From 29d28e767c006d90225c8f3aaf1208530c9cc48d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 19 Mar 2026 22:24:28 +0800 Subject: [PATCH 1/4] Fix hasMessageAvailable will return true after seeking to a timestamp newer than the last message --- lib/ConsumerImpl.cc | 29 ++++++++++++++++++----------- tests/ReaderTest.cc | 29 ++++++++++++++++++++++------- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 7cb48212..c2289001 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1642,26 +1642,33 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c callback(result, {}); return; } - auto handleResponse = [self, response, callback] { + bool lastSeekIsByTimestamp = false; + { + LockGuard lock{self->mutex_}; + if (self->lastSeekArg_.has_value() && + std::holds_alternative(self->lastSeekArg_.value())) { + lastSeekIsByTimestamp = true; + } + } + auto handleResponse = [self, lastSeekIsByTimestamp, response, callback] { if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { // We only care about comparing ledger ids and entry ids as mark delete position // doesn't have other ids such as batch index auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(), response.getLastMessageId()); - callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0 - : compareResult < 0); + // When the consumer has sought by timestamp that is later than the last message, the + // mark-delete position will still be the same with the last message id's position. But + // broker won't dispatch messages even if startMessageId is inclusive, so we should return + // false in this case. + if (lastSeekIsByTimestamp || !self->config_.isStartMessageIdInclusive()) { + callback(ResultOk, compareResult < 0); + } else { + callback(ResultOk, compareResult <= 0); + } } else { callback(ResultOk, false); } }; - bool lastSeekIsByTimestamp = false; - { - LockGuard lock{self->mutex_}; - if (self->lastSeekArg_.has_value() && - std::holds_alternative(self->lastSeekArg_.value())) { - lastSeekIsByTimestamp = true; - } - } if (self->config_.isStartMessageIdInclusive() && !lastSeekIsByTimestamp) { self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) { if (result != ResultOk) { diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index 3462b554..8404d441 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -38,6 +38,7 @@ #include "lib/Latch.h" #include "lib/LogUtils.h" #include "lib/ReaderImpl.h" +#include "lib/TimeUtils.h" DECLARE_LOG_OBJECT() using namespace pulsar; @@ -865,13 +866,7 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) { } ASSERT_EQ(ResultOk, reader.seek(MessageId::latest())); - // After seek-to-end the broker may close the consumer and trigger reconnect; allow a short - // delay for hasMessageAvailable to become false (avoids flakiness when reconnect completes). - for (int i = 0; i < 50; i++) { - ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); - if (!hasMessageAvailable) break; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); ASSERT_FALSE(hasMessageAvailable); producer.send(MessageBuilder().setContent("msg-2").build()); @@ -983,6 +978,26 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) { assertStartMessageId(false, secondMsgId); } +TEST_P(ReaderSeekTest, testSeekToEndByTimestamp) { + auto topic = "test-seek-to-end-by-timestamp-" + std::to_string(time(nullptr)); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + + ReaderConfiguration readerConf; + readerConf.setStartMessageIdInclusive(GetParam()); + + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), readerConf, reader)); + + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build())); + auto now = TimeUtils::currentTimeMillis() + 1000; + ASSERT_EQ(ResultOk, reader.seek(now)); + + bool hasMessageAvailable; + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + ASSERT_FALSE(hasMessageAvailable); +} + // Regression test for segfault when Reader is used with messageListenerThreads=0. // Verifies ExecutorServiceProvider(0) does not cause undefined behavior and // ConsumerImpl::messageReceived does not dereference null listenerExecutor_. From 4a86cf9e16d2b66d5e6514a9584be98318faaa99 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 20 Mar 2026 09:47:55 +0800 Subject: [PATCH 2/4] address comments --- lib/ConsumerImpl.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index c2289001..4a624527 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1656,10 +1656,8 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c // doesn't have other ids such as batch index auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(), response.getLastMessageId()); - // When the consumer has sought by timestamp that is later than the last message, the - // mark-delete position will still be the same with the last message id's position. But - // broker won't dispatch messages even if startMessageId is inclusive, so we should return - // false in this case. + // When the consumer has sought by timestamp, broker will ignore the + // startMessageIdInclusive config, so the compare should still be exclusive if (lastSeekIsByTimestamp || !self->config_.isStartMessageIdInclusive()) { callback(ResultOk, compareResult < 0); } else { From 910367ecabde170aa1a28c44cee78cf0f38d1a70 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 20 Mar 2026 11:22:52 +0800 Subject: [PATCH 3/4] improve tests --- tests/ReaderTest.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index 8404d441..20b18cb4 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -24,8 +24,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -38,7 +40,6 @@ #include "lib/Latch.h" #include "lib/LogUtils.h" #include "lib/ReaderImpl.h" -#include "lib/TimeUtils.h" DECLARE_LOG_OBJECT() using namespace pulsar; @@ -990,7 +991,7 @@ TEST_P(ReaderSeekTest, testSeekToEndByTimestamp) { ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), readerConf, reader)); ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build())); - auto now = TimeUtils::currentTimeMillis() + 1000; + auto now = std::numeric_limits::max(); ASSERT_EQ(ResultOk, reader.seek(now)); bool hasMessageAvailable; From 11a67a8b40d8e336095163045066849064907f7f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 20 Mar 2026 11:42:31 +0800 Subject: [PATCH 4/4] fix tests --- tests/ReaderTest.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index 20b18cb4..af833ce6 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -991,7 +991,9 @@ TEST_P(ReaderSeekTest, testSeekToEndByTimestamp) { ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), readerConf, reader)); ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build())); - auto now = std::numeric_limits::max(); + // Server side (Java) uses signal 64 bits integers to represent the timestamp, so use max int64_t here to + // seek to the end of topic. + auto now = std::numeric_limits::max(); ASSERT_EQ(ResultOk, reader.seek(now)); bool hasMessageAvailable;