#include <unordered_set>
#include <list>
#include <seastar/core/future.hh>
+#include <seastar/core/seastar.hh>
#include <seastar/net/api.hh>
-#include <seastar/core/reactor.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/queue.hh>
#include <seastar/core/weak_ptr.hh>
#include <seastar/core/scheduling.hh>
+#include <seastar/util/backtrace.hh>
+#include <seastar/util/log.hh>
namespace seastar {
namespace rpc {
+/// \defgroup rpc rpc - remote procedure call framework
+///
+/// \brief
+/// rpc is a framework that can be used to define client-server communication
+/// protocols.
+/// For a high-level description of the RPC features see
+/// [doc/rpc.md](./md_rpc.html),
+/// [doc/rpc-streaming.md](./md_rpc-streaming.html) and
+/// [doc/rpc-compression.md](./md_rpc-compression.html)
+///
+/// The entry point for setting up an rpc protocol is
+/// seastar::rpc::protocol.
+
using id_type = int64_t;
using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
-struct SerializerConcept {
- // For each serializable type T, implement
- class T;
- template <typename Output>
- friend void write(const SerializerConcept&, Output& output, const T& data);
- template <typename Input>
- friend T read(const SerializerConcept&, Input& input, type<T> type_tag); // type_tag used to disambiguate
- // Input and Output expose void read(char*, size_t) and write(const char*, size_t).
-};
-
static constexpr char rpc_magic[] = "SSTARRPC";
+/// \addtogroup rpc
+/// @{
+
/// Specifies resource isolation for a connection.
struct isolation_config {
/// Specifies a scheduling group under which the connection (and all its
};
/// Default isolation configuration - run everything in the default scheduling group.
+///
+/// In the scheduling_group that the protocol::server was created in.
isolation_config default_isolate_connection(sstring isolation_cookie);
/// \brief Resource limits for an RPC server
};
struct client_options {
- compat::optional<net::tcp_keepalive_params> keepalive;
+ std::optional<net::tcp_keepalive_params> keepalive;
bool tcp_nodelay = true;
+ bool reuseaddr = false;
compressor::factory* compressor_factory = nullptr;
bool send_timeout_data = true;
connection_id stream_parent = invalid_connection_id;
sstring isolation_cookie;
};
+/// @}
+
// RPC call that passes stream connection id as a parameter
// may arrive to a different shard from where the stream connection
// was opened, so the connection id is not known to a server that handles
friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
};
+/// \addtogroup rpc
+/// @{
+
+class server;
+
struct server_options {
compressor::factory* compressor_factory = nullptr;
bool tcp_nodelay = true;
- compat::optional<streaming_domain_type> streaming_domain;
+ std::optional<streaming_domain_type> streaming_domain;
server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
+ // optional filter function. If set, will be called with remote
+ // (connecting) address.
+ // Returning false will refuse the incoming connection.
+ // Returning true will allow the mechanism to proceed.
+ std::function<bool(const socket_address&)> filter_connection = {};
};
+/// @}
+
inline
size_t
estimate_request_size(const resource_limits& lim, size_t serialized_size) {
class logger {
std::function<void(const sstring&)> _logger;
+ ::seastar::logger* _seastar_logger = nullptr;
+ // _seastar_logger will always be used first if it's available
void log(const sstring& str) const {
- if (_logger) {
+ if (_seastar_logger) {
+ // default level for log messages is `info`
+ _seastar_logger->info("{}", str);
+ } else if (_logger) {
_logger(str);
}
}
+ // _seastar_logger will always be used first if it's available
+ template <typename... Args>
+ void log(log_level level, const char* fmt, Args&&... args) const {
+ if (_seastar_logger) {
+ _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
+ // If the log level is at least `info`, fall back to legacy logging without explicit level.
+ // Ignore less severe levels in order not to spam user's log with messages during transition,
+ // i.e. when the user still only defines a level-less logger.
+ } else if (_logger && level <= log_level::info) {
+ _logger(format(fmt, std::forward<Args>(args)...));
+ }
+ }
+
public:
void set(std::function<void(const sstring&)> l) {
_logger = std::move(l);
}
- void operator()(const client_info& info, id_type msg_id, const sstring& str) const {
- log(to_sstring("client ") + inet_ntoa(info.addr.as_posix_sockaddr_in().sin_addr) + " msg_id " + to_sstring(msg_id) + ": " + str);
+ void set(::seastar::logger* logger) {
+ _seastar_logger = logger;
}
- void operator()(const client_info& info, const sstring& str) const {
- log(to_sstring("client ") + inet_ntoa(info.addr.as_posix_sockaddr_in().sin_addr) + ": " + str);
- }
+ void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
+ void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const;
- void operator()(ipv4_addr addr, const sstring& str) const {
- log(to_sstring("client ") + inet_ntoa(in_addr{net::ntoh(addr.ip)}) + ": " + str);
- }
+ void operator()(const client_info& info, const sstring& str) const;
+ void operator()(const client_info& info, log_level level, std::string_view str) const;
+
+ void operator()(const socket_address& addr, const sstring& str) const;
+ void operator()(const socket_address& addr, log_level level, std::string_view str) const;
};
class connection {
struct outgoing_entry {
timer<rpc_clock_type> t;
snd_buf buf;
- compat::optional<promise<>> p = promise<>();
+ std::optional<promise<>> p = promise<>();
cancellable* pcancel = nullptr;
outgoing_entry(snd_buf b) : buf(std::move(b)) {}
- outgoing_entry(outgoing_entry&& o) : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
- o.p = compat::nullopt;
+ outgoing_entry(outgoing_entry&& o) noexcept : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
+ o.p = std::nullopt;
}
~outgoing_entry() {
if (p) {
std::unordered_map<connection_id, xshard_connection_ptr> _streams;
queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
semaphore _stream_sem = semaphore(max_stream_buffers_memory);
- bool _sink_closed = false;
- bool _source_closed = false;
+ bool _sink_closed = true;
+ bool _source_closed = true;
// the future holds if sink is already closed
// if it is not ready it means the sink is been closed
future<bool> _sink_closed_future = make_ready_future<bool>(false);
template<outgoing_queue_type QueueType> void send_loop();
future<> stop_send_loop();
- future<compat::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
+ future<std::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
bool stream_check_twoway_closed() {
return _sink_closed && _source_closed;
}
future<> send_negotiation_frame(feature_map features);
// functions below are public because they are used by external heavily templated functions
// and I am not smart enough to know how to define them as friends
- future<> send(snd_buf buf, compat::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
+ future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
bool error() { return _error; }
void abort();
- future<> stop();
+ future<> stop() noexcept;
future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
future<> close_sink() {
_sink_closed = true;
}
xshard_connection_ptr get_stream(connection_id id) const;
void register_stream(connection_id id, xshard_connection_ptr c);
- virtual ipv4_addr peer_address() const = 0;
+ virtual socket_address peer_address() const = 0;
const logger& get_logger() const {
return _logger;
return *static_cast<Serializer*>(_serializer);
}
- template <typename FrameType, typename Info>
- typename FrameType::return_type read_frame(const Info& info, input_stream<char>& in);
+ template <typename FrameType>
+ typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in);
- template <typename FrameType, typename Info>
- typename FrameType::return_type read_frame_compressed(const Info& info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
+ template <typename FrameType>
+ typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
friend class client;
+ template<typename Serializer, typename... Out>
+ friend class sink_impl;
+ template<typename Serializer, typename... In>
+ friend class source_impl;
+};
+
+struct deferred_snd_buf {
+ promise<> pr;
+ snd_buf data;
};
// send data Out...
template<typename Serializer, typename... Out>
class sink_impl : public sink<Out...>::impl {
+ // Used on the shard *this lives on.
+ alignas (cache_line_size) uint64_t _next_seq_num = 1;
+
+ // Used on the shard the _conn lives on.
+ struct alignas (cache_line_size) {
+ uint64_t last_seq_num = 0;
+ std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
+ } _remote_state;
public:
- sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) {}
+ sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
future<> operator()(const Out&... args) override;
future<> close() override;
+ future<> flush() override;
+ ~sink_impl() override;
};
// receive data In...
template<typename Serializer, typename... In>
class source_impl : public source<In...>::impl {
public:
- source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) {}
- future<compat::optional<std::tuple<In...>>> operator()() override;
+ source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
+ future<std::optional<std::tuple<In...>>> operator()() override;
};
class client : public rpc::connection, public weakly_referencable<client> {
};
private:
std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
- ipv4_addr _server_addr;
+ socket_address _server_addr, _local_addr;
client_options _options;
- compat::optional<shared_promise<>> _client_negotiated = shared_promise<>();
+ std::optional<shared_promise<>> _client_negotiated = shared_promise<>();
weak_ptr<client> _parent; // for stream clients
private:
future<> negotiate_protocol(input_stream<char>& in);
void negotiate(feature_map server_features);
- future<int64_t, compat::optional<rcv_buf>>
+ future<std::tuple<int64_t, std::optional<rcv_buf>>>
read_response_frame(input_stream<char>& in);
- future<int64_t, compat::optional<rcv_buf>>
+ future<std::tuple<int64_t, std::optional<rcv_buf>>>
read_response_frame_compressed(input_stream<char>& in);
void send_loop() {
if (is_stream()) {
/**
* Create client object which will attempt to connect to the remote address.
*
+ * @param l \ref seastar::logger to use for logging error messages
+ * @param s an optional connection serializer
* @param addr the remote address identifying this client
* @param local the local address of this client
*/
- client(const logger& l, void* s, ipv4_addr addr, ipv4_addr local = ipv4_addr());
- client(const logger& l, void* s, client_options options, ipv4_addr addr, ipv4_addr local = ipv4_addr());
+ client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
+ client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
/**
* Create client object which will attempt to connect to the remote address using the
* specified seastar::socket.
*
+ * @param l \ref seastar::logger to use for logging error messages
+ * @param s an optional connection serializer
* @param addr the remote address identifying this client
* @param local the local address of this client
* @param socket the socket object use to connect to the remote address
*/
- client(const logger& l, void* s, socket socket, ipv4_addr addr, ipv4_addr local = ipv4_addr());
- client(const logger& l, void* s, client_options options, socket socket, ipv4_addr addr, ipv4_addr local = ipv4_addr());
+ client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
+ client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
stats get_stats() const;
stats& get_stats_internal() {
return _stats;
}
auto next_message_id() { return _message_id++; }
- void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
+ void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
void wait_timed_out(id_type id);
- future<> stop();
+ future<> stop() noexcept;
void abort_all_streams();
void deregister_this_stream();
- ipv4_addr peer_address() const override {
+ socket_address peer_address() const override {
return _server_addr;
}
future<> await_connection() {
client_options o = _options;
o.stream_parent = this->get_connection_id();
o.send_timeout_data = false;
- auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr);
+ auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
c->_parent = this->weak_from_this();
c->_is_stream = true;
return c->await_connection().then([c, this] {
}
template<typename Serializer, typename... Out>
future<sink<Out...>> make_stream_sink() {
- return make_stream_sink<Serializer, Out...>(engine().net().socket());
+ return make_stream_sink<Serializer, Out...>(make_socket());
}
};
server& _server;
client_info _info;
connection_id _parent_id = invalid_connection_id;
- compat::optional<isolation_config> _isolation_config;
+ std::optional<isolation_config> _isolation_config;
private:
future<> negotiate_protocol(input_stream<char>& in);
- future<compat::optional<uint64_t>, uint64_t, int64_t, compat::optional<rcv_buf>>
+ future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
read_request_frame_compressed(input_stream<char>& in);
future<feature_map> negotiate(feature_map requested);
void send_loop() {
rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>();
}
}
+ future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
public:
connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
future<> process();
- future<> respond(int64_t msg_id, snd_buf&& data, compat::optional<rpc_clock_type::time_point> timeout);
+ future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
client_info& info() { return _info; }
const client_info& info() const { return _info; }
stats get_stats() const {
stats& get_stats_internal() {
return _stats;
}
- ipv4_addr peer_address() const override {
- return ipv4_addr(_info.addr);
+ socket_address peer_address() const override {
+ return _info.addr;
}
// Resources will be released when this goes out of scope
- future<resource_permit> wait_for_resources(size_t memory_consumed, compat::optional<rpc_clock_type::time_point> timeout) {
+ future<resource_permit> wait_for_resources(size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
if (timeout) {
return get_units(_server._resources_available, memory_consumed, *timeout);
} else {
uint64_t _next_client_id = 1;
public:
- server(protocol_base* proto, ipv4_addr addr, resource_limits memory_limit = resource_limits());
- server(protocol_base* proto, server_options opts, ipv4_addr addr, resource_limits memory_limit = resource_limits());
+ server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
+ server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
server(protocol_base* proto, server_socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{});
server(protocol_base* proto, server_options opts, server_socket, resource_limits memory_limit = resource_limits());
void accept();
friend client;
};
-using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, compat::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
+using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
rcv_buf data)>;
struct rpc_handler {
scheduling_group sg;
rpc_handler_func func;
+ gate use_gate;
};
class protocol_base {
public:
virtual ~protocol_base() {};
virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
+protected:
+ friend class server;
+
virtual rpc_handler* get_handler(uint64_t msg_id) = 0;
+ virtual void put_handler(rpc_handler*) = 0;
};
-// MsgType is a type that holds type of a message. The type should be hashable
-// and serializable. It is preferable to use enum for message types, but
-// do not forget to provide hash function for it
+/// \addtogroup rpc
+/// @{
+
+/// Defines a protocol for communication between a server and a client.
+///
+/// A protocol is defined by a `Serializer` and a `MsgType`. The `Serializer` is
+/// responsible for serializing and unserializing all types used as arguments and
+/// return types used in the protocol. The `Serializer` is expected to define a
+/// `read()` and `write()` method for each such type `T` as follows:
+///
+/// template <typename Output>
+/// void write(const serializer&, Output& output, const T& data);
+///
+/// template <typename Input>
+/// T read(const serializer&, Input& input, type<T> type_tag); // type_tag used to disambiguate
+///
+/// Where `Input` and `Output` have a `void read(char*, size_t)` and
+/// `write(const char*, size_t)` respectively.
+/// `MsgType` defines the type to be used as the message id, the id which is
+/// used to identify different messages used in the protocol. These are also
+/// often referred to as "verbs". The client will use the message id, to
+/// specify the remote method (verb) to invoke on the server. The server uses
+/// the message id to dispatch the incoming call to the right handler.
+/// `MsgType` should be hashable and serializable. It is preferable to use enum
+/// for message types, but do not forget to provide hash function for it.
+///
+/// Use register_handler() on the server to define the available verbs and the
+/// code to be executed when they are invoked by clients. Use make_client() on
+/// the client to create a matching callable that can be used to invoke the
+/// verb on the server and wait for its result. Note that register_handler()
+/// also returns a client, that can be used to invoke the registered verb on
+/// another node (given that the other node has the same verb). This is useful
+/// for symmetric protocols, where two or more nodes all have servers as well as
+/// connect to the other nodes as clients.
+///
+/// Use protocol::server to listen for and accept incoming connections on the
+/// server and protocol::client to establish connections to the server.
+/// Note that registering the available verbs can be done before/after
+/// listening for connections, but best to ensure that by the time incoming
+/// requests are to be expected, all the verbs are set-up.
+///
+/// ## Configuration
+///
+/// TODO
+///
+/// ## Isolation
+///
+/// RPC supports isolating verb handlers from each other. There are two ways to
+/// achieve this: per-handler isolation (the old way) and per-connection
+/// isolation (the new way). If no isolation is configured, all handlers will be
+/// executed in the context of the scheduling_group in which the
+/// protocol::server was created.
+///
+/// Per-handler isolation (the old way) can be configured by using the
+/// register_handler() overload which takes a scheduling_group. When invoked,
+/// the body of the handler will be executed from the context of the configured
+/// scheduling_group.
+///
+/// Per-connection isolation (the new way) is a more flexible mechanism that
+/// requires user application provided logic to determine how connections are
+/// isolated. This mechanism has two parts, the server and the client part.
+/// The client configures isolation by setting client_options::isolation_cookie.
+/// This cookie is an opaque (to the RPC layer) string that is to be interpreted
+/// on the server using user application provided logic. The application
+/// provides this logic to the server by setting
+/// resource_limits::isolate_connection to an appropriate handler function, that
+/// interprets the opaque cookie and resolves it to an isolation_config. The
+/// scheduling_group in the former will be used not just to execute all verb
+/// handlers, but also the connection loop itself, hence providing better
+/// isolation.
+///
+/// There a few gotchas related to mixing the two isolation mechanisms. This can
+/// happen when the application is updated and one of the client/server is
+/// still using the old/new mechanism. In general per-connection isolation
+/// overrides the per-handler one. If both are set up, the former will determine
+/// the scheduling_group context for the handlers. If the client is not
+/// configured to send an isolation cookie, the server's
+/// resource_limits::isolate_connection will not be invoked and the server will
+/// fall back to per-handler isolation if configured. If the client is
+/// configured to send an isolation cookie but the server doesn't have a
+/// resource_limits::isolate_connection configured, it will use
+/// default_isolate_connection() to interpret the cookie. Note that this still
+/// overrides the per-handler isolation if any is configured. If the server is
+/// so old that it doesn't have the per-connection isolation feature at all, it
+/// will of course just use the per-handler one, if configured.
+///
+/// ## Compatibility
+///
+/// TODO
+///
+/// \tparam Serializer the serializer for the protocol.
+/// \tparam MsgType the type to be used as the message id or verb id.
template<typename Serializer, typename MsgType = uint32_t>
-class protocol : public protocol_base {
+class protocol final : public protocol_base {
public:
+ /// Represents the listening port and all accepted connections.
class server : public rpc::server {
public:
- server(protocol& proto, ipv4_addr addr, resource_limits memory_limit = resource_limits()) :
+ server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
rpc::server(&proto, addr, memory_limit) {}
- server(protocol& proto, server_options opts, ipv4_addr addr, resource_limits memory_limit = resource_limits()) :
+ server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
rpc::server(&proto, opts, addr, memory_limit) {}
server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{}) :
rpc::server(&proto, std::move(socket), memory_limit) {}
server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) :
rpc::server(&proto, opts, std::move(socket), memory_limit) {}
};
+ /// Represents a client side connection.
class client : public rpc::client {
public:
/*
* @param addr the remote address identifying this client
* @param local the local address of this client
*/
- client(protocol& p, ipv4_addr addr, ipv4_addr local = ipv4_addr()) :
+ client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
rpc::client(p.get_logger(), &p._serializer, addr, local) {}
- client(protocol& p, client_options options, ipv4_addr addr, ipv4_addr local = ipv4_addr()) :
+ client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
/**
* @param local the local address of this client
* @param socket the socket object use to connect to the remote address
*/
- client(protocol& p, socket socket, ipv4_addr addr, ipv4_addr local = ipv4_addr()) :
+ client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
- client(protocol& p, client_options options, socket socket, ipv4_addr addr, ipv4_addr local = ipv4_addr()) :
+ client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
};
public:
protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
+
+ /// Creates a callable that can be used to invoke the verb on the remote.
+ ///
+ /// \tparam Func The signature of the verb. Has to be either the same or
+ /// compatible with the one passed to register_handler on the server.
+ /// \param t the verb to invoke on the remote.
+ ///
+ /// \returns a callable whose signature is derived from Func as follows:
+ /// given `Func == Ret(Args...)` the returned callable has the following
+ /// signature: `future<Ret>(protocol::client&, Args...)`.
template<typename Func>
auto make_client(MsgType t);
- // returns a function which type depends on Func
- // if Func == Ret(Args...) then return function is
- // future<Ret>(protocol::client&, Args...)
+ /// Register a handler to be called when this verb is invoked.
+ ///
+ /// \tparam Func the type of the handler for the verb. This determines the
+ /// signature of the verb.
+ /// \param t the verb to register the handler for.
+ /// \param func the callable to be called when the verb is invoked by the
+ /// remote.
+ ///
+ /// \returns a client, a callable that can be used to invoke the verb. See
+ /// make_client(). The client can be discarded, in fact this is what
+ /// most callers will do as real clients will live on a remote node, not
+ /// on the one where handlers are registered.
template<typename Func>
auto register_handler(MsgType t, Func&& func);
- // returns a function which type depends on Func
- // if Func == Ret(Args...) then return function is
- // future<Ret>(protocol::client&, Args...)
+ /// Register a handler to be called when this verb is invoked.
+ ///
+ /// \tparam Func the type of the handler for the verb. This determines the
+ /// signature of the verb.
+ /// \param t the verb to register the handler for.
+ /// \param sg the scheduling group that will be used to invoke the handler
+ /// in. This can be used to execute different verbs in different
+ /// scheduling groups. Note that there is a newer mechanism to determine
+ /// the scheduling groups a handler will run it per invocation, see
+ /// isolation_config.
+ /// \param func the callable to be called when the verb is invoked by the
+ /// remote.
+ ///
+ /// \returns a client, a callable that can be used to invoke the verb. See
+ /// make_client(). The client can be discarded, in fact this is what
+ /// most callers will do as real clients will live on a remote node, not
+ /// on the one where handlers are registered.
template <typename Func>
auto register_handler(MsgType t, scheduling_group sg, Func&& func);
- void unregister_handler(MsgType t) {
- _handlers.erase(t);
- }
+ /// Unregister the handler for the verb.
+ ///
+ /// Waits for all currently running handlers, then unregisters the handler.
+ /// Future attempts to invoke the verb will fail. This becomes effective
+ /// immediately after calling this function.
+ ///
+ /// \param t the verb to unregister the handler for.
+ ///
+ /// \returns a future that becomes available once all currently running
+ /// handlers finished.
+ future<> unregister_handler(MsgType t);
+ /// Set a logger function to be used to log messages.
+ ///
+ /// \deprecated use the logger overload set_logger(::seastar::logger*)
+ /// instead.
+ [[deprecated("Use set_logger(::seastar::logger*) instead")]]
void set_logger(std::function<void(const sstring&)> logger) {
_logger.set(std::move(logger));
}
+ /// Set a logger to be used to log messages.
+ void set_logger(::seastar::logger* logger) {
+ _logger.set(logger);
+ }
+
const logger& get_logger() const {
return _logger;
}
return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
}
- rpc_handler* get_handler(uint64_t msg_id) override {
- auto it = _handlers.find(MsgType(msg_id));
- if (it != _handlers.end()) {
- return &it->second;
- } else {
- return nullptr;
- }
+ bool has_handler(MsgType msg_id);
+
+ /// Checks if any there are handlers registered.
+ /// Debugging helper, should only be used for debugging and not relied on.
+ ///
+ /// \returns true if there are, false if there are no registered handlers.
+ bool has_handlers() const noexcept {
+ return !_handlers.empty();
}
private:
+ rpc_handler* get_handler(uint64_t msg_id) override;
+ void put_handler(rpc_handler*) override;
+
template<typename Ret, typename... In>
auto make_client(signature<Ret(In...)> sig, MsgType t);
void register_receiver(MsgType t, rpc_handler&& handler) {
- _handlers.emplace(t, std::move(handler));
+ auto r = _handlers.emplace(t, std::move(handler));
+ if (!r.second) {
+ throw_with_backtrace<std::runtime_error>("registered handler already exists");
+ }
}
};
+
+/// @}
+
}
}