Skip to content

[fix][cpp] Clear stale consumer connection after reconnect subscribe failure#577

Merged
shibd merged 1 commit into
apache:mainfrom
shibd:codex/fix-consumer-reconnect-state
May 18, 2026
Merged

[fix][cpp] Clear stale consumer connection after reconnect subscribe failure#577
shibd merged 1 commit into
apache:mainfrom
shibd:codex/fix-consumer-reconnect-state

Conversation

@shibd
Copy link
Copy Markdown
Member

@shibd shibd commented May 18, 2026

Motivation

A consumer can be created successfully and consume messages for some time. Later, after a broker restart, namespace bundle unload, or topic ownership change, the client reconnects and sends a new broker-side SUBSCRIBE command for the existing consumer.

Before this fix, if that reconnect SUBSCRIBE failed, the local consumer could get stuck with isConnected() == true even though the broker no longer had an active consumer for the subscription.

Failure path

Initial subscribe succeeds; consumer is running
             |
             v
Broker restart / topic unload / ownership change
             |
             v
Old connection is closed or broker sends CLOSE_CONSUMER
             |
             v
HandlerBase::handleDisconnection() or ConsumerImpl::disconnectConsumer()
    -> resetCnx()
    -> scheduleReconnection()
             |
             v
lookup + TCP connect succeeds
             |
             v
ConsumerImpl::connectionOpened(cnx)
    -> setCnx(cnx)                 <-- connection_ is set before SUBSCRIBE
    -> cnx->registerConsumer()
    -> sendRequestWithId(SUBSCRIBE)
             |
             v
broker rejects SUBSCRIBE, for example:
    "Client is not authorized to subscribe"
    ResultUnknownError or ResultAuthorizationError
             |
             v
ConsumerImpl::handleCreateConsumer(cnx, result)
    consumerCreatedPromise_.isComplete() == true
    -> handleResult = ResultRetryable
    -> previously: connection_ was not cleared
             |
             v
HandlerBase connectionOpened listener
    isResultRetryable(ResultRetryable) == true
    -> scheduleReconnection()      <-- appears to retry
             |
             v
timer fires; HandlerBase::grabCnx()
    getCnx().lock() != nullptr     <-- stale connection_ is still set
    -> "Ignoring reconnection request since we're already connected"
    -> reconnectionPending_ = false
    -> no further reconnect attempt
             |
             v
ConsumerImpl::isConnected()
    = !getCnx().expired() && state_ == Ready
    = true && true
    = true                         <-- false positive

The key issue is that connectionOpened() sets connection_ before the broker confirms the SUBSCRIBE. If the SUBSCRIBE fails for an already-created consumer, the reconnect path returns ResultRetryable, but the stale connection_ must also be cleared. Otherwise the next reconnect attempt is skipped by grabCnx() because it sees a non-null connection.

Changes

  • Clear the stale consumer connection when an already-created consumer fails to re-subscribe during reconnect.
  • Add a regression test that simulates a reconnect SUBSCRIBE rejection and verifies the stale connection is cleared and isConnected() returns false.
  • Expose the internal handleCreateConsumer() path through PulsarFriend for the regression test.

Validation

  • ./build-support/docker-format.sh
  • Attempted ./build/tests/pulsar-tests --gtest_filter=ConsumerTest.testIsConnectedFalsePositiveAfterSubscribeRejectedOnReconnect, but the local Pulsar service was not running on localhost:6650, so the test timed out during initial subscribe before reaching the regression path.

@shibd shibd self-assigned this May 18, 2026
@shibd shibd requested review from BewareMyPower and Copilot May 18, 2026 01:50
@shibd shibd marked this pull request as ready for review May 18, 2026 01:50
@shibd shibd added the bug Something isn't working label May 18, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a bug where a C++ Pulsar consumer could end up permanently disconnected (while isConnected() falsely returns true) after a reconnect-time SUBSCRIBE rejection (e.g. authorization error). Previously handleCreateConsumer() returned ResultRetryable but left the stale connection_ set, causing the subsequent grabCnx() to short-circuit. The fix calls resetCnx() on the retryable-after-creation branch so reconnect can proceed.

Changes:

  • Call resetCnx() in ConsumerImpl::handleCreateConsumer() when a reconnect SUBSCRIBE fails after the consumer was already initially created.
  • Add a regression test in ConsumerTest.cc simulating a reconnect SUBSCRIBE rejection.
  • Expose handleCreateConsumer() to tests via PulsarFriend::consumerHandleCreateConsumer().

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
lib/ConsumerImpl.cc Adds resetCnx() call so a failed reconnect SUBSCRIBE clears the stale connection, allowing the reconnect timer to retry.
tests/PulsarFriend.h Adds a static helper that exposes ConsumerImpl::handleCreateConsumer() for testing.
tests/ConsumerTest.cc Adds regression test verifying connection_ is cleared and isConnected() returns false after reconnect SUBSCRIBE rejection.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread lib/ConsumerImpl.cc
@shibd shibd merged commit 7d1002a into apache:main May 18, 2026
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants