Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Add transducer observation deployment lookup index.

Revision ID: p9c0d1e2f3a4
Revises: o8b9c0d1e2f3
Create Date: 2026-03-19 11:05:00.000000
"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "p9c0d1e2f3a4"
down_revision = "o8b9c0d1e2f3"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_index(
"ix_transducer_observation_deployment_parameter_datetime",
"transducer_observation",
["deployment_id", "parameter_id", "observation_datetime"],
unique=False,
)


def downgrade() -> None:
op.drop_index(
"ix_transducer_observation_deployment_parameter_datetime",
table_name="transducer_observation",
)
1 change: 1 addition & 0 deletions cli/db_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def restore_local_db_from_sql(
) from exc

return LocalDbRestoreResult(
sql_file=staged_sql_file,
source=source_description,
host=host,
port=port,
Expand Down
8 changes: 8 additions & 0 deletions db/transducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ class TransducerObservation(Base, AutoBaseMixin, ReleaseMixin):
"""

__tablename__ = "transducer_observation"
__table_args__ = (
Index(
"ix_transducer_observation_deployment_parameter_datetime",
"deployment_id",
"parameter_id",
"observation_datetime",
),
)

parameter_id: Mapped[int] = mapped_column(
ForeignKey("parameter.id", ondelete="CASCADE"), nullable=False, index=True
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ services:
- PYGEOAPI_POSTGRES_DB=ocotilloapi_dev
- PYGEOAPI_POSTGRES_USER=${POSTGRES_USER}
- PYGEOAPI_POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- GOOGLE_APPLICATION_CREDENTIALS=/app/gcs_credentials.json
ports:
- 8000:8000
depends_on:
Expand Down
134 changes: 103 additions & 31 deletions services/observation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,34 +52,25 @@ def get_transducer_observations(
order: str | None = None,
filter_: str = Query(alias="filter", default=None),
):
deployment_rows: list[tuple[int, int]] = []
deployment_to_thing: dict[int, int] = {}

if thing_id:
item = session.get(Thing, thing_id)
if item is None:
empty_query = select(TransducerObservation).where(False)
return paginate(query=empty_query, conn=session)

# Subquery to get latest block for each observation
block_subq = (
select(TransducerObservationBlock.id)
.where(
TransducerObservationBlock.parameter_id
== TransducerObservation.parameter_id,
TransducerObservationBlock.start_datetime
<= TransducerObservation.observation_datetime,
TransducerObservationBlock.end_datetime
>= TransducerObservation.observation_datetime,
)
.order_by(desc(TransducerObservationBlock.start_datetime))
.limit(1)
.correlate(TransducerObservation)
.scalar_subquery()
)

query = (
select(TransducerObservation, TransducerObservationBlock)
.join(Deployment, TransducerObservation.deployment_id == Deployment.id)
.join(TransducerObservationBlock, TransducerObservationBlock.id == block_subq)
)
deployment_rows = session.execute(
select(Deployment.id, Deployment.thing_id).where(
Deployment.thing_id == thing_id
)
).all()
deployment_to_thing = {
deployment_id: deployment_thing_id
for deployment_id, deployment_thing_id in deployment_rows
}

query = select(TransducerObservation)

if start_time:
query = query.where(TransducerObservation.observation_datetime >= start_time)
Expand All @@ -89,23 +80,104 @@ def get_transducer_observations(
if parameter_id:
query = query.where(TransducerObservation.parameter_id == parameter_id)
if thing_id:
query = query.where(Deployment.thing_id == thing_id)
deployment_ids = list(deployment_to_thing)
if not deployment_ids:
empty_query = select(TransducerObservation).where(False)
return paginate(query=empty_query, conn=session)
query = query.where(TransducerObservation.deployment_id.in_(deployment_ids))

def transformer(result):
def transformer(observations):
from schemas.transducer import (
TransducerObservationWithBlockResponse,
TransducerObservationResponse,
TransducerObservationBlockResponse,
)

return [
TransducerObservationWithBlockResponse(
observation=TransducerObservationResponse.model_validate(observation),
block=TransducerObservationBlockResponse.model_validate(block),
).model_dump()
for observation, block in result
if not observations:
return []

deployment_ids = {observation.deployment_id for observation in observations}
if not deployment_to_thing or not deployment_ids.issubset(deployment_to_thing):
deployment_rows = session.execute(
select(Deployment.id, Deployment.thing_id).where(
Deployment.id.in_(deployment_ids)
)
).all()
deployment_to_thing.update(
{
deployment_id: deployment_thing_id
for deployment_id, deployment_thing_id in deployment_rows
}
)

thing_ids = {
deployment_to_thing[observation.deployment_id]
for observation in observations
if observation.deployment_id in deployment_to_thing
}
parameter_ids = {observation.parameter_id for observation in observations}
observation_datetimes = [
observation.observation_datetime for observation in observations
]

block_rows = session.scalars(
select(TransducerObservationBlock)
.where(
TransducerObservationBlock.thing_id.in_(thing_ids),
TransducerObservationBlock.parameter_id.in_(parameter_ids),
TransducerObservationBlock.start_datetime <= max(observation_datetimes),
TransducerObservationBlock.end_datetime >= min(observation_datetimes),
)
.order_by(
TransducerObservationBlock.thing_id,
TransducerObservationBlock.parameter_id,
desc(TransducerObservationBlock.start_datetime),
)
).all()

block_map: dict[tuple[int, int], list[TransducerObservationBlock]] = {}
for block in block_rows:
key = (block.thing_id, block.parameter_id)
if key not in block_map:
block_map[key] = []
block_map[key].append(block)

response_items = []
for observation in observations:
thing_id_for_observation = deployment_to_thing.get(
observation.deployment_id
)
if thing_id_for_observation is None:
continue

matching_block = next(
(
block
for block in block_map.get(
(thing_id_for_observation, observation.parameter_id), []
)
if block.start_datetime
<= observation.observation_datetime
<= block.end_datetime
),
None,
)
if matching_block is None:
continue
Comment on lines +165 to +166

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep blockless observations out of the paginated result set

If a page contains transducer observations that do not currently fall inside any TransducerObservationBlock (for example, newly ingested data before QC blocks are created), this branch drops them after paginate() has already counted and sliced select(TransducerObservation). That makes total/pages inconsistent with items, and page 1 can be empty even though later pages still return data. The previous SQL join filtered these rows before pagination, so this is a regression in get_transducer_observations.

Useful? React with 👍 / 👎.


response_items.append(
TransducerObservationWithBlockResponse(
observation=TransducerObservationResponse.model_validate(
observation
),
block=TransducerObservationBlockResponse.model_validate(
matching_block
),
).model_dump()
)

return response_items

query = query.order_by(TransducerObservation.observation_datetime.desc())

return paginate(query=query, conn=session, transformer=transformer)
Expand Down
Loading
Loading