Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ celerybeat.pid

# Environments
.env
.remote-test.env
.venv
env/
venv/
Expand Down
4 changes: 3 additions & 1 deletion hatch_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ def initialize(self, version: str, build_data: dict) -> None:
except Exception:
commit = "unknown"

version_file = Path(self.root) / "src" / "expb" / "_version.py"
version_file = (
Path(self.root) / "src" / "expb" / "cli" / "version" / "_version.py"
)
version_file.write_text(f'__version__ = "{version}"\n__commit__ = "{commit}"\n')
10 changes: 7 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ authors = [
requires-python = ">=3.13"
dependencies = [
"docker>=7.1.0",
"fastapi>=0.133.0",
"filelock>=3.24.3",
"jinja2>=3.1.6",
"pydantic>=2.11.7",
"pyyaml>=6.0.2",
"rich>=14.0.0",
"sqlalchemy>=2.0.46",
"structlog>=25.4.0",
"typer>=0.16.0",
"uvicorn[standard]>=0.41.0",
"web3>=7.12.0",
]

Expand All @@ -35,6 +38,7 @@ packages = ["src/expb"]
"src" = ""

[dependency-groups]
dev = [
"hatchling>=1.29.0",
]
dev = ["httpx>=0.28.1", "pytest>=9.0.2", "hatchling>=1.29.0"]

[tool.pytest.ini_options]
filterwarnings = ["ignore::DeprecationWarning:websockets"]
21 changes: 3 additions & 18 deletions src/expb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,8 @@
import typer

from expb.compress_payloads import app as compress_payloads_app
from expb.execute_scenario import app as execute_scenario_app
from expb.execute_scenarios import app as execute_scenarios_app
from expb.generate_payloads import app as generate_payloads_app
from expb.send_payloads import app as send_payloads_app
from expb.version import app as version_app
from expb.cli import app as cli_app

app = typer.Typer()

typer_apps = [
generate_payloads_app,
execute_scenario_app,
execute_scenarios_app,
compress_payloads_app,
send_payloads_app,
version_app,
]


for typer_app in typer_apps:
app.add_typer(typer_app)
# All commands (including the `api` sub-group) are registered via cli/
app.add_typer(cli_app)
2 changes: 0 additions & 2 deletions src/expb/_version.py

This file was deleted.

Empty file added src/expb/api/__init__.py
Empty file.
73 changes: 73 additions & 0 deletions src/expb/api/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from contextlib import asynccontextmanager
from pathlib import Path

import yaml
from fastapi import FastAPI

from expb.api.db.engine import init_db
from expb.api.worker import BenchmarkWorker
from expb.configs.scenarios import Scenarios


def create_app(
config_file: Path,
db_path: Path,
log_level: str = "INFO",
) -> FastAPI:
"""
FastAPI application factory.

Creates the app, wires up the DB, loads the scenarios config, starts the
background benchmark worker, and registers all routers.

Parameters
----------
config_file:
Path to the expb YAML configuration file.
db_path:
Path to the SQLite database file.
log_level:
Log level string passed to the worker's structured logger.
"""

@asynccontextmanager
async def lifespan(app: FastAPI):
# 1. Initialise DB (creates tables, enables WAL mode)
init_db(db_path)

# 2. Load scenarios config and stash on app.state for routes to access
with config_file.open() as f:
raw = yaml.safe_load(f)
scenarios = Scenarios(**raw)
app.state.scenarios = scenarios
app.state.config_file = config_file

# 3. Start the background benchmark worker thread
worker = BenchmarkWorker(scenarios=scenarios, log_level=log_level)
app.state.worker = worker
worker.start()

yield

# 4. Graceful shutdown: signal worker to finish current job then stop
worker.stop()

app = FastAPI(
title="expb Benchmark Queue API",
description=(
"Queue and monitor Ethereum execution client benchmark runs. "
"All endpoints except /health require Bearer token authentication."
),
version="0.1.0",
lifespan=lifespan,
)

from expb.api.routes.health import router as health_router
from expb.api.routes.runs import router as runs_router
from expb.api.routes.scenarios import router as scenarios_router

app.include_router(health_router)
app.include_router(runs_router, prefix="/runs", tags=["runs"])
app.include_router(scenarios_router, prefix="/scenarios", tags=["scenarios"])

return app
38 changes: 38 additions & 0 deletions src/expb/api/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import hashlib
from datetime import datetime, timezone

from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session

from expb.api.dependencies import get_db
from expb.api.db.models import ApiToken

_bearer_scheme = HTTPBearer(auto_error=True)


def _hash_token(raw_token: str) -> str:
return hashlib.sha256(raw_token.encode()).hexdigest()


def verify_token(
credentials: HTTPAuthorizationCredentials = Security(_bearer_scheme),
db: Session = Depends(get_db),
) -> None:
"""
FastAPI dependency that validates a Bearer token against the DB.

On success, updates the token's ``last_used_at`` timestamp.
Raises HTTP 403 if the ``Authorization`` header is missing (FastAPI default
for ``HTTPBearer``). Raises HTTP 401 if the token is invalid or revoked.
Use as: ``_: None = Depends(verify_token)``
Comment thread
cbermudez97 marked this conversation as resolved.
"""
computed_hash = _hash_token(credentials.credentials)

# Query directly by the indexed hash column — no need to scan all tokens.
token = db.query(ApiToken).filter(ApiToken.token_hash == computed_hash).first()
if token is None:
raise HTTPException(status_code=401, detail="Invalid or revoked token.")

token.last_used_at = datetime.now(timezone.utc)
db.commit()
Empty file added src/expb/api/db/__init__.py
Empty file.
41 changes: 41 additions & 0 deletions src/expb/api/db/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pathlib import Path

from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session, sessionmaker

_engine = None
_SessionLocal = None


def init_db(db_path: Path) -> None:
global _engine, _SessionLocal

_engine = create_engine(
f"sqlite:///{db_path}",
# Required: SQLite connections may be used from multiple threads
# (FastAPI request threads + the background worker thread).
connect_args={"check_same_thread": False},
)

# Enable WAL journal mode so that concurrent reads from FastAPI handlers
# do not block the worker's writes, and vice versa.
with _engine.connect() as conn:
conn.execute(text("PRAGMA journal_mode=WAL"))

_SessionLocal = sessionmaker(bind=_engine, autoflush=False, autocommit=False)

from expb.api.db.models import Base

Base.metadata.create_all(_engine)


def get_engine():
if _engine is None:
raise RuntimeError("Database has not been initialised. Call init_db() first.")
return _engine


def get_session() -> Session:
if _SessionLocal is None:
raise RuntimeError("Database has not been initialised. Call init_db() first.")
return _SessionLocal()
63 changes: 63 additions & 0 deletions src/expb/api/db/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import enum
import uuid
from datetime import datetime, timezone

from sqlalchemy import JSON, DateTime, String, Text
from sqlalchemy import Enum as SAEnum
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
pass


class RunStatus(str, enum.Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"


class Run(Base):
__tablename__ = "runs"

run_id: Mapped[str] = mapped_column(
String(36), primary_key=True, default=lambda: str(uuid.uuid4())
)
scenario_name: Mapped[str] = mapped_column(String(255), nullable=False)
status: Mapped[RunStatus] = mapped_column(
SAEnum(RunStatus), nullable=False, default=RunStatus.QUEUED, index=True
)
queued_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)
)
Comment thread
cbermudez97 marked this conversation as resolved.
started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
# Absolute path to the expb-executor-<scenario>-<timestamp>/ output directory
output_dir: Mapped[str | None] = mapped_column(Text, nullable=True)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
# Parsed K6 metrics from k6-summary.json, keyed by group name
# {"engine_newPayload": {"avg": ..., "min": ..., "max": ...,
# "med": ..., "p90": ..., "p95": ..., "p99": ...},
# "engine_forkchoiceUpdated": {...}}
k6_metrics: Mapped[dict | None] = mapped_column(JSON, nullable=True)
# Full override dict from the API request, stored for audit/replay
overrides: Mapped[dict | None] = mapped_column(JSON, nullable=True)


class ApiToken(Base):
__tablename__ = "api_tokens"

token_id: Mapped[str] = mapped_column(
String(36), primary_key=True, default=lambda: str(uuid.uuid4())
)
name: Mapped[str] = mapped_column(
String(255), nullable=False, unique=True, index=True
)
# SHA-256 hex digest of the raw token — the raw value is never stored
token_hash: Mapped[str] = mapped_column(String(64), nullable=False, unique=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)
)
Comment thread
cbermudez97 marked this conversation as resolved.
last_used_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
14 changes: 14 additions & 0 deletions src/expb/api/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from collections.abc import Generator

from sqlalchemy.orm import Session

from expb.api.db.engine import get_session


def get_db() -> Generator[Session, None, None]:
"""FastAPI dependency that provides a per-request DB session."""
db = get_session()
try:
yield db
finally:
db.close()
60 changes: 60 additions & 0 deletions src/expb/api/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json
from pathlib import Path


def parse_k6_summary(summary_path: Path) -> dict | None:
"""
Parse a k6-summary.json file (produced by K6's ``--summary-export`` flag)
and extract per-group HTTP request duration statistics.

Returns a dict of the form::

{
"engine_newPayload": {
"avg": float, "min": float, "max": float,
"med": float, "p90": float, "p95": float, "p99": float,
},
"engine_forkchoiceUpdated": { ... },
}

Keys whose values are missing from the summary file are set to ``None``.
Returns ``None`` if the file cannot be read or parsed.
"""
try:
with summary_path.open() as f:
data = json.load(f)
except (OSError, json.JSONDecodeError):
return None

metrics = data.get("metrics", {})
if not metrics:
return None

# K6 stores group-scoped metrics with keys like:
# "http_req_duration{group:::engine_newPayload}"
# "http_req_duration{group:::engine_forkchoiceUpdated}"
group_keys = {
"engine_newPayload": "http_req_duration{group:::engine_newPayload}",
"engine_forkchoiceUpdated": "http_req_duration{group:::engine_forkchoiceUpdated}",
}

result: dict[str, dict] = {}

for group_name, metric_key in group_keys.items():
metric_data = metrics.get(metric_key)
if metric_data is None:
continue

# K6 uses "p(90)" notation; normalise to "p90" for clean storage / API output.
# Values are stored directly on the metric object (no "values" sub-key).
result[group_name] = {
"avg": metric_data.get("avg"),
"min": metric_data.get("min"),
"max": metric_data.get("max"),
"med": metric_data.get("med"),
"p90": metric_data.get("p(90)"),
"p95": metric_data.get("p(95)"),
"p99": metric_data.get("p(99)"),
}

return result if result else None
Empty file added src/expb/api/routes/__init__.py
Empty file.
Loading