Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ def upgrade() -> None:
# Remove any rows that cannot be linked to a Thing, then enforce NOT NULL
op.execute('DELETE FROM "NMA_SurfaceWaterData" WHERE thing_id IS NULL')
op.alter_column(
"NMA_SurfaceWaterData", "thing_id", existing_type=sa.Integer(), nullable=False
"NMA_SurfaceWaterData",
"thing_id",
existing_type=sa.Integer(),
nullable=False,
)


Expand Down
53 changes: 40 additions & 13 deletions alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
("monitoring_wells", "monitoring well"),
("observation_wells", "observation well"),
("other_things", "other"),
("outfalls_wastewater_return_flow", "outfall of wastewater or return flow"),
(
"outfalls_wastewater_return_flow",
"outfall of wastewater or return flow",
),
("perennial_streams", "perennial stream"),
("piezometers", "piezometer"),
("production_wells", "production well"),
Expand Down Expand Up @@ -107,8 +110,11 @@ def _create_latest_depth_view() -> str:
o.observation_datetime,
o.value,
o.measuring_point_height,
-- Treat NULL measuring_point_height as 0 when computing depth_to_water_bgs
(o.value - COALESCE(o.measuring_point_height, 0)) AS depth_to_water_bgs,
-- Treat NULL measuring_point_height as 0 when computing
-- depth_to_water_bgs.
(
o.value - COALESCE(o.measuring_point_height, 0)
) AS depth_to_water_bgs,
ROW_NUMBER() OVER (
PARTITION BY fe.thing_id
ORDER BY o.observation_datetime DESC, o.id DESC
Expand Down Expand Up @@ -151,7 +157,10 @@ def _create_avg_tds_view() -> str:
SELECT
csi.thing_id,
mc.id AS major_chemistry_id,
COALESCE(mc."AnalysisDate", csi."CollectionDate")::date AS observation_date,
COALESCE(
mc."AnalysisDate",
csi."CollectionDate"
)::date AS observation_date,
mc."SampleValue" AS sample_value,
mc."Units" AS units
FROM "NMA_MajorChemistry" AS mc
Expand Down Expand Up @@ -193,15 +202,16 @@ def _drop_view_or_materialized_view(view_name: str) -> None:

def _create_matview_indexes() -> None:
# Required so REFRESH MATERIALIZED VIEW CONCURRENTLY can run.
avg_tds_index_sql = (
"CREATE UNIQUE INDEX ux_ogc_avg_tds_wells_id " "ON ogc_avg_tds_wells (id)"
)
op.execute(
text(
"CREATE UNIQUE INDEX ux_ogc_latest_depth_to_water_wells_id "
"ON ogc_latest_depth_to_water_wells (id)"
)
)
op.execute(
text("CREATE UNIQUE INDEX ux_ogc_avg_tds_wells_id " "ON ogc_avg_tds_wells (id)")
)
op.execute(text(avg_tds_index_sql))


def _create_refresh_function() -> str:
Expand All @@ -220,7 +230,11 @@ def _create_refresh_function() -> str:
WHERE schemaname = 'public'
AND matviewname LIKE 'ogc_%'
LOOP
matview_fqname := format('%I.%I', matview_record.schemaname, matview_record.matviewname);
matview_fqname := format(
'%I.%I',
matview_record.schemaname,
matview_record.matviewname
);
EXECUTE format('REFRESH MATERIALIZED VIEW %s', matview_fqname);
END LOOP;
END;
Expand All @@ -235,10 +249,15 @@ def upgrade() -> None:
required_core = {"thing", "location", "location_thing_association"}
existing_tables = set(inspector.get_table_names(schema="public"))
if not required_core.issubset(existing_tables):
missing_tables = sorted(t for t in required_core if t not in existing_tables)
missing_tables = sorted(
table_name
for table_name in required_core
if table_name not in existing_tables
)
missing_tables_str = ", ".join(missing_tables)
raise RuntimeError(
"Cannot create pygeoapi supporting views. The following required core "
"Cannot create pygeoapi supporting views. "
"The following required core "
f"tables are missing: {missing_tables_str}"
)

Expand All @@ -255,7 +274,8 @@ def upgrade() -> None:
)
missing_depth_tables_str = ", ".join(missing_depth_tables)
raise RuntimeError(
"Cannot create ogc_latest_depth_to_water_wells. The following required "
"Cannot create ogc_latest_depth_to_water_wells. "
"The following required "
f"tables are missing: {missing_depth_tables_str}"
)
op.execute(text(_create_latest_depth_view()))
Expand All @@ -269,7 +289,11 @@ def upgrade() -> None:
_drop_view_or_materialized_view("ogc_avg_tds_wells")
required_tds = {"NMA_MajorChemistry", "NMA_Chemistry_SampleInfo"}
if not required_tds.issubset(existing_tables):
missing_tds_tables = sorted(t for t in required_tds if t not in existing_tables)
missing_tds_tables = sorted(
table_name
for table_name in required_tds
if table_name not in existing_tables
)
missing_tds_tables_str = ", ".join(missing_tds_tables)
raise RuntimeError(
"Cannot create ogc_avg_tds_wells. The following required "
Expand All @@ -288,7 +312,10 @@ def upgrade() -> None:


def downgrade() -> None:
op.execute(text(f"DROP FUNCTION IF EXISTS public.{REFRESH_FUNCTION_NAME}()"))
drop_refresh_function_sql = (
f"DROP FUNCTION IF EXISTS public.{REFRESH_FUNCTION_NAME}()"
)
op.execute(text(drop_refresh_function_sql))
_drop_view_or_materialized_view("ogc_avg_tds_wells")
_drop_view_or_materialized_view("ogc_latest_depth_to_water_wells")
for view_id, _ in THING_COLLECTIONS:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""add actively monitored wells pygeoapi view

Revision ID: r2s3t4u5v6w7
Revises: p9c0d1e2f3a4
Create Date: 2026-03-19 10:10:00.000000
"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy import inspect, text

# revision identifiers, used by Alembic.
revision: str = "r2s3t4u5v6w7"
down_revision: Union[str, Sequence[str], None] = "p9c0d1e2f3a4"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
DROP_VIEW_SQL = "DROP VIEW IF EXISTS ogc_actively_monitored_wells"
DROP_MATVIEW_SQL = "DROP MATERIALIZED VIEW IF EXISTS " "ogc_actively_monitored_wells"


def _create_actively_monitored_wells_view() -> str:
return """
CREATE VIEW ogc_actively_monitored_wells AS
SELECT
wws.id,
wws.name,
'water well'::text AS thing_type,
wws.well_depth,
wws.elevation,
wws.elevation_method,
wws.formation_zone,
wws.total_water_levels,
wws.last_water_level,
wws.last_water_level_datetime,
wws.min_water_level,
wws.max_water_level,
wws.water_level_trend_ft_per_year,
g.id AS group_id,
g.name AS group_name,
g.group_type,
wws.point
FROM "group" AS g
JOIN group_thing_association AS gta ON gta.group_id = g.id
JOIN ogc_water_well_summary AS wws ON wws.id = gta.thing_id
WHERE lower(trim(g.name)) = 'water level network'
"""


def upgrade() -> None:
bind = op.get_bind()
inspector = inspect(bind)
existing_tables = set(inspector.get_table_names(schema="public"))
required_tables = {
"group",
"group_thing_association",
}

if not required_tables.issubset(existing_tables):
missing = sorted(
table_name
for table_name in required_tables
if table_name not in existing_tables
)
raise RuntimeError(
"Cannot create ogc_actively_monitored_wells. "
f"Missing required tables: {', '.join(missing)}"
)

has_summary = bind.execute(
text(
"SELECT 1 FROM pg_matviews "
"WHERE schemaname = 'public' "
"AND matviewname = 'ogc_water_well_summary'"
)
).scalar()
if has_summary != 1:
raise RuntimeError(
"Cannot create ogc_actively_monitored_wells. "
"Missing required materialized view: ogc_water_well_summary"
)

op.execute(text(DROP_VIEW_SQL))
op.execute(text(DROP_MATVIEW_SQL))
op.execute(text(_create_actively_monitored_wells_view()))
op.execute(
text(
"COMMENT ON VIEW ogc_actively_monitored_wells IS "
"'Wells in the Water Level Network group for pygeoapi.'"
)
)


def downgrade() -> None:
op.execute(text(DROP_VIEW_SQL))
op.execute(text(DROP_MATVIEW_SQL))
99 changes: 99 additions & 0 deletions alembic/versions/s4t5u6v7w8x9_drop_unused_well_type_ogc_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""drop unused well-type OGC views

Revision ID: s4t5u6v7w8x9
Revises: r2s3t4u5v6w7
Create Date: 2026-03-19 14:30:00.000000
"""

import re
from typing import Sequence, Union

from alembic import op
from sqlalchemy import text

# revision identifiers, used by Alembic.
revision: str = "s4t5u6v7w8x9"
down_revision: Union[str, Sequence[str], None] = "r2s3t4u5v6w7"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

REMOVED_THING_COLLECTIONS = [
("abandoned_wells", "abandoned well"),
("artesian_wells", "artesian well"),
("dry_holes", "dry hole"),
("dug_wells", "dug well"),
("exploration_wells", "exploration well"),
("injection_wells", "injection well"),
("monitoring_wells", "monitoring well"),
("observation_wells", "observation well"),
("piezometers", "piezometer"),
("production_wells", "production well"),
("test_wells", "test well"),
]

LATEST_LOCATION_CTE = """
SELECT DISTINCT ON (lta.thing_id)
lta.thing_id,
lta.location_id,
lta.effective_start
FROM location_thing_association AS lta
WHERE lta.effective_end IS NULL
ORDER BY lta.thing_id, lta.effective_start DESC
""".strip()


def _safe_view_id(view_id: str) -> str:
if not re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", view_id):
raise ValueError(f"Unsafe view id: {view_id!r}")
return view_id


def _drop_view_or_materialized_view(view_name: str) -> None:
op.execute(text(f"DROP VIEW IF EXISTS {view_name}"))
op.execute(text(f"DROP MATERIALIZED VIEW IF EXISTS {view_name}"))


def _create_thing_view(view_id: str, thing_type: str) -> str:
safe_view_id = _safe_view_id(view_id)
escaped_thing_type = thing_type.replace("'", "''")
return f"""
CREATE VIEW ogc_{safe_view_id} AS
WITH latest_location AS (
{LATEST_LOCATION_CTE}
)
SELECT
t.id,
t.name,
t.first_visit_date,
t.nma_pk_welldata,
t.well_depth,
t.hole_depth,
t.well_casing_diameter,
t.well_casing_depth,
t.well_completion_date,
t.well_driller_name,
t.well_construction_method,
t.well_pump_type,
t.well_pump_depth,
t.formation_completion_code,
t.nma_formation_zone,
t.release_status,
l.elevation,
l.point
FROM thing AS t
JOIN latest_location AS ll ON ll.thing_id = t.id
JOIN location AS l ON l.id = ll.location_id
WHERE t.thing_type = '{escaped_thing_type}'
"""


def upgrade() -> None:
for view_id, _ in REMOVED_THING_COLLECTIONS:
_drop_view_or_materialized_view(f"ogc_{_safe_view_id(view_id)}")


def downgrade() -> None:
for view_id, thing_type in REMOVED_THING_COLLECTIONS:
safe_view_id = _safe_view_id(view_id)
_drop_view_or_materialized_view(f"ogc_{safe_view_id}")
op.execute(text(_create_thing_view(view_id, thing_type)))
23 changes: 23 additions & 0 deletions core/pygeoapi-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,27 @@ resources:
table: ogc_minor_chemistry_wells
geom_field: point

actively_monitored_wells:
type: collection
title: Actively Monitored Wells
description: Wells in the collaborative network currently flagged as actively monitored.
keywords: [water-wells, monitoring, collaborative-network, actively-monitored]
extents:
spatial:
bbox: [-109.05, 31.33, -103.00, 37.00]
crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84
providers:
- type: feature
name: PostgreSQL
data:
host: {postgres_host}
port: {postgres_port}
dbname: {postgres_db}
user: {postgres_user}
password: {postgres_password_env}
search_path: [public]
id_field: id
table: ogc_actively_monitored_wells
geom_field: point

{thing_collections_block}
Loading
Loading