]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/include/seastar/rpc/rpc.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc.hh
index 4a3c8adaf3837b6f7425f24202d2f6c2d1def94e..c92cddec43d1f83e628602eb4e9b475fc81c9ad9 100644 (file)
@@ -25,8 +25,8 @@
 #include <unordered_set>
 #include <list>
 #include <seastar/core/future.hh>
+#include <seastar/core/seastar.hh>
 #include <seastar/net/api.hh>
-#include <seastar/core/reactor.hh>
 #include <seastar/core/iostream.hh>
 #include <seastar/core/shared_ptr.hh>
 #include <seastar/core/condition-variable.hh>
 #include <seastar/core/queue.hh>
 #include <seastar/core/weak_ptr.hh>
 #include <seastar/core/scheduling.hh>
+#include <seastar/util/backtrace.hh>
+#include <seastar/util/log.hh>
 
 namespace seastar {
 
 namespace rpc {
 
+/// \defgroup rpc rpc - remote procedure call framework
+///
+/// \brief
+/// rpc is a framework that can be used to define client-server communication
+/// protocols.
+/// For a high-level description of the RPC features see
+/// [doc/rpc.md](./md_rpc.html),
+/// [doc/rpc-streaming.md](./md_rpc-streaming.html) and
+/// [doc/rpc-compression.md](./md_rpc-compression.html)
+///
+/// The entry point for setting up an rpc protocol is
+/// seastar::rpc::protocol.
+
 using id_type = int64_t;
 
 using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
 using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
 
-struct SerializerConcept {
-    // For each serializable type T, implement
-    class T;
-    template <typename Output>
-    friend void write(const SerializerConcept&, Output& output, const T& data);
-    template <typename Input>
-    friend T read(const SerializerConcept&, Input& input, type<T> type_tag);  // type_tag used to disambiguate
-    // Input and Output expose void read(char*, size_t) and write(const char*, size_t).
-};
-
 static constexpr char rpc_magic[] = "SSTARRPC";
 
+/// \addtogroup rpc
+/// @{
+
 /// Specifies resource isolation for a connection.
 struct isolation_config {
     /// Specifies a scheduling group under which the connection (and all its
@@ -67,6 +75,8 @@ struct isolation_config {
 };
 
 /// Default isolation configuration - run everything in the default scheduling group.
+///
+/// In the scheduling_group that the protocol::server was created in.
 isolation_config default_isolate_connection(sstring isolation_cookie);
 
 /// \brief Resource limits for an RPC server
@@ -90,8 +100,9 @@ struct resource_limits {
 };
 
 struct client_options {
-    compat::optional<net::tcp_keepalive_params> keepalive;
+    std::optional<net::tcp_keepalive_params> keepalive;
     bool tcp_nodelay = true;
+    bool reuseaddr = false;
     compressor::factory* compressor_factory = nullptr;
     bool send_timeout_data = true;
     connection_id stream_parent = invalid_connection_id;
@@ -101,6 +112,8 @@ struct client_options {
     sstring isolation_cookie;
 };
 
+/// @}
+
 // RPC call that passes stream connection id as a parameter
 // may arrive to a different shard from where the stream connection
 // was opened, so the connection id is not known to a server that handles
@@ -122,13 +135,25 @@ public:
     friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
 };
 
+/// \addtogroup rpc
+/// @{
+
+class server;
+
 struct server_options {
     compressor::factory* compressor_factory = nullptr;
     bool tcp_nodelay = true;
-    compat::optional<streaming_domain_type> streaming_domain;
+    std::optional<streaming_domain_type> streaming_domain;
     server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
+    // optional filter function. If set, will be called with remote 
+    // (connecting) address.    
+    // Returning false will refuse the incoming connection. 
+    // Returning true will allow the mechanism to proceed.
+    std::function<bool(const socket_address&)> filter_connection = {};
 };
 
+/// @}
+
 inline
 size_t
 estimate_request_size(const resource_limits& lim, size_t serialized_size) {
@@ -157,29 +182,48 @@ struct signature;
 
 class logger {
     std::function<void(const sstring&)> _logger;
+    ::seastar::logger* _seastar_logger = nullptr;
 
+    // _seastar_logger will always be used first if it's available
     void log(const sstring& str) const {
-        if (_logger) {
+        if (_seastar_logger) {
+            // default level for log messages is `info`
+            _seastar_logger->info("{}", str);
+        } else if (_logger) {
             _logger(str);
         }
     }
 
+    // _seastar_logger will always be used first if it's available
+    template <typename... Args>
+    void log(log_level level, const char* fmt, Args&&... args) const {
+        if (_seastar_logger) {
+            _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
+        // If the log level is at least `info`, fall back to legacy logging without explicit level.
+        // Ignore less severe levels in order not to spam user's log with messages during transition,
+        // i.e. when the user still only defines a level-less logger.
+        } else if (_logger && level <= log_level::info) {
+            _logger(format(fmt, std::forward<Args>(args)...));
+        }
+    }
+
 public:
     void set(std::function<void(const sstring&)> l) {
         _logger = std::move(l);
     }
 
-    void operator()(const client_info& info, id_type msg_id, const sstring& str) const {
-        log(to_sstring("client ") + inet_ntoa(info.addr.as_posix_sockaddr_in().sin_addr) + " msg_id " + to_sstring(msg_id) + ": " + str);
+    void set(::seastar::logger* logger) {
+        _seastar_logger = logger;
     }
 
-    void operator()(const client_info& info, const sstring& str) const {
-        log(to_sstring("client ") + inet_ntoa(info.addr.as_posix_sockaddr_in().sin_addr) + ": " + str);
-    }
+    void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
+    void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const;
 
-    void operator()(ipv4_addr addr, const sstring& str) const {
-        log(to_sstring("client ") + inet_ntoa(in_addr{net::ntoh(addr.ip)}) + ": " + str);
-    }
+    void operator()(const client_info& info, const sstring& str) const;
+    void operator()(const client_info& info, log_level level, std::string_view str) const;
+
+    void operator()(const socket_address& addr, const sstring& str) const;
+    void operator()(const socket_address& addr, log_level level, std::string_view str) const;
 };
 
 class connection {
@@ -198,11 +242,11 @@ protected:
     struct outgoing_entry {
         timer<rpc_clock_type> t;
         snd_buf buf;
-        compat::optional<promise<>> p = promise<>();
+        std::optional<promise<>> p = promise<>();
         cancellable* pcancel = nullptr;
         outgoing_entry(snd_buf b) : buf(std::move(b)) {}
-        outgoing_entry(outgoing_entry&& o) : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
-            o.p = compat::nullopt;
+        outgoing_entry(outgoing_entry&& o) noexcept : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
+            o.p = std::nullopt;
         }
         ~outgoing_entry() {
             if (p) {
@@ -227,8 +271,8 @@ protected:
     std::unordered_map<connection_id, xshard_connection_ptr> _streams;
     queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
     semaphore _stream_sem = semaphore(max_stream_buffers_memory);
-    bool _sink_closed = false;
-    bool _source_closed = false;
+    bool _sink_closed = true;
+    bool _source_closed = true;
     // the future holds if sink is already closed
     // if it is not ready it means the sink is been closed
     future<bool> _sink_closed_future = make_ready_future<bool>(false);
@@ -248,7 +292,7 @@ protected:
 
     template<outgoing_queue_type QueueType> void send_loop();
     future<> stop_send_loop();
-    future<compat::optional<rcv_buf>>  read_stream_frame_compressed(input_stream<char>& in);
+    future<std::optional<rcv_buf>>  read_stream_frame_compressed(input_stream<char>& in);
     bool stream_check_twoway_closed() {
         return _sink_closed && _source_closed;
     }
@@ -266,10 +310,10 @@ public:
     future<> send_negotiation_frame(feature_map features);
     // functions below are public because they are used by external heavily templated functions
     // and I am not smart enough to know how to define them as friends
-    future<> send(snd_buf buf, compat::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
+    future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
     bool error() { return _error; }
     void abort();
-    future<> stop();
+    future<> stop() noexcept;
     future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
     future<> close_sink() {
         _sink_closed = true;
@@ -293,7 +337,7 @@ public:
     }
     xshard_connection_ptr get_stream(connection_id id) const;
     void register_stream(connection_id id, xshard_connection_ptr c);
-    virtual ipv4_addr peer_address() const = 0;
+    virtual socket_address peer_address() const = 0;
 
     const logger& get_logger() const {
         return _logger;
@@ -304,29 +348,48 @@ public:
         return *static_cast<Serializer*>(_serializer);
     }
 
-    template <typename FrameType, typename Info>
-    typename FrameType::return_type read_frame(const Info& info, input_stream<char>& in);
+    template <typename FrameType>
+    typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in);
 
-    template <typename FrameType, typename Info>
-    typename FrameType::return_type read_frame_compressed(const Info& info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
+    template <typename FrameType>
+    typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
     friend class client;
+    template<typename Serializer, typename... Out>
+    friend class sink_impl;
+    template<typename Serializer, typename... In>
+    friend class source_impl;
+};
+
+struct deferred_snd_buf {
+    promise<> pr;
+    snd_buf data;
 };
 
 // send data Out...
 template<typename Serializer, typename... Out>
 class sink_impl : public sink<Out...>::impl {
+    // Used on the shard *this lives on.
+    alignas (cache_line_size) uint64_t _next_seq_num = 1;
+
+    // Used on the shard the _conn lives on.
+    struct alignas (cache_line_size) {
+        uint64_t last_seq_num = 0;
+        std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
+    } _remote_state;
 public:
-    sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) {}
+    sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
     future<> operator()(const Out&... args) override;
     future<> close() override;
+    future<> flush() override;
+    ~sink_impl() override;
 };
 
 // receive data In...
 template<typename Serializer, typename... In>
 class source_impl : public source<In...>::impl {
 public:
-    source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) {}
-    future<compat::optional<std::tuple<In...>>> operator()() override;
+    source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
+    future<std::optional<std::tuple<In...>>> operator()() override;
 };
 
 class client : public rpc::connection, public weakly_referencable<client> {
@@ -366,17 +429,17 @@ public:
     };
 private:
     std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
-    ipv4_addr _server_addr;
+    socket_address _server_addr, _local_addr;
     client_options _options;
-    compat::optional<shared_promise<>> _client_negotiated = shared_promise<>();
+    std::optional<shared_promise<>> _client_negotiated = shared_promise<>();
     weak_ptr<client> _parent; // for stream clients
 
 private:
     future<> negotiate_protocol(input_stream<char>& in);
     void negotiate(feature_map server_features);
-    future<int64_t, compat::optional<rcv_buf>>
+    future<std::tuple<int64_t, std::optional<rcv_buf>>>
     read_response_frame(input_stream<char>& in);
-    future<int64_t, compat::optional<rcv_buf>>
+    future<std::tuple<int64_t, std::optional<rcv_buf>>>
     read_response_frame_compressed(input_stream<char>& in);
     void send_loop() {
         if (is_stream()) {
@@ -389,34 +452,38 @@ public:
     /**
      * Create client object which will attempt to connect to the remote address.
      *
+     * @param l \ref seastar::logger to use for logging error messages
+     * @param s an optional connection serializer
      * @param addr the remote address identifying this client
      * @param local the local address of this client
      */
-    client(const logger& l, void* s, ipv4_addr addr, ipv4_addr local = ipv4_addr());
-    client(const logger& l, void* s, client_options options, ipv4_addr addr, ipv4_addr local = ipv4_addr());
+    client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
+    client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
 
      /**
      * Create client object which will attempt to connect to the remote address using the
      * specified seastar::socket.
      *
+     * @param l \ref seastar::logger to use for logging error messages
+     * @param s an optional connection serializer
      * @param addr the remote address identifying this client
      * @param local the local address of this client
      * @param socket the socket object use to connect to the remote address
      */
-    client(const logger& l, void* s, socket socket, ipv4_addr addr, ipv4_addr local = ipv4_addr());
-    client(const logger& l, void* s, client_options options, socket socket, ipv4_addr addr, ipv4_addr local = ipv4_addr());
+    client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
+    client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
 
     stats get_stats() const;
     stats& get_stats_internal() {
         return _stats;
     }
     auto next_message_id() { return _message_id++; }
-    void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
+    void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
     void wait_timed_out(id_type id);
-    future<> stop();
+    future<> stop() noexcept;
     void abort_all_streams();
     void deregister_this_stream();
-    ipv4_addr peer_address() const override {
+    socket_address peer_address() const override {
         return _server_addr;
     }
     future<> await_connection() {
@@ -435,7 +502,7 @@ public:
             client_options o = _options;
             o.stream_parent = this->get_connection_id();
             o.send_timeout_data = false;
-            auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr);
+            auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
             c->_parent = this->weak_from_this();
             c->_is_stream = true;
             return c->await_connection().then([c, this] {
@@ -447,7 +514,7 @@ public:
     }
     template<typename Serializer, typename... Out>
     future<sink<Out...>> make_stream_sink() {
-        return make_stream_sink<Serializer, Out...>(engine().net().socket());
+        return make_stream_sink<Serializer, Out...>(make_socket());
     }
 };
 
@@ -462,10 +529,10 @@ public:
         server& _server;
         client_info _info;
         connection_id _parent_id = invalid_connection_id;
-        compat::optional<isolation_config> _isolation_config;
+        std::optional<isolation_config> _isolation_config;
     private:
         future<> negotiate_protocol(input_stream<char>& in);
-        future<compat::optional<uint64_t>, uint64_t, int64_t, compat::optional<rcv_buf>>
+        future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
         read_request_frame_compressed(input_stream<char>& in);
         future<feature_map> negotiate(feature_map requested);
         void send_loop() {
@@ -475,10 +542,11 @@ public:
                 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>();
             }
         }
+        future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
     public:
         connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
         future<> process();
-        future<> respond(int64_t msg_id, snd_buf&& data, compat::optional<rpc_clock_type::time_point> timeout);
+        future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
         client_info& info() { return _info; }
         const client_info& info() const { return _info; }
         stats get_stats() const {
@@ -490,11 +558,11 @@ public:
         stats& get_stats_internal() {
             return _stats;
         }
-        ipv4_addr peer_address() const override {
-            return ipv4_addr(_info.addr);
+        socket_address peer_address() const override {
+            return _info.addr;
         }
         // Resources will be released when this goes out of scope
-        future<resource_permit> wait_for_resources(size_t memory_consumed,  compat::optional<rpc_clock_type::time_point> timeout) {
+        future<resource_permit> wait_for_resources(size_t memory_consumed,  std::optional<rpc_clock_type::time_point> timeout) {
             if (timeout) {
                 return get_units(_server._resources_available, memory_consumed, *timeout);
             } else {
@@ -524,8 +592,8 @@ private:
     uint64_t _next_client_id = 1;
 
 public:
-    server(protocol_base* proto, ipv4_addr addr, resource_limits memory_limit = resource_limits());
-    server(protocol_base* proto, server_options opts, ipv4_addr addr, resource_limits memory_limit = resource_limits());
+    server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
+    server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
     server(protocol_base* proto, server_socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{});
     server(protocol_base* proto, server_options opts, server_socket, resource_limits memory_limit = resource_limits());
     void accept();
@@ -543,38 +611,134 @@ public:
     friend client;
 };
 
-using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, compat::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
+using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
                                                  rcv_buf data)>;
 
 struct rpc_handler {
     scheduling_group sg;
     rpc_handler_func func;
+    gate use_gate;
 };
 
 class protocol_base {
 public:
     virtual ~protocol_base() {};
     virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
+protected:
+    friend class server;
+
     virtual rpc_handler* get_handler(uint64_t msg_id) = 0;
+    virtual void put_handler(rpc_handler*) = 0;
 };
 
-// MsgType is a type that holds type of a message. The type should be hashable
-// and serializable. It is preferable to use enum for message types, but
-// do not forget to provide hash function for it
+/// \addtogroup rpc
+/// @{
+
+/// Defines a protocol for communication between a server and a client.
+///
+/// A protocol is defined by a `Serializer` and a `MsgType`. The `Serializer` is
+/// responsible for serializing and unserializing all types used as arguments and
+/// return types used in the protocol. The `Serializer` is expected to define a
+/// `read()` and `write()` method for each such type `T` as follows:
+///
+///     template <typename Output>
+///     void write(const serializer&, Output& output, const T& data);
+///
+///     template <typename Input>
+///     T read(const serializer&, Input& input, type<T> type_tag);  // type_tag used to disambiguate
+///
+/// Where `Input` and `Output` have a `void read(char*, size_t)` and
+/// `write(const char*, size_t)` respectively.
+/// `MsgType` defines the type to be used as the message id, the id which is
+/// used to identify different messages used in the protocol. These are also
+/// often referred to as "verbs". The client will use the message id, to
+/// specify the remote method (verb) to invoke on the server. The server uses
+/// the message id to dispatch the incoming call to the right handler.
+/// `MsgType` should be hashable and serializable. It is preferable to use enum
+/// for message types, but do not forget to provide hash function for it.
+///
+/// Use register_handler() on the server to define the available verbs and the
+/// code to be executed when they are invoked by clients. Use make_client() on
+/// the client to create a matching callable that can be used to invoke the
+/// verb on the server and wait for its result. Note that register_handler()
+/// also returns a client, that can be used to invoke the registered verb on
+/// another node (given that the other node has the same verb). This is useful
+/// for symmetric protocols, where two or more nodes all have servers as well as
+/// connect to the other nodes as clients.
+///
+/// Use protocol::server to listen for and accept incoming connections on the
+/// server and protocol::client to establish connections to the server.
+/// Note that registering the available verbs can be done before/after
+/// listening for connections, but best to ensure that by the time incoming
+/// requests are to be expected, all the verbs are set-up.
+///
+/// ## Configuration
+///
+/// TODO
+///
+/// ## Isolation
+///
+/// RPC supports isolating verb handlers from each other. There are two ways to
+/// achieve this: per-handler isolation (the old way) and per-connection
+/// isolation (the new way). If no isolation is configured, all handlers will be
+/// executed in the context of the scheduling_group in which the
+/// protocol::server was created.
+///
+/// Per-handler isolation (the old way) can be configured by using the
+/// register_handler() overload which takes a scheduling_group. When invoked,
+/// the body of the handler will be executed from the context of the configured
+/// scheduling_group.
+///
+/// Per-connection isolation (the new way) is a more flexible mechanism that
+/// requires user application provided logic to determine how connections are
+/// isolated. This mechanism has two parts, the server and the client part.
+/// The client configures isolation by setting client_options::isolation_cookie.
+/// This cookie is an opaque (to the RPC layer) string that is to be interpreted
+/// on the server using user application provided logic. The application
+/// provides this logic to the server by setting
+/// resource_limits::isolate_connection to an appropriate handler function, that
+/// interprets the opaque cookie and resolves it to an isolation_config. The
+/// scheduling_group in the former will be used not just to execute all verb
+/// handlers, but also the connection loop itself, hence providing better
+/// isolation.
+///
+/// There a few gotchas related to mixing the two isolation mechanisms. This can
+/// happen when the application is updated and one of the client/server is
+/// still using the old/new mechanism. In general per-connection isolation
+/// overrides the per-handler one. If both are set up, the former will determine
+/// the scheduling_group context for the handlers. If the client is not
+/// configured to send an isolation cookie, the server's
+/// resource_limits::isolate_connection will not be invoked and the server will
+/// fall back to per-handler isolation if configured. If the client is
+/// configured to send an isolation cookie but the server doesn't have a
+/// resource_limits::isolate_connection configured, it will use
+/// default_isolate_connection() to interpret the cookie. Note that this still
+/// overrides the per-handler isolation if any is configured. If the server is
+/// so old that it doesn't have the per-connection isolation feature at all, it
+/// will of course just use the per-handler one, if configured.
+///
+/// ## Compatibility
+///
+/// TODO
+///
+/// \tparam Serializer the serializer for the protocol.
+/// \tparam MsgType the type to be used as the message id or verb id.
 template<typename Serializer, typename MsgType = uint32_t>
-class protocol : public protocol_base {
+class protocol final : public protocol_base {
 public:
+    /// Represents the listening port and all accepted connections.
     class server : public rpc::server {
     public:
-        server(protocol& proto, ipv4_addr addr, resource_limits memory_limit = resource_limits()) :
+        server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
             rpc::server(&proto, addr, memory_limit) {}
-        server(protocol& proto, server_options opts, ipv4_addr addr, resource_limits memory_limit = resource_limits()) :
+        server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
             rpc::server(&proto, opts, addr, memory_limit) {}
         server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{}) :
             rpc::server(&proto, std::move(socket), memory_limit) {}
         server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) :
             rpc::server(&proto, opts, std::move(socket), memory_limit) {}
     };
+    /// Represents a client side connection.
     class client : public rpc::client {
     public:
         /*
@@ -583,9 +747,9 @@ public:
          * @param addr the remote address identifying this client
          * @param local the local address of this client
          */
-        client(protocol& p, ipv4_addr addr, ipv4_addr local = ipv4_addr()) :
+        client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
             rpc::client(p.get_logger(), &p._serializer, addr, local) {}
-        client(protocol& p, client_options options, ipv4_addr addr, ipv4_addr local = ipv4_addr()) :
+        client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
             rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
 
         /**
@@ -596,9 +760,9 @@ public:
          * @param local the local address of this client
          * @param socket the socket object use to connect to the remote address
          */
-        client(protocol& p, socket socket, ipv4_addr addr, ipv4_addr local = ipv4_addr()) :
+        client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
             rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
-        client(protocol& p, client_options options, socket socket, ipv4_addr addr, ipv4_addr local = ipv4_addr()) :
+        client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
             rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
     };
 
@@ -610,29 +774,80 @@ private:
 
 public:
     protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
+
+    /// Creates a callable that can be used to invoke the verb on the remote.
+    ///
+    /// \tparam Func The signature of the verb. Has to be either the same or
+    ///     compatible with the one passed to register_handler on the server.
+    /// \param t the verb to invoke on the remote.
+    ///
+    /// \returns a callable whose signature is derived from Func as follows:
+    ///     given `Func == Ret(Args...)` the returned callable has the following
+    ///     signature: `future<Ret>(protocol::client&, Args...)`.
     template<typename Func>
     auto make_client(MsgType t);
 
-    // returns a function which type depends on Func
-    // if Func == Ret(Args...) then return function is
-    // future<Ret>(protocol::client&, Args...)
+    /// Register a handler to be called when this verb is invoked.
+    ///
+    /// \tparam Func the type of the handler for the verb. This determines the
+    ///     signature of the verb.
+    /// \param t the verb to register the handler for.
+    /// \param func the callable to be called when the verb is invoked by the
+    ///     remote.
+    ///
+    /// \returns a client, a callable that can be used to invoke the verb. See
+    ///     make_client(). The client can be discarded, in fact this is what
+    ///     most callers will do as real clients will live on a remote node, not
+    ///     on the one where handlers are registered.
     template<typename Func>
     auto register_handler(MsgType t, Func&& func);
 
-    // returns a function which type depends on Func
-    // if Func == Ret(Args...) then return function is
-    // future<Ret>(protocol::client&, Args...)
+    /// Register a handler to be called when this verb is invoked.
+    ///
+    /// \tparam Func the type of the handler for the verb. This determines the
+    ///     signature of the verb.
+    /// \param t the verb to register the handler for.
+    /// \param sg the scheduling group that will be used to invoke the handler
+    ///     in. This can be used to execute different verbs in different
+    ///     scheduling groups. Note that there is a newer mechanism to determine
+    ///     the scheduling groups a handler will run it per invocation, see
+    ///     isolation_config.
+    /// \param func the callable to be called when the verb is invoked by the
+    ///     remote.
+    ///
+    /// \returns a client, a callable that can be used to invoke the verb. See
+    ///     make_client(). The client can be discarded, in fact this is what
+    ///     most callers will do as real clients will live on a remote node, not
+    ///     on the one where handlers are registered.
     template <typename Func>
     auto register_handler(MsgType t, scheduling_group sg, Func&& func);
 
-    void unregister_handler(MsgType t) {
-        _handlers.erase(t);
-    }
+    /// Unregister the handler for the verb.
+    ///
+    /// Waits for all currently running handlers, then unregisters the handler.
+    /// Future attempts to invoke the verb will fail. This becomes effective
+    /// immediately after calling this function.
+    ///
+    /// \param t the verb to unregister the handler for.
+    ///
+    /// \returns a future that becomes available once all currently running
+    ///     handlers finished.
+    future<> unregister_handler(MsgType t);
 
+    /// Set a logger function to be used to log messages.
+    ///
+    /// \deprecated use the logger overload set_logger(::seastar::logger*)
+    /// instead.
+    [[deprecated("Use set_logger(::seastar::logger*) instead")]]
     void set_logger(std::function<void(const sstring&)> logger) {
         _logger.set(std::move(logger));
     }
 
+    /// Set a logger to be used to log messages.
+    void set_logger(::seastar::logger* logger) {
+        _logger.set(logger);
+    }
+
     const logger& get_logger() const {
         return _logger;
     }
@@ -641,23 +856,33 @@ public:
         return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
     }
 
-    rpc_handler* get_handler(uint64_t msg_id) override {
-        auto it = _handlers.find(MsgType(msg_id));
-        if (it != _handlers.end()) {
-            return &it->second;
-        } else {
-            return nullptr;
-        }
+    bool has_handler(MsgType msg_id);
+
+    /// Checks if any there are handlers registered.
+    /// Debugging helper, should only be used for debugging and not relied on.
+    ///
+    /// \returns true if there are, false if there are no registered handlers.
+    bool has_handlers() const noexcept {
+        return !_handlers.empty();
     }
 
 private:
+    rpc_handler* get_handler(uint64_t msg_id) override;
+    void put_handler(rpc_handler*) override;
+
     template<typename Ret, typename... In>
     auto make_client(signature<Ret(In...)> sig, MsgType t);
 
     void register_receiver(MsgType t, rpc_handler&& handler) {
-        _handlers.emplace(t, std::move(handler));
+        auto r = _handlers.emplace(t, std::move(handler));
+        if (!r.second) {
+            throw_with_backtrace<std::runtime_error>("registered handler already exists");
+        }
     }
 };
+
+/// @}
+
 }
 
 }