diff --git a/backend/app/alembic/versions/050_add_cost_to_evaluation_run.py b/backend/app/alembic/versions/050_add_cost_to_evaluation_run.py new file mode 100644 index 000000000..6d63de3e8 --- /dev/null +++ b/backend/app/alembic/versions/050_add_cost_to_evaluation_run.py @@ -0,0 +1,33 @@ +"""add cost tracking to evaluation_run + +Revision ID: 050 +Revises: 049 +Create Date: 2026-04-09 12:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "050" +down_revision = "049" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "evaluation_run", + sa.Column( + "cost", + postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + comment="Cost tracking (response/embedding tokens and USD)", + ), + ) + + +def downgrade(): + op.drop_column("evaluation_run", "cost") diff --git a/backend/app/crud/evaluations/__init__.py b/backend/app/crud/evaluations/__init__.py index a5824c0a2..64dfb8a3a 100644 --- a/backend/app/crud/evaluations/__init__.py +++ b/backend/app/crud/evaluations/__init__.py @@ -31,6 +31,12 @@ update_traces_with_cosine_scores, upload_dataset_to_langfuse, ) +from app.crud.evaluations.pricing import ( + build_cost_dict, + build_embedding_cost_entry, + build_response_cost_entry, + calculate_token_cost, +) from app.crud.evaluations.processing import ( check_and_process_evaluation, poll_all_pending_evaluations, @@ -74,6 +80,11 @@ "calculate_average_similarity", "calculate_cosine_similarity", "start_embedding_batch", + # Pricing + "build_cost_dict", + "build_embedding_cost_entry", + "build_response_cost_entry", + "calculate_token_cost", # Langfuse "create_langfuse_dataset_run", "fetch_trace_scores_from_langfuse", diff --git a/backend/app/crud/evaluations/core.py b/backend/app/crud/evaluations/core.py index 79a3c9d3f..e52d77cdc 100644 --- a/backend/app/crud/evaluations/core.py +++ b/backend/app/crud/evaluations/core.py @@ -197,6 +197,7 @@ def update_evaluation_run( object_store_url: str | None = None, score_trace_url: str | None = None, score: dict | None = None, + cost: dict | None = None, embedding_batch_job_id: int | None = None, ) -> EvaluationRun: """ @@ -211,7 +212,9 @@ def update_evaluation_run( status: New status value (optional) error_message: New error message (optional) object_store_url: New object store URL (optional) + score_trace_url: New per-trace score S3 URL (optional) score: New score dict (optional) + cost: New cost dict (optional) embedding_batch_job_id: New embedding batch job ID (optional) Returns: @@ -226,6 +229,8 @@ def update_evaluation_run( eval_run.object_store_url = object_store_url if score is not None: eval_run.score = score + if cost is not None: + eval_run.cost = cost if embedding_batch_job_id is not None: eval_run.embedding_batch_job_id = embedding_batch_job_id if score_trace_url is not None: diff --git a/backend/app/crud/evaluations/pricing.py b/backend/app/crud/evaluations/pricing.py new file mode 100644 index 000000000..be3d98791 --- /dev/null +++ b/backend/app/crud/evaluations/pricing.py @@ -0,0 +1,225 @@ +""" +Pricing utilities for evaluation cost tracking. + +This module provides model pricing data and cost calculation functions +for both response generation and embedding stages of evaluation runs. + +Pricing uses OpenAI Batch API rates (50% cheaper than real-time). +Source: https://github.com/BerriAI/litellm/blob/main/model_prices_and_context_window.json +""" + +import logging +from collections.abc import Callable, Iterable +from typing import Any + +from app.crud.evaluations.embeddings import EMBEDDING_MODEL + +logger = logging.getLogger(__name__) + +# Number of decimals to round USD cost values to. +COST_USD_DECIMALS = 6 + +# Batch API pricing in USD per token. +MODEL_PRICING: dict[str, dict[str, float]] = { + # GPT-4o (batch pricing) + "gpt-4o": { + "input_cost_per_token": 1.25e-06, + "output_cost_per_token": 5e-06, + }, + "gpt-4o-mini": { + "input_cost_per_token": 7.5e-08, + "output_cost_per_token": 3e-07, + }, + # GPT-4.1 (batch pricing) + "gpt-4.1": { + "input_cost_per_token": 1e-06, + "output_cost_per_token": 4e-06, + }, + # GPT-5 (batch pricing) + "gpt-5": { + "input_cost_per_token": 6.25e-07, + "output_cost_per_token": 5e-06, + }, + "gpt-5-mini": { + "input_cost_per_token": 1.25e-07, + "output_cost_per_token": 1e-06, + }, + "gpt-5-nano": { + "input_cost_per_token": 2.5e-08, + "output_cost_per_token": 2e-07, + }, + # GPT-5.4 (batch pricing) + "gpt-5.4": { + "input_cost_per_token": 1.25e-06, + "output_cost_per_token": 7.5e-06, + }, + "gpt-5.4-pro": { + "input_cost_per_token": 1.5e-05, + "output_cost_per_token": 9e-05, + }, + "gpt-5.4-mini": { + "input_cost_per_token": 3.75e-07, + "output_cost_per_token": 2.25e-06, + }, + "gpt-5.4-nano": { + "input_cost_per_token": 1e-07, + "output_cost_per_token": 6.25e-07, + }, + # Embedding models (batch pricing) + EMBEDDING_MODEL: { + "input_cost_per_token": 6.5e-08, + }, +} + + +def calculate_token_cost( + model: str, input_tokens: int, output_tokens: int = 0 +) -> float: + """ + Calculate USD cost for a model call given input and output token counts. + + Used for both response generation (input + output tokens) and embeddings + (input tokens only — pass output_tokens=0 or omit). + + Args: + model: OpenAI model name (e.g., "gpt-4o", "text-embedding-3-large") + input_tokens: Number of input/prompt tokens + output_tokens: Number of output tokens (default 0 for embeddings) + + Returns: + Cost in USD. Returns 0.0 if model is unknown. + """ + pricing = MODEL_PRICING.get(model) + if not pricing: + logger.warning( + f"[calculate_token_cost] Unknown model '{model}', returning cost 0.0" + ) + return 0.0 + + input_cost = input_tokens * pricing.get("input_cost_per_token", 0) + output_cost = output_tokens * pricing.get("output_cost_per_token", 0) + return input_cost + output_cost + + +def _sum_usage( + items: Iterable[dict[str, Any]], + usage_extractor: Callable[[dict[str, Any]], dict[str, Any] | None], + fields: tuple[str, ...], +) -> dict[str, int]: + """ + Sum named token fields across items, using a caller-supplied extractor + to locate the per-item usage dict. + + Args: + items: Iterable of items to aggregate + usage_extractor: Function returning the usage dict for an item, or None + fields: Token field names to sum (e.g., "input_tokens", "total_tokens") + + Returns: + Mapping of field name to summed value + """ + totals: dict[str, int] = {field: 0 for field in fields} + for item in items: + usage = usage_extractor(item) + if not usage: + continue + for field in fields: + totals[field] += usage.get(field, 0) + return totals + + +def build_response_cost_entry( + model: str, results: list[dict[str, Any]] +) -> dict[str, Any]: + """ + Aggregate token usage from parsed evaluation results and calculate cost. + + Args: + model: OpenAI model name used for response generation + results: Parsed evaluation results from parse_evaluation_output(), + each containing a "usage" dict with input_tokens/output_tokens/total_tokens + + Returns: + Response cost entry for the cost JSONB field + """ + totals = _sum_usage( + items=results, + usage_extractor=lambda r: r.get("usage"), + fields=("input_tokens", "output_tokens", "total_tokens"), + ) + + cost_usd = calculate_token_cost( + model=model, + input_tokens=totals["input_tokens"], + output_tokens=totals["output_tokens"], + ) + + return { + "model": model, + "input_tokens": totals["input_tokens"], + "output_tokens": totals["output_tokens"], + "total_tokens": totals["total_tokens"], + "cost_usd": round(cost_usd, COST_USD_DECIMALS), + } + + +def build_embedding_cost_entry( + model: str, raw_results: list[dict[str, Any]] +) -> dict[str, Any]: + """ + Aggregate token usage from raw embedding batch results and calculate cost. + + Args: + model: OpenAI embedding model name + raw_results: Raw JSONL lines from embedding batch output, + each containing response.body.usage with prompt_tokens/total_tokens + + Returns: + Embedding cost entry for the cost JSONB field + """ + totals = _sum_usage( + items=raw_results, + usage_extractor=lambda r: r.get("response", {}).get("body", {}).get("usage"), + fields=("prompt_tokens", "total_tokens"), + ) + + cost_usd = calculate_token_cost(model=model, input_tokens=totals["prompt_tokens"]) + + return { + "model": model, + "prompt_tokens": totals["prompt_tokens"], + "total_tokens": totals["total_tokens"], + "cost_usd": round(cost_usd, COST_USD_DECIMALS), + } + + +def build_cost_dict( + response_entry: dict[str, Any] | None = None, + embedding_entry: dict[str, Any] | None = None, +) -> dict[str, Any]: + """ + Combine response and embedding cost entries into the final cost JSONB structure. + + Args: + response_entry: Response cost entry from build_response_cost_entry() + embedding_entry: Embedding cost entry from build_embedding_cost_entry() + + Returns: + Combined cost dict with total_cost_usd + """ + cost: dict[str, Any] = {} + + response_cost = 0.0 + embedding_cost = 0.0 + + if response_entry: + cost["response"] = response_entry + response_cost = response_entry.get("cost_usd", 0.0) + + if embedding_entry: + cost["embedding"] = embedding_entry + embedding_cost = embedding_entry.get("cost_usd", 0.0) + + cost["total_cost_usd"] = round(response_cost + embedding_cost, COST_USD_DECIMALS) + + return cost diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index 1fa82b39f..77502fe8a 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -29,10 +29,16 @@ from app.crud.evaluations.batch import fetch_dataset_items from app.crud.evaluations.core import update_evaluation_run, resolve_model_from_config from app.crud.evaluations.embeddings import ( + EMBEDDING_MODEL, calculate_average_similarity, parse_embedding_results, start_embedding_batch, ) +from app.crud.evaluations.pricing import ( + build_cost_dict, + build_embedding_cost_entry, + build_response_cost_entry, +) from app.crud.evaluations.langfuse import ( create_langfuse_dataset_run, update_traces_with_cosine_scores, @@ -45,6 +51,58 @@ logger = logging.getLogger(__name__) +def _safe_attach_cost( + eval_run: EvaluationRun, + log_prefix: str, + *, + response_model: str | None = None, + response_results: list[dict[str, Any]] | None = None, + embedding_model: str | None = None, + embedding_raw_results: list[dict[str, Any]] | None = None, +) -> None: + """ + Compute and attach a cost dict to eval_run.cost without raising. + + Cost-tracking failures must never block evaluation completion, so any + exception is logged and swallowed. The caller is responsible for + persisting eval_run via update_evaluation_run. + + When called for the embedding stage only, any previously-computed + response entry on eval_run.cost is preserved. + + Args: + eval_run: EvaluationRun whose cost field will be set + log_prefix: Caller-provided log prefix (org/project/eval ids) + response_model: Model name for response cost (response stage only) + response_results: Parsed evaluation results (response stage only) + embedding_model: Model name for embedding cost (embedding stage only) + embedding_raw_results: Raw embedding batch results (embedding stage only) + """ + try: + if response_model is not None and response_results is not None: + response_entry = build_response_cost_entry( + model=response_model, results=response_results + ) + else: + # Preserve any response entry computed during an earlier stage. + response_entry = (eval_run.cost or {}).get("response") + + embedding_entry: dict[str, Any] | None = None + if embedding_model is not None and embedding_raw_results is not None: + embedding_entry = build_embedding_cost_entry( + model=embedding_model, raw_results=embedding_raw_results + ) + + eval_run.cost = build_cost_dict( + response_entry=response_entry, + embedding_entry=embedding_entry, + ) + except Exception as cost_err: + logger.warning( + f"[_safe_attach_cost] {log_prefix} Failed to compute cost | {cost_err}" + ) + + def _extract_batch_error_message( provider: OpenAIBatchProvider, error_file_id: str, @@ -332,6 +390,15 @@ async def process_completed_evaluation( # Use model stored at creation time for cost tracking model = resolve_model_from_config(session=session, eval_run=eval_run) + # Aggregate response generation cost + _safe_attach_cost( + eval_run=eval_run, + log_prefix=log_prefix, + response_model=model, + response_results=results, + ) + update_evaluation_run(session=session, eval_run=eval_run, cost=eval_run.cost) + trace_id_mapping = create_langfuse_dataset_run( langfuse=langfuse, dataset_name=eval_run.dataset_name, @@ -488,9 +555,21 @@ async def process_completed_embedding_batch( exc_info=True, ) - # Step 7: Mark evaluation as completed + # Step 7: Accumulate embedding cost onto existing response cost + _safe_attach_cost( + eval_run=eval_run, + log_prefix=log_prefix, + embedding_model=EMBEDDING_MODEL, + embedding_raw_results=raw_results, + ) + + # Step 8: Mark evaluation as completed eval_run = update_evaluation_run( - session=session, eval_run=eval_run, status="completed", score=eval_run.score + session=session, + eval_run=eval_run, + status="completed", + score=eval_run.score, + cost=eval_run.cost, ) logger.info( diff --git a/backend/app/models/evaluation.py b/backend/app/models/evaluation.py index d2d2beecc..fddc255cb 100644 --- a/backend/app/models/evaluation.py +++ b/backend/app/models/evaluation.py @@ -313,6 +313,17 @@ class EvaluationRun(SQLModel, table=True): description="Evaluation scores (e.g., correctness, cosine_similarity, etc.)", ) + # Cost tracking field + cost: dict[str, Any] | None = SQLField( + default=None, + sa_column=Column( + JSONB, + nullable=True, + comment="Cost tracking (response/embedding tokens and USD)", + ), + description="Cost breakdown by stage (response, embedding) with token counts and USD", + ) + # Error message field error_message: str | None = SQLField( default=None, @@ -397,6 +408,7 @@ class EvaluationRunPublic(SQLModel): object_store_url: str | None total_items: int score: dict[str, Any] | None + cost: dict[str, Any] | None error_message: str | None organization_id: int project_id: int diff --git a/backend/app/tests/crud/evaluations/test_pricing.py b/backend/app/tests/crud/evaluations/test_pricing.py new file mode 100644 index 000000000..b938cb7dc --- /dev/null +++ b/backend/app/tests/crud/evaluations/test_pricing.py @@ -0,0 +1,287 @@ +import pytest + +from app.crud.evaluations.pricing import ( + COST_USD_DECIMALS, + MODEL_PRICING, + build_cost_dict, + build_embedding_cost_entry, + build_response_cost_entry, + calculate_token_cost, +) + + +class TestCalculateTokenCost: + """Tests for calculate_token_cost function.""" + + def test_known_chat_model_input_and_output(self) -> None: + """Cost is sum of input and output token costs for a known chat model.""" + pricing = MODEL_PRICING["gpt-4o"] + expected = ( + 1000 * pricing["input_cost_per_token"] + + 500 * pricing["output_cost_per_token"] + ) + + cost = calculate_token_cost( + model="gpt-4o", input_tokens=1000, output_tokens=500 + ) + + assert cost == pytest.approx(expected) + + def test_known_embedding_model_defaults_output_tokens_to_zero(self) -> None: + """Embedding models charge only for input tokens; output_tokens defaults to 0.""" + pricing = MODEL_PRICING["text-embedding-3-large"] + expected = 2000 * pricing["input_cost_per_token"] + + cost = calculate_token_cost(model="text-embedding-3-large", input_tokens=2000) + + assert cost == pytest.approx(expected) + + def test_unknown_model_returns_zero(self) -> None: + """Unknown models return 0.0 instead of raising.""" + cost = calculate_token_cost( + model="not-a-real-model", input_tokens=100, output_tokens=50 + ) + + assert cost == 0.0 + + def test_zero_tokens_returns_zero(self) -> None: + """Zero tokens for a known model returns zero cost.""" + cost = calculate_token_cost(model="gpt-4o", input_tokens=0, output_tokens=0) + + assert cost == 0.0 + + def test_embedding_model_with_explicit_output_tokens(self) -> None: + """Passing output_tokens to an embedding model adds 0 cost (no output rate).""" + pricing = MODEL_PRICING["text-embedding-3-large"] + expected = 100 * pricing["input_cost_per_token"] + + cost = calculate_token_cost( + model="text-embedding-3-large", input_tokens=100, output_tokens=999 + ) + + assert cost == pytest.approx(expected) + + +class TestBuildResponseCostEntry: + """Tests for build_response_cost_entry function.""" + + def test_basic_aggregation(self) -> None: + """Sums input/output/total tokens across results and computes USD cost.""" + results = [ + { + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + } + }, + { + "usage": { + "input_tokens": 200, + "output_tokens": 75, + "total_tokens": 275, + } + }, + ] + + entry = build_response_cost_entry(model="gpt-4o", results=results) + + assert entry["model"] == "gpt-4o" + assert entry["input_tokens"] == 300 + assert entry["output_tokens"] == 125 + assert entry["total_tokens"] == 425 + pricing = MODEL_PRICING["gpt-4o"] + expected_cost = round( + 300 * pricing["input_cost_per_token"] + + 125 * pricing["output_cost_per_token"], + COST_USD_DECIMALS, + ) + assert entry["cost_usd"] == expected_cost + + def test_empty_results(self) -> None: + """Empty results yields zero tokens and zero cost.""" + entry = build_response_cost_entry(model="gpt-4o", results=[]) + + assert entry["input_tokens"] == 0 + assert entry["output_tokens"] == 0 + assert entry["total_tokens"] == 0 + assert entry["cost_usd"] == 0.0 + + def test_results_missing_usage_are_skipped(self) -> None: + """Items without a usage dict are skipped without raising.""" + results = [ + {"usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}}, + {}, # No usage key + {"usage": None}, # Explicit None + ] + + entry = build_response_cost_entry(model="gpt-4o", results=results) + + assert entry["input_tokens"] == 10 + assert entry["output_tokens"] == 5 + assert entry["total_tokens"] == 15 + + def test_unknown_model_yields_zero_cost(self) -> None: + """Unknown model still aggregates token counts but reports zero cost.""" + results = [ + {"usage": {"input_tokens": 100, "output_tokens": 50, "total_tokens": 150}} + ] + + entry = build_response_cost_entry(model="mystery-model", results=results) + + assert entry["input_tokens"] == 100 + assert entry["output_tokens"] == 50 + assert entry["cost_usd"] == 0.0 + + +class TestBuildEmbeddingCostEntry: + """Tests for build_embedding_cost_entry function.""" + + def test_basic_aggregation(self) -> None: + """Sums prompt/total tokens from raw batch results and computes USD cost.""" + raw_results = [ + { + "response": { + "body": {"usage": {"prompt_tokens": 100, "total_tokens": 100}} + } + }, + { + "response": { + "body": {"usage": {"prompt_tokens": 250, "total_tokens": 250}} + } + }, + ] + + entry = build_embedding_cost_entry( + model="text-embedding-3-large", raw_results=raw_results + ) + + assert entry["model"] == "text-embedding-3-large" + assert entry["prompt_tokens"] == 350 + assert entry["total_tokens"] == 350 + pricing = MODEL_PRICING["text-embedding-3-large"] + expected_cost = round(350 * pricing["input_cost_per_token"], COST_USD_DECIMALS) + assert entry["cost_usd"] == expected_cost + + def test_empty_raw_results(self) -> None: + """Empty raw_results yields zero tokens and zero cost.""" + entry = build_embedding_cost_entry( + model="text-embedding-3-large", raw_results=[] + ) + + assert entry["prompt_tokens"] == 0 + assert entry["total_tokens"] == 0 + assert entry["cost_usd"] == 0.0 + + def test_results_missing_usage_are_skipped(self) -> None: + """Items without nested usage are skipped (e.g., error rows).""" + raw_results = [ + { + "response": { + "body": {"usage": {"prompt_tokens": 50, "total_tokens": 50}} + } + }, + {"error": {"message": "Rate limited"}}, # No response.body.usage + {"response": {"body": {}}}, # body present, usage missing + ] + + entry = build_embedding_cost_entry( + model="text-embedding-3-large", raw_results=raw_results + ) + + assert entry["prompt_tokens"] == 50 + assert entry["total_tokens"] == 50 + + def test_unknown_model_yields_zero_cost(self) -> None: + """Unknown embedding model still aggregates tokens but reports zero cost.""" + raw_results = [ + { + "response": { + "body": {"usage": {"prompt_tokens": 100, "total_tokens": 100}} + } + } + ] + + entry = build_embedding_cost_entry( + model="mystery-embed", raw_results=raw_results + ) + + assert entry["prompt_tokens"] == 100 + assert entry["cost_usd"] == 0.0 + + +class TestBuildCostDict: + """Tests for build_cost_dict function.""" + + def test_response_only(self) -> None: + """Only response entry → embedding key absent, total = response cost.""" + response_entry = { + "model": "gpt-4o", + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + "cost_usd": 0.001234, + } + + cost = build_cost_dict(response_entry=response_entry) + + assert cost["response"] == response_entry + assert "embedding" not in cost + assert cost["total_cost_usd"] == 0.001234 + + def test_embedding_only(self) -> None: + """Only embedding entry → response key absent, total = embedding cost.""" + embedding_entry = { + "model": "text-embedding-3-large", + "prompt_tokens": 200, + "total_tokens": 200, + "cost_usd": 0.000013, + } + + cost = build_cost_dict(embedding_entry=embedding_entry) + + assert cost["embedding"] == embedding_entry + assert "response" not in cost + assert cost["total_cost_usd"] == 0.000013 + + def test_both_entries(self) -> None: + """Both entries → both keys present, total = sum of both costs.""" + response_entry = {"cost_usd": 0.001234} + embedding_entry = {"cost_usd": 0.000013} + + cost = build_cost_dict( + response_entry=response_entry, embedding_entry=embedding_entry + ) + + assert cost["response"] == response_entry + assert cost["embedding"] == embedding_entry + assert cost["total_cost_usd"] == round(0.001234 + 0.000013, COST_USD_DECIMALS) + + def test_neither_entry(self) -> None: + """No entries → only total_cost_usd present, equal to 0.0.""" + cost = build_cost_dict() + + assert cost == {"total_cost_usd": 0.0} + + def test_total_is_rounded(self) -> None: + """total_cost_usd is rounded to COST_USD_DECIMALS.""" + response_entry = {"cost_usd": 0.0000001} + embedding_entry = {"cost_usd": 0.0000002} + + cost = build_cost_dict( + response_entry=response_entry, embedding_entry=embedding_entry + ) + + # 0.0000003 rounded to 6 decimals → 0.0 + assert cost["total_cost_usd"] == 0.0 + + def test_entry_missing_cost_usd_treated_as_zero(self) -> None: + """Entries without a cost_usd key default to 0 in the total.""" + response_entry = {"model": "gpt-4o"} # No cost_usd + embedding_entry = {"cost_usd": 0.000050} + + cost = build_cost_dict( + response_entry=response_entry, embedding_entry=embedding_entry + ) + + assert cost["total_cost_usd"] == 0.000050 diff --git a/backend/app/tests/crud/evaluations/test_processing.py b/backend/app/tests/crud/evaluations/test_processing.py index 52162654d..51e2321db 100644 --- a/backend/app/tests/crud/evaluations/test_processing.py +++ b/backend/app/tests/crud/evaluations/test_processing.py @@ -357,7 +357,11 @@ async def test_process_completed_evaluation_success( "body": { "id": "resp_123", "output": "Answer 1", - "usage": {"total_tokens": 10}, + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + }, } }, } @@ -397,6 +401,20 @@ async def test_process_completed_evaluation_success( mock_create_langfuse.assert_called_once() mock_start_embedding.assert_called_once() + # Cost tracking: response cost should be aggregated and persisted. + db.refresh(result) + assert result.cost is not None + assert "response" in result.cost + response_cost = result.cost["response"] + assert response_cost["model"] == "gpt-4o" + assert response_cost["input_tokens"] == 100 + assert response_cost["output_tokens"] == 50 + assert response_cost["total_tokens"] == 150 + assert response_cost["cost_usd"] > 0 + assert result.cost["total_cost_usd"] == response_cost["cost_usd"] + # Embedding cost is added later by process_completed_embedding_batch. + assert "embedding" not in result.cost + @pytest.mark.asyncio @patch("app.crud.evaluations.processing.download_batch_results") @patch("app.crud.evaluations.processing.fetch_dataset_items") @@ -547,7 +565,31 @@ async def test_process_completed_embedding_batch_success( eval_run_with_embedding_batch, ): """Test successfully processing completed embedding batch.""" - mock_download.return_value = [] + # Pre-populate eval_run.cost with a response entry to verify that the + # embedding stage merges (not overwrites) existing cost data. + eval_run_with_embedding_batch.cost = { + "response": { + "model": "gpt-4o", + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + "cost_usd": 0.000375, + }, + "total_cost_usd": 0.000375, + } + db.add(eval_run_with_embedding_batch) + db.commit() + db.refresh(eval_run_with_embedding_batch) + + # Raw results carry the usage payload that build_embedding_cost_entry reads. + mock_download.return_value = [ + { + "custom_id": "trace_123", + "response": { + "body": {"usage": {"prompt_tokens": 200, "total_tokens": 200}} + }, + } + ] mock_parse.return_value = [ { "item_id": "item1", @@ -586,6 +628,21 @@ async def test_process_completed_embedding_batch_success( assert cosine_score is not None assert cosine_score["avg"] == 0.95 + # Cost tracking: embedding entry is added, response entry is preserved, + # and total_cost_usd is the sum of both. + assert result.cost is not None + assert "response" in result.cost + assert "embedding" in result.cost + assert result.cost["response"]["cost_usd"] == 0.000375 + embedding_cost = result.cost["embedding"] + assert embedding_cost["model"] == "text-embedding-3-large" + assert embedding_cost["prompt_tokens"] == 200 + assert embedding_cost["total_tokens"] == 200 + assert embedding_cost["cost_usd"] > 0 + assert result.cost["total_cost_usd"] == pytest.approx( + 0.000375 + embedding_cost["cost_usd"] + ) + @pytest.mark.asyncio @patch("app.crud.evaluations.processing.download_batch_results") @patch("app.crud.evaluations.processing.parse_embedding_results")