-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathmodels.py
More file actions
63 lines (52 loc) · 2.39 KB
/
models.py
File metadata and controls
63 lines (52 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import enum
import uuid
from datetime import datetime, timezone
from sqlalchemy import JSON, DateTime, String, Text
from sqlalchemy import Enum as SAEnum
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class RunStatus(str, enum.Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class Run(Base):
__tablename__ = "runs"
run_id: Mapped[str] = mapped_column(
String(36), primary_key=True, default=lambda: str(uuid.uuid4())
)
scenario_name: Mapped[str] = mapped_column(String(255), nullable=False)
status: Mapped[RunStatus] = mapped_column(
SAEnum(RunStatus), nullable=False, default=RunStatus.QUEUED, index=True
)
queued_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)
)
started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
# Absolute path to the expb-executor-<scenario>-<timestamp>/ output directory
output_dir: Mapped[str | None] = mapped_column(Text, nullable=True)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
# Parsed K6 metrics from k6-summary.json, keyed by group name
# {"engine_newPayload": {"avg": ..., "min": ..., "max": ...,
# "med": ..., "p90": ..., "p95": ..., "p99": ...},
# "engine_forkchoiceUpdated": {...}}
k6_metrics: Mapped[dict | None] = mapped_column(JSON, nullable=True)
# Full override dict from the API request, stored for audit/replay
overrides: Mapped[dict | None] = mapped_column(JSON, nullable=True)
class ApiToken(Base):
__tablename__ = "api_tokens"
token_id: Mapped[str] = mapped_column(
String(36), primary_key=True, default=lambda: str(uuid.uuid4())
)
name: Mapped[str] = mapped_column(
String(255), nullable=False, unique=True, index=True
)
# SHA-256 hex digest of the raw token — the raw value is never stored
token_hash: Mapped[str] = mapped_column(String(64), nullable=False, unique=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)
)
last_used_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)