From 8495b4d54d8c21045b21b02a36bf8e2477a05d3c Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 19 Jun 2026 15:50:02 -0400 Subject: [PATCH 1/3] chore(tasks) Switch to taskbroker-client metrics I'm reworking client metrics for taskbroker clients and this aligns launchpad with that direction. The existing metric names will continue to emit alongside an unprefixed metric. The unprefixed metric will have both `application` and `processing_pool` as tags. Refs STREAM-1173 --- pyproject.toml | 2 +- src/launchpad/worker/app.py | 106 +++++++----------------------------- uv.lock | 14 ++--- 3 files changed, 27 insertions(+), 95 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bea0db43..edd66eea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ dependencies = [ "sentry-arroyo>=2.38.7", "sentry-sdk>=2.36.0", "sortedcontainers>=2.4.0", - "taskbroker-client>=0.1.9,<1", + "taskbroker-client>=0.19.3,<1", "typing-extensions>=4.15.0", "zipfile-zstd==0.0.4", ] diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py index c46c07ea..5552cd15 100644 --- a/src/launchpad/worker/app.py +++ b/src/launchpad/worker/app.py @@ -1,95 +1,10 @@ import os -import platform -import resource -import time - -from collections.abc import Generator -from contextlib import contextmanager from arroyo.backends.kafka import KafkaProducer from taskbroker_client.app import TaskbrokerApp -from taskbroker_client.metrics import MetricsBackend, Tags +from taskbroker_client.metrics import MetricsBackend, DatadogMetrics from taskbroker_client.router import TaskRouter -from launchpad.utils.statsd import create_dogstatsd_client - -_RUSAGE_TO_BYTES = 1 if platform.system() == "Darwin" else 1024 - - -def _convert_tags(tags: Tags | None) -> list[str] | None: - if tags is None: - return None - return [f"{k}:{v}" for k, v in tags.items()] - - -class TaskworkerMetricsBackend(MetricsBackend): - def __init__(self) -> None: - self._dogstatsd = create_dogstatsd_client("launchpad") - - def incr( - self, - name: str, - value: int | float = 1, - tags: Tags | None = None, - sample_rate: float | None = None, - ) -> None: - kwargs: dict = {"tags": _convert_tags(tags)} - if sample_rate is not None: - kwargs["sample_rate"] = sample_rate - self._dogstatsd.increment(name, int(value), **kwargs) - - def distribution( - self, - name: str, - value: int | float, - tags: Tags | None = None, - unit: str | None = None, - sample_rate: float | None = None, - ) -> None: - kwargs: dict = {"tags": _convert_tags(tags)} - if sample_rate is not None: - kwargs["sample_rate"] = sample_rate - self._dogstatsd.distribution(name, value, **kwargs) - - def gauge( - self, - name: str, - value: int | float, - tags: Tags | None = None, - sample_rate: float | None = None, - ) -> None: - kwargs: dict = {"tags": _convert_tags(tags)} - if sample_rate is not None: - kwargs["sample_rate"] = sample_rate - self._dogstatsd.gauge(name, value, **kwargs) - - @contextmanager - def timer( - self, - key: str, - tags: Tags | None = None, - sample_rate: float | None = None, - stacklevel: int = 0, - ) -> Generator[None]: - start = time.monotonic() - try: - yield - finally: - duration_ms = (time.monotonic() - start) * 1000 - self.distribution(key, duration_ms, tags=tags, unit="millisecond", sample_rate=sample_rate) - - @contextmanager - def track_memory_usage( - self, - key: str, - tags: Tags | None = None, - ) -> Generator[None]: - before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - try: - yield - finally: - after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - self.distribution(key, (after - before) * _RUSAGE_TO_BYTES, tags=tags, unit="byte") class CustomRouter(TaskRouter): @@ -107,11 +22,28 @@ def producer_factory(topic: str) -> KafkaProducer: return KafkaProducer(config) +def create_metrics() -> MetricsBackend: + host = os.getenv("STATSD_HOST", "127.0.0.1") + port_str = os.getenv("STATSD_PORT", "8125") + try: + port = int(port_str) + except ValueError: + raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}") + + return DatadogMetrics( + application="launchpad", + processing_pool="launchpad", + statsd_host=host, + statsd_port=port, + enable_prefixed_metrics=True, + ) + + app = TaskbrokerApp( name="launchpad", producer_factory=producer_factory, router_class=CustomRouter(), - metrics_class=TaskworkerMetricsBackend(), + metrics_class=create_metrics(), ) app.set_config( diff --git a/uv.lock b/uv.lock index 55e481b8..b63a3462 100644 --- a/uv.lock +++ b/uv.lock @@ -856,7 +856,7 @@ requires-dist = [ { name = "sentry-arroyo", specifier = ">=2.38.7" }, { name = "sentry-sdk", specifier = ">=2.36.0" }, { name = "sortedcontainers", specifier = ">=2.4.0" }, - { name = "taskbroker-client", specifier = ">=0.1.9,<1" }, + { name = "taskbroker-client", specifier = ">=0.19.3,<1" }, { name = "typing-extensions", specifier = ">=4.15.0" }, { name = "zipfile-zstd", specifier = "==0.0.4" }, ] @@ -1877,16 +1877,16 @@ wheels = [ [[package]] name = "sentry-protos" -version = "0.22.1" +version = "0.32.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "grpc-stubs" }, { name = "grpcio" }, { name = "protobuf" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/10/6c/9345de057b9c2b7eaae2ebc28f3234df9560f0151c6e7204508d1d12566b/sentry_protos-0.22.1.tar.gz", hash = "sha256:17821439e47d22df493d6350084d0c9fc6966aaa7f9d419261597f4fe66ebf4c", size = 135668, upload-time = "2026-06-03T23:29:05.479Z" } +sdist = { url = "https://files.pythonhosted.org/packages/54/69/d0c32256fe22470eb2e09d1136b97f452e1c9d50c790680807e881e72e94/sentry_protos-0.32.2.tar.gz", hash = "sha256:e76f89de38485b81052f1cb5049c428cb300faf11f56670c3fbf6e0301804c65", size = 151052, upload-time = "2026-06-18T20:44:10.344Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/85/99/edf6f7e43a73e65ca5144c957f4790a346ef6f798c53463436d3e20a5c65/sentry_protos-0.22.1-py3-none-any.whl", hash = "sha256:c6bb445ab288d9309b1e5e57c7a43b545eb5aa0c805dff83b8d8d9a18c001531", size = 352722, upload-time = "2026-06-03T23:29:04.204Z" }, + { url = "https://files.pythonhosted.org/packages/bf/89/7f2d40aa4ef6300d03f5cf550635364500f2bf2ea3cc86400deefd1dc1f0/sentry_protos-0.32.2-py3-none-any.whl", hash = "sha256:4ae5023f27ea05b21b6a8fb5c7ef724921fd209f81c5d64a11ef346077274b66", size = 402246, upload-time = "2026-06-18T20:44:08.987Z" }, ] [[package]] @@ -1967,7 +1967,7 @@ wheels = [ [[package]] name = "taskbroker-client" -version = "0.18.2" +version = "0.19.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "confluent-kafka" }, @@ -1984,9 +1984,9 @@ dependencies = [ { name = "sentry-sdk", extra = ["http2"] }, { name = "zstandard" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/0e/4d/8feea0a9ee72e492f63aaaee480ab2fb6cf1ce1afcabe41c86fa0dc7e956/taskbroker_client-0.18.2.tar.gz", hash = "sha256:8679fc0832d2a6ae085086f247249266bbea1a255a9fc5783c1d979d5be98cef", size = 35153, upload-time = "2026-06-04T20:34:55.437Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f2/04/6cb05381207feb41f5257c3964feb939b23308e9d08133fe15c39ccc9b5d/taskbroker_client-0.19.3.tar.gz", hash = "sha256:8e3c8f159a832d919c6fc06c1363d37019a37fbf329f2414f120b246d8089ab5", size = 39250, upload-time = "2026-06-18T19:51:05.012Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/35/b7/6b38da76f535731d30dbe1b2dc48cbb379dc5ea7231c4a6084e91d30ba20/taskbroker_client-0.18.2-py3-none-any.whl", hash = "sha256:a968e1da5f778af5136894f3d67c1bc42e349b9197852d50d611df19739675c0", size = 44913, upload-time = "2026-06-04T20:34:54.544Z" }, + { url = "https://files.pythonhosted.org/packages/35/d0/2ca1e4d268f9876514e06979dffc92f61a9c8d93f6bf3c8c1ebb5877e2d3/taskbroker_client-0.19.3-py3-none-any.whl", hash = "sha256:4359b5906902e254d8c3efa1492dc249d4ed0678146e87b14c82769742da433f", size = 50155, upload-time = "2026-06-18T19:51:03.943Z" }, ] [[package]] From c9b8fbc2a95d1e62ca2804918e01e3451509508f Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 19 Jun 2026 16:39:19 -0400 Subject: [PATCH 2/3] Fix lint --- src/launchpad/worker/app.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py index 5552cd15..e34aca90 100644 --- a/src/launchpad/worker/app.py +++ b/src/launchpad/worker/app.py @@ -2,11 +2,10 @@ from arroyo.backends.kafka import KafkaProducer from taskbroker_client.app import TaskbrokerApp -from taskbroker_client.metrics import MetricsBackend, DatadogMetrics +from taskbroker_client.metrics import DatadogMetrics, MetricsBackend from taskbroker_client.router import TaskRouter - class CustomRouter(TaskRouter): def route_namespace(self, name: str) -> str: return os.getenv("TASKWORKER_TOPIC", "taskworker") From b053a1b571172d32cb5a98c4912c1a4f342fa87d Mon Sep 17 00:00:00 2001 From: Mark Story Date: Mon, 22 Jun 2026 13:44:48 -0400 Subject: [PATCH 3/3] Update taskbroker-client version --- pyproject.toml | 2 +- uv.lock | 32 ++++---------------------------- 2 files changed, 5 insertions(+), 29 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index edd66eea..e202fb59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ dependencies = [ "sentry-arroyo>=2.38.7", "sentry-sdk>=2.36.0", "sortedcontainers>=2.4.0", - "taskbroker-client>=0.19.3,<1", + "taskbroker-client>=0.20.1,<1", "typing-extensions>=4.15.0", "zipfile-zstd==0.0.4", ] diff --git a/uv.lock b/uv.lock index b63a3462..37363b73 100644 --- a/uv.lock +++ b/uv.lock @@ -856,7 +856,7 @@ requires-dist = [ { name = "sentry-arroyo", specifier = ">=2.38.7" }, { name = "sentry-sdk", specifier = ">=2.36.0" }, { name = "sortedcontainers", specifier = ">=2.4.0" }, - { name = "taskbroker-client", specifier = ">=0.19.3,<1" }, + { name = "taskbroker-client", specifier = ">=0.20.1,<1" }, { name = "typing-extensions", specifier = ">=4.15.0" }, { name = "zipfile-zstd", specifier = "==0.0.4" }, ] @@ -1149,29 +1149,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8c/0c/e71a00f2152e0400d3ac0a764b51dc16818e1e07e03e30a173bf7582af36/objectstore_client-0.1.10-py3-none-any.whl", hash = "sha256:01235fdaab87beb27b23fb016b51571a4f01ecc46dcf5669395612aa521f4485", size = 21824, upload-time = "2026-06-04T21:31:38.826Z" }, ] -[[package]] -name = "orjson" -version = "3.11.9" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/7e/0c/964746fcafbd16f8ff53219ad9f6b412b34f345c75f384ad434ceaadb538/orjson-3.11.9.tar.gz", hash = "sha256:4fef17e1f8722c11587a6ef18e35902450221da0028e65dbaaa543619e68e48f", size = 5599163, upload-time = "2026-05-06T15:11:08.309Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/8e/eb/5da01e356015aee6ecfa1187ced87aef51364e306f5e695dd52719bf0e78/orjson-3.11.9-cp314-cp314-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:b6ef1979adc4bc243523f1a2ba91418030a8e29b0a99cbe7e0e2d6807d4dce6e", size = 228465, upload-time = "2026-05-06T15:10:44.097Z" }, - { url = "https://files.pythonhosted.org/packages/64/62/3e0e0c14c957133bcd855395c62b55ed4e3b0af23ffea11b032cb1dcbdb1/orjson-3.11.9-cp314-cp314-macosx_15_0_arm64.whl", hash = "sha256:f36b7f32c7c0db4a719f1fc5824db4a9c6f8bd1a354debb91faf26ebf3a4c71e", size = 128364, upload-time = "2026-05-06T15:10:45.839Z" }, - { url = "https://files.pythonhosted.org/packages/5a/5a/07d8aa117211a8ed7630bda80c8c0b14d04e0f8dcf99bcf49656e4a710eb/orjson-3.11.9-cp314-cp314-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:08f4d8ebb44925c794e535b2bebc507cebf32209df81de22ae285fb0d8d66de0", size = 132063, upload-time = "2026-05-06T15:10:47.267Z" }, - { url = "https://files.pythonhosted.org/packages/d6/ec/4acaf21483e18aa945be74a474c74b434f284b549f275a0a39b9f98956e9/orjson-3.11.9-cp314-cp314-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:6cc7923789694fd58f001cbcac7e47abc13af4d560ebbfcf3b41a8b1a0748124", size = 122356, upload-time = "2026-05-06T15:10:48.765Z" }, - { url = "https://files.pythonhosted.org/packages/13/d8/5f0555e7638801323b7a75850f92e7dfa891bc84fe27a1ba4449170d1200/orjson-3.11.9-cp314-cp314-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ea5c46eb2d3af39e806b986f4b09d5c2706a1f5afde3cbf7544ce6616127173c", size = 129592, upload-time = "2026-05-06T15:10:50.13Z" }, - { url = "https://files.pythonhosted.org/packages/b6/30/ed9860412a3603ceb3c5955bfd72d28b9d0e7ba6ed81add14f83d7114236/orjson-3.11.9-cp314-cp314-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f5d89a2ed90731df3be64bab0aa44f78bff39fdc9d71c291f4a8023aa46425b7", size = 140491, upload-time = "2026-05-06T15:10:51.582Z" }, - { url = "https://files.pythonhosted.org/packages/d0/17/adc514dea7ac7c505527febf884934b815d34f0c7b8693c1a8b39c5c4a57/orjson-3.11.9-cp314-cp314-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:25e4aed0312d292c09f61af25bba34e0b2c88546041472b09088c39a4d828af1", size = 127309, upload-time = "2026-05-06T15:10:53.329Z" }, - { url = "https://files.pythonhosted.org/packages/76/3e/c0b690253f0b82d86e99949af13533363acfb5432ecb5d53dd5b3bce9c34/orjson-3.11.9-cp314-cp314-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aaea64f3f467d22e70eeed68bdccb3bc4f83f650446c4a03c59f2cba28a108db", size = 134030, upload-time = "2026-05-06T15:10:54.988Z" }, - { url = "https://files.pythonhosted.org/packages/c1/7a/bc82a0bb25e9faaf92dc4d9ef002732efc09737706af83e346788641d4a7/orjson-3.11.9-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:a028425d1b440c5d92a6be1e1a020739dfe67ea87d96c6dbe828c1b30041728b", size = 141482, upload-time = "2026-05-06T15:10:56.663Z" }, - { url = "https://files.pythonhosted.org/packages/01/55/e69188b939f77d5d32a9833745ace31ea5ccae3ab613a1ec185d3cd2c4fb/orjson-3.11.9-cp314-cp314-musllinux_1_2_armv7l.whl", hash = "sha256:5b192c6cf397e4455b11523c5cf2b18ed084c1bbd61b6c0926344d2129481972", size = 415178, upload-time = "2026-05-06T15:10:58.446Z" }, - { url = "https://files.pythonhosted.org/packages/2e/1a/b8a5a7ac527e80b9cb11d51e3f6689b709279183264b9ec5c7bc680bb8b5/orjson-3.11.9-cp314-cp314-musllinux_1_2_i686.whl", hash = "sha256:ea407d4ccf5891d667d045fecae97a7a1e5e87b3b97f97ae1803c2e741130be0", size = 148089, upload-time = "2026-05-06T15:11:00.441Z" }, - { url = "https://files.pythonhosted.org/packages/97/4e/00503f64204bf859b37213a63927028f30fb6268cd8677fb0a5ad48155e1/orjson-3.11.9-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:5f63aaf97afd9f6dec5b1a68e1b8da12bfccb4cb9a9a65c3e0b6c847849e7586", size = 136921, upload-time = "2026-05-06T15:11:02.176Z" }, - { url = "https://files.pythonhosted.org/packages/0d/ba/a23b82a0a8d0ed7bed4e5f5035aae751cad4ff6a1e8d2ecd14d8860f5929/orjson-3.11.9-cp314-cp314-win32.whl", hash = "sha256:e30ab17845bb9fa54ccf67fa4f9f5282652d54faa6d17452f47d0f369d038673", size = 131638, upload-time = "2026-05-06T15:11:03.696Z" }, - { url = "https://files.pythonhosted.org/packages/f3/c3/0c6798456bade745c75c452342dabacce5798196483e77e643be1f53877d/orjson-3.11.9-cp314-cp314-win_amd64.whl", hash = "sha256:32ef5f4283a3be81913947d19608eacb7c6608026851123790cd9cc8982af34b", size = 127078, upload-time = "2026-05-06T15:11:05.123Z" }, - { url = "https://files.pythonhosted.org/packages/16/21/5a3f1e8913103b703a436a5664238e5b965ec392b555fe68943ea3691e6b/orjson-3.11.9-cp314-cp314-win_arm64.whl", hash = "sha256:eebdbdeef0094e4f5aefa20dcd4eb2368ab5e7a3b4edea27f1e7b2892e009cf9", size = 126687, upload-time = "2026-05-06T15:11:06.602Z" }, -] - [[package]] name = "packageurl-python" version = "0.17.6" @@ -1967,7 +1944,7 @@ wheels = [ [[package]] name = "taskbroker-client" -version = "0.19.3" +version = "0.20.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "confluent-kafka" }, @@ -1975,7 +1952,6 @@ dependencies = [ { name = "grpcio" }, { name = "grpcio-health-checking" }, { name = "msgpack" }, - { name = "orjson" }, { name = "protobuf" }, { name = "redis" }, { name = "redis-py-cluster" }, @@ -1984,9 +1960,9 @@ dependencies = [ { name = "sentry-sdk", extra = ["http2"] }, { name = "zstandard" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/f2/04/6cb05381207feb41f5257c3964feb939b23308e9d08133fe15c39ccc9b5d/taskbroker_client-0.19.3.tar.gz", hash = "sha256:8e3c8f159a832d919c6fc06c1363d37019a37fbf329f2414f120b246d8089ab5", size = 39250, upload-time = "2026-06-18T19:51:05.012Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f1/d0/f431bddc5439bc4380f974a0fb6bb48039ef9de2db9ebd263d27a21aee00/taskbroker_client-0.20.1.tar.gz", hash = "sha256:72c96ced88d699bf110f5fa91483a55a2facd5862a1b889036e4a6683a222504", size = 38598, upload-time = "2026-06-22T16:30:05.21Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/35/d0/2ca1e4d268f9876514e06979dffc92f61a9c8d93f6bf3c8c1ebb5877e2d3/taskbroker_client-0.19.3-py3-none-any.whl", hash = "sha256:4359b5906902e254d8c3efa1492dc249d4ed0678146e87b14c82769742da433f", size = 50155, upload-time = "2026-06-18T19:51:03.943Z" }, + { url = "https://files.pythonhosted.org/packages/05/54/bfa8cb55b7befbae37de85d1895b11583cb1e967966c0bee9c69f0a6a596/taskbroker_client-0.20.1-py3-none-any.whl", hash = "sha256:56c9de6d8d2f2a483516cebe27c27704a4613c96335b124ecdbf485c363a77d8", size = 49540, upload-time = "2026-06-22T16:30:04.147Z" }, ] [[package]]