From 1ae4386dc4110a3d64739314e487ba989efba75d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 17 Mar 2026 22:08:30 +0800 Subject: [PATCH 1/7] Fix connection leak by request timers not cancelled in time --- lib/ClientConnection.cc | 311 +++++++++++++++------------------------- lib/ClientConnection.h | 78 +++------- lib/ExecutorService.h | 9 ++ lib/PendingRequest.h | 77 ++++++++++ 4 files changed, 215 insertions(+), 260 deletions(-) create mode 100644 lib/PendingRequest.h diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index cc7e1f67..7ac6643d 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -200,7 +200,6 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: connectTimer_(executor_->createDeadlineTimer()), outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), keepAliveIntervalInSeconds_(clientConfiguration.getKeepAliveIntervalInSeconds()), - consumerStatsRequestTimer_(executor_->createDeadlineTimer()), maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()), clientVersion_(clientVersion), pool_(pool), @@ -336,49 +335,6 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC lock.unlock(); connectPromise_.setValue(shared_from_this()); - - if (serverProtocolVersion_ >= proto::v8) { - startConsumerStatsTimer(std::vector()); - } -} - -void ClientConnection::startConsumerStatsTimer(std::vector consumerStatsRequests) { - std::vector> consumerStatsPromises; - Lock lock(mutex_); - - for (int i = 0; i < consumerStatsRequests.size(); i++) { - PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find(consumerStatsRequests[i]); - if (it != pendingConsumerStatsMap_.end()) { - LOG_DEBUG(cnxString() << " removing request_id " << it->first - << " from the pendingConsumerStatsMap_"); - consumerStatsPromises.push_back(it->second); - pendingConsumerStatsMap_.erase(it); - } else { - LOG_DEBUG(cnxString() << "request_id " << it->first << " already fulfilled - not removing it"); - } - } - - consumerStatsRequests.clear(); - for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.begin(); - it != pendingConsumerStatsMap_.end(); ++it) { - consumerStatsRequests.push_back(it->first); - } - - // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero - // Check if we have a timer still before we set the request timer to pop again. - if (consumerStatsRequestTimer_) { - consumerStatsRequestTimer_->expires_after(operationsTimeout_); - consumerStatsRequestTimer_->async_wait( - [this, self{shared_from_this()}, consumerStatsRequests](const ASIO_ERROR& err) { - handleConsumerStatsTimeout(err, consumerStatsRequests); - }); - } - lock.unlock(); - // Complex logic since promises need to be fulfilled outside the lock - for (int i = 0; i < consumerStatsPromises.size(); i++) { - consumerStatsPromises[i].setFailed(ResultTimeout); - LOG_WARN(cnxString() << " Operation timedout, didn't get response from broker"); - } } /// The number of unacknowledged probes to send before considering the connection dead and notifying the @@ -996,21 +952,29 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { Future ClientConnection::newConsumerStats(uint64_t consumerId, uint64_t requestId) { Lock lock(mutex_); - Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << " Client is not connected to the broker"); - promise.setFailed(ResultNotConnected); - return promise.getFuture(); + auto request = + std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + request->fail(ResultNotConnected); + return request->getFuture(); } - pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise)); + + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "ConsumerStats request timeout to broker, req_id: " << requestId); + }); + pendingConsumerStatsMap_.emplace(requestId, request); + request->initialize(); lock.unlock(); + if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && mockServer_->sendRequest("CONSUMER_STATS", requestId)) { - return promise.getFuture(); + return request->getFuture(); } sendCommand(Commands::newConsumerStats(consumerId, requestId)); - return promise.getFuture(); + return request->getFuture(); } void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative, @@ -1029,8 +993,6 @@ void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType, const LookupDataResultPromisePtr& promise) { Lock lock(mutex_); - std::shared_ptr lookupDataResult; - lookupDataResult = std::make_shared(); if (isClosed()) { lock.unlock(); promise->setFailed(ResultNotConnected); @@ -1040,16 +1002,22 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co promise->setFailed(ResultTooManyLookupRequestException); return; } - LookupRequestData requestData; - requestData.promise = promise; - requestData.timer = executor_->createDeadlineTimer(); - requestData.timer->expires_after(operationsTimeout_); - requestData.timer->async_wait([this, self{shared_from_this()}, requestData](const ASIO_ERROR& ec) { - handleLookupTimeout(ec, requestData); + + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId, requestType]() { + LOG_WARN(cnxString << requestType << " request timeout to broker, req_id: " << requestId); + }); + request->getFuture().addListener([promise](Result result, const LookupDataResultPtr& lookupDataResult) { + if (result == ResultOk) { + promise->setValue(lookupDataResult); + } else { + promise->setFailed(result); + } }); - pendingLookupRequests_.insert(std::make_pair(requestId, requestData)); + pendingLookupRequests_.emplace(requestId, request); numOfPendingLookupRequest_++; + request->initialize(); lock.unlock(); LOG_DEBUG(cnxString() << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")"); sendCommand(cmd); @@ -1159,21 +1127,21 @@ Future ClientConnection::sendRequestWithId(const SharedBuf if (isClosed()) { lock.unlock(); - Promise promise; LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " << requestId << ") to a closed connection"); - promise.setFailed(ResultNotConnected); - return promise.getFuture(); + auto request = std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + request->fail(ResultNotConnected); + return request->getFuture(); } - PendingRequestData requestData; - requestData.timer = executor_->createDeadlineTimer(); - requestData.timer->expires_after(operationsTimeout_); - requestData.timer->async_wait([this, self{shared_from_this()}, requestData](const ASIO_ERROR& ec) { - handleRequestTimeout(ec, requestData); - }); - - pendingRequests_.insert(std::make_pair(requestId, requestData)); + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), + [cnxString = cnxString(), physicalAddress = physicalAddress_, requestId, requestType]() { + LOG_WARN(cnxString << "Network request timeout to broker, remote: " << physicalAddress + << ", req_id: " << requestId << ", request: " << requestType); + }); + pendingRequests_.emplace(requestId, request); + request->initialize(); lock.unlock(); LOG_DEBUG(cnxString() << "Inserted request " << requestType << " (req_id: " << requestId << ")"); @@ -1187,31 +1155,7 @@ Future ClientConnection::sendRequestWithId(const SharedBuf } else { sendCommand(cmd); } - return requestData.promise.getFuture(); -} - -void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec, - const PendingRequestData& pendingRequestData) { - if (!ec && !pendingRequestData.hasGotResponse->load()) { - LOG_WARN(cnxString() << "Network request timeout to broker, remote: " << physicalAddress_); - pendingRequestData.promise.setFailed(ResultTimeout); - } -} - -void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec, - const LookupRequestData& pendingRequestData) { - if (!ec) { - LOG_WARN(cnxString() << "Lookup request timeout to broker, remote: " << physicalAddress_); - pendingRequestData.promise->setFailed(ResultTimeout); - } -} - -void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec, - const ClientConnection::LastMessageIdRequestData& data) { - if (!ec) { - LOG_WARN(cnxString() << "GetLastMessageId request timeout to broker, remote: " << physicalAddress_); - data.promise->setFailed(ResultTimeout); - } + return request->getFuture(); } void ClientConnection::handleKeepAliveTimeout(const ASIO_ERROR& ec) { @@ -1240,15 +1184,6 @@ void ClientConnection::handleKeepAliveTimeout(const ASIO_ERROR& ec) { } } -void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, - const std::vector& consumerStatsRequests) { - if (ec) { - LOG_DEBUG(cnxString() << " Ignoring timer cancelled event, code[" << ec << "]"); - return; - } - startConsumerStatsTimer(consumerStatsRequests); -} - const std::future& ClientConnection::close(Result result, bool switchCluster) { Lock lock(mutex_); if (closeFuture_) { @@ -1286,11 +1221,6 @@ const std::future& ClientConnection::close(Result result, bool switchClust keepAliveTimer_.reset(); } - if (consumerStatsRequestTimer_) { - cancelTimer(*consumerStatsRequestTimer_); - consumerStatsRequestTimer_.reset(); - } - cancelTimer(*connectTimer_); lock.unlock(); int refCount = weak_from_this().use_count(); @@ -1344,25 +1274,25 @@ const std::future& ClientConnection::close(Result result, bool switchClust connectPromise_.setFailed(result); - // Fail all pending requests, all these type are map whose value type contains the Promise object + // Fail all pending requests after releasing the lock. for (auto& kv : pendingRequests) { - kv.second.fail(result); + kv.second->fail(result); } for (auto& kv : pendingLookupRequests) { - kv.second.fail(result); + kv.second->fail(result); } for (auto& kv : pendingConsumerStatsMap) { LOG_ERROR(cnxString() << " Closing Client Connection, please try again later"); - kv.second.setFailed(result); + kv.second->fail(result); } for (auto& kv : pendingGetLastMessageIdRequests) { - kv.second.fail(result); + kv.second->fail(result); } for (auto& kv : pendingGetNamespaceTopicsRequests) { - kv.second.setFailed(result); + kv.second->fail(result); } for (auto& kv : pendingGetSchemaRequests) { - kv.second.fail(result); + kv.second->fail(result); } return *closeFuture_; } @@ -1406,77 +1336,70 @@ Commands::ChecksumType ClientConnection::getChecksumType() const { Future ClientConnection::newGetLastMessageId(uint64_t consumerId, uint64_t requestId) { Lock lock(mutex_); - auto promise = std::make_shared(); if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << " Client is not connected to the broker"); + auto promise = std::make_shared(); promise->setFailed(ResultNotConnected); return promise->getFuture(); } - LastMessageIdRequestData requestData; - requestData.promise = promise; - requestData.timer = executor_->createDeadlineTimer(); - requestData.timer->expires_after(operationsTimeout_); - requestData.timer->async_wait([this, self{shared_from_this()}, requestData](const ASIO_ERROR& ec) { - handleGetLastMessageIdTimeout(ec, requestData); - }); - pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, requestData)); + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId); + }); + pendingGetLastMessageIdRequests_.emplace(requestId, request); + request->initialize(); lock.unlock(); sendCommand(Commands::newGetLastMessageId(consumerId, requestId)); - return promise->getFuture(); + return request->getFuture(); } Future ClientConnection::newGetTopicsOfNamespace( const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) { Lock lock(mutex_); - Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - promise.setFailed(ResultNotConnected); - return promise.getFuture(); + auto request = + std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + request->fail(ResultNotConnected); + return request->getFuture(); } - pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, promise)); + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " << requestId); + }); + pendingGetNamespaceTopicsRequests_.emplace(requestId, request); + request->initialize(); lock.unlock(); sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId)); - return promise.getFuture(); + return request->getFuture(); } Future ClientConnection::newGetSchema(const std::string& topicName, const std::string& version, uint64_t requestId) { Lock lock(mutex_); - Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - promise.setFailed(ResultNotConnected); - return promise.getFuture(); + auto request = std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + request->fail(ResultNotConnected); + return request->getFuture(); } - auto timer = executor_->createDeadlineTimer(); - pendingGetSchemaRequests_.emplace(requestId, GetSchemaRequest{promise, timer}); + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId); + }); + pendingGetSchemaRequests_.emplace(requestId, request); + request->initialize(); lock.unlock(); - timer->expires_after(operationsTimeout_); - timer->async_wait([this, self{shared_from_this()}, requestId](const ASIO_ERROR& ec) { - if (ec) { - return; - } - Lock lock(mutex_); - auto it = pendingGetSchemaRequests_.find(requestId); - if (it != pendingGetSchemaRequests_.end()) { - auto promise = std::move(it->second.promise); - pendingGetSchemaRequests_.erase(it); - lock.unlock(); - promise.setFailed(ResultTimeout); - } - }); - sendCommand(Commands::newGetSchema(topicName, version, requestId)); - return promise.getFuture(); + return request->getFuture(); } void ClientConnection::checkServerError(ServerError error, const std::string& message) { @@ -1541,12 +1464,11 @@ void ClientConnection::handleSuccess(const proto::CommandSuccess& success) { Lock lock(mutex_); auto it = pendingRequests_.find(success.request_id()); if (it != pendingRequests_.end()) { - PendingRequestData requestData = it->second; + auto request = std::move(it->second); pendingRequests_.erase(it); lock.unlock(); - requestData.promise.setValue({}); - cancelTimer(*requestData.timer); + request->complete({}); } } @@ -1558,9 +1480,7 @@ void ClientConnection::handlePartitionedMetadataResponse( Lock lock(mutex_); auto it = pendingLookupRequests_.find(partitionMetadataResponse.request_id()); if (it != pendingLookupRequests_.end()) { - cancelTimer(*it->second.timer); - - LookupDataResultPromisePtr lookupDataPromise = it->second.promise; + auto request = std::move(it->second); pendingLookupRequests_.erase(it); numOfPendingLookupRequest_--; lock.unlock(); @@ -1574,17 +1494,17 @@ void ClientConnection::handlePartitionedMetadataResponse( << " error: " << partitionMetadataResponse.error() << " msg: " << partitionMetadataResponse.message()); checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message()); - lookupDataPromise->setFailed( + request->fail( getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message())); } else { LOG_ERROR(cnxString() << "Failed partition-metadata lookup req_id: " << partitionMetadataResponse.request_id() << " with empty response: "); - lookupDataPromise->setFailed(ResultConnectError); + request->fail(ResultConnectError); } } else { LookupDataResultPtr lookupResultPtr = std::make_shared(); lookupResultPtr->setPartitions(partitionMetadataResponse.partitions()); - lookupDataPromise->setValue(lookupResultPtr); + request->complete(lookupResultPtr); } } else { @@ -1600,7 +1520,7 @@ void ClientConnection::handleConsumerStatsResponse( Lock lock(mutex_); auto it = pendingConsumerStatsMap_.find(consumerStatsResponse.request_id()); if (it != pendingConsumerStatsMap_.end()) { - Promise consumerStatsPromise = it->second; + auto request = std::move(it->second); pendingConsumerStatsMap_.erase(it); lock.unlock(); @@ -1609,7 +1529,7 @@ void ClientConnection::handleConsumerStatsResponse( LOG_ERROR(cnxString() << " Failed to get consumer stats - " << consumerStatsResponse.error_message()); } - consumerStatsPromise.setFailed( + request->fail( getResult(consumerStatsResponse.error_code(), consumerStatsResponse.error_message())); } else { LOG_DEBUG(cnxString() << "ConsumerStatsResponse command - Received consumer stats " @@ -1622,7 +1542,7 @@ void ClientConnection::handleConsumerStatsResponse( consumerStatsResponse.blockedconsumeronunackedmsgs(), consumerStatsResponse.address(), consumerStatsResponse.connectedsince(), consumerStatsResponse.type(), consumerStatsResponse.msgrateexpired(), consumerStatsResponse.msgbacklog()); - consumerStatsPromise.setValue(brokerStats); + request->complete(brokerStats); } } else { LOG_WARN("ConsumerStatsResponse command - Received unknown request id from server: " @@ -1635,8 +1555,7 @@ void ClientConnection::handleLookupTopicRespose( Lock lock(mutex_); auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id()); if (it != pendingLookupRequests_.end()) { - cancelTimer(*it->second.timer); - LookupDataResultPromisePtr lookupDataPromise = it->second.promise; + auto request = std::move(it->second); pendingLookupRequests_.erase(it); numOfPendingLookupRequest_--; lock.unlock(); @@ -1648,12 +1567,11 @@ void ClientConnection::handleLookupTopicRespose( << " error: " << lookupTopicResponse.error() << " msg: " << lookupTopicResponse.message()); checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message()); - lookupDataPromise->setFailed( - getResult(lookupTopicResponse.error(), lookupTopicResponse.message())); + request->fail(getResult(lookupTopicResponse.error(), lookupTopicResponse.message())); } else { LOG_ERROR(cnxString() << "Failed lookup req_id: " << lookupTopicResponse.request_id() << " with empty response: "); - lookupDataPromise->setFailed(ResultConnectError); + request->fail(ResultConnectError); } } else { LOG_DEBUG(cnxString() << "Received lookup response from server. req_id: " @@ -1676,7 +1594,7 @@ void ClientConnection::handleLookupTopicRespose( lookupResultPtr->setRedirect(lookupTopicResponse.response() == proto::CommandLookupTopicResponse::Redirect); lookupResultPtr->setShouldProxyThroughServiceUrl(lookupTopicResponse.proxy_through_service_url()); - lookupDataPromise->setValue(lookupResultPtr); + request->complete(lookupResultPtr); } } else { @@ -1692,12 +1610,12 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess Lock lock(mutex_); auto it = pendingRequests_.find(producerSuccess.request_id()); if (it != pendingRequests_.end()) { - PendingRequestData requestData = it->second; + auto request = it->second; if (!producerSuccess.producer_ready()) { LOG_INFO(cnxString() << " Producer " << producerSuccess.producer_name() << " has been queued up at broker. req_id: " << producerSuccess.request_id()); - requestData.hasGotResponse->store(true); + request->disableTimeout(); lock.unlock(); } else { pendingRequests_.erase(it); @@ -1713,8 +1631,7 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess } else { data.topicEpoch = std::nullopt; } - requestData.promise.setValue(data); - cancelTimer(*requestData.timer); + request->complete(data); } } } @@ -1729,30 +1646,28 @@ void ClientConnection::handleError(const proto::CommandError& error) { auto it = pendingRequests_.find(error.request_id()); if (it != pendingRequests_.end()) { - PendingRequestData requestData = it->second; + auto request = std::move(it->second); pendingRequests_.erase(it); lock.unlock(); - requestData.promise.setFailed(result); - cancelTimer(*requestData.timer); + request->fail(result); } else { - PendingGetLastMessageIdRequestsMap::iterator it = - pendingGetLastMessageIdRequests_.find(error.request_id()); + auto it = pendingGetLastMessageIdRequests_.find(error.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - auto getLastMessageIdPromise = it->second.promise; + auto request = std::move(it->second); pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); - getLastMessageIdPromise->setFailed(result); + request->fail(result); } else { PendingGetNamespaceTopicsMap::iterator it = pendingGetNamespaceTopicsRequests_.find(error.request_id()); if (it != pendingGetNamespaceTopicsRequests_.end()) { - Promise getNamespaceTopicsPromise = it->second; + auto request = std::move(it->second); pendingGetNamespaceTopicsRequests_.erase(it); lock.unlock(); - getNamespaceTopicsPromise.setFailed(result); + request->fail(result); } else { lock.unlock(); } @@ -1904,16 +1819,15 @@ void ClientConnection::handleGetLastMessageIdResponse( auto it = pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - auto getLastMessageIdPromise = it->second.promise; + auto request = std::move(it->second); pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); if (getLastMessageIdResponse.has_consumer_mark_delete_position()) { - getLastMessageIdPromise->setValue( - {toMessageId(getLastMessageIdResponse.last_message_id()), - toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())}); + request->complete({toMessageId(getLastMessageIdResponse.last_message_id()), + toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())}); } else { - getLastMessageIdPromise->setValue({toMessageId(getLastMessageIdResponse.last_message_id())}); + request->complete({toMessageId(getLastMessageIdResponse.last_message_id())}); } } else { lock.unlock(); @@ -1931,7 +1845,7 @@ void ClientConnection::handleGetTopicOfNamespaceResponse( auto it = pendingGetNamespaceTopicsRequests_.find(response.request_id()); if (it != pendingGetNamespaceTopicsRequests_.end()) { - Promise getTopicsPromise = it->second; + auto request = std::move(it->second); pendingGetNamespaceTopicsRequests_.erase(it); lock.unlock(); @@ -1953,7 +1867,7 @@ void ClientConnection::handleGetTopicOfNamespaceResponse( NamespaceTopicsPtr topicsPtr = std::make_shared>(topicSet.begin(), topicSet.end()); - getTopicsPromise.setValue(topicsPtr); + request->complete(topicsPtr); } else { lock.unlock(); LOG_WARN( @@ -1968,7 +1882,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp Lock lock(mutex_); auto it = pendingGetSchemaRequests_.find(response.request_id()); if (it != pendingGetSchemaRequests_.end()) { - Promise getSchemaPromise = it->second.promise; + auto request = std::move(it->second); pendingGetSchemaRequests_.erase(it); lock.unlock(); @@ -1981,7 +1895,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp : "") << " -- req_id: " << response.request_id()); } - getSchemaPromise.setFailed(result); + request->fail(result); return; } @@ -1992,7 +1906,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp properties[kv->key()] = kv->value(); } SchemaInfo schemaInfo(static_cast(schema.type()), "", schema.schema_data(), properties); - getSchemaPromise.setValue(schemaInfo); + request->complete(schemaInfo); } else { lock.unlock(); LOG_WARN( @@ -2013,24 +1927,23 @@ void ClientConnection::handleAckResponse(const proto::CommandAckResponse& respon return; } - auto promise = it->second.promise; + auto request = std::move(it->second); pendingRequests_.erase(it); lock.unlock(); if (response.has_error()) { - promise.setFailed(getResult(response.error(), "")); + request->fail(getResult(response.error(), "")); } else { - promise.setValue({}); + request->complete({}); } } void ClientConnection::unsafeRemovePendingRequest(long requestId) { auto it = pendingRequests_.find(requestId); if (it != pendingRequests_.end()) { - it->second.promise.setFailed(ResultDisconnected); - cancelTimer(*it->second.timer); - + auto request = std::move(it->second); pendingRequests_.erase(it); + request->fail(ResultDisconnected); } } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 75e4bca8..59dce4d5 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -28,6 +28,8 @@ #include #include #include + +#include "lib/PendingRequest.h" #ifdef USE_ASIO #include #include @@ -225,47 +227,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this promise; - DeadlineTimerPtr timer; - std::shared_ptr hasGotResponse{std::make_shared(false)}; - - void fail(Result result) { - cancelTimer(*timer); - promise.setFailed(result); - } - }; - - struct LookupRequestData { - LookupDataResultPromisePtr promise; - DeadlineTimerPtr timer; - - void fail(Result result) { - cancelTimer(*timer); - promise->setFailed(result); - } - }; - - struct LastMessageIdRequestData { - GetLastMessageIdResponsePromisePtr promise; - DeadlineTimerPtr timer; - - void fail(Result result) { - cancelTimer(*timer); - promise->setFailed(result); - } - }; - - struct GetSchemaRequest { - Promise promise; - DeadlineTimerPtr timer; - - void fail(Result result) { - cancelTimer(*timer); - promise.setFailed(result); - } - }; - /* * handler for connectAsync * creates a ConnectionPtr which has a valid ClientConnection object @@ -303,12 +264,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this inline AllocHandler customAllocReadHandler(Handler h) { return AllocHandler(readHandlerAllocator_, h); @@ -385,28 +340,34 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this PendingRequestsMap; + using Request = PendingRequest; + typedef std::unordered_map> PendingRequestsMap; PendingRequestsMap pendingRequests_; - typedef std::map PendingLookupRequestsMap; + using LookupRequest = PendingRequest; + typedef std::unordered_map> PendingLookupRequestsMap; PendingLookupRequestsMap pendingLookupRequests_; - typedef std::map ProducersMap; + typedef std::unordered_map ProducersMap; ProducersMap producers_; - typedef std::map ConsumersMap; + typedef std::unordered_map ConsumersMap; ConsumersMap consumers_; - typedef std::map> PendingConsumerStatsMap; + using ConsumerStatsRequest = PendingRequest; + typedef std::unordered_map> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; - typedef std::map PendingGetLastMessageIdRequestsMap; - PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_; + using GetLastMessageId = PendingRequest; + using PendingGetLastMessageIdMap = std::unordered_map>; + PendingGetLastMessageIdMap pendingGetLastMessageIdRequests_; - typedef std::map> PendingGetNamespaceTopicsMap; + using GetTopicsOfNamespace = PendingRequest; + typedef std::unordered_map> PendingGetNamespaceTopicsMap; PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_; - typedef std::unordered_map PendingGetSchemaMap; + using GetSchema = PendingRequest; + typedef std::unordered_map> PendingGetSchemaMap; PendingGetSchemaMap pendingGetSchemaRequests_; mutable std::mutex mutex_; @@ -426,14 +387,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this mockServer_; - - void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector& consumerStatsRequests); - - void startConsumerStatsTimer(std::vector consumerStatsRequests); uint32_t maxPendingLookupRequest_; uint32_t numOfPendingLookupRequest_ = 0; diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h index 80659d4b..4a36396c 100644 --- a/lib/ExecutorService.h +++ b/lib/ExecutorService.h @@ -28,12 +28,14 @@ #include #include #include +#include #else #include #include #include #include #include +#include #endif #include #include @@ -68,6 +70,13 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this + ASIO::steady_timer createTimer(const Duration &duration) { + auto timer = ASIO::steady_timer(io_context_); + timer.expires_after(duration); + return timer; + } + // Execute the task in the event loop thread asynchronously, i.e. the task will be put in the event loop // queue and executed later. template diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h new file mode 100644 index 00000000..bb908139 --- /dev/null +++ b/lib/PendingRequest.h @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include + +#include "AsioDefines.h" +#include "AsioTimer.h" +#include "Future.h" + +namespace pulsar { + +template +class PendingRequest : public std::enable_shared_from_this> { + public: + PendingRequest(ASIO::steady_timer timer, std::function timeoutCallback) + : timer_(std::move(timer)), timeoutCallback_(std::move(timeoutCallback)) {} + + void initialize() { + timer_.async_wait([this, weakSelf{this->weak_from_this()}](const auto& error) { + auto self = weakSelf.lock(); + if (!self || error || timeoutDisabled_.load(std::memory_order_acquire)) { + return; + } + timeoutCallback_(); + promise_.setFailed(ResultTimeout); + }); + } + + void complete(const T& value) { + promise_.setValue(value); + cancelTimer(timer_); + } + + void fail(Result result) { + promise_.setFailed(result); + cancelTimer(timer_); + } + + void disableTimeout() { timeoutDisabled_.store(true, std::memory_order_release); } + + auto getFuture() const { return promise_.getFuture(); } + + ~PendingRequest() { cancelTimer(timer_); } + + private: + ASIO::steady_timer timer_; + Promise promise_; + std::function timeoutCallback_; + std::atomic_bool timeoutDisabled_{false}; +}; + +template +using PendingRequestPtr = std::shared_ptr>; + +} // namespace pulsar From 348b5c2c15e648b393aca2ef6361546a408d16b0 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 17 Mar 2026 22:11:48 +0800 Subject: [PATCH 2/7] fix --- lib/ClientConnection.cc | 9 +++++++++ lib/PendingRequest.h | 1 - 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 7ac6643d..a4062b0b 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -960,6 +960,15 @@ Future ClientConnection::newConsumerStats(uint6 request->fail(ResultNotConnected); return request->getFuture(); } + if (serverProtocolVersion_ < proto::v8) { + lock.unlock(); + LOG_ERROR(cnxString() << "ConsumerStats is not supported since server protobuf version " + << serverProtocolVersion_ << " is older than proto::v8"); + auto request = + std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + request->fail(ResultUnsupportedVersionError); + return request->getFuture(); + } auto request = std::make_shared( executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h index bb908139..465073f6 100644 --- a/lib/PendingRequest.h +++ b/lib/PendingRequest.h @@ -20,7 +20,6 @@ #include -#include #include #include #include From 5b0b64537075725f195dbc4373a6e7204502baaa Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 17 Mar 2026 22:15:42 +0800 Subject: [PATCH 3/7] fix include style --- lib/ClientConnection.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 59dce4d5..ad979e5d 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -29,7 +29,6 @@ #include #include -#include "lib/PendingRequest.h" #ifdef USE_ASIO #include #include @@ -56,6 +55,7 @@ #include "Commands.h" #include "GetLastMessageIdResponse.h" #include "LookupDataResult.h" +#include "PendingRequest.h" #include "SharedBuffer.h" #include "TimeUtils.h" #include "UtilAllocator.h" From 86e864bac26653809ce7374ca44ab6db9f4f3156 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 19 Mar 2026 09:24:56 +0800 Subject: [PATCH 4/7] revert consumer stats changes and speed up tests --- lib/ClientConnection.cc | 158 ++++++++++++++++++++++++++++------------ lib/ClientConnection.h | 43 ++++++++++- lib/MockServer.h | 20 +++++ lib/PendingRequest.h | 8 +- tests/ClientTest.cc | 70 ++++++++++++++++++ tests/PulsarFriend.h | 35 +++++++++ 6 files changed, 284 insertions(+), 50 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index a4062b0b..8efa3313 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -200,6 +200,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: connectTimer_(executor_->createDeadlineTimer()), outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), keepAliveIntervalInSeconds_(clientConfiguration.getKeepAliveIntervalInSeconds()), + consumerStatsRequestTimer_(executor_->createDeadlineTimer()), maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()), clientVersion_(clientVersion), pool_(pool), @@ -335,6 +336,49 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC lock.unlock(); connectPromise_.setValue(shared_from_this()); + + if (serverProtocolVersion_ >= proto::v8) { + startConsumerStatsTimer(std::vector()); + } +} + +void ClientConnection::startConsumerStatsTimer(std::vector consumerStatsRequests) { + std::vector> consumerStatsPromises; + Lock lock(mutex_); + + for (int i = 0; i < consumerStatsRequests.size(); i++) { + PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find(consumerStatsRequests[i]); + if (it != pendingConsumerStatsMap_.end()) { + LOG_DEBUG(cnxString() << " removing request_id " << it->first + << " from the pendingConsumerStatsMap_"); + consumerStatsPromises.push_back(it->second); + pendingConsumerStatsMap_.erase(it); + } else { + LOG_DEBUG(cnxString() << "request_id " << it->first << " already fulfilled - not removing it"); + } + } + + consumerStatsRequests.clear(); + for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.begin(); + it != pendingConsumerStatsMap_.end(); ++it) { + consumerStatsRequests.push_back(it->first); + } + + // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero + // Check if we have a timer still before we set the request timer to pop again. + if (consumerStatsRequestTimer_) { + consumerStatsRequestTimer_->expires_after(operationsTimeout_); + consumerStatsRequestTimer_->async_wait( + [this, self{shared_from_this()}, consumerStatsRequests](const ASIO_ERROR& err) { + handleConsumerStatsTimeout(err, consumerStatsRequests); + }); + } + lock.unlock(); + // Complex logic since promises need to be fulfilled outside the lock + for (int i = 0; i < consumerStatsPromises.size(); i++) { + consumerStatsPromises[i].setFailed(ResultTimeout); + LOG_WARN(cnxString() << " Operation timedout, didn't get response from broker"); + } } /// The number of unacknowledged probes to send before considering the connection dead and notifying the @@ -952,38 +996,22 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { Future ClientConnection::newConsumerStats(uint64_t consumerId, uint64_t requestId) { Lock lock(mutex_); + Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << " Client is not connected to the broker"); - auto request = - std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); - request->fail(ResultNotConnected); - return request->getFuture(); - } - if (serverProtocolVersion_ < proto::v8) { - lock.unlock(); - LOG_ERROR(cnxString() << "ConsumerStats is not supported since server protobuf version " - << serverProtocolVersion_ << " is older than proto::v8"); - auto request = - std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); - request->fail(ResultUnsupportedVersionError); - return request->getFuture(); + promise.setFailed(ResultNotConnected); + return promise.getFuture(); } - - auto request = std::make_shared( - executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { - LOG_WARN(cnxString << "ConsumerStats request timeout to broker, req_id: " << requestId); - }); - pendingConsumerStatsMap_.emplace(requestId, request); - request->initialize(); + pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise)); lock.unlock(); if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && mockServer_->sendRequest("CONSUMER_STATS", requestId)) { - return request->getFuture(); + return promise.getFuture(); } sendCommand(Commands::newConsumerStats(consumerId, requestId)); - return request->getFuture(); + return promise.getFuture(); } void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative, @@ -1013,9 +1041,13 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co } auto request = std::make_shared( - executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId, requestType]() { - LOG_WARN(cnxString << requestType << " request timeout to broker, req_id: " << requestId); - }); + executor_->createTimer(operationsTimeout_), + makePendingRequestTimeoutHandler( + pendingLookupRequests_, requestId, + [cnxString = cnxString(), requestId, requestType]() { + LOG_WARN(cnxString << requestType << " request timeout to broker, req_id: " << requestId); + }, + [](ClientConnection& connection) { connection.numOfPendingLookupRequest_--; })); request->getFuture().addListener([promise](Result result, const LookupDataResultPtr& lookupDataResult) { if (result == ResultOk) { promise->setValue(lookupDataResult); @@ -1029,6 +1061,10 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co request->initialize(); lock.unlock(); LOG_DEBUG(cnxString() << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")"); + if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && + mockServer_->sendRequest(requestType, requestId)) { + return; + } sendCommand(cmd); } @@ -1138,17 +1174,20 @@ Future ClientConnection::sendRequestWithId(const SharedBuf lock.unlock(); LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " << requestId << ") to a closed connection"); - auto request = std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + auto request = + std::make_shared(executor_->createTimer(operationsTimeout_), [] { return false; }); request->fail(ResultNotConnected); return request->getFuture(); } auto request = std::make_shared( executor_->createTimer(operationsTimeout_), - [cnxString = cnxString(), physicalAddress = physicalAddress_, requestId, requestType]() { - LOG_WARN(cnxString << "Network request timeout to broker, remote: " << physicalAddress - << ", req_id: " << requestId << ", request: " << requestType); - }); + makePendingRequestTimeoutHandler( + pendingRequests_, requestId, + [cnxString = cnxString(), physicalAddress = physicalAddress_, requestId, requestType]() { + LOG_WARN(cnxString << "Network request timeout to broker, remote: " << physicalAddress + << ", req_id: " << requestId << ", request: " << requestType); + })); pendingRequests_.emplace(requestId, request); request->initialize(); lock.unlock(); @@ -1193,6 +1232,15 @@ void ClientConnection::handleKeepAliveTimeout(const ASIO_ERROR& ec) { } } +void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, + const std::vector& consumerStatsRequests) { + if (ec) { + LOG_DEBUG(cnxString() << " Ignoring timer cancelled event, code[" << ec << "]"); + return; + } + startConsumerStatsTimer(consumerStatsRequests); +} + const std::future& ClientConnection::close(Result result, bool switchCluster) { Lock lock(mutex_); if (closeFuture_) { @@ -1292,7 +1340,7 @@ const std::future& ClientConnection::close(Result result, bool switchClust } for (auto& kv : pendingConsumerStatsMap) { LOG_ERROR(cnxString() << " Closing Client Connection, please try again later"); - kv.second->fail(result); + kv.second.setFailed(result); } for (auto& kv : pendingGetLastMessageIdRequests) { kv.second->fail(result); @@ -1354,12 +1402,18 @@ Future ClientConnection::newGetLastMessageId(u } auto request = std::make_shared( - executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { - LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId); - }); + executor_->createTimer(operationsTimeout_), + makePendingRequestTimeoutHandler( + pendingGetLastMessageIdRequests_, requestId, [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId); + })); pendingGetLastMessageIdRequests_.emplace(requestId, request); request->initialize(); lock.unlock(); + if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && + mockServer_->sendRequest("GET_LAST_MESSAGE_ID", requestId)) { + return request->getFuture(); + } sendCommand(Commands::newGetLastMessageId(consumerId, requestId)); return request->getFuture(); } @@ -1370,19 +1424,26 @@ Future ClientConnection::newGetTopicsOfNamespace( if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - auto request = - std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + auto request = std::make_shared(executor_->createTimer(operationsTimeout_), + [] { return false; }); request->fail(ResultNotConnected); return request->getFuture(); } auto request = std::make_shared( - executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { - LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " << requestId); - }); + executor_->createTimer(operationsTimeout_), + makePendingRequestTimeoutHandler( + pendingGetNamespaceTopicsRequests_, requestId, [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " + << requestId); + })); pendingGetNamespaceTopicsRequests_.emplace(requestId, request); request->initialize(); lock.unlock(); + if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && + mockServer_->sendRequest("GET_TOPICS_OF_NAMESPACE", requestId)) { + return request->getFuture(); + } sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId)); return request->getFuture(); } @@ -1394,19 +1455,26 @@ Future ClientConnection::newGetSchema(const std::string& top if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - auto request = std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + auto request = + std::make_shared(executor_->createTimer(operationsTimeout_), [] { return false; }); request->fail(ResultNotConnected); return request->getFuture(); } auto request = std::make_shared( - executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { - LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId); - }); + executor_->createTimer(operationsTimeout_), + makePendingRequestTimeoutHandler( + pendingGetSchemaRequests_, requestId, [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId); + })); pendingGetSchemaRequests_.emplace(requestId, request); request->initialize(); lock.unlock(); + if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && + mockServer_->sendRequest("GET_SCHEMA", requestId)) { + return request->getFuture(); + } sendCommand(Commands::newGetSchema(topicName, version, requestId)); return request->getFuture(); } @@ -1538,7 +1606,7 @@ void ClientConnection::handleConsumerStatsResponse( LOG_ERROR(cnxString() << " Failed to get consumer stats - " << consumerStatsResponse.error_message()); } - request->fail( + request.setFailed( getResult(consumerStatsResponse.error_code(), consumerStatsResponse.error_message())); } else { LOG_DEBUG(cnxString() << "ConsumerStatsResponse command - Received consumer stats " @@ -1551,7 +1619,7 @@ void ClientConnection::handleConsumerStatsResponse( consumerStatsResponse.blockedconsumeronunackedmsgs(), consumerStatsResponse.address(), consumerStatsResponse.connectedsince(), consumerStatsResponse.type(), consumerStatsResponse.msgrateexpired(), consumerStatsResponse.msgbacklog()); - request->complete(brokerStats); + request.setValue(brokerStats); } } else { LOG_WARN("ConsumerStatsResponse command - Received unknown request id from server: " diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index ad979e5d..c62bc216 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -354,8 +354,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this ConsumersMap; ConsumersMap consumers_; - using ConsumerStatsRequest = PendingRequest; - typedef std::unordered_map> PendingConsumerStatsMap; + typedef std::map> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; using GetLastMessageId = PendingRequest; @@ -373,6 +372,41 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this Lock; + template + std::function makePendingRequestTimeoutHandler(RequestMap& pendingRequests, + const typename RequestMap::key_type& requestId, + OnTimeout onTimeout, OnCleanup onCleanup) { + auto weakSelf = weak_from_this(); + return [weakSelf, pendingRequestsPtr = &pendingRequests, requestId, onTimeout = std::move(onTimeout), + onCleanup = std::move(onCleanup)]() mutable { + auto self = weakSelf.lock(); + if (!self) { + return false; + } + + { + Lock lock(self->mutex_); + auto it = pendingRequestsPtr->find(requestId); + if (it == pendingRequestsPtr->end()) { + return false; + } + pendingRequestsPtr->erase(it); + onCleanup(*self); + } + + onTimeout(); + return true; + }; + } + + template + std::function makePendingRequestTimeoutHandler(RequestMap& pendingRequests, + const typename RequestMap::key_type& requestId, + OnTimeout onTimeout) { + return makePendingRequestTimeoutHandler(pendingRequests, requestId, std::move(onTimeout), + [](ClientConnection&) {}); + } + // Pending buffers to write on the socket std::deque pendingWriteBuffers_; int pendingWriteOperations_ = 0; @@ -387,9 +421,14 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this mockServer_; + + void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector& consumerStatsRequests); + + void startConsumerStatsTimer(std::vector consumerStatsRequests); uint32_t maxPendingLookupRequest_; uint32_t numOfPendingLookupRequest_ = 0; diff --git a/lib/MockServer.h b/lib/MockServer.h index 2d830fc7..6f8d1390 100644 --- a/lib/MockServer.h +++ b/lib/MockServer.h @@ -81,6 +81,26 @@ class MockServer : public std::enable_shared_from_this { proto::CommandConsumerStatsResponse response; response.set_request_id(requestId); connection->handleConsumerStatsResponse(response); + } else if (request == "LOOKUP") { + proto::CommandLookupTopicResponse response; + response.set_request_id(requestId); + response.set_response(proto::CommandLookupTopicResponse_LookupType_Connect); + response.set_brokerserviceurl("pulsar://localhost:6650"); + connection->handleLookupTopicRespose(response); + } else if (request == "GET_LAST_MESSAGE_ID") { + proto::CommandGetLastMessageIdResponse response; + response.set_request_id(requestId); + response.mutable_last_message_id(); + connection->handleGetLastMessageIdResponse(response); + } else if (request == "GET_TOPICS_OF_NAMESPACE") { + proto::CommandGetTopicsOfNamespaceResponse response; + response.set_request_id(requestId); + connection->handleGetTopicOfNamespaceResponse(response); + } else if (request == "GET_SCHEMA") { + proto::CommandGetSchemaResponse response; + response.set_request_id(requestId); + response.mutable_schema()->set_type(proto::Schema_Type_String); + connection->handleGetSchemaResponse(response); } else { proto::CommandSuccess success; success.set_request_id(requestId); diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h index 465073f6..f73611f2 100644 --- a/lib/PendingRequest.h +++ b/lib/PendingRequest.h @@ -33,7 +33,7 @@ namespace pulsar { template class PendingRequest : public std::enable_shared_from_this> { public: - PendingRequest(ASIO::steady_timer timer, std::function timeoutCallback) + PendingRequest(ASIO::steady_timer timer, std::function timeoutCallback) : timer_(std::move(timer)), timeoutCallback_(std::move(timeoutCallback)) {} void initialize() { @@ -42,7 +42,9 @@ class PendingRequest : public std::enable_shared_from_this> { if (!self || error || timeoutDisabled_.load(std::memory_order_acquire)) { return; } - timeoutCallback_(); + if (!timeoutCallback_()) { + return; + } promise_.setFailed(ResultTimeout); }); } @@ -66,7 +68,7 @@ class PendingRequest : public std::enable_shared_from_this> { private: ASIO::steady_timer timer_; Promise promise_; - std::function timeoutCallback_; + std::function timeoutCallback_; std::atomic_bool timeoutDisabled_{false}; }; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 6bd6cc8a..6787a7d2 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -18,6 +18,7 @@ */ #include #include +#include #include #include @@ -33,8 +34,13 @@ #include "PulsarFriend.h" #include "WaitUtils.h" #include "lib/AsioDefines.h" +#include "lib/AtomicSharedPtr.h" +#include "lib/BrokerConsumerStatsImpl.h" #include "lib/ClientConnection.h" +#include "lib/ConnectionPool.h" +#include "lib/ExecutorService.h" #include "lib/LogUtils.h" +#include "lib/MockServer.h" #include "lib/checksum/ChecksumProvider.h" #include "lib/stats/ProducerStatsImpl.h" @@ -231,6 +237,70 @@ TEST(ClientTest, testConnectTimeoutAfterTcpConnected) { server->stop(); } +TEST(ClientTest, testTimedOutPendingRequestsAreErasedFromConnectionMaps) { + const auto suffix = std::to_string(std::chrono::steady_clock::now().time_since_epoch().count()); + ClientConfiguration conf; + conf.setOperationTimeoutSeconds(1); + + auto executorProvider = std::make_shared(1); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(lookupUrl)); + ConnectionPool pool(serviceInfo, conf, executorProvider, ""); + auto connection = std::make_shared(lookupUrl, lookupUrl, *serviceInfo.load(), + executorProvider->get(), conf, "", pool, 0); + PulsarFriend::setServerProtocolVersion(*connection, 8); + + long requestIdGenerator = 0; + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + mockServer->setRequestDelay({{"TEST_PENDING_REQUEST", 1200}, + {"LOOKUP", 1200}, + {"GET_LAST_MESSAGE_ID", 1200}, + {"GET_TOPICS_OF_NAMESPACE", 1200}, + {"GET_SCHEMA", 1200}}); + + auto pingFuture = + connection->sendRequestWithId(Commands::newPing(), requestIdGenerator++, "TEST_PENDING_REQUEST"); + + auto lookupPromise = std::make_shared(); + auto lookupFuture = lookupPromise->getFuture(); + connection->newTopicLookup("persistent://public/default/testTimedOutPendingRequests-" + suffix, false, "", + requestIdGenerator++, lookupPromise); + + auto lastMessageIdFuture = connection->newGetLastMessageId(0, requestIdGenerator++); + + auto getTopicsOfNamespaceFuture = connection->newGetTopicsOfNamespace( + "public/default", CommandGetTopicsOfNamespace_Mode_PERSISTENT, requestIdGenerator++); + + auto getSchemaFuture = connection->newGetSchema( + "persistent://public/default/testTimedOutPendingRequests-" + suffix, "", requestIdGenerator++); + + ResponseData responseData; + ASSERT_EQ(ResultTimeout, pingFuture.get(responseData)); + ASSERT_EQ(0u, PulsarFriend::getPendingRequests(*connection)); + + LookupDataResultPtr lookupData; + ASSERT_EQ(ResultTimeout, lookupFuture.get(lookupData)); + ASSERT_EQ(0u, PulsarFriend::getPendingLookupRequests(*connection)); + ASSERT_EQ(0u, PulsarFriend::getNumOfPendingLookupRequests(*connection)); + + GetLastMessageIdResponse lastMessageIdResponse; + ASSERT_EQ(ResultTimeout, lastMessageIdFuture.get(lastMessageIdResponse)); + ASSERT_EQ(0u, PulsarFriend::getPendingGetLastMessageIdRequests(*connection)); + + NamespaceTopicsPtr topics; + ASSERT_EQ(ResultTimeout, getTopicsOfNamespaceFuture.get(topics)); + ASSERT_EQ(0u, PulsarFriend::getPendingGetTopicsOfNamespaceRequests(*connection)); + + SchemaInfo schemaInfo; + ASSERT_EQ(ResultTimeout, getSchemaFuture.get(schemaInfo)); + ASSERT_EQ(0u, PulsarFriend::getPendingGetSchemaRequests(*connection)); + + mockServer->close(); + connection->close(ResultDisconnected).wait(); + executorProvider->close(); +} + TEST(ClientTest, testGetNumberOfReferences) { Client client("pulsar://localhost:6650"); diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 1f351d16..3296953b 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -167,6 +167,41 @@ class PulsarFriend { return cnx.pendingConsumerStatsMap_.size(); } + static size_t getPendingRequests(const ClientConnection& cnx) { + std::lock_guard lock(cnx.mutex_); + return cnx.pendingRequests_.size(); + } + + static size_t getPendingLookupRequests(const ClientConnection& cnx) { + std::lock_guard lock(cnx.mutex_); + return cnx.pendingLookupRequests_.size(); + } + + static size_t getNumOfPendingLookupRequests(const ClientConnection& cnx) { + std::lock_guard lock(cnx.mutex_); + return cnx.numOfPendingLookupRequest_; + } + + static size_t getPendingGetLastMessageIdRequests(const ClientConnection& cnx) { + std::lock_guard lock(cnx.mutex_); + return cnx.pendingGetLastMessageIdRequests_.size(); + } + + static size_t getPendingGetTopicsOfNamespaceRequests(const ClientConnection& cnx) { + std::lock_guard lock(cnx.mutex_); + return cnx.pendingGetNamespaceTopicsRequests_.size(); + } + + static size_t getPendingGetSchemaRequests(const ClientConnection& cnx) { + std::lock_guard lock(cnx.mutex_); + return cnx.pendingGetSchemaRequests_.size(); + } + + static void setServerProtocolVersion(ClientConnection& cnx, int serverProtocolVersion) { + std::lock_guard lock(cnx.mutex_); + cnx.serverProtocolVersion_ = serverProtocolVersion; + } + static void setNegativeAckEnabled(Consumer consumer, bool enabled) { consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled); } From 30d27f1b49cc43137ebb3e781859117c113fe697 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 19 Mar 2026 11:03:35 +0800 Subject: [PATCH 5/7] abstract a common method to insert a request and add tests --- lib/ClientConnection.cc | 102 +++++++++++++++++----------------------- lib/ClientConnection.h | 82 +++++++++++--------------------- lib/PendingRequest.h | 8 ++-- tests/ClientTest.cc | 29 +++++++++++- 4 files changed, 102 insertions(+), 119 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 8efa3313..d96e50f2 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -33,6 +33,7 @@ #include "ConnectionPool.h" #include "ConsumerImpl.h" #include "ExecutorService.h" +#include "Future.h" #include "LogUtils.h" #include "MockServer.h" #include "OpSendMsg.h" @@ -1005,7 +1006,6 @@ Future ClientConnection::newConsumerStats(uint6 } pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise)); lock.unlock(); - if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && mockServer_->sendRequest("CONSUMER_STATS", requestId)) { return promise.getFuture(); @@ -1040,14 +1040,14 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co return; } - auto request = std::make_shared( - executor_->createTimer(operationsTimeout_), - makePendingRequestTimeoutHandler( - pendingLookupRequests_, requestId, - [cnxString = cnxString(), requestId, requestType]() { - LOG_WARN(cnxString << requestType << " request timeout to broker, req_id: " << requestId); - }, - [](ClientConnection& connection) { connection.numOfPendingLookupRequest_--; })); + auto request = insertRequest( + pendingLookupRequests_, requestId, [weakSelf{weak_from_this()}, requestId, requestType]() { + if (auto self = weakSelf.lock()) { + LOG_WARN(self->cnxString() + << requestType << " request timeout to broker, req_id: " << requestId); + self->numOfPendingLookupRequest_--; + } + }); request->getFuture().addListener([promise](Result result, const LookupDataResultPtr& lookupDataResult) { if (result == ResultOk) { promise->setValue(lookupDataResult); @@ -1056,9 +1056,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co } }); - pendingLookupRequests_.emplace(requestId, request); numOfPendingLookupRequest_++; - request->initialize(); lock.unlock(); LOG_DEBUG(cnxString() << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")"); if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && @@ -1174,22 +1172,17 @@ Future ClientConnection::sendRequestWithId(const SharedBuf lock.unlock(); LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " << requestId << ") to a closed connection"); - auto request = - std::make_shared(executor_->createTimer(operationsTimeout_), [] { return false; }); - request->fail(ResultNotConnected); - return request->getFuture(); + Promise promise; + promise.setFailed(ResultNotConnected); + return promise.getFuture(); } - auto request = std::make_shared( - executor_->createTimer(operationsTimeout_), - makePendingRequestTimeoutHandler( - pendingRequests_, requestId, - [cnxString = cnxString(), physicalAddress = physicalAddress_, requestId, requestType]() { - LOG_WARN(cnxString << "Network request timeout to broker, remote: " << physicalAddress - << ", req_id: " << requestId << ", request: " << requestType); - })); - pendingRequests_.emplace(requestId, request); - request->initialize(); + auto request = insertRequest( + pendingRequests_, requestId, + [cnxString{cnxString()}, physicalAddress{physicalAddress_}, requestId, requestType]() { + LOG_WARN(cnxString << "Network request timeout to broker, remote: " << physicalAddress + << ", req_id: " << requestId << ", request: " << requestType); + }); lock.unlock(); LOG_DEBUG(cnxString() << "Inserted request " << requestType << " (req_id: " << requestId << ")"); @@ -1278,6 +1271,11 @@ const std::future& ClientConnection::close(Result result, bool switchClust keepAliveTimer_.reset(); } + if (consumerStatsRequestTimer_) { + cancelTimer(*consumerStatsRequestTimer_); + consumerStatsRequestTimer_.reset(); + } + cancelTimer(*connectTimer_); lock.unlock(); int refCount = weak_from_this().use_count(); @@ -1401,15 +1399,12 @@ Future ClientConnection::newGetLastMessageId(u return promise->getFuture(); } - auto request = std::make_shared( - executor_->createTimer(operationsTimeout_), - makePendingRequestTimeoutHandler( - pendingGetLastMessageIdRequests_, requestId, [cnxString = cnxString(), requestId]() { - LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId); - })); - pendingGetLastMessageIdRequests_.emplace(requestId, request); - request->initialize(); + auto request = + insertRequest(pendingGetLastMessageIdRequests_, requestId, [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId); + }); lock.unlock(); + if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && mockServer_->sendRequest("GET_LAST_MESSAGE_ID", requestId)) { return request->getFuture(); @@ -1424,21 +1419,16 @@ Future ClientConnection::newGetTopicsOfNamespace( if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - auto request = std::make_shared(executor_->createTimer(operationsTimeout_), - [] { return false; }); - request->fail(ResultNotConnected); - return request->getFuture(); + Promise promise; + promise.setFailed(ResultNotConnected); + return promise.getFuture(); } - auto request = std::make_shared( - executor_->createTimer(operationsTimeout_), - makePendingRequestTimeoutHandler( - pendingGetNamespaceTopicsRequests_, requestId, [cnxString = cnxString(), requestId]() { - LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " - << requestId); - })); + auto request = + insertRequest(pendingGetNamespaceTopicsRequests_, requestId, [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " << requestId); + }); pendingGetNamespaceTopicsRequests_.emplace(requestId, request); - request->initialize(); lock.unlock(); if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && mockServer_->sendRequest("GET_TOPICS_OF_NAMESPACE", requestId)) { @@ -1455,20 +1445,15 @@ Future ClientConnection::newGetSchema(const std::string& top if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - auto request = - std::make_shared(executor_->createTimer(operationsTimeout_), [] { return false; }); - request->fail(ResultNotConnected); - return request->getFuture(); + Promise promise; + promise.setFailed(ResultNotConnected); + return promise.getFuture(); } - auto request = std::make_shared( - executor_->createTimer(operationsTimeout_), - makePendingRequestTimeoutHandler( - pendingGetSchemaRequests_, requestId, [cnxString = cnxString(), requestId]() { - LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId); - })); - pendingGetSchemaRequests_.emplace(requestId, request); - request->initialize(); + auto request = + insertRequest(pendingGetSchemaRequests_, requestId, [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId); + }); lock.unlock(); if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && @@ -1737,8 +1722,7 @@ void ClientConnection::handleError(const proto::CommandError& error) { request->fail(result); } else { - PendingGetNamespaceTopicsMap::iterator it = - pendingGetNamespaceTopicsRequests_.find(error.request_id()); + auto it = pendingGetNamespaceTopicsRequests_.find(error.request_id()); if (it != pendingGetNamespaceTopicsRequests_.end()) { auto request = std::move(it->second); pendingGetNamespaceTopicsRequests_.erase(it); diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index c62bc216..c8cd86fe 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -53,6 +53,7 @@ #include "AsioTimer.h" #include "Commands.h" +#include "ExecutorService.h" #include "GetLastMessageIdResponse.h" #include "LookupDataResult.h" #include "PendingRequest.h" @@ -68,9 +69,6 @@ class PulsarFriend; using TcpResolverPtr = std::shared_ptr; -class ExecutorService; -using ExecutorServicePtr = std::shared_ptr; - class ConnectionPool; class ClientConnection; typedef std::shared_ptr ClientConnectionPtr; @@ -340,13 +338,14 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this; - typedef std::unordered_map> PendingRequestsMap; - PendingRequestsMap pendingRequests_; + template + using RequestMap = std::unordered_map>; - using LookupRequest = PendingRequest; - typedef std::unordered_map> PendingLookupRequestsMap; - PendingLookupRequestsMap pendingLookupRequests_; + RequestMap pendingRequests_; + RequestMap pendingLookupRequests_; + RequestMap pendingGetLastMessageIdRequests_; + RequestMap pendingGetNamespaceTopicsRequests_; + RequestMap pendingGetSchemaRequests_; typedef std::unordered_map ProducersMap; ProducersMap producers_; @@ -357,54 +356,29 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; - using GetLastMessageId = PendingRequest; - using PendingGetLastMessageIdMap = std::unordered_map>; - PendingGetLastMessageIdMap pendingGetLastMessageIdRequests_; - - using GetTopicsOfNamespace = PendingRequest; - typedef std::unordered_map> PendingGetNamespaceTopicsMap; - PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_; - - using GetSchema = PendingRequest; - typedef std::unordered_map> PendingGetSchemaMap; - PendingGetSchemaMap pendingGetSchemaRequests_; - mutable std::mutex mutex_; typedef std::unique_lock Lock; - template - std::function makePendingRequestTimeoutHandler(RequestMap& pendingRequests, - const typename RequestMap::key_type& requestId, - OnTimeout onTimeout, OnCleanup onCleanup) { - auto weakSelf = weak_from_this(); - return [weakSelf, pendingRequestsPtr = &pendingRequests, requestId, onTimeout = std::move(onTimeout), - onCleanup = std::move(onCleanup)]() mutable { - auto self = weakSelf.lock(); - if (!self) { - return false; - } - - { - Lock lock(self->mutex_); - auto it = pendingRequestsPtr->find(requestId); - if (it == pendingRequestsPtr->end()) { - return false; + // Note: this method must be called when holding `mutex_` + template + auto insertRequest(RequestMap& pendingRequests, uint64_t requestId, OnTimeout onTimeout) { + auto request = std::make_shared>( + executor_->createTimer(operationsTimeout_), + [this, self{shared_from_this()}, requestId, onTimeout{std::move(onTimeout)}, + &pendingRequests]() mutable { + { + std::lock_guard lock{mutex_}; + if (auto it = pendingRequests.find(requestId); it != pendingRequests.end()) { + pendingRequests.erase(it); + } } - pendingRequestsPtr->erase(it); - onCleanup(*self); - } - - onTimeout(); - return true; - }; - } - - template - std::function makePendingRequestTimeoutHandler(RequestMap& pendingRequests, - const typename RequestMap::key_type& requestId, - OnTimeout onTimeout) { - return makePendingRequestTimeoutHandler(pendingRequests, requestId, std::move(onTimeout), - [](ClientConnection&) {}); + onTimeout(); + }); + auto [iterator, inserted] = pendingRequests.emplace(requestId, request); + if (inserted) { + request->initialize(); + } // else: the request id is duplicated + return iterator->second; } // Pending buffers to write on the socket @@ -430,7 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this consumerStatsRequests); uint32_t maxPendingLookupRequest_; - uint32_t numOfPendingLookupRequest_ = 0; + std::atomic_uint32_t numOfPendingLookupRequest_{0}; bool isTlsAllowInsecureConnection_ = false; diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h index f73611f2..465073f6 100644 --- a/lib/PendingRequest.h +++ b/lib/PendingRequest.h @@ -33,7 +33,7 @@ namespace pulsar { template class PendingRequest : public std::enable_shared_from_this> { public: - PendingRequest(ASIO::steady_timer timer, std::function timeoutCallback) + PendingRequest(ASIO::steady_timer timer, std::function timeoutCallback) : timer_(std::move(timer)), timeoutCallback_(std::move(timeoutCallback)) {} void initialize() { @@ -42,9 +42,7 @@ class PendingRequest : public std::enable_shared_from_this> { if (!self || error || timeoutDisabled_.load(std::memory_order_acquire)) { return; } - if (!timeoutCallback_()) { - return; - } + timeoutCallback_(); promise_.setFailed(ResultTimeout); }); } @@ -68,7 +66,7 @@ class PendingRequest : public std::enable_shared_from_this> { private: ASIO::steady_timer timer_; Promise promise_; - std::function timeoutCallback_; + std::function timeoutCallback_; std::atomic_bool timeoutDisabled_{false}; }; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 6787a7d2..63ac9d16 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -35,12 +35,12 @@ #include "WaitUtils.h" #include "lib/AsioDefines.h" #include "lib/AtomicSharedPtr.h" -#include "lib/BrokerConsumerStatsImpl.h" #include "lib/ClientConnection.h" #include "lib/ConnectionPool.h" #include "lib/ExecutorService.h" #include "lib/LogUtils.h" #include "lib/MockServer.h" +#include "lib/TimeUtils.h" #include "lib/checksum/ChecksumProvider.h" #include "lib/stats/ProducerStatsImpl.h" @@ -237,6 +237,33 @@ TEST(ClientTest, testConnectTimeoutAfterTcpConnected) { server->stop(); } +TEST(ClientTest, testConnectionNotReferredAfterClose) { + Client client(lookupUrl); + auto topic = "test-connection-not-referred-after-close-" + std::to_string(time(nullptr)); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader)); + + bool available; + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(available)); + ASSERT_FALSE(available); + + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("test").build())); + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(available)); + ASSERT_TRUE(available); + + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg)); + ASSERT_EQ("test", msg.getDataAsString()); + + auto start = TimeUtils::currentTimeMillis(); + ASSERT_EQ(ResultOk, client.close()); + auto closeTimeMs = TimeUtils::currentTimeMillis() - start; + ASSERT_LT(closeTimeMs, 3000) << "close time: " << closeTimeMs << " ms"; +} + TEST(ClientTest, testTimedOutPendingRequestsAreErasedFromConnectionMaps) { const auto suffix = std::to_string(std::chrono::steady_clock::now().time_since_epoch().count()); ClientConfiguration conf; From 20f971b67702e9d46678c5d47d857acfa4c848e5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 19 Mar 2026 12:02:53 +0800 Subject: [PATCH 6/7] remove duplicated emplace --- lib/ClientConnection.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index d96e50f2..03e0d905 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1428,7 +1428,6 @@ Future ClientConnection::newGetTopicsOfNamespace( insertRequest(pendingGetNamespaceTopicsRequests_, requestId, [cnxString = cnxString(), requestId]() { LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " << requestId); }); - pendingGetNamespaceTopicsRequests_.emplace(requestId, request); lock.unlock(); if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && mockServer_->sendRequest("GET_TOPICS_OF_NAMESPACE", requestId)) { From 6906525cb7dd94ecff1cbcd0bda111975e88db44 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 19 Mar 2026 12:11:16 +0800 Subject: [PATCH 7/7] remove unexpected error log --- lib/ClientConnection.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 03e0d905..a135ade7 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1279,7 +1279,7 @@ const std::future& ClientConnection::close(Result result, bool switchClust cancelTimer(*connectTimer_); lock.unlock(); int refCount = weak_from_this().use_count(); - if (!isResultRetryable(result)) { + if (result != ResultAlreadyClosed /* closed by the pool */ && !isResultRetryable(result)) { LOG_ERROR(cnxString() << "Connection closed with " << result << " (refCnt: " << refCount << ")"); } else { LOG_INFO(cnxString() << "Connection disconnected (refCnt: " << refCount << ")");