Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
}

if (consumerCreatedPromise_.isComplete()) {
// Consumer had already been initially created, we need to retry connecting in any case
// Clear the connection set before SUBSCRIBE so the next reconnect is not skipped.
LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
resetCnx();
Comment thread
shibd marked this conversation as resolved.
handleResult = ResultRetryable;
} else {
// Consumer was not yet created, retry to connect to broker if it's possible
Expand Down
36 changes: 36 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1596,4 +1596,40 @@ TEST(ConsumerTest, testCloseAfterSeek) {
anotherClient.close();
}

TEST(ConsumerTest, testIsConnectedFalsePositiveAfterSubscribeRejectedOnReconnect) {
// A reconnect SUBSCRIBE failure happens after the initial subscribe has already completed.
const std::string topic =
"persistent://public/default/test-false-positive-" + std::to_string(time(nullptr));
Client client(lookupUrl);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
ASSERT_TRUE(consumer.isConnected()) << "Precondition: consumer should be connected";

auto& consumerImpl = PulsarFriend::getConsumerImpl(consumer);

// Capture the current live connection.
auto cnx = consumerImpl.getCnx().lock();
ASSERT_TRUE(cnx != nullptr) << "Precondition: cnx should be non-null";
LOG_INFO("Step 1 passed: consumer subscribed, cnx=" << cnx);

// Simulate the broker rejecting the SUBSCRIBE command during reconnect.
Result handleResult = PulsarFriend::consumerHandleCreateConsumer(consumer, cnx, ResultAuthorizationError);
LOG_INFO("Step 2: handleCreateConsumer returned " << handleResult);
EXPECT_EQ(ResultRetryable, handleResult)
<< "handleCreateConsumer should return ResultRetryable for an already-created consumer";

// The failed SUBSCRIBE must clear the connection set before SUBSCRIBE.
auto cnxAfter = consumerImpl.getCnx().lock();
LOG_INFO("Step 3: cnx after handleCreateConsumer failure = " << cnxAfter);
LOG_INFO("Step 3: isConnected() = " << consumer.isConnected());

EXPECT_EQ(nullptr, cnxAfter)
<< "After fix: connection_ must be cleared by resetCnx() so grabCnx() can retry";
EXPECT_FALSE(consumer.isConnected())
<< "After fix: isConnected() must return false after SUBSCRIBE rejection";

consumer.close();
client.close();
}

} // namespace pulsar
5 changes: 5 additions & 0 deletions tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class PulsarFriend {
return *consumerImpl;
}

static Result consumerHandleCreateConsumer(Consumer consumer, const ClientConnectionPtr& cnx,
Result result) {
return getConsumerImpl(consumer).handleCreateConsumer(cnx, result);
}

static std::shared_ptr<ConsumerImpl> getConsumerImplPtr(Consumer consumer) {
return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
}
Expand Down
Loading