From f28c32fa11e9a2c290c4291e4bdd866281abecb1 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Sun, 8 Mar 2026 18:45:18 +0800 Subject: [PATCH 1/6] fix Flaky test: ReaderSeekTest.testHasMessageAvailableAfterSeekToEnd --- lib/ConsumerImpl.cc | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 757b6e84..c728352d 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1804,7 +1804,13 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c if (result == ResultOk) { LockGuard lock(mutex_); if (getCnx().expired() || reconnectionPending_) { - // It's during reconnection, complete the seek future after connection is established + // It's during reconnection, complete the seek future after connection is established. + // Clear local state now so hasMessageAvailable() does not see stale prefetched messages. + ackGroupingTrackerPtr_->flushAndClean(); + incomingMessages_.clear(); + if (lastSeekArg_.has_value() && std::holds_alternative(lastSeekArg_.value())) { + startMessageId_ = std::get(lastSeekArg_.value()); + } seekStatus_ = SeekStatus::COMPLETED; LOG_INFO(getName() << "Delay the seek future until the reconnection is done"); } else { From fdd0cbb2d130de8e9d1d4261726b8c84869f9a21 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Mon, 16 Mar 2026 16:21:21 +0800 Subject: [PATCH 2/6] fix ci timout --- .github/workflows/ci-pr-validation.yaml | 50 ++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index d209b533..f841e058 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -99,10 +99,25 @@ jobs: key: vcpkg-${{ runner.os }}-${{ hashFiles('vcpkg.json') }} restore-keys: vcpkg-${{ runner.os }}- + - name: Restore vcpkg downloads cache + uses: actions/cache@v4 + with: + path: vcpkg/downloads + key: vcpkg-downloads-${{ runner.os }}-${{ hashFiles('vcpkg.json') }} + restore-keys: vcpkg-downloads-${{ runner.os }}- + - name: Build the project run: | - cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON - cmake --build build -j8 + for attempt in 1 2 3; do + echo "Build attempt $attempt/3" + if cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON && cmake --build build -j8; then + exit 0 + fi + echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..." + sleep 90 + done + echo "All build attempts failed" + exit 1 - name: Tidy check run: | @@ -137,10 +152,25 @@ jobs: key: vcpkg-${{ runner.os }}-${{ hashFiles('vcpkg.json') }} restore-keys: vcpkg-${{ runner.os }}- + - name: Restore vcpkg downloads cache + uses: actions/cache@v4 + with: + path: vcpkg/downloads + key: vcpkg-downloads-${{ runner.os }}-${{ hashFiles('vcpkg.json') }} + restore-keys: vcpkg-downloads-${{ runner.os }}- + - name: Build core libraries run: | - cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=OFF - cmake --build build -j8 + for attempt in 1 2 3; do + echo "Build attempt $attempt/3" + if cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=OFF && cmake --build build -j8; then + exit 0 + fi + echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..." + sleep 90 + done + echo "All build attempts failed" + exit 1 - name: Check formatting run: | @@ -164,8 +194,16 @@ jobs: - name: Build with Boost.Asio run: | - cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON - cmake --build build-boost-asio -j8 + for attempt in 1 2 3; do + echo "Build Boost.Asio attempt $attempt/3" + if cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON && cmake --build build-boost-asio -j8; then + exit 0 + fi + echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..." + sleep 90 + done + echo "All build attempts failed" + exit 1 - name: Build perf tools run: | From 0370bac6f90a8bed3fdc2b4ab1794ac9a68f3d7a Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 17 Mar 2026 09:28:49 +0800 Subject: [PATCH 3/6] fix --- lib/ConsumerImpl.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index b6a0de7e..08f60e14 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -334,11 +334,11 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result return ResultAlreadyClosed; } - mutexLock.unlock(); - LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); incomingMessages_.clear(); possibleSendToDeadLetterTopicMessages_.clear(); backoff_.reset(); + mutexLock.unlock(); + LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); if (!messageListener_ && config_.getReceiverQueueSize() == 0) { // Complicated logic since we don't have a isLocked() function for mutex if (waitingForZeroQueueSizeMessage) { From 277e8e80475d8fade5f0a4b37e430582013bb7cb Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 17 Mar 2026 09:32:31 +0800 Subject: [PATCH 4/6] del useless code --- lib/ConsumerImpl.cc | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 08f60e14..045f3662 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1819,13 +1819,9 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c if (result == ResultOk) { LockGuard lock(mutex_); if (getCnx().expired() || reconnectionPending_) { - // It's during reconnection, complete the seek future after connection is established. - // Clear local state now so hasMessageAvailable() does not see stale prefetched messages. - ackGroupingTrackerPtr_->flushAndClean(); - incomingMessages_.clear(); - if (lastSeekArg_.has_value() && std::holds_alternative(lastSeekArg_.value())) { - startMessageId_ = std::get(lastSeekArg_.value()); - } + // Reconnection path: delay the seek callback until connectionOpened. clearReceiveQueue() + // and handleCreateConsumer() (which clears incomingMessages_ under the lock) run before + // the seek callback is invoked, so hasMessageAvailable() after seek sees cleared state. seekStatus_ = SeekStatus::COMPLETED; LOG_INFO(getName() << "Delay the seek future until the reconnection is done"); } else { From 6c2c9b4853e795410de8cfe7e1201dda59fc3fa2 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Sat, 21 Mar 2026 21:28:21 +0800 Subject: [PATCH 5/6] fix --- lib/ConsumerImpl.cc | 11 ++++++----- tests/ConsumerSeekTest.cc | 25 +++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 045f3662..1c4f8b89 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -334,11 +334,11 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result return ResultAlreadyClosed; } + mutexLock.unlock(); + LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); incomingMessages_.clear(); possibleSendToDeadLetterTopicMessages_.clear(); backoff_.reset(); - mutexLock.unlock(); - LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); if (!messageListener_ && config_.getReceiverQueueSize() == 0) { // Complicated logic since we don't have a isLocked() function for mutex if (waitingForZeroQueueSizeMessage) { @@ -1846,9 +1846,10 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c LockGuard lock{mutex_}; seekStatus_ = SeekStatus::NOT_STARTED; lastSeekArg_ = previousLastSeekArg; - executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() { - callback(ResultOk); - }); + executor_->postWork( + [self, callback{std::exchange(seekCallback_, std::nullopt).value()}, result]() { + callback(result); + }); } }); } diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index f66c27d7..d208fc63 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -258,6 +258,31 @@ TEST_F(ConsumerSeekTest, testReconnectionSlow) { client.close(); } +TEST_F(ConsumerSeekTest, testSeekFailureIsPropagated) { + using namespace std::chrono_literals; + + Client client(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(1)); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe("testSeekFailureIsPropagated", "sub", consumer)); + + auto connection = *PulsarFriend::getConnections(client).begin(); + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + mockServer->setRequestDelay({{"SEEK", 5000}}); + + std::promise promise; + auto future = promise.get_future(); + consumer.seekAsync(MessageId::earliest(), [&promise](Result result) { promise.set_value(result); }); + + // Cancel the mocked SEEK success so request completes with timeout. + ASSERT_GE(mockServer->close(), 1); + + ASSERT_EQ(future.wait_for(5s), std::future_status::ready); + ASSERT_EQ(future.get(), ResultTimeout); + + client.close(); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); } // namespace pulsar From 9b05df124d61e7e05592cc3834f0362cc7282062 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Sat, 21 Mar 2026 21:29:39 +0800 Subject: [PATCH 6/6] format --- lib/ConsumerImpl.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 1c4f8b89..c6de404f 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1846,10 +1846,8 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c LockGuard lock{mutex_}; seekStatus_ = SeekStatus::NOT_STARTED; lastSeekArg_ = previousLastSeekArg; - executor_->postWork( - [self, callback{std::exchange(seekCallback_, std::nullopt).value()}, result]() { - callback(result); - }); + executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()}, + result]() { callback(result); }); } }); }