From 9a8acfe7e3b04d62d8a060ff4a719b7201f47e42 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 18 May 2026 09:48:52 +0800 Subject: [PATCH] Fix consumer reconnect state after subscribe failure --- lib/ConsumerImpl.cc | 3 ++- tests/ConsumerTest.cc | 36 ++++++++++++++++++++++++++++++++++++ tests/PulsarFriend.h | 5 +++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index d2684845..85f49946 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -371,8 +371,9 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result } if (consumerCreatedPromise_.isComplete()) { - // Consumer had already been initially created, we need to retry connecting in any case + // Clear the connection set before SUBSCRIBE so the next reconnect is not skipped. LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result)); + resetCnx(); handleResult = ResultRetryable; } else { // Consumer was not yet created, retry to connect to broker if it's possible diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 795613e0..5de5c755 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1596,4 +1596,40 @@ TEST(ConsumerTest, testCloseAfterSeek) { anotherClient.close(); } +TEST(ConsumerTest, testIsConnectedFalsePositiveAfterSubscribeRejectedOnReconnect) { + // A reconnect SUBSCRIBE failure happens after the initial subscribe has already completed. + const std::string topic = + "persistent://public/default/test-false-positive-" + std::to_string(time(nullptr)); + Client client(lookupUrl); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + ASSERT_TRUE(consumer.isConnected()) << "Precondition: consumer should be connected"; + + auto& consumerImpl = PulsarFriend::getConsumerImpl(consumer); + + // Capture the current live connection. + auto cnx = consumerImpl.getCnx().lock(); + ASSERT_TRUE(cnx != nullptr) << "Precondition: cnx should be non-null"; + LOG_INFO("Step 1 passed: consumer subscribed, cnx=" << cnx); + + // Simulate the broker rejecting the SUBSCRIBE command during reconnect. + Result handleResult = PulsarFriend::consumerHandleCreateConsumer(consumer, cnx, ResultAuthorizationError); + LOG_INFO("Step 2: handleCreateConsumer returned " << handleResult); + EXPECT_EQ(ResultRetryable, handleResult) + << "handleCreateConsumer should return ResultRetryable for an already-created consumer"; + + // The failed SUBSCRIBE must clear the connection set before SUBSCRIBE. + auto cnxAfter = consumerImpl.getCnx().lock(); + LOG_INFO("Step 3: cnx after handleCreateConsumer failure = " << cnxAfter); + LOG_INFO("Step 3: isConnected() = " << consumer.isConnected()); + + EXPECT_EQ(nullptr, cnxAfter) + << "After fix: connection_ must be cleared by resetCnx() so grabCnx() can retry"; + EXPECT_FALSE(consumer.isConnected()) + << "After fix: isConnected() must return false after SUBSCRIBE rejection"; + + consumer.close(); + client.close(); +} + } // namespace pulsar diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 8017e0ba..02447437 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -102,6 +102,11 @@ class PulsarFriend { return *consumerImpl; } + static Result consumerHandleCreateConsumer(Consumer consumer, const ClientConnectionPtr& cnx, + Result result) { + return getConsumerImpl(consumer).handleCreateConsumer(cnx, result); + } + static std::shared_ptr getConsumerImplPtr(Consumer consumer) { return std::static_pointer_cast(consumer.impl_); }