Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
2491e15
feat: implement Radionuclides backfill job and BDD step definitions
kbighorse Feb 28, 2026
90e87b7
fix: harden chemistry backfill test cleanup
kbighorse Feb 28, 2026
03df1f1
fix: harden backfill robustness and remove misleading batch_size
kbighorse Feb 28, 2026
2cba0ae
fix: make migration idempotent, remove batch-size drift, scope test c…
kbighorse Feb 28, 2026
20cee26
fix: harden migration FK lookup, add row-error tracebacks, scope test…
kbighorse Feb 28, 2026
56fa0a6
Formatting changes
kbighorse Feb 28, 2026
c1d1b3f
fix: scope sample assertions, normalize existing_keys case, broaden r…
kbighorse Feb 28, 2026
848524a
Formatting changes
kbighorse Feb 28, 2026
4d867ee
Merge branch 'staging' into 558-radionuclides-backfill
kbighorse Mar 2, 2026
ac44f6b
fix(tests): scope analysis method cleanup to only backfill-created rows
kbighorse Mar 2, 2026
10f3e41
fix(tests): clear backfill tracking after scenario cleanup
kbighorse Mar 2, 2026
1f09e20
fix: deterministic volume handling and explicit CLI arg rejection
kbighorse Mar 2, 2026
ae60c1f
Formatting changes
kbighorse Mar 2, 2026
243f5cf
Merge remote-tracking branch 'origin/staging' into 558-radionuclides-…
kbighorse Mar 2, 2026
f7cef22
Merge branch '558-radionuclides-backfill' of https://github.com/DataI…
kbighorse Mar 2, 2026
09c31ff
fix: make pg_cron optional for local development
kbighorse Mar 2, 2026
72cdf83
fix: make pg_cron optional for local development
kbighorse Mar 2, 2026
0acbfb6
Merge branch '558-radionuclides-backfill' of https://github.com/DataI…
kbighorse Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""add chemistry backfill columns to observation and sample

Revision ID: 545a5b77e5e8
Revises: d5e6f7a8b9c0
Create Date: 2026-02-27 11:30:45.380002

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision: str = "545a5b77e5e8"
down_revision: Union[str, Sequence[str], None] = "d5e6f7a8b9c0"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add chemistry backfill columns to observation and sample tables."""
# Observation table: 4 new columns
op.add_column(
"observation",
sa.Column(
"nma_pk_chemistryresults",
sa.String(),
nullable=True,
comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key",
),
)
op.add_column(
"observation",
sa.Column(
"detect_flag",
sa.Boolean(),
nullable=True,
comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier",
),
)
op.add_column(
"observation",
sa.Column(
"uncertainty",
sa.Float(),
nullable=True,
comment="Measurement uncertainty for the observation value",
),
)
op.add_column(
"observation",
sa.Column(
"analysis_agency",
sa.String(),
nullable=True,
comment="Agency or lab that performed the analysis",
),
)
op.create_unique_constraint(
"uq_observation_nma_pk_chemistryresults",
"observation",
["nma_pk_chemistryresults"],
)

# Observation version table (sqlalchemy-continuum)
op.add_column(
"observation_version",
sa.Column(
"nma_pk_chemistryresults",
sa.String(),
autoincrement=False,
nullable=True,
comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key",
),
)
op.add_column(
"observation_version",
sa.Column(
"detect_flag",
sa.Boolean(),
autoincrement=False,
nullable=True,
comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier",
),
)
op.add_column(
"observation_version",
sa.Column(
"uncertainty",
sa.Float(),
autoincrement=False,
nullable=True,
comment="Measurement uncertainty for the observation value",
),
)
op.add_column(
"observation_version",
sa.Column(
"analysis_agency",
sa.String(),
autoincrement=False,
nullable=True,
comment="Agency or lab that performed the analysis",
),
)

# Sample table: 3 new columns
op.add_column(
"sample",
sa.Column(
"nma_pk_chemistrysample",
sa.String(),
nullable=True,
comment="NM_Aquifer SamplePtID for chemistry samples — transfer audit key",
),
)
op.add_column(
"sample",
sa.Column(
"volume",
sa.Float(),
nullable=True,
comment="Volume of the sample collected",
),
)
op.add_column(
"sample",
sa.Column(
"volume_unit",
sa.String(),
nullable=True,
comment="Unit for the sample volume (e.g. mL, L)",
),
)
op.create_unique_constraint(
"uq_sample_nma_pk_chemistrysample",
"sample",
["nma_pk_chemistrysample"],
)

# Drop stale thing_id column from NMA_Radionuclides (model no longer defines it;
# relationships go through NMA_Chemistry_SampleInfo.thing_id instead).
# Guard against environments where the column was already removed.
conn = op.get_bind()
has_thing_id = conn.execute(
sa.text(
"SELECT 1 FROM information_schema.columns "
"WHERE table_name = 'NMA_Radionuclides' AND column_name = 'thing_id'"
)
).scalar()
if has_thing_id:
# FK name may differ across environments; look it up dynamically.
fks = sa.inspect(conn).get_foreign_keys("NMA_Radionuclides")
for fk in fks:
if "thing_id" in fk["constrained_columns"] and fk.get("name"):
op.drop_constraint(fk["name"], "NMA_Radionuclides", type_="foreignkey")
op.drop_column("NMA_Radionuclides", "thing_id")


def downgrade() -> None:
"""Remove chemistry backfill columns."""
# Restore NMA_Radionuclides.thing_id (add nullable, backfill, then enforce)
op.add_column(
"NMA_Radionuclides",
sa.Column(
"thing_id",
sa.Integer(),
nullable=True,
),
)
op.execute(
'UPDATE "NMA_Radionuclides" r '
"SET thing_id = csi.thing_id "
'FROM "NMA_Chemistry_SampleInfo" csi '
"WHERE r.chemistry_sample_info_id = csi.id"
)
op.alter_column("NMA_Radionuclides", "thing_id", nullable=False)
op.create_foreign_key(
"NMA_Radionuclides_thing_id_fkey",
"NMA_Radionuclides",
"thing",
["thing_id"],
["id"],
ondelete="CASCADE",
)

op.drop_constraint("uq_sample_nma_pk_chemistrysample", "sample", type_="unique")
op.drop_column("sample", "volume_unit")
op.drop_column("sample", "volume")
op.drop_column("sample", "nma_pk_chemistrysample")

op.drop_column("observation_version", "analysis_agency")
op.drop_column("observation_version", "uncertainty")
op.drop_column("observation_version", "detect_flag")
op.drop_column("observation_version", "nma_pk_chemistryresults")

op.drop_constraint(
"uq_observation_nma_pk_chemistryresults", "observation", type_="unique"
)
op.drop_column("observation", "analysis_agency")
op.drop_column("observation", "uncertainty")
op.drop_column("observation", "detect_flag")
op.drop_column("observation", "nma_pk_chemistryresults")
22 changes: 14 additions & 8 deletions alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,8 @@ def upgrade() -> None:
")"
)
).scalar()
if not pg_cron_available:
raise RuntimeError(
"Cannot schedule nightly pygeoapi materialized view refresh job: "
"pg_cron extension is not available on this PostgreSQL server."
)
op.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))
if pg_cron_available:
op.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))

for view_id, thing_type in THING_COLLECTIONS:
safe_view_id = _safe_view_id(view_id)
Expand Down Expand Up @@ -364,11 +360,21 @@ def upgrade() -> None:
_create_matview_indexes()

op.execute(text(_create_refresh_function()))
op.execute(text(_schedule_refresh_job()))
if pg_cron_available:
op.execute(text(_schedule_refresh_job()))


def downgrade() -> None:
op.execute(text(_unschedule_refresh_job()))
bind = op.get_bind()
pg_cron_available = bind.execute(
text(
"SELECT EXISTS ("
"SELECT 1 FROM pg_available_extensions WHERE name = 'pg_cron'"
")"
)
).scalar()
if pg_cron_available:
op.execute(text(_unschedule_refresh_job()))
op.execute(text(f"DROP FUNCTION IF EXISTS public.{REFRESH_FUNCTION_NAME}()"))
_drop_view_or_materialized_view("ogc_avg_tds_wells")
_drop_view_or_materialized_view("ogc_latest_depth_to_water_wells")
Expand Down
8 changes: 2 additions & 6 deletions core/initializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,8 @@ def erase_and_rebuild_db():
")"
)
).scalar()
if not pg_cron_available:
raise RuntimeError(
"Cannot erase and rebuild database: pg_cron extension is not "
"available on this PostgreSQL server."
)
session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))
if pg_cron_available:
session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))
session.commit()
Base.metadata.drop_all(session.bind)
Base.metadata.create_all(session.bind)
Expand Down
14 changes: 14 additions & 0 deletions core/lexicon.json
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,13 @@
"term": "mg/L",
"definition": "Milligrams per Liter"
},
{
"categories": [
"unit"
],
"term": "pCi/L",
"definition": "Picocuries per Liter"
},
{
"categories": [
"unit"
Expand Down Expand Up @@ -8213,6 +8220,13 @@
"term": "Site Notes (legacy)",
"definition": "Legacy site notes field from WaterLevels"
},
{
"categories": [
"note_type"
],
"term": "Chemistry Observation",
"definition": "Notes from chemistry observation results"
},
{
"categories": [
"well_pump_type"
Expand Down
14 changes: 7 additions & 7 deletions db/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os

from sqlalchemy import text
from sqlalchemy import inspect as sa_inspect, text
from sqlalchemy.engine import Connection
from sqlalchemy.orm import Session
from sqlalchemy_searchable import sync_trigger
Expand Down Expand Up @@ -69,12 +69,8 @@ def recreate_public_schema(session: Session) -> None:
")"
)
).scalar()
if not pg_cron_available:
raise RuntimeError(
"Cannot initialize database schema: pg_cron extension is not available "
"on this PostgreSQL server."
)
session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))
if pg_cron_available:
session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))
session.execute(APP_READ_GRANT_SQL)
grant_app_read_members(session)
session.commit()
Expand All @@ -83,7 +79,11 @@ def recreate_public_schema(session: Session) -> None:
def sync_search_vector_triggers(session: Session) -> None:
"""Ensure SQLAlchemy-searchable triggers exist for every TSVector column."""
conn = session.connection()
inspector = sa_inspect(conn)
existing_tables = set(inspector.get_table_names())
for table in Base.metadata.tables.values():
if table.name not in existing_tables:
continue
for column in table.columns:
if isinstance(column.type, TSVectorType):
sync_trigger(conn, table.name, column.name, list(column.type.columns))
Expand Down
19 changes: 19 additions & 0 deletions db/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ class Observation(Base, AutoBaseMixin, ReleaseMixin):

# NM_Aquifer fields for audits
nma_pk_waterlevels: Mapped[str] = mapped_column(nullable=True)
nma_pk_chemistryresults: Mapped[str] = mapped_column(
nullable=True,
unique=True,
comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key",
)

# Chemistry-specific columns
detect_flag: Mapped[bool] = mapped_column(
nullable=True,
comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier",
)
uncertainty: Mapped[float] = mapped_column(
nullable=True,
comment="Measurement uncertainty for the observation value",
)
analysis_agency: Mapped[str] = mapped_column(
nullable=True,
comment="Agency or lab that performed the analysis",
)

# --- Foreign Keys ---
sample_id: Mapped[int] = mapped_column(
Expand Down
15 changes: 15 additions & 0 deletions db/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ class Sample(Base, AutoBaseMixin, ReleaseMixin):
nullable=True,
comment="NM_Aquifer primary key for waterlevels - to be used for transfer audits",
)
nma_pk_chemistrysample: Mapped[str] = mapped_column(
nullable=True,
unique=True,
comment="NM_Aquifer SamplePtID for chemistry samples — transfer audit key",
)

# Chemistry sample attributes
volume: Mapped[float] = mapped_column(
nullable=True,
comment="Volume of the sample collected",
)
volume_unit: Mapped[str] = mapped_column(
nullable=True,
comment="Unit for the sample volume (e.g. mL, L)",
)

# --- Relationship Definitions ---
field_activity: Mapped["FieldActivity"] = relationship(back_populates="samples")
Expand Down
5 changes: 2 additions & 3 deletions run_backfill.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# e.g. state, county, quad_name,etc. It will also be used to handle data refactors/corrections in the future.

# Load environment variables from .env and run the staging backfill.
# Usage: ./run_backfill.sh [--batch-size N]
# Usage: ./run_backfill.sh

# github workflow equivalent: for reference only
#- name: Run backfill script on staging database
Expand Down Expand Up @@ -38,5 +38,4 @@ set +a

uv run alembic upgrade head

# Forward any args (e.g., --batch-size 500)
python -m transfers.backfill.backfill "$@"
python -m transfers.backfill.backfill
4 changes: 4 additions & 0 deletions schemas/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class BaseObservationResponse(BaseResponseModel):
value: float | None
unit: Unit
nma_data_quality: str | None = None
nma_pk_chemistryresults: str | None = None
detect_flag: bool | None = None
uncertainty: float | None = None
analysis_agency: str | None = None


class GroundwaterLevelObservationResponse(BaseObservationResponse):
Expand Down
Loading