buffer task-started events that arrive before task-sent#31
Conversation
Current Aviator status
This PR was merged using Aviator.
See the real-time status of this PR on the
Aviator webapp.
Use the Aviator Chrome Extension
to see the status of your PR within GitHub.
|
There was a problem hiding this comment.
Code Review
This pull request introduces a buffer to handle out-of-order Celery events where a 'task-started' event arrives before its corresponding 'task-sent' event. By storing these 'orphan' started events in an OrderedDict, the system can correctly calculate queue wait times when the 'task-sent' event eventually arrives, preventing potential memory leaks in the in-flight cache. Feedback suggests increasing the TTL for these orphan events to 60 seconds to better accommodate clock skew between workers and the monitor. Additionally, it is recommended to implement thread synchronization (locks) when accessing shared state like the orphan buffer and in-flight cache to avoid race conditions between the event receiver and the pruning threads.
|
|
||
| _WORKER_HEARTBEAT_TTL_SEC = 120 | ||
| _PRUNE_INTERVAL_SEC = 30 | ||
| _ORPHAN_STARTED_TTL_SEC = 10 |
There was a problem hiding this comment.
A 10-second TTL might be too aggressive given that the pruning logic in _prune compares the worker-generated event timestamp with the monitor's local time. If a worker's clock is behind the monitor's clock by more than 10 seconds, its task-started events will be pruned immediately upon the next _prune cycle, defeating the purpose of the buffer. Since the cache size is already capped at 10,000 items (which has a negligible memory footprint), consider increasing this TTL to 60 or 120 seconds to better account for clock skew and broker latency.
| _ORPHAN_STARTED_TTL_SEC = 10 | |
| _ORPHAN_STARTED_TTL_SEC = 60 |
| while len(self._orphan_started) > _ORPHAN_STARTED_CACHE_SIZE: | ||
| self._orphan_started.popitem(last=False) |
There was a problem hiding this comment.
While OrderedDict is generally thread-safe for single operations in CPython, this compound check-and-pop logic is not atomic. Since _record_task_started is called from the event receiver thread and _prune runs in a separate timer thread, there is a small race condition where the size could change between the len() check and popitem(). Given the existing patterns in this class, this might be acceptable, but for better robustness, consider wrapping accesses to shared state like _orphan_started and _in_flight with a threading.Lock.
celery's task-sent and task-started ride independent broker connections with no ordering guarantee, so a fast pickup can flip their order. those tasks were leaking to the in-flight TTL and silently dropping queue_wait samples.