diff --git a/pyproject.toml b/pyproject.toml index b1976261..e202fb59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ dependencies = [ "sentry-arroyo>=2.38.7", "sentry-sdk>=2.36.0", "sortedcontainers>=2.4.0", - "taskbroker-client>=0.20.0,<1", + "taskbroker-client>=0.20.1,<1", "typing-extensions>=4.15.0", "zipfile-zstd==0.0.4", ] diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py index c46c07ea..e34aca90 100644 --- a/src/launchpad/worker/app.py +++ b/src/launchpad/worker/app.py @@ -1,96 +1,10 @@ import os -import platform -import resource -import time - -from collections.abc import Generator -from contextlib import contextmanager from arroyo.backends.kafka import KafkaProducer from taskbroker_client.app import TaskbrokerApp -from taskbroker_client.metrics import MetricsBackend, Tags +from taskbroker_client.metrics import DatadogMetrics, MetricsBackend from taskbroker_client.router import TaskRouter -from launchpad.utils.statsd import create_dogstatsd_client - -_RUSAGE_TO_BYTES = 1 if platform.system() == "Darwin" else 1024 - - -def _convert_tags(tags: Tags | None) -> list[str] | None: - if tags is None: - return None - return [f"{k}:{v}" for k, v in tags.items()] - - -class TaskworkerMetricsBackend(MetricsBackend): - def __init__(self) -> None: - self._dogstatsd = create_dogstatsd_client("launchpad") - - def incr( - self, - name: str, - value: int | float = 1, - tags: Tags | None = None, - sample_rate: float | None = None, - ) -> None: - kwargs: dict = {"tags": _convert_tags(tags)} - if sample_rate is not None: - kwargs["sample_rate"] = sample_rate - self._dogstatsd.increment(name, int(value), **kwargs) - - def distribution( - self, - name: str, - value: int | float, - tags: Tags | None = None, - unit: str | None = None, - sample_rate: float | None = None, - ) -> None: - kwargs: dict = {"tags": _convert_tags(tags)} - if sample_rate is not None: - kwargs["sample_rate"] = sample_rate - self._dogstatsd.distribution(name, value, **kwargs) - - def gauge( - self, - name: str, - value: int | float, - tags: Tags | None = None, - sample_rate: float | None = None, - ) -> None: - kwargs: dict = {"tags": _convert_tags(tags)} - if sample_rate is not None: - kwargs["sample_rate"] = sample_rate - self._dogstatsd.gauge(name, value, **kwargs) - - @contextmanager - def timer( - self, - key: str, - tags: Tags | None = None, - sample_rate: float | None = None, - stacklevel: int = 0, - ) -> Generator[None]: - start = time.monotonic() - try: - yield - finally: - duration_ms = (time.monotonic() - start) * 1000 - self.distribution(key, duration_ms, tags=tags, unit="millisecond", sample_rate=sample_rate) - - @contextmanager - def track_memory_usage( - self, - key: str, - tags: Tags | None = None, - ) -> Generator[None]: - before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - try: - yield - finally: - after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - self.distribution(key, (after - before) * _RUSAGE_TO_BYTES, tags=tags, unit="byte") - class CustomRouter(TaskRouter): def route_namespace(self, name: str) -> str: @@ -107,11 +21,28 @@ def producer_factory(topic: str) -> KafkaProducer: return KafkaProducer(config) +def create_metrics() -> MetricsBackend: + host = os.getenv("STATSD_HOST", "127.0.0.1") + port_str = os.getenv("STATSD_PORT", "8125") + try: + port = int(port_str) + except ValueError: + raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}") + + return DatadogMetrics( + application="launchpad", + processing_pool="launchpad", + statsd_host=host, + statsd_port=port, + enable_prefixed_metrics=True, + ) + + app = TaskbrokerApp( name="launchpad", producer_factory=producer_factory, router_class=CustomRouter(), - metrics_class=TaskworkerMetricsBackend(), + metrics_class=create_metrics(), ) app.set_config( diff --git a/uv.lock b/uv.lock index aeb6e799..37363b73 100644 --- a/uv.lock +++ b/uv.lock @@ -856,7 +856,7 @@ requires-dist = [ { name = "sentry-arroyo", specifier = ">=2.38.7" }, { name = "sentry-sdk", specifier = ">=2.36.0" }, { name = "sortedcontainers", specifier = ">=2.4.0" }, - { name = "taskbroker-client", specifier = ">=0.20.0,<1" }, + { name = "taskbroker-client", specifier = ">=0.20.1,<1" }, { name = "typing-extensions", specifier = ">=4.15.0" }, { name = "zipfile-zstd", specifier = "==0.0.4" }, ] @@ -1944,7 +1944,7 @@ wheels = [ [[package]] name = "taskbroker-client" -version = "0.20.0" +version = "0.20.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "confluent-kafka" }, @@ -1960,9 +1960,9 @@ dependencies = [ { name = "sentry-sdk", extra = ["http2"] }, { name = "zstandard" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/0f/66/d54064c888d14680a037c982aaa2357241c933b565e6c0a6311f9fef5904/taskbroker_client-0.20.0.tar.gz", hash = "sha256:2c98838536974599f2b3d502e00a4d650c82da658fb4b8ede0f57aaf001c3a97", size = 38462, upload-time = "2026-06-19T18:48:03.639Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f1/d0/f431bddc5439bc4380f974a0fb6bb48039ef9de2db9ebd263d27a21aee00/taskbroker_client-0.20.1.tar.gz", hash = "sha256:72c96ced88d699bf110f5fa91483a55a2facd5862a1b889036e4a6683a222504", size = 38598, upload-time = "2026-06-22T16:30:05.21Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/4b/45/0cc0a6e21a11e9b5e154c734fcd06ee0cc1d3aa02b14072d62e8c125a90f/taskbroker_client-0.20.0-py3-none-any.whl", hash = "sha256:0c7af97508162af86f3d925ded28101c4a4266437f382a5d8212cdb889cfc9fa", size = 49395, upload-time = "2026-06-19T18:48:02.48Z" }, + { url = "https://files.pythonhosted.org/packages/05/54/bfa8cb55b7befbae37de85d1895b11583cb1e967966c0bee9c69f0a6a596/taskbroker_client-0.20.1-py3-none-any.whl", hash = "sha256:56c9de6d8d2f2a483516cebe27c27704a4613c96335b124ecdbf485c363a77d8", size = 49540, upload-time = "2026-06-22T16:30:04.147Z" }, ] [[package]]