]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/include/seastar/rpc/rpc.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc.hh
index c92cddec43d1f83e628602eb4e9b475fc81c9ad9..f3fff61daa1a58c5caddfee696dcda6bbffefd80 100644 (file)
@@ -24,6 +24,8 @@
 #include <unordered_map>
 #include <unordered_set>
 #include <list>
+#include <variant>
+#include <boost/intrusive/list.hpp>
 #include <seastar/core/future.hh>
 #include <seastar/core/seastar.hh>
 #include <seastar/net/api.hh>
@@ -40,6 +42,8 @@
 #include <seastar/util/backtrace.hh>
 #include <seastar/util/log.hh>
 
+namespace bi = boost::intrusive;
+
 namespace seastar {
 
 namespace rpc {
@@ -96,7 +100,10 @@ struct resource_limits {
     size_t max_memory = rpc_semaphore::max_counter(); ///< Maximum amount of memory that may be consumed by all requests
     /// Configures isolation for a connection based on its isolation cookie. May throw,
     /// in which case the connection will be terminated.
-    std::function<isolation_config (sstring isolation_cookie)> isolate_connection = default_isolate_connection;
+    using syncronous_isolation_function = std::function<isolation_config (sstring isolation_cookie)>;
+    using asyncronous_isolation_function = std::function<future<isolation_config> (sstring isolation_cookie)>;
+    using isolation_function_alternatives = std::variant<syncronous_isolation_function, asyncronous_isolation_function>;
+    isolation_function_alternatives isolate_connection = default_isolate_connection;
 };
 
 struct client_options {
@@ -233,36 +240,45 @@ protected:
     output_stream<char> _write_buf;
     bool _error = false;
     bool _connected = false;
+    std::optional<shared_promise<>> _negotiated = shared_promise<>();
     promise<> _stopped;
     stats _stats;
     const logger& _logger;
     // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
     // The type of the pointer is erased here, but the original type is Serializer
     void* _serializer;
-    struct outgoing_entry {
+    struct outgoing_entry : public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
         timer<rpc_clock_type> t;
         snd_buf buf;
-        std::optional<promise<>> p = promise<>();
+        promise<> done;
         cancellable* pcancel = nullptr;
         outgoing_entry(snd_buf b) : buf(std::move(b)) {}
-        outgoing_entry(outgoing_entry&& o) noexcept : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
-            o.p = std::nullopt;
+
+        outgoing_entry(outgoing_entry&&) = delete;
+        outgoing_entry(const outgoing_entry&) = delete;
+
+        void uncancellable() {
+            t.cancel();
+            if (pcancel) {
+                pcancel->cancel_send = std::function<void()>();
+            }
         }
+
         ~outgoing_entry() {
-            if (p) {
-                if (pcancel) {
-                    pcancel->cancel_send = std::function<void()>();
-                    pcancel->send_back_pointer = nullptr;
-                }
-                p->set_value();
+            if (pcancel) {
+                pcancel->cancel_send = std::function<void()>();
+                pcancel->send_back_pointer = nullptr;
             }
         }
+
+        using container_t = bi::list<outgoing_entry, bi::constant_time_size<false>>;
     };
-    friend outgoing_entry;
-    std::list<outgoing_entry> _outgoing_queue;
-    condition_variable _outgoing_queue_cond;
-    future<> _send_loop_stopped = make_ready_future<>();
+    void withdraw(outgoing_entry::container_t::iterator it, std::exception_ptr ex = nullptr);
+    future<> _outgoing_queue_ready = _negotiated->get_shared_future();
+    outgoing_entry::container_t _outgoing_queue;
+    size_t _outgoing_queue_size = 0;
     std::unique_ptr<compressor> _compressor;
+    bool _propagate_timeout = false;
     bool _timeout_negotiated = false;
     // stream related fields
     bool _is_stream = false;
@@ -277,23 +293,23 @@ protected:
     // if it is not ready it means the sink is been closed
     future<bool> _sink_closed_future = make_ready_future<bool>(false);
 
-    bool is_stream() {
+    size_t outgoing_queue_length() const noexcept {
+        return _outgoing_queue_size;
+    }
+
+    void set_negotiated() noexcept;
+
+    bool is_stream() const noexcept {
         return _is_stream;
     }
 
     snd_buf compress(snd_buf buf);
     future<> send_buffer(snd_buf buf);
 
-    enum class outgoing_queue_type {
-        request,
-        response,
-        stream = response
-    };
-
-    template<outgoing_queue_type QueueType> void send_loop();
-    future<> stop_send_loop();
+    future<> send_entry(outgoing_entry& d);
+    future<> stop_send_loop(std::exception_ptr ex);
     future<std::optional<rcv_buf>>  read_stream_frame_compressed(input_stream<char>& in);
-    bool stream_check_twoway_closed() {
+    bool stream_check_twoway_closed() const noexcept {
         return _sink_closed && _source_closed;
     }
     future<> stream_close();
@@ -311,7 +327,7 @@ public:
     // functions below are public because they are used by external heavily templated functions
     // and I am not smart enough to know how to define them as friends
     future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
-    bool error() { return _error; }
+    bool error() const noexcept { return _error; }
     void abort();
     future<> stop() noexcept;
     future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
@@ -322,7 +338,7 @@ public:
         }
         return make_ready_future();
     }
-    bool sink_closed() {
+    bool sink_closed() const noexcept {
         return _sink_closed;
     }
     future<> close_source() {
@@ -332,14 +348,14 @@ public:
         }
         return make_ready_future();
     }
-    connection_id get_connection_id() const {
+    connection_id get_connection_id() const noexcept {
         return _id;
     }
     xshard_connection_ptr get_stream(connection_id id) const;
     void register_stream(connection_id id, xshard_connection_ptr c);
     virtual socket_address peer_address() const = 0;
 
-    const logger& get_logger() const {
+    const logger& get_logger() const noexcept {
         return _logger;
     }
 
@@ -358,6 +374,14 @@ public:
     friend class sink_impl;
     template<typename Serializer, typename... In>
     friend class source_impl;
+
+    void suspend_for_testing(promise<>& p) {
+        _outgoing_queue_ready.get();
+        auto dummy = std::make_unique<outgoing_entry>(snd_buf());
+        _outgoing_queue.push_back(*dummy);
+        _outgoing_queue_ready = dummy->done.get_future();
+        (void)p.get_future().then([dummy = std::move(dummy)] { dummy->done.set_value(); });
+    }
 };
 
 struct deferred_snd_buf {
@@ -431,7 +455,6 @@ private:
     std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
     socket_address _server_addr, _local_addr;
     client_options _options;
-    std::optional<shared_promise<>> _client_negotiated = shared_promise<>();
     weak_ptr<client> _parent; // for stream clients
 
 private:
@@ -441,13 +464,6 @@ private:
     read_response_frame(input_stream<char>& in);
     future<std::tuple<int64_t, std::optional<rcv_buf>>>
     read_response_frame_compressed(input_stream<char>& in);
-    void send_loop() {
-        if (is_stream()) {
-            rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
-        } else {
-            rpc::connection::send_loop<rpc::connection::outgoing_queue_type::request>();
-        }
-    }
 public:
     /**
      * Create client object which will attempt to connect to the remote address.
@@ -487,10 +503,10 @@ public:
         return _server_addr;
     }
     future<> await_connection() {
-        if (!_client_negotiated) {
+        if (!_negotiated) {
             return make_ready_future<>();
         } else {
-            return _client_negotiated->get_shared_future();
+            return _negotiated->get_shared_future();
         }
     }
     template<typename Serializer, typename... Out>
@@ -509,6 +525,12 @@ public:
                 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
                 this->register_stream(c->get_connection_id(), s);
                 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
+            }).handle_exception([c] (std::exception_ptr eptr) {
+                // If await_connection fails we need to stop the client
+                // before destroying it.
+                return c->stop().then([eptr, c] {
+                    return make_exception_future<sink<Out...>>(eptr);
+                });
             });
         });
     }
@@ -535,13 +557,6 @@ public:
         future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
         read_request_frame_compressed(input_stream<char>& in);
         future<feature_map> negotiate(feature_map requested);
-        void send_loop() {
-            if (is_stream()) {
-                rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
-            } else {
-                rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>();
-            }
-        }
         future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
     public:
         connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
@@ -551,7 +566,7 @@ public:
         const client_info& info() const { return _info; }
         stats get_stats() const {
             stats res = _stats;
-            res.pending = _outgoing_queue.size();
+            res.pending = outgoing_queue_length();
             return res;
         }
 
@@ -733,7 +748,7 @@ public:
             rpc::server(&proto, addr, memory_limit) {}
         server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
             rpc::server(&proto, opts, addr, memory_limit) {}
-        server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{}) :
+        server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options = server_options{}) :
             rpc::server(&proto, std::move(socket), memory_limit) {}
         server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) :
             rpc::server(&proto, opts, std::move(socket), memory_limit) {}