]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/include/seastar/rpc/rpc_impl.hh
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc_impl.hh
index 5bb17b517b2392b55c6ee08c1d8c1d4c4e66fe1a..a494f95d46d6820b5e3d28a0f095e04a21621e72 100644 (file)
@@ -30,6 +30,7 @@
 #include <boost/range/numeric.hpp>
 #include <boost/range/adaptor/transformed.hpp>
 #include <seastar/net/packet-data-source.hh>
+#include <seastar/core/print.hh>
 
 namespace seastar {
 
@@ -206,6 +207,9 @@ struct serialize_helper<true> {
     }
 };
 
+template <typename Serializer, typename Output, typename... T>
+inline void do_marshall(Serializer& serializer, Output& out, const T&... args);
+
 template <typename Serializer, typename Output>
 struct marshall_one {
     template <typename T> struct helper {
@@ -233,6 +237,14 @@ struct marshall_one {
             put_connection_id(arg.get_id(), out);
         }
     };
+    template <typename... T> struct helper<tuple<T...>> {
+        static void doit(Serializer& serializer, Output& out, const tuple<T...>& arg) {
+            auto do_do_marshall = [&serializer, &out] (const auto&... args) {
+                do_marshall(serializer, out, args...);
+            };
+            apply(do_do_marshall, arg);
+        }
+    };
 };
 
 template <typename Serializer, typename Output, typename... T>
@@ -303,6 +315,11 @@ struct unmarshal_one {
             return source<T...>(make_shared<source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
         }
     };
+    template <typename... T> struct helper<tuple<T...>> {
+        static tuple<T...> doit(connection& c, Input& in) {
+            return do_unmarshall<Serializer, Input, T...>(c, in);
+        }
+    };
 };
 
 template <typename Serializer, typename Input, typename T0, typename... Trest>
@@ -422,6 +439,15 @@ inline auto wait_for_reply(no_wait_type, compat::optional<rpc_clock_type::time_p
     return make_ready_future<>();
 }
 
+// Convert a relative timeout (a duration) to an absolute one (time_point).
+// Do the calculation safely so that a very large duration will be capped by
+// time_point::max, instead of wrapping around to ancient history.
+inline rpc_clock_type::time_point
+relative_timeout_to_absolute(rpc_clock_type::duration relative) {
+    rpc_clock_type::time_point now = rpc_clock_type::now();
+    return now + std::min(relative, rpc_clock_type::time_point::max() - now);
+}
+
 // Returns lambda that can be used to send rpc messages.
 // The lambda gets client connection and rpc parameters as arguments, marshalls them sends
 // to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion
@@ -459,7 +485,7 @@ auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
             return send(dst, timeout, nullptr, args...);
         }
         auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, const InArgs&... args) {
-            return send(dst, rpc_clock_type::now() + timeout, nullptr, args...);
+            return send(dst, relative_timeout_to_absolute(timeout), nullptr, args...);
         }
         auto operator()(rpc::client& dst, cancellable& cancel, const InArgs&... args) {
             return send(dst, {}, &cancel, args...);
@@ -543,7 +569,8 @@ auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci
         if (memory_consumed > client->max_request_size()) {
             auto err = format("request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size());
             client->get_logger()(client->peer_address(), err);
-            with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
+            // FIXME: future is discarded
+            (void)with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
                 return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout);
             });
             return make_ready_future();
@@ -551,11 +578,19 @@ auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci
         // note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
         auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func] (auto permit) mutable {
             try {
-                with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
-                    auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
-                    return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
-                        return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).then([permit = std::move(permit)] {});
-                    });
+                // FIXME: future is discarded
+                (void)with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
+                    try {
+                        auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
+                        return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
+                            return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
+                                client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", eptr));
+                            });
+                        });
+                    } catch (...) {
+                        client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", std::current_exception()));
+                        return make_ready_future();
+                    }
                 });
             } catch (gate_closed_exception&) {/* ignore */ }
         });
@@ -564,7 +599,7 @@ auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci
             f = f.handle_exception_type([] (semaphore_timed_out&) { /* ignore */ });
         }
 
-        return std::move(f);
+        return f;
     };
 }
 
@@ -640,6 +675,46 @@ auto protocol<Serializer, MsgType>::register_handler(MsgType t, Func&& func) {
     return register_handler(t, scheduling_group(), std::forward<Func>(func));
 }
 
+template<typename Serializer, typename MsgType>
+future<> protocol<Serializer, MsgType>::unregister_handler(MsgType t) {
+    auto it = _handlers.find(t);
+    if (it != _handlers.end()) {
+        return it->second.use_gate.close().finally([this, t] {
+            _handlers.erase(t);
+        });
+    }
+    return make_ready_future<>();
+}
+
+template<typename Serializer, typename MsgType>
+bool protocol<Serializer, MsgType>::has_handler(uint64_t msg_id) {
+    auto it = _handlers.find(MsgType(msg_id));
+    if (it == _handlers.end()) {
+        return false;
+    }
+    return !it->second.use_gate.is_closed();
+}
+
+template<typename Serializer, typename MsgType>
+rpc_handler* protocol<Serializer, MsgType>::get_handler(uint64_t msg_id) {
+    rpc_handler* h = nullptr;
+    auto it = _handlers.find(MsgType(msg_id));
+    if (it != _handlers.end()) {
+        try {
+            it->second.use_gate.enter();
+            h = &it->second;
+        } catch (gate_closed_exception&) {
+            // unregistered, just ignore
+        }
+    }
+    return h;
+}
+
+template<typename Serializer, typename MsgType>
+void protocol<Serializer, MsgType>::put_handler(rpc_handler* h) {
+    h->use_gate.leave();
+}
+
 template<typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org);
 
 template<typename Serializer, typename... Out>
@@ -657,7 +732,8 @@ future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
         if (this->_ex) {
             return make_exception_future(this->_ex);
         }
-        smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data)] () mutable {
+        // FIXME: future is discarded
+        (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data)] () mutable {
             connection* con = this->_con->get();
             if (con->error()) {
                 return make_exception_future(closed_error());
@@ -677,6 +753,17 @@ future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
     });
 }
 
+template<typename Serializer, typename... Out>
+future<> sink_impl<Serializer, Out...>::flush() {
+    // wait until everything is sent out before returning.
+    return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
+        if (this->_ex) {
+            return make_exception_future(this->_ex);
+        }
+        return make_ready_future();
+    });
+}
+
 template<typename Serializer, typename... Out>
 future<> sink_impl<Serializer, Out...>::close() {
     return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
@@ -718,6 +805,9 @@ future<compat::optional<std::tuple<In...>>> source_impl<Serializer, In...>::oper
     // refill buffers from remote cpu
     return smp::submit_to(this->_con->get_owner_shard(), [this] () -> future<> {
         connection* con = this->_con->get();
+        if (con->_source_closed) {
+            return make_exception_future<>(stream_closed());
+        }
         return con->stream_receive(this->_bufs).then_wrapped([this, con] (future<>&& f) {
             if (f.failed()) {
                 return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){