Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ celerybeat.pid

# Environments
.env
.remote-test.env
.venv
env/
venv/
Expand Down
4 changes: 3 additions & 1 deletion hatch_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ def initialize(self, version: str, build_data: dict) -> None:
except Exception:
commit = "unknown"

version_file = Path(self.root) / "src" / "expb" / "_version.py"
version_file = (
Path(self.root) / "src" / "expb" / "cli" / "version" / "_version.py"
)
version_file.write_text(f'__version__ = "{version}"\n__commit__ = "{commit}"\n')
10 changes: 7 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ authors = [
requires-python = ">=3.13"
dependencies = [
"docker>=7.1.0",
"fastapi>=0.133.0",
"filelock>=3.24.3",
"jinja2>=3.1.6",
"pydantic>=2.11.7",
"pyyaml>=6.0.2",
"rich>=14.0.0",
"sqlalchemy>=2.0.46",
"structlog>=25.4.0",
"typer>=0.16.0",
"uvicorn[standard]>=0.41.0",
"web3>=7.12.0",
]

Expand All @@ -35,6 +38,7 @@ packages = ["src/expb"]
"src" = ""

[dependency-groups]
dev = [
"hatchling>=1.29.0",
]
dev = ["httpx>=0.28.1", "pytest>=9.0.2", "hatchling>=1.29.0"]

[tool.pytest.ini_options]
filterwarnings = ["ignore::DeprecationWarning:websockets", "hatchling>=1.29.0"]
21 changes: 3 additions & 18 deletions src/expb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,8 @@
import typer

from expb.compress_payloads import app as compress_payloads_app
from expb.execute_scenario import app as execute_scenario_app
from expb.execute_scenarios import app as execute_scenarios_app
from expb.generate_payloads import app as generate_payloads_app
from expb.send_payloads import app as send_payloads_app
from expb.version import app as version_app
from expb.cli import app as cli_app

app = typer.Typer()

typer_apps = [
generate_payloads_app,
execute_scenario_app,
execute_scenarios_app,
compress_payloads_app,
send_payloads_app,
version_app,
]


for typer_app in typer_apps:
app.add_typer(typer_app)
# All commands (including the `api` sub-group) are registered via cli/
app.add_typer(cli_app)
2 changes: 0 additions & 2 deletions src/expb/_version.py

This file was deleted.

Empty file added src/expb/api/__init__.py
Empty file.
73 changes: 73 additions & 0 deletions src/expb/api/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from contextlib import asynccontextmanager
from pathlib import Path

import yaml
from fastapi import FastAPI

from expb.api.db.engine import init_db
from expb.api.worker import BenchmarkWorker
from expb.configs.scenarios import Scenarios


def create_app(
config_file: Path,
db_path: Path,
log_level: str = "INFO",
) -> FastAPI:
"""
FastAPI application factory.

Creates the app, wires up the DB, loads the scenarios config, starts the
background benchmark worker, and registers all routers.

Parameters
----------
config_file:
Path to the expb YAML configuration file.
db_path:
Path to the SQLite database file.
log_level:
Log level string passed to the worker's structured logger.
"""

@asynccontextmanager
async def lifespan(app: FastAPI):
# 1. Initialise DB (creates tables, enables WAL mode)
init_db(db_path)

# 2. Load scenarios config and stash on app.state for routes to access
with config_file.open() as f:
raw = yaml.safe_load(f)
scenarios = Scenarios(**raw)
app.state.scenarios = scenarios
app.state.config_file = config_file

# 3. Start the background benchmark worker thread
worker = BenchmarkWorker(scenarios=scenarios, log_level=log_level)
app.state.worker = worker
worker.start()

yield

# 4. Graceful shutdown: signal worker to finish current job then stop
worker.stop()

app = FastAPI(
title="expb Benchmark Queue API",
description=(
"Queue and monitor Ethereum execution client benchmark runs. "
"All endpoints require Bearer token authentication."
Comment thread
cbermudez97 marked this conversation as resolved.
Outdated
),
version="0.1.0",
lifespan=lifespan,
)

from expb.api.routes.health import router as health_router
from expb.api.routes.runs import router as runs_router
from expb.api.routes.scenarios import router as scenarios_router

app.include_router(health_router)
app.include_router(runs_router, prefix="/runs", tags=["runs"])
app.include_router(scenarios_router, prefix="/scenarios", tags=["scenarios"])

return app
40 changes: 40 additions & 0 deletions src/expb/api/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import hashlib
import hmac
from datetime import datetime, timezone

from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session

from expb.api.dependencies import get_db
from expb.api.db.models import ApiToken

_bearer_scheme = HTTPBearer(auto_error=True)


def _hash_token(raw_token: str) -> str:
return hashlib.sha256(raw_token.encode()).hexdigest()


def verify_token(
credentials: HTTPAuthorizationCredentials = Security(_bearer_scheme),
db: Session = Depends(get_db),
) -> None:
"""
FastAPI dependency that validates a Bearer token against the DB.

On success, updates the token's ``last_used_at`` timestamp.
Raises HTTP 401 if the token is missing, invalid, or revoked.
Use as: ``_: None = Depends(verify_token)``
Comment thread
cbermudez97 marked this conversation as resolved.
"""
computed_hash = _hash_token(credentials.credentials)

# Fetch all tokens and compare with hmac.compare_digest to resist timing attacks.
tokens = db.query(ApiToken).all()
for token in tokens:
if hmac.compare_digest(token.token_hash, computed_hash):
token.last_used_at = datetime.now(timezone.utc)
db.commit()
return

raise HTTPException(status_code=401, detail="Invalid or revoked token.")
Comment thread
cbermudez97 marked this conversation as resolved.
Outdated
Empty file added src/expb/api/db/__init__.py
Empty file.
41 changes: 41 additions & 0 deletions src/expb/api/db/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pathlib import Path

from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session, sessionmaker

_engine = None
_SessionLocal = None


def init_db(db_path: Path) -> None:
global _engine, _SessionLocal

_engine = create_engine(
f"sqlite:///{db_path}",
# Required: SQLite connections may be used from multiple threads
# (FastAPI request threads + the background worker thread).
connect_args={"check_same_thread": False},
)

# Enable WAL journal mode so that concurrent reads from FastAPI handlers
# do not block the worker's writes, and vice versa.
with _engine.connect() as conn:
conn.execute(text("PRAGMA journal_mode=WAL"))

_SessionLocal = sessionmaker(bind=_engine, autoflush=False, autocommit=False)

from expb.api.db.models import Base

Base.metadata.create_all(_engine)


def get_engine():
if _engine is None:
raise RuntimeError("Database has not been initialised. Call init_db() first.")
return _engine


def get_session() -> Session:
if _SessionLocal is None:
raise RuntimeError("Database has not been initialised. Call init_db() first.")
return _SessionLocal()
63 changes: 63 additions & 0 deletions src/expb/api/db/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import enum
import uuid
from datetime import datetime, timezone

from sqlalchemy import JSON, DateTime, String, Text
from sqlalchemy import Enum as SAEnum
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
pass


class RunStatus(str, enum.Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"


class Run(Base):
__tablename__ = "runs"

run_id: Mapped[str] = mapped_column(
String(36), primary_key=True, default=lambda: str(uuid.uuid4())
)
scenario_name: Mapped[str] = mapped_column(String(255), nullable=False)
status: Mapped[str] = mapped_column(
Comment thread
cbermudez97 marked this conversation as resolved.
Outdated
SAEnum(RunStatus), nullable=False, default=RunStatus.QUEUED, index=True
)
queued_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, default=datetime.now(timezone.utc)
)
Comment thread
cbermudez97 marked this conversation as resolved.
started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
# Absolute path to the expb-executor-<scenario>-<timestamp>/ output directory
output_dir: Mapped[str | None] = mapped_column(Text, nullable=True)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
# Parsed K6 metrics from k6-summary.json, keyed by group name
# {"engine_newPayload": {"avg": ..., "min": ..., "max": ...,
# "med": ..., "p90": ..., "p95": ..., "p99": ...},
# "engine_forkchoiceUpdated": {...}}
k6_metrics: Mapped[dict | None] = mapped_column(JSON, nullable=True)
# Full override dict from the API request, stored for audit/replay
overrides: Mapped[dict | None] = mapped_column(JSON, nullable=True)


class ApiToken(Base):
__tablename__ = "api_tokens"

token_id: Mapped[str] = mapped_column(
String(36), primary_key=True, default=lambda: str(uuid.uuid4())
)
name: Mapped[str] = mapped_column(
String(255), nullable=False, unique=True, index=True
)
# SHA-256 hex digest of the raw token — the raw value is never stored
token_hash: Mapped[str] = mapped_column(String(64), nullable=False, unique=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, default=datetime.now(timezone.utc)
)
Comment thread
cbermudez97 marked this conversation as resolved.
last_used_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
14 changes: 14 additions & 0 deletions src/expb/api/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from collections.abc import Generator

from sqlalchemy.orm import Session

from expb.api.db.engine import get_session


def get_db() -> Generator[Session, None, None]:
"""FastAPI dependency that provides a per-request DB session."""
db = get_session()
try:
yield db
finally:
db.close()
60 changes: 60 additions & 0 deletions src/expb/api/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json
from pathlib import Path


def parse_k6_summary(summary_path: Path) -> dict | None:
"""
Parse a k6-summary.json file (produced by K6's ``--summary-export`` flag)
and extract per-group HTTP request duration statistics.

Returns a dict of the form::

{
"engine_newPayload": {
"avg": float, "min": float, "max": float,
"med": float, "p90": float, "p95": float, "p99": float,
},
"engine_forkchoiceUpdated": { ... },
}

Keys whose values are missing from the summary file are set to ``None``.
Returns ``None`` if the file cannot be read or parsed.
"""
try:
with summary_path.open() as f:
data = json.load(f)
except (OSError, json.JSONDecodeError):
return None

metrics = data.get("metrics", {})
if not metrics:
return None

# K6 stores group-scoped metrics with keys like:
# "http_req_duration{group:::engine_newPayload}"
# "http_req_duration{group:::engine_forkchoiceUpdated}"
group_keys = {
"engine_newPayload": "http_req_duration{group:::engine_newPayload}",
"engine_forkchoiceUpdated": "http_req_duration{group:::engine_forkchoiceUpdated}",
}

result: dict[str, dict] = {}

for group_name, metric_key in group_keys.items():
metric_data = metrics.get(metric_key)
if metric_data is None:
continue

# K6 uses "p(90)" notation; normalise to "p90" for clean storage / API output.
# Values are stored directly on the metric object (no "values" sub-key).
result[group_name] = {
"avg": metric_data.get("avg"),
"min": metric_data.get("min"),
"max": metric_data.get("max"),
"med": metric_data.get("med"),
"p90": metric_data.get("p(90)"),
"p95": metric_data.get("p(95)"),
"p99": metric_data.get("p(99)"),
}

return result if result else None
Empty file added src/expb/api/routes/__init__.py
Empty file.
Loading