]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/include/seastar/rpc/rpc.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc.hh
index c9d20e3cf27d01e9680f5f7bbca85758cc0dd62d..c92cddec43d1f83e628602eb4e9b475fc81c9ad9 100644 (file)
@@ -25,8 +25,8 @@
 #include <unordered_set>
 #include <list>
 #include <seastar/core/future.hh>
+#include <seastar/core/seastar.hh>
 #include <seastar/net/api.hh>
-#include <seastar/core/reactor.hh>
 #include <seastar/core/iostream.hh>
 #include <seastar/core/shared_ptr.hh>
 #include <seastar/core/condition-variable.hh>
 #include <seastar/core/weak_ptr.hh>
 #include <seastar/core/scheduling.hh>
 #include <seastar/util/backtrace.hh>
+#include <seastar/util/log.hh>
 
 namespace seastar {
 
 namespace rpc {
 
+/// \defgroup rpc rpc - remote procedure call framework
+///
+/// \brief
+/// rpc is a framework that can be used to define client-server communication
+/// protocols.
+/// For a high-level description of the RPC features see
+/// [doc/rpc.md](./md_rpc.html),
+/// [doc/rpc-streaming.md](./md_rpc-streaming.html) and
+/// [doc/rpc-compression.md](./md_rpc-compression.html)
+///
+/// The entry point for setting up an rpc protocol is
+/// seastar::rpc::protocol.
+
 using id_type = int64_t;
 
 using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
 using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
 
-struct SerializerConcept {
-    // For each serializable type T, implement
-    class T;
-    template <typename Output>
-    friend void write(const SerializerConcept&, Output& output, const T& data);
-    template <typename Input>
-    friend T read(const SerializerConcept&, Input& input, type<T> type_tag);  // type_tag used to disambiguate
-    // Input and Output expose void read(char*, size_t) and write(const char*, size_t).
-};
-
 static constexpr char rpc_magic[] = "SSTARRPC";
 
+/// \addtogroup rpc
+/// @{
+
 /// Specifies resource isolation for a connection.
 struct isolation_config {
     /// Specifies a scheduling group under which the connection (and all its
@@ -68,6 +75,8 @@ struct isolation_config {
 };
 
 /// Default isolation configuration - run everything in the default scheduling group.
+///
+/// In the scheduling_group that the protocol::server was created in.
 isolation_config default_isolate_connection(sstring isolation_cookie);
 
 /// \brief Resource limits for an RPC server
@@ -91,7 +100,7 @@ struct resource_limits {
 };
 
 struct client_options {
-    compat::optional<net::tcp_keepalive_params> keepalive;
+    std::optional<net::tcp_keepalive_params> keepalive;
     bool tcp_nodelay = true;
     bool reuseaddr = false;
     compressor::factory* compressor_factory = nullptr;
@@ -103,6 +112,8 @@ struct client_options {
     sstring isolation_cookie;
 };
 
+/// @}
+
 // RPC call that passes stream connection id as a parameter
 // may arrive to a different shard from where the stream connection
 // was opened, so the connection id is not known to a server that handles
@@ -124,13 +135,25 @@ public:
     friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
 };
 
+/// \addtogroup rpc
+/// @{
+
+class server;
+
 struct server_options {
     compressor::factory* compressor_factory = nullptr;
     bool tcp_nodelay = true;
-    compat::optional<streaming_domain_type> streaming_domain;
+    std::optional<streaming_domain_type> streaming_domain;
     server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
+    // optional filter function. If set, will be called with remote 
+    // (connecting) address.    
+    // Returning false will refuse the incoming connection. 
+    // Returning true will allow the mechanism to proceed.
+    std::function<bool(const socket_address&)> filter_connection = {};
 };
 
+/// @}
+
 inline
 size_t
 estimate_request_size(const resource_limits& lim, size_t serialized_size) {
@@ -159,23 +182,48 @@ struct signature;
 
 class logger {
     std::function<void(const sstring&)> _logger;
+    ::seastar::logger* _seastar_logger = nullptr;
 
+    // _seastar_logger will always be used first if it's available
     void log(const sstring& str) const {
-        if (_logger) {
+        if (_seastar_logger) {
+            // default level for log messages is `info`
+            _seastar_logger->info("{}", str);
+        } else if (_logger) {
             _logger(str);
         }
     }
 
+    // _seastar_logger will always be used first if it's available
+    template <typename... Args>
+    void log(log_level level, const char* fmt, Args&&... args) const {
+        if (_seastar_logger) {
+            _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
+        // If the log level is at least `info`, fall back to legacy logging without explicit level.
+        // Ignore less severe levels in order not to spam user's log with messages during transition,
+        // i.e. when the user still only defines a level-less logger.
+        } else if (_logger && level <= log_level::info) {
+            _logger(format(fmt, std::forward<Args>(args)...));
+        }
+    }
+
 public:
     void set(std::function<void(const sstring&)> l) {
         _logger = std::move(l);
     }
 
+    void set(::seastar::logger* logger) {
+        _seastar_logger = logger;
+    }
+
     void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
+    void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const;
 
     void operator()(const client_info& info, const sstring& str) const;
+    void operator()(const client_info& info, log_level level, std::string_view str) const;
 
     void operator()(const socket_address& addr, const sstring& str) const;
+    void operator()(const socket_address& addr, log_level level, std::string_view str) const;
 };
 
 class connection {
@@ -194,11 +242,11 @@ protected:
     struct outgoing_entry {
         timer<rpc_clock_type> t;
         snd_buf buf;
-        compat::optional<promise<>> p = promise<>();
+        std::optional<promise<>> p = promise<>();
         cancellable* pcancel = nullptr;
         outgoing_entry(snd_buf b) : buf(std::move(b)) {}
-        outgoing_entry(outgoing_entry&& o) : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
-            o.p = compat::nullopt;
+        outgoing_entry(outgoing_entry&& o) noexcept : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
+            o.p = std::nullopt;
         }
         ~outgoing_entry() {
             if (p) {
@@ -244,7 +292,7 @@ protected:
 
     template<outgoing_queue_type QueueType> void send_loop();
     future<> stop_send_loop();
-    future<compat::optional<rcv_buf>>  read_stream_frame_compressed(input_stream<char>& in);
+    future<std::optional<rcv_buf>>  read_stream_frame_compressed(input_stream<char>& in);
     bool stream_check_twoway_closed() {
         return _sink_closed && _source_closed;
     }
@@ -262,10 +310,10 @@ public:
     future<> send_negotiation_frame(feature_map features);
     // functions below are public because they are used by external heavily templated functions
     // and I am not smart enough to know how to define them as friends
-    future<> send(snd_buf buf, compat::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
+    future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
     bool error() { return _error; }
     void abort();
-    future<> stop();
+    future<> stop() noexcept;
     future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
     future<> close_sink() {
         _sink_closed = true;
@@ -312,14 +360,28 @@ public:
     friend class source_impl;
 };
 
+struct deferred_snd_buf {
+    promise<> pr;
+    snd_buf data;
+};
+
 // send data Out...
 template<typename Serializer, typename... Out>
 class sink_impl : public sink<Out...>::impl {
+    // Used on the shard *this lives on.
+    alignas (cache_line_size) uint64_t _next_seq_num = 1;
+
+    // Used on the shard the _conn lives on.
+    struct alignas (cache_line_size) {
+        uint64_t last_seq_num = 0;
+        std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
+    } _remote_state;
 public:
     sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
     future<> operator()(const Out&... args) override;
     future<> close() override;
     future<> flush() override;
+    ~sink_impl() override;
 };
 
 // receive data In...
@@ -327,7 +389,7 @@ template<typename Serializer, typename... In>
 class source_impl : public source<In...>::impl {
 public:
     source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
-    future<compat::optional<std::tuple<In...>>> operator()() override;
+    future<std::optional<std::tuple<In...>>> operator()() override;
 };
 
 class client : public rpc::connection, public weakly_referencable<client> {
@@ -367,17 +429,17 @@ public:
     };
 private:
     std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
-    socket_address _server_addr;
+    socket_address _server_addr, _local_addr;
     client_options _options;
-    compat::optional<shared_promise<>> _client_negotiated = shared_promise<>();
+    std::optional<shared_promise<>> _client_negotiated = shared_promise<>();
     weak_ptr<client> _parent; // for stream clients
 
 private:
     future<> negotiate_protocol(input_stream<char>& in);
     void negotiate(feature_map server_features);
-    future<std::tuple<int64_t, compat::optional<rcv_buf>>>
+    future<std::tuple<int64_t, std::optional<rcv_buf>>>
     read_response_frame(input_stream<char>& in);
-    future<std::tuple<int64_t, compat::optional<rcv_buf>>>
+    future<std::tuple<int64_t, std::optional<rcv_buf>>>
     read_response_frame_compressed(input_stream<char>& in);
     void send_loop() {
         if (is_stream()) {
@@ -390,6 +452,8 @@ public:
     /**
      * Create client object which will attempt to connect to the remote address.
      *
+     * @param l \ref seastar::logger to use for logging error messages
+     * @param s an optional connection serializer
      * @param addr the remote address identifying this client
      * @param local the local address of this client
      */
@@ -400,6 +464,8 @@ public:
      * Create client object which will attempt to connect to the remote address using the
      * specified seastar::socket.
      *
+     * @param l \ref seastar::logger to use for logging error messages
+     * @param s an optional connection serializer
      * @param addr the remote address identifying this client
      * @param local the local address of this client
      * @param socket the socket object use to connect to the remote address
@@ -412,9 +478,9 @@ public:
         return _stats;
     }
     auto next_message_id() { return _message_id++; }
-    void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
+    void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
     void wait_timed_out(id_type id);
-    future<> stop();
+    future<> stop() noexcept;
     void abort_all_streams();
     void deregister_this_stream();
     socket_address peer_address() const override {
@@ -436,7 +502,7 @@ public:
             client_options o = _options;
             o.stream_parent = this->get_connection_id();
             o.send_timeout_data = false;
-            auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr);
+            auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
             c->_parent = this->weak_from_this();
             c->_is_stream = true;
             return c->await_connection().then([c, this] {
@@ -448,7 +514,7 @@ public:
     }
     template<typename Serializer, typename... Out>
     future<sink<Out...>> make_stream_sink() {
-        return make_stream_sink<Serializer, Out...>(engine().net().socket());
+        return make_stream_sink<Serializer, Out...>(make_socket());
     }
 };
 
@@ -463,10 +529,10 @@ public:
         server& _server;
         client_info _info;
         connection_id _parent_id = invalid_connection_id;
-        compat::optional<isolation_config> _isolation_config;
+        std::optional<isolation_config> _isolation_config;
     private:
         future<> negotiate_protocol(input_stream<char>& in);
-        future<std::tuple<compat::optional<uint64_t>, uint64_t, int64_t, compat::optional<rcv_buf>>>
+        future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
         read_request_frame_compressed(input_stream<char>& in);
         future<feature_map> negotiate(feature_map requested);
         void send_loop() {
@@ -476,11 +542,11 @@ public:
                 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>();
             }
         }
-        future<> send_unknown_verb_reply(compat::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
+        future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
     public:
         connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
         future<> process();
-        future<> respond(int64_t msg_id, snd_buf&& data, compat::optional<rpc_clock_type::time_point> timeout);
+        future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
         client_info& info() { return _info; }
         const client_info& info() const { return _info; }
         stats get_stats() const {
@@ -496,7 +562,7 @@ public:
             return _info.addr;
         }
         // Resources will be released when this goes out of scope
-        future<resource_permit> wait_for_resources(size_t memory_consumed,  compat::optional<rpc_clock_type::time_point> timeout) {
+        future<resource_permit> wait_for_resources(size_t memory_consumed,  std::optional<rpc_clock_type::time_point> timeout) {
             if (timeout) {
                 return get_units(_server._resources_available, memory_consumed, *timeout);
             } else {
@@ -516,7 +582,7 @@ public:
     };
 private:
     protocol_base* _proto;
-    api_v2::server_socket _ss;
+    server_socket _ss;
     resource_limits _limits;
     rpc_semaphore _resources_available;
     std::unordered_map<connection_id, shared_ptr<connection>> _conns;
@@ -545,7 +611,7 @@ public:
     friend client;
 };
 
-using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, compat::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
+using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
                                                  rcv_buf data)>;
 
 struct rpc_handler {
@@ -565,12 +631,102 @@ protected:
     virtual void put_handler(rpc_handler*) = 0;
 };
 
-// MsgType is a type that holds type of a message. The type should be hashable
-// and serializable. It is preferable to use enum for message types, but
-// do not forget to provide hash function for it
+/// \addtogroup rpc
+/// @{
+
+/// Defines a protocol for communication between a server and a client.
+///
+/// A protocol is defined by a `Serializer` and a `MsgType`. The `Serializer` is
+/// responsible for serializing and unserializing all types used as arguments and
+/// return types used in the protocol. The `Serializer` is expected to define a
+/// `read()` and `write()` method for each such type `T` as follows:
+///
+///     template <typename Output>
+///     void write(const serializer&, Output& output, const T& data);
+///
+///     template <typename Input>
+///     T read(const serializer&, Input& input, type<T> type_tag);  // type_tag used to disambiguate
+///
+/// Where `Input` and `Output` have a `void read(char*, size_t)` and
+/// `write(const char*, size_t)` respectively.
+/// `MsgType` defines the type to be used as the message id, the id which is
+/// used to identify different messages used in the protocol. These are also
+/// often referred to as "verbs". The client will use the message id, to
+/// specify the remote method (verb) to invoke on the server. The server uses
+/// the message id to dispatch the incoming call to the right handler.
+/// `MsgType` should be hashable and serializable. It is preferable to use enum
+/// for message types, but do not forget to provide hash function for it.
+///
+/// Use register_handler() on the server to define the available verbs and the
+/// code to be executed when they are invoked by clients. Use make_client() on
+/// the client to create a matching callable that can be used to invoke the
+/// verb on the server and wait for its result. Note that register_handler()
+/// also returns a client, that can be used to invoke the registered verb on
+/// another node (given that the other node has the same verb). This is useful
+/// for symmetric protocols, where two or more nodes all have servers as well as
+/// connect to the other nodes as clients.
+///
+/// Use protocol::server to listen for and accept incoming connections on the
+/// server and protocol::client to establish connections to the server.
+/// Note that registering the available verbs can be done before/after
+/// listening for connections, but best to ensure that by the time incoming
+/// requests are to be expected, all the verbs are set-up.
+///
+/// ## Configuration
+///
+/// TODO
+///
+/// ## Isolation
+///
+/// RPC supports isolating verb handlers from each other. There are two ways to
+/// achieve this: per-handler isolation (the old way) and per-connection
+/// isolation (the new way). If no isolation is configured, all handlers will be
+/// executed in the context of the scheduling_group in which the
+/// protocol::server was created.
+///
+/// Per-handler isolation (the old way) can be configured by using the
+/// register_handler() overload which takes a scheduling_group. When invoked,
+/// the body of the handler will be executed from the context of the configured
+/// scheduling_group.
+///
+/// Per-connection isolation (the new way) is a more flexible mechanism that
+/// requires user application provided logic to determine how connections are
+/// isolated. This mechanism has two parts, the server and the client part.
+/// The client configures isolation by setting client_options::isolation_cookie.
+/// This cookie is an opaque (to the RPC layer) string that is to be interpreted
+/// on the server using user application provided logic. The application
+/// provides this logic to the server by setting
+/// resource_limits::isolate_connection to an appropriate handler function, that
+/// interprets the opaque cookie and resolves it to an isolation_config. The
+/// scheduling_group in the former will be used not just to execute all verb
+/// handlers, but also the connection loop itself, hence providing better
+/// isolation.
+///
+/// There a few gotchas related to mixing the two isolation mechanisms. This can
+/// happen when the application is updated and one of the client/server is
+/// still using the old/new mechanism. In general per-connection isolation
+/// overrides the per-handler one. If both are set up, the former will determine
+/// the scheduling_group context for the handlers. If the client is not
+/// configured to send an isolation cookie, the server's
+/// resource_limits::isolate_connection will not be invoked and the server will
+/// fall back to per-handler isolation if configured. If the client is
+/// configured to send an isolation cookie but the server doesn't have a
+/// resource_limits::isolate_connection configured, it will use
+/// default_isolate_connection() to interpret the cookie. Note that this still
+/// overrides the per-handler isolation if any is configured. If the server is
+/// so old that it doesn't have the per-connection isolation feature at all, it
+/// will of course just use the per-handler one, if configured.
+///
+/// ## Compatibility
+///
+/// TODO
+///
+/// \tparam Serializer the serializer for the protocol.
+/// \tparam MsgType the type to be used as the message id or verb id.
 template<typename Serializer, typename MsgType = uint32_t>
-class protocol : public protocol_base {
+class protocol final : public protocol_base {
 public:
+    /// Represents the listening port and all accepted connections.
     class server : public rpc::server {
     public:
         server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
@@ -582,6 +738,7 @@ public:
         server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) :
             rpc::server(&proto, opts, std::move(socket), memory_limit) {}
     };
+    /// Represents a client side connection.
     class client : public rpc::client {
     public:
         /*
@@ -617,27 +774,80 @@ private:
 
 public:
     protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
+
+    /// Creates a callable that can be used to invoke the verb on the remote.
+    ///
+    /// \tparam Func The signature of the verb. Has to be either the same or
+    ///     compatible with the one passed to register_handler on the server.
+    /// \param t the verb to invoke on the remote.
+    ///
+    /// \returns a callable whose signature is derived from Func as follows:
+    ///     given `Func == Ret(Args...)` the returned callable has the following
+    ///     signature: `future<Ret>(protocol::client&, Args...)`.
     template<typename Func>
     auto make_client(MsgType t);
 
-    // returns a function which type depends on Func
-    // if Func == Ret(Args...) then return function is
-    // future<Ret>(protocol::client&, Args...)
+    /// Register a handler to be called when this verb is invoked.
+    ///
+    /// \tparam Func the type of the handler for the verb. This determines the
+    ///     signature of the verb.
+    /// \param t the verb to register the handler for.
+    /// \param func the callable to be called when the verb is invoked by the
+    ///     remote.
+    ///
+    /// \returns a client, a callable that can be used to invoke the verb. See
+    ///     make_client(). The client can be discarded, in fact this is what
+    ///     most callers will do as real clients will live on a remote node, not
+    ///     on the one where handlers are registered.
     template<typename Func>
     auto register_handler(MsgType t, Func&& func);
 
-    // returns a function which type depends on Func
-    // if Func == Ret(Args...) then return function is
-    // future<Ret>(protocol::client&, Args...)
+    /// Register a handler to be called when this verb is invoked.
+    ///
+    /// \tparam Func the type of the handler for the verb. This determines the
+    ///     signature of the verb.
+    /// \param t the verb to register the handler for.
+    /// \param sg the scheduling group that will be used to invoke the handler
+    ///     in. This can be used to execute different verbs in different
+    ///     scheduling groups. Note that there is a newer mechanism to determine
+    ///     the scheduling groups a handler will run it per invocation, see
+    ///     isolation_config.
+    /// \param func the callable to be called when the verb is invoked by the
+    ///     remote.
+    ///
+    /// \returns a client, a callable that can be used to invoke the verb. See
+    ///     make_client(). The client can be discarded, in fact this is what
+    ///     most callers will do as real clients will live on a remote node, not
+    ///     on the one where handlers are registered.
     template <typename Func>
     auto register_handler(MsgType t, scheduling_group sg, Func&& func);
 
+    /// Unregister the handler for the verb.
+    ///
+    /// Waits for all currently running handlers, then unregisters the handler.
+    /// Future attempts to invoke the verb will fail. This becomes effective
+    /// immediately after calling this function.
+    ///
+    /// \param t the verb to unregister the handler for.
+    ///
+    /// \returns a future that becomes available once all currently running
+    ///     handlers finished.
     future<> unregister_handler(MsgType t);
 
+    /// Set a logger function to be used to log messages.
+    ///
+    /// \deprecated use the logger overload set_logger(::seastar::logger*)
+    /// instead.
+    [[deprecated("Use set_logger(::seastar::logger*) instead")]]
     void set_logger(std::function<void(const sstring&)> logger) {
         _logger.set(std::move(logger));
     }
 
+    /// Set a logger to be used to log messages.
+    void set_logger(::seastar::logger* logger) {
+        _logger.set(logger);
+    }
+
     const logger& get_logger() const {
         return _logger;
     }
@@ -646,7 +856,15 @@ public:
         return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
     }
 
-    bool has_handler(uint64_t msg_id);
+    bool has_handler(MsgType msg_id);
+
+    /// Checks if any there are handlers registered.
+    /// Debugging helper, should only be used for debugging and not relied on.
+    ///
+    /// \returns true if there are, false if there are no registered handlers.
+    bool has_handlers() const noexcept {
+        return !_handlers.empty();
+    }
 
 private:
     rpc_handler* get_handler(uint64_t msg_id) override;
@@ -662,6 +880,9 @@ private:
         }
     }
 };
+
+/// @}
+
 }
 
 }