Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,13 @@ services:
environment:
ENV: "DEV"
PORT: "7010"
DATABASE_HOST: ${DATABASE_HOST}
DATABASE_PORT: ${DATABASE_PORT}
DATABASE_NAME: ${DATABASE_NAME}
DATABASE_USER: ${DATABASE_USER}
DATABASE_PASSWORD: ${DATABASE_PASSWORD}
# Query reads telemetry + its own metadata tables from ClickHouse over the
# HTTP interface (clickhouse-connect), so port 8123 — gr26 uses native 9000.
CLICKHOUSE_HOST: "clickhouse"
CLICKHOUSE_PORT: "8123"
CLICKHOUSE_USER: "default"
CLICKHOUSE_PASSWORD: ""
CLICKHOUSE_DATABASE: "mapache"
KERBECS_ENDPOINT: "http://kerbecs:10300"
KERBECS_USER: "admin"
KERBECS_PASSWORD: "admin"
Expand Down
8 changes: 8 additions & 0 deletions example.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ DATABASE_NAME="mapache"
DATABASE_USER="postgres"
DATABASE_PASSWORD="password"

# Query service telemetry + metadata tables live in ClickHouse, reached over the
# HTTP interface via clickhouse-connect (port 8123; gr26 uses native 9000).
CLICKHOUSE_HOST="clickhouse"
CLICKHOUSE_PORT="8123"
CLICKHOUSE_USER="default"
CLICKHOUSE_PASSWORD=""
CLICKHOUSE_DATABASE="mapache"

SENTINEL_URL="https://sentinel-api.gauchoracing.com"
SENTINEL_JWKS_URL="https://sso.gauchoracing.com/.well-known/jwks.json"
SENTINEL_CLIENT_ID="z6V9NREjMFhf"
Expand Down
6 changes: 5 additions & 1 deletion query/Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ RUN pip install uv

WORKDIR /app/query

CMD ["uv", "run", "uvicorn", "query.main:create_app", "--host", "0.0.0.0", "--port", "7010", "--reload"]
# Only watch the application package. The bind-mounted /app/query also holds
# .venv, scripts/, tests/ and .pytest_cache; watching those makes `uv run`
# touching the venv trigger an endless reload loop, taking the service down and
# dropping it from Rincon's registry (gateway then 404s /query/* routes).
CMD ["uv", "run", "uvicorn", "query.main:create_app", "--host", "0.0.0.0", "--port", "7010", "--reload", "--reload-dir", "/app/query/query"]
9 changes: 7 additions & 2 deletions query/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ dependencies = [
"fastapi>=0.115.10,<0.116.0",
"uvicorn>=0.34.0,<0.35.0",
"dotenv>=0.9.9,<0.10.0",
"sqlalchemy>=2.0.38,<3.0.0",
"psycopg2-binary>=2.9.10,<3.0.0",
"clickhouse-connect>=0.8.0,<0.9.0",
"numpy>=2.2.3,<3.0.0",
"pandas>=2.2.3,<3.0.0",
"loguru>=0.7.3,<0.8.0",
Expand All @@ -28,6 +27,12 @@ dependencies = [
[project.scripts]
query = "query.main:main"

[dependency-groups]
dev = [
"pytest>=8.0.0,<9.0.0",
"httpx>=0.27.0,<0.28.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
36 changes: 12 additions & 24 deletions query/query/config/config.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import os

from sqlalchemy import URL

class Config:
"""Configuration settings for the application"""

# Server settings
VERSION: str = "3.3.0"
PORT: int = int(os.getenv('PORT', 7000))

# Database settings
DATABASE_HOST: str = os.getenv('DATABASE_HOST')
DATABASE_PORT: int = int(os.getenv('DATABASE_PORT'))
DATABASE_USER: str = os.getenv('DATABASE_USER')
DATABASE_PASSWORD: str = os.getenv('DATABASE_PASSWORD')
DATABASE_NAME: str = os.getenv('DATABASE_NAME')
# ClickHouse settings. Telemetry (signal) plus the query service's own
# metadata tables (signal_definition, query_log, query_token) all live in
# ClickHouse now. The query service talks to it over the HTTP interface
# (port 8123) via clickhouse-connect — distinct from gr26, which uses the
# native protocol (9000) via clickhouse-go.
CLICKHOUSE_HOST: str = os.getenv('CLICKHOUSE_HOST')
CLICKHOUSE_PORT: int = int(os.getenv('CLICKHOUSE_PORT', 8123))
CLICKHOUSE_USER: str = os.getenv('CLICKHOUSE_USER')
CLICKHOUSE_PASSWORD: str = os.getenv('CLICKHOUSE_PASSWORD')
CLICKHOUSE_DATABASE: str = os.getenv('CLICKHOUSE_DATABASE')
# Set CLICKHOUSE_SECURE=true when the HTTP endpoint is TLS (port 8443).
CLICKHOUSE_SECURE: bool = os.getenv('CLICKHOUSE_SECURE', 'false').lower() == 'true'

# Kerbecs admin endpoint — used to resolve service-to-service routes.
KERBECS_ENDPOINT: str = os.getenv('KERBECS_ENDPOINT')
Expand All @@ -26,19 +30,3 @@ class Config:
SENTINEL_URL: str = os.getenv('SENTINEL_URL')
SENTINEL_JWKS_URL: str = os.getenv('SENTINEL_JWKS_URL')
SENTINEL_CLIENT_ID: str = os.getenv('SENTINEL_CLIENT_ID')

@staticmethod
def get_database_url() -> URL:
# Build via URL.create rather than an f-string so credentials with
# special characters (e.g. '@', '%', or non-ASCII bytes in the
# password) are escaped instead of corrupting the DSN — an unescaped
# '@' in the password otherwise bleeds into the host portion.
return URL.create(
"postgresql+psycopg2",
username=Config.DATABASE_USER,
password=Config.DATABASE_PASSWORD,
host=Config.DATABASE_HOST,
port=Config.DATABASE_PORT,
database=Config.DATABASE_NAME,
)

157 changes: 94 additions & 63 deletions query/query/database/connection.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,104 @@
from query.model.base import Base
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
"""ClickHouse connection + schema bootstrap for the query service.

The signal/telemetry tables (signal, gr26_can, ping) are created and written by
gr26; this service only reads them. The three relational metadata tables the
query service owns — signal_definition, query_log, query_token — used to live in
Postgres and are now created here in ClickHouse:

- query_log is append-only, so a plain MergeTree.
- query_token and signal_definition are mutable by id (token revoke rewrites
expires_at; definitions get re-seeded), so ReplacingMergeTree keyed on id with
an updated_at version column. Readers use FINAL to collapse to the latest row.
"""

import clickhouse_connect
from clickhouse_connect.driver.client import Client

from query.config.config import Config
from contextlib import contextmanager

DATABASE_URL = Config.get_database_url()
_client: Client | None = None

SIGNAL_DEFINITION_DDL = """
CREATE TABLE IF NOT EXISTS signal_definition (
id String,
vehicle_type String,
name String,
description String,
updated_at DateTime64(6, 'UTC') DEFAULT now64(6)
) ENGINE = ReplacingMergeTree(updated_at) ORDER BY id
"""

QUERY_LOG_DDL = """
CREATE TABLE IF NOT EXISTS query_log (
id String,
user_id String,
parameters String,
status_code Int32,
latency Int32,
error_message String,
timestamp DateTime64(6, 'UTC') DEFAULT now64(6)
) ENGINE = MergeTree ORDER BY (timestamp, id)
"""

QUERY_TOKEN_DDL = """
CREATE TABLE IF NOT EXISTS query_token (
id String,
user_id String,
created_at DateTime64(6, 'UTC') DEFAULT now64(6),
expires_at DateTime64(6, 'UTC'),
updated_at DateTime64(6, 'UTC') DEFAULT now64(6)
) ENGINE = ReplacingMergeTree(updated_at) ORDER BY id
"""


def _build_client() -> Client:
return clickhouse_connect.get_client(
host=Config.CLICKHOUSE_HOST,
port=Config.CLICKHOUSE_PORT,
username=Config.CLICKHOUSE_USER,
password=Config.CLICKHOUSE_PASSWORD or "",
database=Config.CLICKHOUSE_DATABASE,
secure=Config.CLICKHOUSE_SECURE,
)

db_session = None

def init_db():
"""Initialize the database session"""
if not Config.DATABASE_HOST:
raise ValueError("DATABASE_HOST is not set")
elif not Config.DATABASE_PORT:
raise ValueError("DATABASE_PORT is not set")
elif not Config.DATABASE_USER:
raise ValueError("DATABASE_USER is not set")
elif not Config.DATABASE_PASSWORD:
raise ValueError("DATABASE_PASSWORD is not set")
elif not Config.DATABASE_NAME:
raise ValueError("DATABASE_NAME is not set")
else:
global db_session
engine = create_engine(DATABASE_URL)
db_session = scoped_session(
sessionmaker(
autocommit=False,
autoflush=False,
expire_on_commit=False,
bind=engine
)
)

from query.model.log import QueryLog
from query.model.token import QueryToken
from query.model.signal_definition import SignalDefinition

# Create all tables
Base.metadata.create_all(bind=engine)
print("Database initialized")
"""Open the ClickHouse client and create the metadata tables."""
if not Config.CLICKHOUSE_HOST:
raise ValueError("CLICKHOUSE_HOST is not set")
elif not Config.CLICKHOUSE_PORT:
raise ValueError("CLICKHOUSE_PORT is not set")
elif not Config.CLICKHOUSE_USER:
raise ValueError("CLICKHOUSE_USER is not set")
elif not Config.CLICKHOUSE_DATABASE:
raise ValueError("CLICKHOUSE_DATABASE is not set")

global _client
_client = _build_client()
for ddl in (SIGNAL_DEFINITION_DDL, QUERY_LOG_DDL, QUERY_TOKEN_DDL):
_client.command(ddl)
print("Database initialized")


def init_test_db():
global db_session
engine = create_engine(DATABASE_URL)
db_session = scoped_session(
sessionmaker(
autocommit=False,
autoflush=False,
expire_on_commit=False,
bind=engine
)
)
global _client
_client = _build_client()


def get_client() -> Client:
"""Return the shared ClickHouse client.

clickhouse-connect's Client is safe to share across threads (it sits on a
thread-safe urllib3 pool), so one module-level instance serves all requests.
"""
if _client is None:
raise ValueError("Database client is not initialized")
return _client

@contextmanager
def get_db():
"""Get the database session with proper error handling"""
if not db_session:
raise ValueError("Database session is not initialized")

try:
yield db_session
db_session.commit()
except Exception as e:
db_session.rollback()
raise e
finally:
db_session.remove()

def shutdown_session(exception=None):
"""Remove the session at the end of request"""
if db_session:
db_session.remove()
"""Close the client (best-effort) on shutdown."""
global _client
if _client is not None:
_client.close()
_client = None
23 changes: 11 additions & 12 deletions query/query/model/log.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
from sqlalchemy import Column, Integer, String, DateTime, Text
from dataclasses import dataclass
from datetime import datetime
from datetime import timezone
from query.model.base import Base
from typing import Optional

class QueryLog(Base):
__tablename__ = "query_log"

id = Column(String(255), primary_key=True)
user_id = Column(String(255))
parameters = Column(Text)
status_code = Column(Integer)
latency = Column(Integer)
error_message = Column(Text)
timestamp = Column(DateTime, default=lambda: datetime.now(timezone.utc))
@dataclass
class QueryLog:
user_id: Optional[str] = None
parameters: Optional[str] = None
status_code: Optional[int] = None
latency: Optional[int] = None
error_message: Optional[str] = None
id: Optional[str] = None
timestamp: Optional[datetime] = None

def to_dict(self):
return {
Expand Down
16 changes: 8 additions & 8 deletions query/query/model/signal_definition.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from sqlalchemy import Column, String, Text
from query.model.base import Base
from dataclasses import dataclass
from typing import Optional

class SignalDefinition(Base):
__tablename__ = "signal_definition"

id = Column(String(255), primary_key=True)
vehicle_type = Column(String(255))
name = Column(String(255))
description = Column(Text)
@dataclass
class SignalDefinition:
id: Optional[str] = None
vehicle_type: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None

def to_dict(self):
return {
Expand Down
19 changes: 9 additions & 10 deletions query/query/model/token.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from sqlalchemy import Column, String, DateTime
from dataclasses import dataclass
from datetime import datetime
from datetime import timezone
from query.model.base import Base
from typing import Optional

class QueryToken(Base):
__tablename__ = "query_token"

id = Column(String(255), primary_key=True)
user_id = Column(String(255))
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
expires_at = Column(DateTime)
@dataclass
class QueryToken:
id: Optional[str] = None
user_id: Optional[str] = None
created_at: Optional[datetime] = None
expires_at: Optional[datetime] = None

def to_dict(self):
return {
"id": self.id,
"user_id": self.user_id,
"created_at": self.created_at.isoformat() + "Z" if self.created_at else None,
"expires_at": self.expires_at.isoformat() + "Z" if self.expires_at else None
}
}
Loading
Loading