diff --git a/alembic/versions/p9c0d1e2f3a4_add_transducer_observation_deployment_index.py b/alembic/versions/p9c0d1e2f3a4_add_transducer_observation_deployment_index.py new file mode 100644 index 00000000..ea512a86 --- /dev/null +++ b/alembic/versions/p9c0d1e2f3a4_add_transducer_observation_deployment_index.py @@ -0,0 +1,30 @@ +"""Add transducer observation deployment lookup index. + +Revision ID: p9c0d1e2f3a4 +Revises: o8b9c0d1e2f3 +Create Date: 2026-03-19 11:05:00.000000 +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "p9c0d1e2f3a4" +down_revision = "o8b9c0d1e2f3" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_index( + "ix_transducer_observation_deployment_parameter_datetime", + "transducer_observation", + ["deployment_id", "parameter_id", "observation_datetime"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index( + "ix_transducer_observation_deployment_parameter_datetime", + table_name="transducer_observation", + ) diff --git a/cli/db_restore.py b/cli/db_restore.py index c746a18e..bd1200c0 100644 --- a/cli/db_restore.py +++ b/cli/db_restore.py @@ -235,6 +235,7 @@ def restore_local_db_from_sql( ) from exc return LocalDbRestoreResult( + sql_file=staged_sql_file, source=source_description, host=host, port=port, diff --git a/db/transducer.py b/db/transducer.py index ae9ac01d..1670bb9f 100644 --- a/db/transducer.py +++ b/db/transducer.py @@ -107,6 +107,14 @@ class TransducerObservation(Base, AutoBaseMixin, ReleaseMixin): """ __tablename__ = "transducer_observation" + __table_args__ = ( + Index( + "ix_transducer_observation_deployment_parameter_datetime", + "deployment_id", + "parameter_id", + "observation_datetime", + ), + ) parameter_id: Mapped[int] = mapped_column( ForeignKey("parameter.id", ondelete="CASCADE"), nullable=False, index=True diff --git a/docker-compose.yml b/docker-compose.yml index 5331fe3d..fcbd09f1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,6 +39,7 @@ services: - PYGEOAPI_POSTGRES_DB=ocotilloapi_dev - PYGEOAPI_POSTGRES_USER=${POSTGRES_USER} - PYGEOAPI_POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + - GOOGLE_APPLICATION_CREDENTIALS=/app/gcs_credentials.json ports: - 8000:8000 depends_on: diff --git a/services/observation_helper.py b/services/observation_helper.py index af24af05..c049d5b1 100644 --- a/services/observation_helper.py +++ b/services/observation_helper.py @@ -52,34 +52,25 @@ def get_transducer_observations( order: str | None = None, filter_: str = Query(alias="filter", default=None), ): + deployment_rows: list[tuple[int, int]] = [] + deployment_to_thing: dict[int, int] = {} + if thing_id: item = session.get(Thing, thing_id) if item is None: empty_query = select(TransducerObservation).where(False) return paginate(query=empty_query, conn=session) - - # Subquery to get latest block for each observation - block_subq = ( - select(TransducerObservationBlock.id) - .where( - TransducerObservationBlock.parameter_id - == TransducerObservation.parameter_id, - TransducerObservationBlock.start_datetime - <= TransducerObservation.observation_datetime, - TransducerObservationBlock.end_datetime - >= TransducerObservation.observation_datetime, - ) - .order_by(desc(TransducerObservationBlock.start_datetime)) - .limit(1) - .correlate(TransducerObservation) - .scalar_subquery() - ) - - query = ( - select(TransducerObservation, TransducerObservationBlock) - .join(Deployment, TransducerObservation.deployment_id == Deployment.id) - .join(TransducerObservationBlock, TransducerObservationBlock.id == block_subq) - ) + deployment_rows = session.execute( + select(Deployment.id, Deployment.thing_id).where( + Deployment.thing_id == thing_id + ) + ).all() + deployment_to_thing = { + deployment_id: deployment_thing_id + for deployment_id, deployment_thing_id in deployment_rows + } + + query = select(TransducerObservation) if start_time: query = query.where(TransducerObservation.observation_datetime >= start_time) @@ -89,23 +80,104 @@ def get_transducer_observations( if parameter_id: query = query.where(TransducerObservation.parameter_id == parameter_id) if thing_id: - query = query.where(Deployment.thing_id == thing_id) + deployment_ids = list(deployment_to_thing) + if not deployment_ids: + empty_query = select(TransducerObservation).where(False) + return paginate(query=empty_query, conn=session) + query = query.where(TransducerObservation.deployment_id.in_(deployment_ids)) - def transformer(result): + def transformer(observations): from schemas.transducer import ( TransducerObservationWithBlockResponse, TransducerObservationResponse, TransducerObservationBlockResponse, ) - return [ - TransducerObservationWithBlockResponse( - observation=TransducerObservationResponse.model_validate(observation), - block=TransducerObservationBlockResponse.model_validate(block), - ).model_dump() - for observation, block in result + if not observations: + return [] + + deployment_ids = {observation.deployment_id for observation in observations} + if not deployment_to_thing or not deployment_ids.issubset(deployment_to_thing): + deployment_rows = session.execute( + select(Deployment.id, Deployment.thing_id).where( + Deployment.id.in_(deployment_ids) + ) + ).all() + deployment_to_thing.update( + { + deployment_id: deployment_thing_id + for deployment_id, deployment_thing_id in deployment_rows + } + ) + + thing_ids = { + deployment_to_thing[observation.deployment_id] + for observation in observations + if observation.deployment_id in deployment_to_thing + } + parameter_ids = {observation.parameter_id for observation in observations} + observation_datetimes = [ + observation.observation_datetime for observation in observations ] + block_rows = session.scalars( + select(TransducerObservationBlock) + .where( + TransducerObservationBlock.thing_id.in_(thing_ids), + TransducerObservationBlock.parameter_id.in_(parameter_ids), + TransducerObservationBlock.start_datetime <= max(observation_datetimes), + TransducerObservationBlock.end_datetime >= min(observation_datetimes), + ) + .order_by( + TransducerObservationBlock.thing_id, + TransducerObservationBlock.parameter_id, + desc(TransducerObservationBlock.start_datetime), + ) + ).all() + + block_map: dict[tuple[int, int], list[TransducerObservationBlock]] = {} + for block in block_rows: + key = (block.thing_id, block.parameter_id) + if key not in block_map: + block_map[key] = [] + block_map[key].append(block) + + response_items = [] + for observation in observations: + thing_id_for_observation = deployment_to_thing.get( + observation.deployment_id + ) + if thing_id_for_observation is None: + continue + + matching_block = next( + ( + block + for block in block_map.get( + (thing_id_for_observation, observation.parameter_id), [] + ) + if block.start_datetime + <= observation.observation_datetime + <= block.end_datetime + ), + None, + ) + if matching_block is None: + continue + + response_items.append( + TransducerObservationWithBlockResponse( + observation=TransducerObservationResponse.model_validate( + observation + ), + block=TransducerObservationBlockResponse.model_validate( + matching_block + ), + ).model_dump() + ) + + return response_items + query = query.order_by(TransducerObservation.observation_datetime.desc()) return paginate(query=query, conn=session, transformer=transformer) diff --git a/tests/test_observation.py b/tests/test_observation.py index daad2678..386c823c 100644 --- a/tests/test_observation.py +++ b/tests/test_observation.py @@ -14,7 +14,8 @@ # limitations under the License. # =============================================================================== -from datetime import timezone +from datetime import datetime, timedelta, timezone +import uuid import pytest @@ -25,7 +26,18 @@ amp_editor_function, viewer_function, ) -from db import Observation, FieldEvent, FieldActivity, Sample +from db import ( + Deployment, + FieldActivity, + FieldEvent, + LocationThingAssociation, + Observation, + Sample, + Sensor, + Thing, + TransducerObservation, + TransducerObservationBlock, +) from db.engine import session_ctx from main import app from schemas import DT_FMT @@ -384,6 +396,162 @@ def test_get_groundwater_level_observations(groundwater_level_observation): ) +def test_get_transducer_groundwater_level_observations_uses_blocks_for_same_thing( + location, second_location, sensor +): + observation_time = datetime.now(timezone.utc) + matching_block_id = None + observation_id = None + other_block_id = None + target_deployment_id = None + other_deployment_id = None + other_sensor_id = None + other_thing_id = None + target_thing_id = None + + try: + with session_ctx() as session: + target_thing = Thing( + name="Transducer Target Well", + first_visit_date="2023-03-03", + thing_type="water well", + release_status="draft", + well_depth=10, + hole_depth=10, + well_casing_diameter=5.0, + well_casing_depth=10.0, + ) + other_thing = Thing( + name="Transducer Other Well", + first_visit_date="2023-03-04", + thing_type="water well", + release_status="draft", + well_depth=10, + hole_depth=10, + well_casing_diameter=5.0, + well_casing_depth=10.0, + ) + session.add_all([target_thing, other_thing]) + session.flush() + + session.add_all( + [ + LocationThingAssociation( + location_id=location.id, + thing_id=target_thing.id, + effective_start="2025-02-01T00:00:00Z", + ), + LocationThingAssociation( + location_id=second_location.id, + thing_id=other_thing.id, + effective_start="2025-02-01T00:00:00Z", + ), + ] + ) + + other_sensor = Sensor( + name=f"Transducer Other Sensor {uuid.uuid4()}", + sensor_type="Pressure Transducer", + model="Model X", + serial_no=f"serial-{uuid.uuid4()}", + pcn_number=f"pcn-{uuid.uuid4()}", + owner_agency="NMBGMR", + sensor_status="In Service", + notes="other sensor", + release_status="draft", + ) + session.add(other_sensor) + session.flush() + + target_deployment = Deployment( + sensor_id=sensor.id, + thing_id=target_thing.id, + installation_date="2023-01-01", + recording_interval=24, + recording_interval_units="hour", + hanging_cable_length=10, + hanging_point_height=0, + hanging_point_description="target deployment", + notes="target deployment", + ) + other_deployment = Deployment( + sensor_id=other_sensor.id, + thing_id=other_thing.id, + installation_date="2023-01-01", + recording_interval=24, + recording_interval_units="hour", + hanging_cable_length=10, + hanging_point_height=0, + hanging_point_description="other deployment", + notes="other deployment", + ) + session.add_all([target_deployment, other_deployment]) + session.flush() + + target_block = TransducerObservationBlock( + thing_id=target_thing.id, + parameter_id=_groundwater_level_parameter_id(), + start_datetime=observation_time - timedelta(days=10), + end_datetime=observation_time + timedelta(days=10), + review_status="not reviewed", + ) + other_block = TransducerObservationBlock( + thing_id=other_thing.id, + parameter_id=_groundwater_level_parameter_id(), + start_datetime=observation_time - timedelta(days=1), + end_datetime=observation_time + timedelta(days=1), + review_status="not reviewed", + ) + session.add_all([target_block, other_block]) + session.flush() + + observation = TransducerObservation( + parameter_id=_groundwater_level_parameter_id(), + deployment_id=target_deployment.id, + observation_datetime=observation_time, + value=12.34, + ) + session.add(observation) + session.commit() + + matching_block_id = target_block.id + observation_id = observation.id + other_block_id = other_block.id + target_deployment_id = target_deployment.id + other_deployment_id = other_deployment.id + other_sensor_id = other_sensor.id + target_thing_id = target_thing.id + other_thing_id = other_thing.id + + response = client.get( + f"/observation/transducer-groundwater-level?thing_id={target_thing_id}" + ) + + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["block"]["id"] == matching_block_id + assert data["items"][0]["block"]["id"] != other_block_id + finally: + with session_ctx() as session: + for model, pk in ( + (TransducerObservation, observation_id), + (TransducerObservationBlock, matching_block_id), + (TransducerObservationBlock, other_block_id), + (Deployment, target_deployment_id), + (Deployment, other_deployment_id), + (Sensor, other_sensor_id), + (Thing, target_thing_id), + (Thing, other_thing_id), + ): + if pk is None: + continue + instance = session.get(model, pk) + if instance is not None: + session.delete(instance) + session.commit() + + def test_get_groundwater_level_observation_by_id(groundwater_level_observation): response = client.get( f"/observation/groundwater-level/{groundwater_level_observation.id}"