From 631f3f40e9ff8b97abb3d3dd8197eb561534b9fc Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 9 Apr 2026 22:36:45 +0530 Subject: [PATCH 1/3] first stab at costing --- .../050_add_cost_to_evaluation_run.py | 33 +++ backend/app/crud/evaluations/__init__.py | 13 ++ backend/app/crud/evaluations/core.py | 3 + backend/app/crud/evaluations/embeddings.py | 4 + backend/app/crud/evaluations/pricing.py | 200 ++++++++++++++++++ backend/app/crud/evaluations/processing.py | 36 +++- backend/app/models/evaluation.py | 12 ++ 7 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 backend/app/alembic/versions/050_add_cost_to_evaluation_run.py create mode 100644 backend/app/crud/evaluations/pricing.py diff --git a/backend/app/alembic/versions/050_add_cost_to_evaluation_run.py b/backend/app/alembic/versions/050_add_cost_to_evaluation_run.py new file mode 100644 index 000000000..6d63de3e8 --- /dev/null +++ b/backend/app/alembic/versions/050_add_cost_to_evaluation_run.py @@ -0,0 +1,33 @@ +"""add cost tracking to evaluation_run + +Revision ID: 050 +Revises: 049 +Create Date: 2026-04-09 12:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "050" +down_revision = "049" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "evaluation_run", + sa.Column( + "cost", + postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + comment="Cost tracking (response/embedding tokens and USD)", + ), + ) + + +def downgrade(): + op.drop_column("evaluation_run", "cost") diff --git a/backend/app/crud/evaluations/__init__.py b/backend/app/crud/evaluations/__init__.py index a5824c0a2..8515d81da 100644 --- a/backend/app/crud/evaluations/__init__.py +++ b/backend/app/crud/evaluations/__init__.py @@ -31,6 +31,13 @@ update_traces_with_cosine_scores, upload_dataset_to_langfuse, ) +from app.crud.evaluations.pricing import ( + build_cost_dict, + build_embedding_cost_entry, + build_response_cost_entry, + calculate_embedding_cost, + calculate_response_cost, +) from app.crud.evaluations.processing import ( check_and_process_evaluation, poll_all_pending_evaluations, @@ -74,6 +81,12 @@ "calculate_average_similarity", "calculate_cosine_similarity", "start_embedding_batch", + # Pricing + "build_cost_dict", + "build_embedding_cost_entry", + "build_response_cost_entry", + "calculate_embedding_cost", + "calculate_response_cost", # Langfuse "create_langfuse_dataset_run", "fetch_trace_scores_from_langfuse", diff --git a/backend/app/crud/evaluations/core.py b/backend/app/crud/evaluations/core.py index 79a3c9d3f..5f1b22ee0 100644 --- a/backend/app/crud/evaluations/core.py +++ b/backend/app/crud/evaluations/core.py @@ -197,6 +197,7 @@ def update_evaluation_run( object_store_url: str | None = None, score_trace_url: str | None = None, score: dict | None = None, + cost: dict | None = None, embedding_batch_job_id: int | None = None, ) -> EvaluationRun: """ @@ -226,6 +227,8 @@ def update_evaluation_run( eval_run.object_store_url = object_store_url if score is not None: eval_run.score = score + if cost is not None: + eval_run.cost = cost if embedding_batch_job_id is not None: eval_run.embedding_batch_job_id = embedding_batch_job_id if score_trace_url is not None: diff --git a/backend/app/crud/evaluations/embeddings.py b/backend/app/crud/evaluations/embeddings.py index d21f186cc..6c2456a04 100644 --- a/backend/app/crud/evaluations/embeddings.py +++ b/backend/app/crud/evaluations/embeddings.py @@ -204,11 +204,15 @@ def parse_embedding_results(raw_results: list[dict[str, Any]]) -> list[dict[str, ) continue + # Extract usage for cost tracking + usage = response_body.get("usage") + embedding_pairs.append( { "trace_id": trace_id, "output_embedding": output_embedding, "ground_truth_embedding": ground_truth_embedding, + "usage": usage, } ) diff --git a/backend/app/crud/evaluations/pricing.py b/backend/app/crud/evaluations/pricing.py new file mode 100644 index 000000000..0702a9385 --- /dev/null +++ b/backend/app/crud/evaluations/pricing.py @@ -0,0 +1,200 @@ +""" +Pricing utilities for evaluation cost tracking. + +This module provides model pricing data and cost calculation functions +for both response generation and embedding stages of evaluation runs. + +Pricing uses OpenAI Batch API rates (50% cheaper than real-time). +Source: https://github.com/BerriAI/litellm/blob/main/model_prices_and_context_window.json +""" + +import logging +from typing import Any + +logger = logging.getLogger(__name__) + +# Batch API pricing in USD per token +MODEL_PRICING: dict[str, dict[str, Any]] = { + # Chat models (batch pricing) + "gpt-4o": { + "mode": "chat", + "input_cost_per_token": 1.25e-06, + "output_cost_per_token": 5e-06, + }, + "gpt-4o-2024-08-06": { + "mode": "chat", + "input_cost_per_token": 1.25e-06, + "output_cost_per_token": 5e-06, + }, + "gpt-4o-mini": { + "mode": "chat", + "input_cost_per_token": 7.5e-08, + "output_cost_per_token": 3e-07, + }, + "gpt-4o-mini-2024-07-18": { + "mode": "chat", + "input_cost_per_token": 7.5e-08, + "output_cost_per_token": 3e-07, + }, + # Embedding models (batch pricing) + "text-embedding-3-large": { + "mode": "embedding", + "input_cost_per_token": 6.5e-08, + }, + "text-embedding-3-small": { + "mode": "embedding", + "input_cost_per_token": 1e-08, + }, + "text-embedding-ada-002": { + "mode": "embedding", + "input_cost_per_token": 1e-07, + }, +} + + +def calculate_response_cost(model: str, input_tokens: int, output_tokens: int) -> float: + """ + Calculate USD cost for response generation. + + Args: + model: OpenAI model name (e.g., "gpt-4o") + input_tokens: Number of input tokens + output_tokens: Number of output tokens + + Returns: + Cost in USD. Returns 0.0 if model is unknown. + """ + pricing = MODEL_PRICING.get(model) + if not pricing: + logger.warning( + f"[calculate_response_cost] Unknown model '{model}', returning cost 0.0" + ) + return 0.0 + + input_cost = input_tokens * pricing.get("input_cost_per_token", 0) + output_cost = output_tokens * pricing.get("output_cost_per_token", 0) + return input_cost + output_cost + + +def calculate_embedding_cost(model: str, prompt_tokens: int) -> float: + """ + Calculate USD cost for embeddings. + + Args: + model: OpenAI embedding model name (e.g., "text-embedding-3-large") + prompt_tokens: Number of prompt tokens + + Returns: + Cost in USD. Returns 0.0 if model is unknown. + """ + pricing = MODEL_PRICING.get(model) + if not pricing: + logger.warning( + f"[calculate_embedding_cost] Unknown model '{model}', returning cost 0.0" + ) + return 0.0 + + return prompt_tokens * pricing.get("input_cost_per_token", 0) + + +def build_response_cost_entry( + model: str, results: list[dict[str, Any]] +) -> dict[str, Any]: + """ + Aggregate token usage from parsed evaluation results and calculate cost. + + Args: + model: OpenAI model name used for response generation + results: Parsed evaluation results from parse_evaluation_output(), + each containing a "usage" dict with input_tokens/output_tokens/total_tokens + + Returns: + Response cost entry for the cost JSONB field + """ + total_input_tokens = 0 + total_output_tokens = 0 + total_tokens = 0 + + for result in results: + usage = result.get("usage") + if not usage: + continue + total_input_tokens += usage.get("input_tokens", 0) + total_output_tokens += usage.get("output_tokens", 0) + total_tokens += usage.get("total_tokens", 0) + + cost_usd = calculate_response_cost(model, total_input_tokens, total_output_tokens) + + return { + "model": model, + "input_tokens": total_input_tokens, + "output_tokens": total_output_tokens, + "total_tokens": total_tokens, + "cost_usd": round(cost_usd, 6), + } + + +def build_embedding_cost_entry( + model: str, raw_results: list[dict[str, Any]] +) -> dict[str, Any]: + """ + Aggregate token usage from raw embedding batch results and calculate cost. + + Args: + model: OpenAI embedding model name + raw_results: Raw JSONL lines from embedding batch output, + each containing response.body.usage with prompt_tokens/total_tokens + + Returns: + Embedding cost entry for the cost JSONB field + """ + total_prompt_tokens = 0 + total_tokens = 0 + + for response in raw_results: + usage = response.get("response", {}).get("body", {}).get("usage") + if not usage: + continue + total_prompt_tokens += usage.get("prompt_tokens", 0) + total_tokens += usage.get("total_tokens", 0) + + cost_usd = calculate_embedding_cost(model, total_prompt_tokens) + + return { + "model": model, + "prompt_tokens": total_prompt_tokens, + "total_tokens": total_tokens, + "cost_usd": round(cost_usd, 6), + } + + +def build_cost_dict( + response_entry: dict[str, Any] | None = None, + embedding_entry: dict[str, Any] | None = None, +) -> dict[str, Any]: + """ + Combine response and embedding cost entries into the final cost JSONB structure. + + Args: + response_entry: Response cost entry from build_response_cost_entry() + embedding_entry: Embedding cost entry from build_embedding_cost_entry() + + Returns: + Combined cost dict with total_cost_usd + """ + cost: dict[str, Any] = {} + + response_cost = 0.0 + embedding_cost = 0.0 + + if response_entry: + cost["response"] = response_entry + response_cost = response_entry.get("cost_usd", 0.0) + + if embedding_entry: + cost["embedding"] = embedding_entry + embedding_cost = embedding_entry.get("cost_usd", 0.0) + + cost["total_cost_usd"] = round(response_cost + embedding_cost, 6) + + return cost diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index 1fa82b39f..e91a13002 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -29,10 +29,16 @@ from app.crud.evaluations.batch import fetch_dataset_items from app.crud.evaluations.core import update_evaluation_run, resolve_model_from_config from app.crud.evaluations.embeddings import ( + EMBEDDING_MODEL, calculate_average_similarity, parse_embedding_results, start_embedding_batch, ) +from app.crud.evaluations.pricing import ( + build_cost_dict, + build_embedding_cost_entry, + build_response_cost_entry, +) from app.crud.evaluations.langfuse import ( create_langfuse_dataset_run, update_traces_with_cosine_scores, @@ -332,6 +338,18 @@ async def process_completed_evaluation( # Use model stored at creation time for cost tracking model = resolve_model_from_config(session=session, eval_run=eval_run) + # Aggregate response generation cost + try: + response_cost_entry = build_response_cost_entry( + model=model, results=results + ) + cost = build_cost_dict(response_entry=response_cost_entry) + update_evaluation_run(session=session, eval_run=eval_run, cost=cost) + except Exception as cost_err: + logger.warning( + f"[process_completed_evaluation] {log_prefix} Failed to calculate response cost | {cost_err}" + ) + trace_id_mapping = create_langfuse_dataset_run( langfuse=langfuse, dataset_name=eval_run.dataset_name, @@ -488,7 +506,23 @@ async def process_completed_embedding_batch( exc_info=True, ) - # Step 7: Mark evaluation as completed + # Step 7: Accumulate embedding cost onto existing response cost + try: + embedding_cost_entry = build_embedding_cost_entry( + model=EMBEDDING_MODEL, raw_results=raw_results + ) + existing_cost = eval_run.cost or {} + response_entry = existing_cost.get("response") + eval_run.cost = build_cost_dict( + response_entry=response_entry, + embedding_entry=embedding_cost_entry, + ) + except Exception as cost_err: + logger.warning( + f"[process_completed_embedding_batch] {log_prefix} Failed to calculate embedding cost | {cost_err}" + ) + + # Step 8: Mark evaluation as completed eval_run = update_evaluation_run( session=session, eval_run=eval_run, status="completed", score=eval_run.score ) diff --git a/backend/app/models/evaluation.py b/backend/app/models/evaluation.py index d2d2beecc..fddc255cb 100644 --- a/backend/app/models/evaluation.py +++ b/backend/app/models/evaluation.py @@ -313,6 +313,17 @@ class EvaluationRun(SQLModel, table=True): description="Evaluation scores (e.g., correctness, cosine_similarity, etc.)", ) + # Cost tracking field + cost: dict[str, Any] | None = SQLField( + default=None, + sa_column=Column( + JSONB, + nullable=True, + comment="Cost tracking (response/embedding tokens and USD)", + ), + description="Cost breakdown by stage (response, embedding) with token counts and USD", + ) + # Error message field error_message: str | None = SQLField( default=None, @@ -397,6 +408,7 @@ class EvaluationRunPublic(SQLModel): object_store_url: str | None total_items: int score: dict[str, Any] | None + cost: dict[str, Any] | None error_message: str | None organization_id: int project_id: int From b6750f0509503646d145891fdc73e42935b67b43 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 9 Apr 2026 23:27:49 +0530 Subject: [PATCH 2/3] minor fixes --- backend/app/crud/evaluations/pricing.py | 59 ++++++++++++++++++------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/backend/app/crud/evaluations/pricing.py b/backend/app/crud/evaluations/pricing.py index 0702a9385..ffd14c8fa 100644 --- a/backend/app/crud/evaluations/pricing.py +++ b/backend/app/crud/evaluations/pricing.py @@ -15,40 +15,65 @@ # Batch API pricing in USD per token MODEL_PRICING: dict[str, dict[str, Any]] = { - # Chat models (batch pricing) + # GPT-4o (batch pricing) "gpt-4o": { "mode": "chat", "input_cost_per_token": 1.25e-06, "output_cost_per_token": 5e-06, }, - "gpt-4o-2024-08-06": { - "mode": "chat", - "input_cost_per_token": 1.25e-06, - "output_cost_per_token": 5e-06, - }, "gpt-4o-mini": { "mode": "chat", "input_cost_per_token": 7.5e-08, "output_cost_per_token": 3e-07, }, - "gpt-4o-mini-2024-07-18": { + # GPT-4.1 (batch pricing) + "gpt-4.1": { "mode": "chat", - "input_cost_per_token": 7.5e-08, - "output_cost_per_token": 3e-07, + "input_cost_per_token": 1e-06, + "output_cost_per_token": 4e-06, + }, + # GPT-5 (batch pricing) + "gpt-5": { + "mode": "chat", + "input_cost_per_token": 6.25e-07, + "output_cost_per_token": 5e-06, + }, + "gpt-5-mini": { + "mode": "chat", + "input_cost_per_token": 1.25e-07, + "output_cost_per_token": 1e-06, + }, + "gpt-5-nano": { + "mode": "chat", + "input_cost_per_token": 2.5e-08, + "output_cost_per_token": 2e-07, + }, + # GPT-5.4 (batch pricing) + "gpt-5.4": { + "mode": "chat", + "input_cost_per_token": 1.25e-06, + "output_cost_per_token": 7.5e-06, + }, + "gpt-5.4-pro": { + "mode": "chat", + "input_cost_per_token": 1.5e-05, + "output_cost_per_token": 9e-05, + }, + "gpt-5.4-mini": { + "mode": "chat", + "input_cost_per_token": 3.75e-07, + "output_cost_per_token": 2.25e-06, + }, + "gpt-5.4-nano": { + "mode": "chat", + "input_cost_per_token": 1e-07, + "output_cost_per_token": 6.25e-07, }, # Embedding models (batch pricing) "text-embedding-3-large": { "mode": "embedding", "input_cost_per_token": 6.5e-08, }, - "text-embedding-3-small": { - "mode": "embedding", - "input_cost_per_token": 1e-08, - }, - "text-embedding-ada-002": { - "mode": "embedding", - "input_cost_per_token": 1e-07, - }, } From 63eb9428a435244e844fc5f639d30063427cd99f Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Sat, 11 Apr 2026 11:40:46 +0530 Subject: [PATCH 3/3] cleanup --- backend/app/crud/evaluations/__init__.py | 6 +- backend/app/crud/evaluations/core.py | 2 + backend/app/crud/evaluations/embeddings.py | 4 - backend/app/crud/evaluations/pricing.py | 128 ++++---- backend/app/crud/evaluations/processing.py | 95 ++++-- .../tests/crud/evaluations/test_pricing.py | 287 ++++++++++++++++++ .../tests/crud/evaluations/test_processing.py | 61 +++- 7 files changed, 484 insertions(+), 99 deletions(-) create mode 100644 backend/app/tests/crud/evaluations/test_pricing.py diff --git a/backend/app/crud/evaluations/__init__.py b/backend/app/crud/evaluations/__init__.py index 8515d81da..64dfb8a3a 100644 --- a/backend/app/crud/evaluations/__init__.py +++ b/backend/app/crud/evaluations/__init__.py @@ -35,8 +35,7 @@ build_cost_dict, build_embedding_cost_entry, build_response_cost_entry, - calculate_embedding_cost, - calculate_response_cost, + calculate_token_cost, ) from app.crud.evaluations.processing import ( check_and_process_evaluation, @@ -85,8 +84,7 @@ "build_cost_dict", "build_embedding_cost_entry", "build_response_cost_entry", - "calculate_embedding_cost", - "calculate_response_cost", + "calculate_token_cost", # Langfuse "create_langfuse_dataset_run", "fetch_trace_scores_from_langfuse", diff --git a/backend/app/crud/evaluations/core.py b/backend/app/crud/evaluations/core.py index 5f1b22ee0..e52d77cdc 100644 --- a/backend/app/crud/evaluations/core.py +++ b/backend/app/crud/evaluations/core.py @@ -212,7 +212,9 @@ def update_evaluation_run( status: New status value (optional) error_message: New error message (optional) object_store_url: New object store URL (optional) + score_trace_url: New per-trace score S3 URL (optional) score: New score dict (optional) + cost: New cost dict (optional) embedding_batch_job_id: New embedding batch job ID (optional) Returns: diff --git a/backend/app/crud/evaluations/embeddings.py b/backend/app/crud/evaluations/embeddings.py index 6c2456a04..d21f186cc 100644 --- a/backend/app/crud/evaluations/embeddings.py +++ b/backend/app/crud/evaluations/embeddings.py @@ -204,15 +204,11 @@ def parse_embedding_results(raw_results: list[dict[str, Any]]) -> list[dict[str, ) continue - # Extract usage for cost tracking - usage = response_body.get("usage") - embedding_pairs.append( { "trace_id": trace_id, "output_embedding": output_embedding, "ground_truth_embedding": ground_truth_embedding, - "usage": usage, } ) diff --git a/backend/app/crud/evaluations/pricing.py b/backend/app/crud/evaluations/pricing.py index ffd14c8fa..be3d98791 100644 --- a/backend/app/crud/evaluations/pricing.py +++ b/backend/app/crud/evaluations/pricing.py @@ -9,82 +9,82 @@ """ import logging +from collections.abc import Callable, Iterable from typing import Any +from app.crud.evaluations.embeddings import EMBEDDING_MODEL + logger = logging.getLogger(__name__) -# Batch API pricing in USD per token -MODEL_PRICING: dict[str, dict[str, Any]] = { +# Number of decimals to round USD cost values to. +COST_USD_DECIMALS = 6 + +# Batch API pricing in USD per token. +MODEL_PRICING: dict[str, dict[str, float]] = { # GPT-4o (batch pricing) "gpt-4o": { - "mode": "chat", "input_cost_per_token": 1.25e-06, "output_cost_per_token": 5e-06, }, "gpt-4o-mini": { - "mode": "chat", "input_cost_per_token": 7.5e-08, "output_cost_per_token": 3e-07, }, # GPT-4.1 (batch pricing) "gpt-4.1": { - "mode": "chat", "input_cost_per_token": 1e-06, "output_cost_per_token": 4e-06, }, # GPT-5 (batch pricing) "gpt-5": { - "mode": "chat", "input_cost_per_token": 6.25e-07, "output_cost_per_token": 5e-06, }, "gpt-5-mini": { - "mode": "chat", "input_cost_per_token": 1.25e-07, "output_cost_per_token": 1e-06, }, "gpt-5-nano": { - "mode": "chat", "input_cost_per_token": 2.5e-08, "output_cost_per_token": 2e-07, }, # GPT-5.4 (batch pricing) "gpt-5.4": { - "mode": "chat", "input_cost_per_token": 1.25e-06, "output_cost_per_token": 7.5e-06, }, "gpt-5.4-pro": { - "mode": "chat", "input_cost_per_token": 1.5e-05, "output_cost_per_token": 9e-05, }, "gpt-5.4-mini": { - "mode": "chat", "input_cost_per_token": 3.75e-07, "output_cost_per_token": 2.25e-06, }, "gpt-5.4-nano": { - "mode": "chat", "input_cost_per_token": 1e-07, "output_cost_per_token": 6.25e-07, }, # Embedding models (batch pricing) - "text-embedding-3-large": { - "mode": "embedding", + EMBEDDING_MODEL: { "input_cost_per_token": 6.5e-08, }, } -def calculate_response_cost(model: str, input_tokens: int, output_tokens: int) -> float: +def calculate_token_cost( + model: str, input_tokens: int, output_tokens: int = 0 +) -> float: """ - Calculate USD cost for response generation. + Calculate USD cost for a model call given input and output token counts. + + Used for both response generation (input + output tokens) and embeddings + (input tokens only — pass output_tokens=0 or omit). Args: - model: OpenAI model name (e.g., "gpt-4o") - input_tokens: Number of input tokens - output_tokens: Number of output tokens + model: OpenAI model name (e.g., "gpt-4o", "text-embedding-3-large") + input_tokens: Number of input/prompt tokens + output_tokens: Number of output tokens (default 0 for embeddings) Returns: Cost in USD. Returns 0.0 if model is unknown. @@ -92,7 +92,7 @@ def calculate_response_cost(model: str, input_tokens: int, output_tokens: int) - pricing = MODEL_PRICING.get(model) if not pricing: logger.warning( - f"[calculate_response_cost] Unknown model '{model}', returning cost 0.0" + f"[calculate_token_cost] Unknown model '{model}', returning cost 0.0" ) return 0.0 @@ -101,25 +101,31 @@ def calculate_response_cost(model: str, input_tokens: int, output_tokens: int) - return input_cost + output_cost -def calculate_embedding_cost(model: str, prompt_tokens: int) -> float: +def _sum_usage( + items: Iterable[dict[str, Any]], + usage_extractor: Callable[[dict[str, Any]], dict[str, Any] | None], + fields: tuple[str, ...], +) -> dict[str, int]: """ - Calculate USD cost for embeddings. + Sum named token fields across items, using a caller-supplied extractor + to locate the per-item usage dict. Args: - model: OpenAI embedding model name (e.g., "text-embedding-3-large") - prompt_tokens: Number of prompt tokens + items: Iterable of items to aggregate + usage_extractor: Function returning the usage dict for an item, or None + fields: Token field names to sum (e.g., "input_tokens", "total_tokens") Returns: - Cost in USD. Returns 0.0 if model is unknown. + Mapping of field name to summed value """ - pricing = MODEL_PRICING.get(model) - if not pricing: - logger.warning( - f"[calculate_embedding_cost] Unknown model '{model}', returning cost 0.0" - ) - return 0.0 - - return prompt_tokens * pricing.get("input_cost_per_token", 0) + totals: dict[str, int] = {field: 0 for field in fields} + for item in items: + usage = usage_extractor(item) + if not usage: + continue + for field in fields: + totals[field] += usage.get(field, 0) + return totals def build_response_cost_entry( @@ -136,26 +142,24 @@ def build_response_cost_entry( Returns: Response cost entry for the cost JSONB field """ - total_input_tokens = 0 - total_output_tokens = 0 - total_tokens = 0 - - for result in results: - usage = result.get("usage") - if not usage: - continue - total_input_tokens += usage.get("input_tokens", 0) - total_output_tokens += usage.get("output_tokens", 0) - total_tokens += usage.get("total_tokens", 0) - - cost_usd = calculate_response_cost(model, total_input_tokens, total_output_tokens) + totals = _sum_usage( + items=results, + usage_extractor=lambda r: r.get("usage"), + fields=("input_tokens", "output_tokens", "total_tokens"), + ) + + cost_usd = calculate_token_cost( + model=model, + input_tokens=totals["input_tokens"], + output_tokens=totals["output_tokens"], + ) return { "model": model, - "input_tokens": total_input_tokens, - "output_tokens": total_output_tokens, - "total_tokens": total_tokens, - "cost_usd": round(cost_usd, 6), + "input_tokens": totals["input_tokens"], + "output_tokens": totals["output_tokens"], + "total_tokens": totals["total_tokens"], + "cost_usd": round(cost_usd, COST_USD_DECIMALS), } @@ -173,23 +177,19 @@ def build_embedding_cost_entry( Returns: Embedding cost entry for the cost JSONB field """ - total_prompt_tokens = 0 - total_tokens = 0 - - for response in raw_results: - usage = response.get("response", {}).get("body", {}).get("usage") - if not usage: - continue - total_prompt_tokens += usage.get("prompt_tokens", 0) - total_tokens += usage.get("total_tokens", 0) + totals = _sum_usage( + items=raw_results, + usage_extractor=lambda r: r.get("response", {}).get("body", {}).get("usage"), + fields=("prompt_tokens", "total_tokens"), + ) - cost_usd = calculate_embedding_cost(model, total_prompt_tokens) + cost_usd = calculate_token_cost(model=model, input_tokens=totals["prompt_tokens"]) return { "model": model, - "prompt_tokens": total_prompt_tokens, - "total_tokens": total_tokens, - "cost_usd": round(cost_usd, 6), + "prompt_tokens": totals["prompt_tokens"], + "total_tokens": totals["total_tokens"], + "cost_usd": round(cost_usd, COST_USD_DECIMALS), } @@ -220,6 +220,6 @@ def build_cost_dict( cost["embedding"] = embedding_entry embedding_cost = embedding_entry.get("cost_usd", 0.0) - cost["total_cost_usd"] = round(response_cost + embedding_cost, 6) + cost["total_cost_usd"] = round(response_cost + embedding_cost, COST_USD_DECIMALS) return cost diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index e91a13002..77502fe8a 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -51,6 +51,58 @@ logger = logging.getLogger(__name__) +def _safe_attach_cost( + eval_run: EvaluationRun, + log_prefix: str, + *, + response_model: str | None = None, + response_results: list[dict[str, Any]] | None = None, + embedding_model: str | None = None, + embedding_raw_results: list[dict[str, Any]] | None = None, +) -> None: + """ + Compute and attach a cost dict to eval_run.cost without raising. + + Cost-tracking failures must never block evaluation completion, so any + exception is logged and swallowed. The caller is responsible for + persisting eval_run via update_evaluation_run. + + When called for the embedding stage only, any previously-computed + response entry on eval_run.cost is preserved. + + Args: + eval_run: EvaluationRun whose cost field will be set + log_prefix: Caller-provided log prefix (org/project/eval ids) + response_model: Model name for response cost (response stage only) + response_results: Parsed evaluation results (response stage only) + embedding_model: Model name for embedding cost (embedding stage only) + embedding_raw_results: Raw embedding batch results (embedding stage only) + """ + try: + if response_model is not None and response_results is not None: + response_entry = build_response_cost_entry( + model=response_model, results=response_results + ) + else: + # Preserve any response entry computed during an earlier stage. + response_entry = (eval_run.cost or {}).get("response") + + embedding_entry: dict[str, Any] | None = None + if embedding_model is not None and embedding_raw_results is not None: + embedding_entry = build_embedding_cost_entry( + model=embedding_model, raw_results=embedding_raw_results + ) + + eval_run.cost = build_cost_dict( + response_entry=response_entry, + embedding_entry=embedding_entry, + ) + except Exception as cost_err: + logger.warning( + f"[_safe_attach_cost] {log_prefix} Failed to compute cost | {cost_err}" + ) + + def _extract_batch_error_message( provider: OpenAIBatchProvider, error_file_id: str, @@ -339,16 +391,13 @@ async def process_completed_evaluation( model = resolve_model_from_config(session=session, eval_run=eval_run) # Aggregate response generation cost - try: - response_cost_entry = build_response_cost_entry( - model=model, results=results - ) - cost = build_cost_dict(response_entry=response_cost_entry) - update_evaluation_run(session=session, eval_run=eval_run, cost=cost) - except Exception as cost_err: - logger.warning( - f"[process_completed_evaluation] {log_prefix} Failed to calculate response cost | {cost_err}" - ) + _safe_attach_cost( + eval_run=eval_run, + log_prefix=log_prefix, + response_model=model, + response_results=results, + ) + update_evaluation_run(session=session, eval_run=eval_run, cost=eval_run.cost) trace_id_mapping = create_langfuse_dataset_run( langfuse=langfuse, @@ -507,24 +556,20 @@ async def process_completed_embedding_batch( ) # Step 7: Accumulate embedding cost onto existing response cost - try: - embedding_cost_entry = build_embedding_cost_entry( - model=EMBEDDING_MODEL, raw_results=raw_results - ) - existing_cost = eval_run.cost or {} - response_entry = existing_cost.get("response") - eval_run.cost = build_cost_dict( - response_entry=response_entry, - embedding_entry=embedding_cost_entry, - ) - except Exception as cost_err: - logger.warning( - f"[process_completed_embedding_batch] {log_prefix} Failed to calculate embedding cost | {cost_err}" - ) + _safe_attach_cost( + eval_run=eval_run, + log_prefix=log_prefix, + embedding_model=EMBEDDING_MODEL, + embedding_raw_results=raw_results, + ) # Step 8: Mark evaluation as completed eval_run = update_evaluation_run( - session=session, eval_run=eval_run, status="completed", score=eval_run.score + session=session, + eval_run=eval_run, + status="completed", + score=eval_run.score, + cost=eval_run.cost, ) logger.info( diff --git a/backend/app/tests/crud/evaluations/test_pricing.py b/backend/app/tests/crud/evaluations/test_pricing.py new file mode 100644 index 000000000..b938cb7dc --- /dev/null +++ b/backend/app/tests/crud/evaluations/test_pricing.py @@ -0,0 +1,287 @@ +import pytest + +from app.crud.evaluations.pricing import ( + COST_USD_DECIMALS, + MODEL_PRICING, + build_cost_dict, + build_embedding_cost_entry, + build_response_cost_entry, + calculate_token_cost, +) + + +class TestCalculateTokenCost: + """Tests for calculate_token_cost function.""" + + def test_known_chat_model_input_and_output(self) -> None: + """Cost is sum of input and output token costs for a known chat model.""" + pricing = MODEL_PRICING["gpt-4o"] + expected = ( + 1000 * pricing["input_cost_per_token"] + + 500 * pricing["output_cost_per_token"] + ) + + cost = calculate_token_cost( + model="gpt-4o", input_tokens=1000, output_tokens=500 + ) + + assert cost == pytest.approx(expected) + + def test_known_embedding_model_defaults_output_tokens_to_zero(self) -> None: + """Embedding models charge only for input tokens; output_tokens defaults to 0.""" + pricing = MODEL_PRICING["text-embedding-3-large"] + expected = 2000 * pricing["input_cost_per_token"] + + cost = calculate_token_cost(model="text-embedding-3-large", input_tokens=2000) + + assert cost == pytest.approx(expected) + + def test_unknown_model_returns_zero(self) -> None: + """Unknown models return 0.0 instead of raising.""" + cost = calculate_token_cost( + model="not-a-real-model", input_tokens=100, output_tokens=50 + ) + + assert cost == 0.0 + + def test_zero_tokens_returns_zero(self) -> None: + """Zero tokens for a known model returns zero cost.""" + cost = calculate_token_cost(model="gpt-4o", input_tokens=0, output_tokens=0) + + assert cost == 0.0 + + def test_embedding_model_with_explicit_output_tokens(self) -> None: + """Passing output_tokens to an embedding model adds 0 cost (no output rate).""" + pricing = MODEL_PRICING["text-embedding-3-large"] + expected = 100 * pricing["input_cost_per_token"] + + cost = calculate_token_cost( + model="text-embedding-3-large", input_tokens=100, output_tokens=999 + ) + + assert cost == pytest.approx(expected) + + +class TestBuildResponseCostEntry: + """Tests for build_response_cost_entry function.""" + + def test_basic_aggregation(self) -> None: + """Sums input/output/total tokens across results and computes USD cost.""" + results = [ + { + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + } + }, + { + "usage": { + "input_tokens": 200, + "output_tokens": 75, + "total_tokens": 275, + } + }, + ] + + entry = build_response_cost_entry(model="gpt-4o", results=results) + + assert entry["model"] == "gpt-4o" + assert entry["input_tokens"] == 300 + assert entry["output_tokens"] == 125 + assert entry["total_tokens"] == 425 + pricing = MODEL_PRICING["gpt-4o"] + expected_cost = round( + 300 * pricing["input_cost_per_token"] + + 125 * pricing["output_cost_per_token"], + COST_USD_DECIMALS, + ) + assert entry["cost_usd"] == expected_cost + + def test_empty_results(self) -> None: + """Empty results yields zero tokens and zero cost.""" + entry = build_response_cost_entry(model="gpt-4o", results=[]) + + assert entry["input_tokens"] == 0 + assert entry["output_tokens"] == 0 + assert entry["total_tokens"] == 0 + assert entry["cost_usd"] == 0.0 + + def test_results_missing_usage_are_skipped(self) -> None: + """Items without a usage dict are skipped without raising.""" + results = [ + {"usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}}, + {}, # No usage key + {"usage": None}, # Explicit None + ] + + entry = build_response_cost_entry(model="gpt-4o", results=results) + + assert entry["input_tokens"] == 10 + assert entry["output_tokens"] == 5 + assert entry["total_tokens"] == 15 + + def test_unknown_model_yields_zero_cost(self) -> None: + """Unknown model still aggregates token counts but reports zero cost.""" + results = [ + {"usage": {"input_tokens": 100, "output_tokens": 50, "total_tokens": 150}} + ] + + entry = build_response_cost_entry(model="mystery-model", results=results) + + assert entry["input_tokens"] == 100 + assert entry["output_tokens"] == 50 + assert entry["cost_usd"] == 0.0 + + +class TestBuildEmbeddingCostEntry: + """Tests for build_embedding_cost_entry function.""" + + def test_basic_aggregation(self) -> None: + """Sums prompt/total tokens from raw batch results and computes USD cost.""" + raw_results = [ + { + "response": { + "body": {"usage": {"prompt_tokens": 100, "total_tokens": 100}} + } + }, + { + "response": { + "body": {"usage": {"prompt_tokens": 250, "total_tokens": 250}} + } + }, + ] + + entry = build_embedding_cost_entry( + model="text-embedding-3-large", raw_results=raw_results + ) + + assert entry["model"] == "text-embedding-3-large" + assert entry["prompt_tokens"] == 350 + assert entry["total_tokens"] == 350 + pricing = MODEL_PRICING["text-embedding-3-large"] + expected_cost = round(350 * pricing["input_cost_per_token"], COST_USD_DECIMALS) + assert entry["cost_usd"] == expected_cost + + def test_empty_raw_results(self) -> None: + """Empty raw_results yields zero tokens and zero cost.""" + entry = build_embedding_cost_entry( + model="text-embedding-3-large", raw_results=[] + ) + + assert entry["prompt_tokens"] == 0 + assert entry["total_tokens"] == 0 + assert entry["cost_usd"] == 0.0 + + def test_results_missing_usage_are_skipped(self) -> None: + """Items without nested usage are skipped (e.g., error rows).""" + raw_results = [ + { + "response": { + "body": {"usage": {"prompt_tokens": 50, "total_tokens": 50}} + } + }, + {"error": {"message": "Rate limited"}}, # No response.body.usage + {"response": {"body": {}}}, # body present, usage missing + ] + + entry = build_embedding_cost_entry( + model="text-embedding-3-large", raw_results=raw_results + ) + + assert entry["prompt_tokens"] == 50 + assert entry["total_tokens"] == 50 + + def test_unknown_model_yields_zero_cost(self) -> None: + """Unknown embedding model still aggregates tokens but reports zero cost.""" + raw_results = [ + { + "response": { + "body": {"usage": {"prompt_tokens": 100, "total_tokens": 100}} + } + } + ] + + entry = build_embedding_cost_entry( + model="mystery-embed", raw_results=raw_results + ) + + assert entry["prompt_tokens"] == 100 + assert entry["cost_usd"] == 0.0 + + +class TestBuildCostDict: + """Tests for build_cost_dict function.""" + + def test_response_only(self) -> None: + """Only response entry → embedding key absent, total = response cost.""" + response_entry = { + "model": "gpt-4o", + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + "cost_usd": 0.001234, + } + + cost = build_cost_dict(response_entry=response_entry) + + assert cost["response"] == response_entry + assert "embedding" not in cost + assert cost["total_cost_usd"] == 0.001234 + + def test_embedding_only(self) -> None: + """Only embedding entry → response key absent, total = embedding cost.""" + embedding_entry = { + "model": "text-embedding-3-large", + "prompt_tokens": 200, + "total_tokens": 200, + "cost_usd": 0.000013, + } + + cost = build_cost_dict(embedding_entry=embedding_entry) + + assert cost["embedding"] == embedding_entry + assert "response" not in cost + assert cost["total_cost_usd"] == 0.000013 + + def test_both_entries(self) -> None: + """Both entries → both keys present, total = sum of both costs.""" + response_entry = {"cost_usd": 0.001234} + embedding_entry = {"cost_usd": 0.000013} + + cost = build_cost_dict( + response_entry=response_entry, embedding_entry=embedding_entry + ) + + assert cost["response"] == response_entry + assert cost["embedding"] == embedding_entry + assert cost["total_cost_usd"] == round(0.001234 + 0.000013, COST_USD_DECIMALS) + + def test_neither_entry(self) -> None: + """No entries → only total_cost_usd present, equal to 0.0.""" + cost = build_cost_dict() + + assert cost == {"total_cost_usd": 0.0} + + def test_total_is_rounded(self) -> None: + """total_cost_usd is rounded to COST_USD_DECIMALS.""" + response_entry = {"cost_usd": 0.0000001} + embedding_entry = {"cost_usd": 0.0000002} + + cost = build_cost_dict( + response_entry=response_entry, embedding_entry=embedding_entry + ) + + # 0.0000003 rounded to 6 decimals → 0.0 + assert cost["total_cost_usd"] == 0.0 + + def test_entry_missing_cost_usd_treated_as_zero(self) -> None: + """Entries without a cost_usd key default to 0 in the total.""" + response_entry = {"model": "gpt-4o"} # No cost_usd + embedding_entry = {"cost_usd": 0.000050} + + cost = build_cost_dict( + response_entry=response_entry, embedding_entry=embedding_entry + ) + + assert cost["total_cost_usd"] == 0.000050 diff --git a/backend/app/tests/crud/evaluations/test_processing.py b/backend/app/tests/crud/evaluations/test_processing.py index 52162654d..51e2321db 100644 --- a/backend/app/tests/crud/evaluations/test_processing.py +++ b/backend/app/tests/crud/evaluations/test_processing.py @@ -357,7 +357,11 @@ async def test_process_completed_evaluation_success( "body": { "id": "resp_123", "output": "Answer 1", - "usage": {"total_tokens": 10}, + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + }, } }, } @@ -397,6 +401,20 @@ async def test_process_completed_evaluation_success( mock_create_langfuse.assert_called_once() mock_start_embedding.assert_called_once() + # Cost tracking: response cost should be aggregated and persisted. + db.refresh(result) + assert result.cost is not None + assert "response" in result.cost + response_cost = result.cost["response"] + assert response_cost["model"] == "gpt-4o" + assert response_cost["input_tokens"] == 100 + assert response_cost["output_tokens"] == 50 + assert response_cost["total_tokens"] == 150 + assert response_cost["cost_usd"] > 0 + assert result.cost["total_cost_usd"] == response_cost["cost_usd"] + # Embedding cost is added later by process_completed_embedding_batch. + assert "embedding" not in result.cost + @pytest.mark.asyncio @patch("app.crud.evaluations.processing.download_batch_results") @patch("app.crud.evaluations.processing.fetch_dataset_items") @@ -547,7 +565,31 @@ async def test_process_completed_embedding_batch_success( eval_run_with_embedding_batch, ): """Test successfully processing completed embedding batch.""" - mock_download.return_value = [] + # Pre-populate eval_run.cost with a response entry to verify that the + # embedding stage merges (not overwrites) existing cost data. + eval_run_with_embedding_batch.cost = { + "response": { + "model": "gpt-4o", + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + "cost_usd": 0.000375, + }, + "total_cost_usd": 0.000375, + } + db.add(eval_run_with_embedding_batch) + db.commit() + db.refresh(eval_run_with_embedding_batch) + + # Raw results carry the usage payload that build_embedding_cost_entry reads. + mock_download.return_value = [ + { + "custom_id": "trace_123", + "response": { + "body": {"usage": {"prompt_tokens": 200, "total_tokens": 200}} + }, + } + ] mock_parse.return_value = [ { "item_id": "item1", @@ -586,6 +628,21 @@ async def test_process_completed_embedding_batch_success( assert cosine_score is not None assert cosine_score["avg"] == 0.95 + # Cost tracking: embedding entry is added, response entry is preserved, + # and total_cost_usd is the sum of both. + assert result.cost is not None + assert "response" in result.cost + assert "embedding" in result.cost + assert result.cost["response"]["cost_usd"] == 0.000375 + embedding_cost = result.cost["embedding"] + assert embedding_cost["model"] == "text-embedding-3-large" + assert embedding_cost["prompt_tokens"] == 200 + assert embedding_cost["total_tokens"] == 200 + assert embedding_cost["cost_usd"] > 0 + assert result.cost["total_cost_usd"] == pytest.approx( + 0.000375 + embedding_cost["cost_usd"] + ) + @pytest.mark.asyncio @patch("app.crud.evaluations.processing.download_batch_results") @patch("app.crud.evaluations.processing.parse_embedding_results")