Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 104 additions & 131 deletions lib/ClientConnection.cc

Large diffs are not rendered by default.

101 changes: 35 additions & 66 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <cstdint>
#include <future>
#include <optional>

#ifdef USE_ASIO
#include <asio/bind_executor.hpp>
#include <asio/io_context.hpp>
Expand All @@ -52,8 +53,10 @@

#include "AsioTimer.h"
#include "Commands.h"
#include "ExecutorService.h"
#include "GetLastMessageIdResponse.h"
#include "LookupDataResult.h"
#include "PendingRequest.h"
#include "SharedBuffer.h"
#include "TimeUtils.h"
#include "UtilAllocator.h"
Expand All @@ -66,9 +69,6 @@ class PulsarFriend;

using TcpResolverPtr = std::shared_ptr<ASIO::ip::tcp::resolver>;

class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;

class ConnectionPool;
class ClientConnection;
typedef std::shared_ptr<ClientConnection> ClientConnectionPtr;
Expand Down Expand Up @@ -225,47 +225,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleKeepAliveTimeout(const ASIO_ERROR& ec);

private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
DeadlineTimerPtr timer;
std::shared_ptr<std::atomic_bool> hasGotResponse{std::make_shared<std::atomic_bool>(false)};

void fail(Result result) {
cancelTimer(*timer);
promise.setFailed(result);
}
};

struct LookupRequestData {
LookupDataResultPromisePtr promise;
DeadlineTimerPtr timer;

void fail(Result result) {
cancelTimer(*timer);
promise->setFailed(result);
}
};

struct LastMessageIdRequestData {
GetLastMessageIdResponsePromisePtr promise;
DeadlineTimerPtr timer;

void fail(Result result) {
cancelTimer(*timer);
promise->setFailed(result);
}
};

struct GetSchemaRequest {
Promise<Result, SchemaInfo> promise;
DeadlineTimerPtr timer;

void fail(Result result) {
cancelTimer(*timer);
promise.setFailed(result);
}
};

/*
* handler for connectAsync
* creates a ConnectionPtr which has a valid ClientConnection object
Expand Down Expand Up @@ -303,12 +262,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType,
const LookupDataResultPromisePtr& promise);

void handleRequestTimeout(const ASIO_ERROR& ec, const PendingRequestData& pendingRequestData);

void handleLookupTimeout(const ASIO_ERROR&, const LookupRequestData&);

void handleGetLastMessageIdTimeout(const ASIO_ERROR&, const LastMessageIdRequestData& data);

template <typename Handler>
inline AllocHandler<Handler> customAllocReadHandler(Handler h) {
return AllocHandler<Handler>(readHandlerAllocator_, h);
Expand Down Expand Up @@ -385,33 +338,49 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
const std::chrono::milliseconds connectTimeout_;
const DeadlineTimerPtr connectTimer_;

typedef std::map<long, PendingRequestData> PendingRequestsMap;
PendingRequestsMap pendingRequests_;
template <typename T>
using RequestMap = std::unordered_map<uint64_t, PendingRequestPtr<T>>;

typedef std::map<long, LookupRequestData> PendingLookupRequestsMap;
PendingLookupRequestsMap pendingLookupRequests_;
RequestMap<ResponseData> pendingRequests_;
RequestMap<LookupDataResultPtr> pendingLookupRequests_;
RequestMap<GetLastMessageIdResponse> pendingGetLastMessageIdRequests_;
RequestMap<NamespaceTopicsPtr> pendingGetNamespaceTopicsRequests_;
RequestMap<SchemaInfo> pendingGetSchemaRequests_;

typedef std::map<long, ProducerImplWeakPtr> ProducersMap;
typedef std::unordered_map<long, ProducerImplWeakPtr> ProducersMap;
ProducersMap producers_;

typedef std::map<long, ConsumerImplWeakPtr> ConsumersMap;
typedef std::unordered_map<long, ConsumerImplWeakPtr> ConsumersMap;
ConsumersMap consumers_;

typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> PendingConsumerStatsMap;
PendingConsumerStatsMap pendingConsumerStatsMap_;

typedef std::map<long, LastMessageIdRequestData> PendingGetLastMessageIdRequestsMap;
PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;

typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;

typedef std::unordered_map<uint64_t, GetSchemaRequest> PendingGetSchemaMap;
PendingGetSchemaMap pendingGetSchemaRequests_;

mutable std::mutex mutex_;
typedef std::unique_lock<std::mutex> Lock;

// Note: this method must be called when holding `mutex_`
template <typename T, typename OnTimeout>
auto insertRequest(RequestMap<T>& pendingRequests, uint64_t requestId, OnTimeout onTimeout) {
auto request = std::make_shared<PendingRequest<T>>(
executor_->createTimer(operationsTimeout_),
[this, self{shared_from_this()}, requestId, onTimeout{std::move(onTimeout)},
&pendingRequests]() mutable {
{
std::lock_guard lock{mutex_};
if (auto it = pendingRequests.find(requestId); it != pendingRequests.end()) {
pendingRequests.erase(it);
}
}
onTimeout();
});
auto [iterator, inserted] = pendingRequests.emplace(requestId, request);
if (inserted) {
request->initialize();
} // else: the request id is duplicated
return iterator->second;
}

// Pending buffers to write on the socket
std::deque<std::any> pendingWriteBuffers_;
int pendingWriteOperations_ = 0;
Expand All @@ -435,7 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
uint32_t maxPendingLookupRequest_;
uint32_t numOfPendingLookupRequest_ = 0;
std::atomic_uint32_t numOfPendingLookupRequest_{0};

bool isTlsAllowInsecureConnection_ = false;

Expand Down
9 changes: 9 additions & 0 deletions lib/ExecutorService.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
#include <asio/ip/tcp.hpp>
#include <asio/post.hpp>
#include <asio/ssl.hpp>
#include <asio/steady_timer.hpp>
#else
#include <boost/asio/dispatch.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/steady_timer.hpp>
#endif
#include <chrono>
#include <condition_variable>
Expand Down Expand Up @@ -68,6 +70,13 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
// throws std::runtime_error if failed
DeadlineTimerPtr createDeadlineTimer();

template <typename Duration>
ASIO::steady_timer createTimer(const Duration &duration) {
auto timer = ASIO::steady_timer(io_context_);
timer.expires_after(duration);
return timer;
}

// Execute the task in the event loop thread asynchronously, i.e. the task will be put in the event loop
// queue and executed later.
template <typename T>
Expand Down
20 changes: 20 additions & 0 deletions lib/MockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,26 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
proto::CommandConsumerStatsResponse response;
response.set_request_id(requestId);
connection->handleConsumerStatsResponse(response);
} else if (request == "LOOKUP") {
proto::CommandLookupTopicResponse response;
response.set_request_id(requestId);
response.set_response(proto::CommandLookupTopicResponse_LookupType_Connect);
response.set_brokerserviceurl("pulsar://localhost:6650");
connection->handleLookupTopicRespose(response);
} else if (request == "GET_LAST_MESSAGE_ID") {
proto::CommandGetLastMessageIdResponse response;
response.set_request_id(requestId);
response.mutable_last_message_id();
connection->handleGetLastMessageIdResponse(response);
} else if (request == "GET_TOPICS_OF_NAMESPACE") {
proto::CommandGetTopicsOfNamespaceResponse response;
response.set_request_id(requestId);
connection->handleGetTopicOfNamespaceResponse(response);
} else if (request == "GET_SCHEMA") {
proto::CommandGetSchemaResponse response;
response.set_request_id(requestId);
response.mutable_schema()->set_type(proto::Schema_Type_String);
connection->handleGetSchemaResponse(response);
} else {
proto::CommandSuccess success;
success.set_request_id(requestId);
Expand Down
76 changes: 76 additions & 0 deletions lib/PendingRequest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <pulsar/Result.h>

#include <atomic>
#include <functional>
#include <memory>

#include "AsioDefines.h"
#include "AsioTimer.h"
#include "Future.h"

namespace pulsar {

template <typename T>
class PendingRequest : public std::enable_shared_from_this<PendingRequest<T>> {
public:
PendingRequest(ASIO::steady_timer timer, std::function<void()> timeoutCallback)
: timer_(std::move(timer)), timeoutCallback_(std::move(timeoutCallback)) {}

void initialize() {
timer_.async_wait([this, weakSelf{this->weak_from_this()}](const auto& error) {
auto self = weakSelf.lock();
if (!self || error || timeoutDisabled_.load(std::memory_order_acquire)) {
return;
}
timeoutCallback_();
promise_.setFailed(ResultTimeout);
});
}

void complete(const T& value) {
promise_.setValue(value);
cancelTimer(timer_);
}

void fail(Result result) {
promise_.setFailed(result);
cancelTimer(timer_);
}

void disableTimeout() { timeoutDisabled_.store(true, std::memory_order_release); }

auto getFuture() const { return promise_.getFuture(); }

~PendingRequest() { cancelTimer(timer_); }

private:
ASIO::steady_timer timer_;
Promise<Result, T> promise_;
std::function<void()> timeoutCallback_;
std::atomic_bool timeoutDisabled_{false};
};

template <typename T>
using PendingRequestPtr = std::shared_ptr<PendingRequest<T>>;

} // namespace pulsar
Loading
Loading