When Fetcher._proc_fetch_request receives NotLeaderForPartitionError or UnknownTopicOrPartitionError in the per-partition response loop, it calls self._client.force_metadata_update() but does not apply any backoff/sleep before the next fetch cycle.
Normally Kafka broker waits up to fetch.max.wait.ms (default 500ms) before responding to a fetch request. However, when a partition has no leader (e.g. during broker restart or partition reassignment), the broker returns NotLeaderForPartitionError immediately without entering the wait phase. The response time drops to just the network round-trip (~10-20ms).
Without a backoff on the client side, the fetch loop spins as fast as the network allows, effectively DDoS-ing the Kafka cluster with fetch requests. This causes memory usage to spike in the consumer process.
Note that the general KafkaError exception handler at the top of the method (added in #534) does include await asyncio.sleep(self._retry_backoff), but the per-partition error handling branch for NotLeaderForPartitionError inside the response processing loop does not.
Expected behaviour
After receiving NotLeaderForPartitionError or UnknownTopicOrPartitionError, the fetcher should apply self._retry_backoff sleep (same as it does for general KafkaError), giving the cluster time to elect new partition lleaders without being overwhelmed by requests.
elif error_type in (
Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError,
):
self._client.force_metadata_update()
await asyncio.sleep(self._retry_backoff) # missing backoff
When
Fetcher._proc_fetch_requestreceives NotLeaderForPartitionError or UnknownTopicOrPartitionError in the per-partition response loop, it callsself._client.force_metadata_update()but does not apply any backoff/sleep before the next fetch cycle.Normally Kafka broker waits up to fetch.max.wait.ms (default 500ms) before responding to a fetch request. However, when a partition has no leader (e.g. during broker restart or partition reassignment), the broker returns
NotLeaderForPartitionErrorimmediately without entering the wait phase. The response time drops to just the network round-trip (~10-20ms).Without a backoff on the client side, the fetch loop spins as fast as the network allows, effectively DDoS-ing the Kafka cluster with fetch requests. This causes memory usage to spike in the consumer process.
Note that the general KafkaError exception handler at the top of the method (added in #534) does include
await asyncio.sleep(self._retry_backoff), but the per-partition error handling branch for NotLeaderForPartitionError inside the response processing loop does not.Expected behaviour
After receiving NotLeaderForPartitionError or UnknownTopicOrPartitionError, the fetcher should apply
self._retry_backoffsleep (same as it does for general KafkaError), giving the cluster time to elect new partition lleaders without being overwhelmed by requests.