Skip to content

fix: make AIOKafkaProducer.start() idempotent to prevent busy loop#1143

Open
gloryfromca wants to merge 1 commit intoaio-libs:masterfrom
gloryfromca:bug_start_multiple_times
Open

fix: make AIOKafkaProducer.start() idempotent to prevent busy loop#1143
gloryfromca wants to merge 1 commit intoaio-libs:masterfrom
gloryfromca:bug_start_multiple_times

Conversation

@gloryfromca
Copy link
Copy Markdown

When start() is called multiple times on the same producer instance (e.g., by multiple listeners sharing the same producer), each call would create a new sender task. Multiple sender tasks sharing the same message accumulator causes them to wake each other up via drain_by_nodes(), resulting in an infinite busy loop.

Changes:

  • Add _started flag to track producer startup state
  • Return early if start() is called on an already-started producer
  • Reset _started flag on startup failure to allow retry

This ensures only one sender task is created per producer instance.

Changes

Fixes #

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • Add a new news fragment into the CHANGES folder
    • name it <issue_id>.<type> (e.g. 588.bugfix)
    • if you don't have an issue_id change it to the pr id after creating the PR
    • ensure type is one of the following:
      • .feature: Signifying a new feature.
      • .bugfix: Signifying a bug fix.
      • .doc: Signifying a documentation improvement.
      • .removal: Signifying a deprecation or removal of public API.
      • .misc: A ticket has been closed, but it is not of interest to users.
    • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

When start() is called multiple times on the same producer instance
(e.g., by multiple listeners sharing the same producer), each call
would create a new sender task. Multiple sender tasks sharing the
same message accumulator causes them to wake each other up via
drain_by_nodes(), resulting in an infinite busy loop.

Changes:
- Add _started flag to track producer startup state
- Return early if start() is called on an already-started producer
- Reset _started flag on startup failure to allow retry

This ensures only one sender task is created per producer instance.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant