]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/rpc/rpc_impl.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc_impl.hh
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21#pragma once
22
23#include <seastar/core/function_traits.hh>
11fdf7f2
TL
24#include <seastar/core/shared_ptr.hh>
25#include <seastar/core/sstring.hh>
f67539c2 26#include <seastar/core/when_all.hh>
11fdf7f2
TL
27#include <seastar/util/is_smart_ptr.hh>
28#include <seastar/core/simple-stream.hh>
29#include <boost/range/numeric.hpp>
30#include <boost/range/adaptor/transformed.hpp>
31#include <seastar/net/packet-data-source.hh>
9f95a23c 32#include <seastar/core/print.hh>
11fdf7f2
TL
33
34namespace seastar {
35
36namespace rpc {
37
38enum class exception_type : uint32_t {
39 USER = 0,
40 UNKNOWN_VERB = 1,
41};
42
43template<typename T>
44struct remove_optional {
45 using type = T;
46};
47
48template<typename T>
49struct remove_optional<optional<T>> {
50 using type = T;
51};
52
53struct wait_type {}; // opposite of no_wait_type
54
55// tags to tell whether we want a const client_info& parameter
56struct do_want_client_info {};
57struct dont_want_client_info {};
58
59// tags to tell whether we want a opt_time_point parameter
60struct do_want_time_point {};
61struct dont_want_time_point {};
62
63// General case
64template <typename Ret, typename... In>
65struct signature<Ret (In...)> {
66 using ret_type = Ret;
67 using arg_types = std::tuple<In...>;
68 using clean = signature;
69 using want_client_info = dont_want_client_info;
70 using want_time_point = dont_want_time_point;
71};
72
73// Specialize 'clean' for handlers that receive client_info
74template <typename Ret, typename... In>
75struct signature<Ret (const client_info&, In...)> {
76 using ret_type = Ret;
77 using arg_types = std::tuple<In...>;
78 using clean = signature<Ret (In...)>;
79 using want_client_info = do_want_client_info;
80 using want_time_point = dont_want_time_point;
81};
82
83template <typename Ret, typename... In>
84struct signature<Ret (client_info&, In...)> {
85 using ret_type = Ret;
86 using arg_types = std::tuple<In...>;
87 using clean = signature<Ret (In...)>;
88 using want_client_info = do_want_client_info;
89 using want_time_point = dont_want_time_point;
90};
91
92// Specialize 'clean' for handlers that receive client_info and opt_time_point
93template <typename Ret, typename... In>
94struct signature<Ret (const client_info&, opt_time_point, In...)> {
95 using ret_type = Ret;
96 using arg_types = std::tuple<In...>;
97 using clean = signature<Ret (In...)>;
98 using want_client_info = do_want_client_info;
99 using want_time_point = do_want_time_point;
100};
101
102template <typename Ret, typename... In>
103struct signature<Ret (client_info&, opt_time_point, In...)> {
104 using ret_type = Ret;
105 using arg_types = std::tuple<In...>;
106 using clean = signature<Ret (In...)>;
107 using want_client_info = do_want_client_info;
108 using want_time_point = do_want_time_point;
109};
110
111// Specialize 'clean' for handlers that receive opt_time_point
112template <typename Ret, typename... In>
113struct signature<Ret (opt_time_point, In...)> {
114 using ret_type = Ret;
115 using arg_types = std::tuple<In...>;
116 using clean = signature<Ret (In...)>;
117 using want_client_info = dont_want_client_info;
118 using want_time_point = do_want_time_point;
119};
120
121template <typename T>
122struct wait_signature {
123 using type = wait_type;
124 using cleaned_type = T;
125};
126
127template <typename... T>
128struct wait_signature<future<T...>> {
129 using type = wait_type;
130 using cleaned_type = future<T...>;
131};
132
133template <>
134struct wait_signature<no_wait_type> {
135 using type = no_wait_type;
136 using cleaned_type = void;
137};
138
139template <>
140struct wait_signature<future<no_wait_type>> {
141 using type = no_wait_type;
142 using cleaned_type = future<>;
143};
144
145template <typename T>
146using wait_signature_t = typename wait_signature<T>::type;
147
148template <typename... In>
149inline
150std::tuple<In...>
151maybe_add_client_info(dont_want_client_info, client_info& ci, std::tuple<In...>&& args) {
152 return std::move(args);
153}
154
155template <typename... In>
156inline
157std::tuple<std::reference_wrapper<client_info>, In...>
158maybe_add_client_info(do_want_client_info, client_info& ci, std::tuple<In...>&& args) {
159 return std::tuple_cat(std::make_tuple(std::ref(ci)), std::move(args));
160}
161
162template <typename... In>
163inline
164std::tuple<In...>
165maybe_add_time_point(dont_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
166 return std::move(args);
167}
168
169template <typename... In>
170inline
171std::tuple<opt_time_point, In...>
172maybe_add_time_point(do_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
173 return std::tuple_cat(std::make_tuple(otp), std::move(args));
174}
175
176inline sstring serialize_connection_id(const connection_id& id) {
f67539c2
TL
177 sstring p = uninitialized_string(sizeof(id));
178 auto c = p.data();
11fdf7f2
TL
179 write_le(c, id.id);
180 return p;
181}
182
183inline connection_id deserialize_connection_id(const sstring& s) {
184 connection_id id;
185 auto p = s.c_str();
186 id.id = read_le<decltype(id.id)>(p);
187 return id;
188}
189
190template <bool IsSmartPtr>
191struct serialize_helper;
192
193template <>
194struct serialize_helper<false> {
195 template <typename Serializer, typename Output, typename T>
196 static inline void serialize(Serializer& serializer, Output& out, const T& t) {
197 return write(serializer, out, t);
198 }
199};
200
201template <>
202struct serialize_helper<true> {
203 template <typename Serializer, typename Output, typename T>
204 static inline void serialize(Serializer& serializer, Output& out, const T& t) {
205 return write(serializer, out, *t);
206 }
207};
208
9f95a23c
TL
209template <typename Serializer, typename Output, typename... T>
210inline void do_marshall(Serializer& serializer, Output& out, const T&... args);
211
11fdf7f2
TL
212template <typename Serializer, typename Output>
213struct marshall_one {
214 template <typename T> struct helper {
215 static void doit(Serializer& serializer, Output& out, const T& arg) {
216 using serialize_helper_type = serialize_helper<is_smart_ptr<typename std::remove_reference<T>::type>::value>;
217 serialize_helper_type::serialize(serializer, out, arg);
218 }
219 };
220 template<typename T> struct helper<std::reference_wrapper<const T>> {
221 static void doit(Serializer& serializer, Output& out, const std::reference_wrapper<const T>& arg) {
222 helper<T>::doit(serializer, out, arg.get());
223 }
224 };
225 static void put_connection_id(const connection_id& cid, Output& out) {
226 sstring id = serialize_connection_id(cid);
227 out.write(id.c_str(), id.size());
228 }
229 template <typename... T> struct helper<sink<T...>> {
230 static void doit(Serializer& serializer, Output& out, const sink<T...>& arg) {
231 put_connection_id(arg.get_id(), out);
232 }
233 };
234 template <typename... T> struct helper<source<T...>> {
235 static void doit(Serializer& serializer, Output& out, const source<T...>& arg) {
236 put_connection_id(arg.get_id(), out);
237 }
238 };
9f95a23c
TL
239 template <typename... T> struct helper<tuple<T...>> {
240 static void doit(Serializer& serializer, Output& out, const tuple<T...>& arg) {
241 auto do_do_marshall = [&serializer, &out] (const auto&... args) {
242 do_marshall(serializer, out, args...);
243 };
f67539c2 244 std::apply(do_do_marshall, arg);
9f95a23c
TL
245 }
246 };
11fdf7f2
TL
247};
248
249template <typename Serializer, typename Output, typename... T>
250inline void do_marshall(Serializer& serializer, Output& out, const T&... args) {
251 // C++ guarantees that brace-initialization expressions are evaluted in order
252 (void)std::initializer_list<int>{(marshall_one<Serializer, Output>::template helper<T>::doit(serializer, out, args), 1)...};
253}
254
255static inline memory_output_stream<snd_buf::iterator> make_serializer_stream(snd_buf& output) {
f67539c2 256 auto* b = std::get_if<temporary_buffer<char>>(&output.bufs);
11fdf7f2
TL
257 if (b) {
258 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::simple(b->get_write(), b->size()));
259 } else {
f67539c2 260 auto& ar = std::get<std::vector<temporary_buffer<char>>>(output.bufs);
11fdf7f2
TL
261 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::fragmented(ar.begin(), output.size));
262 }
263}
264
265template <typename Serializer, typename... T>
266inline snd_buf marshall(Serializer& serializer, size_t head_space, const T&... args) {
267 measuring_output_stream measure;
268 do_marshall(serializer, measure, args...);
269 snd_buf ret(measure.size() + head_space);
270 auto out = make_serializer_stream(ret);
271 out.skip(head_space);
272 do_marshall(serializer, out, args...);
273 return ret;
274}
275
276template <typename Serializer, typename Input>
277inline std::tuple<> do_unmarshall(connection& c, Input& in) {
278 return std::make_tuple();
279}
280
281template<typename Serializer, typename Input>
282struct unmarshal_one {
283 template<typename T> struct helper {
284 static T doit(connection& c, Input& in) {
285 return read(c.serializer<Serializer>(), in, type<T>());
286 }
287 };
288 template<typename T> struct helper<optional<T>> {
289 static optional<T> doit(connection& c, Input& in) {
290 if (in.size()) {
291 return optional<T>(read(c.serializer<Serializer>(), in, type<typename remove_optional<T>::type>()));
292 } else {
293 return optional<T>();
294 }
295 }
296 };
297 template<typename T> struct helper<std::reference_wrapper<const T>> {
298 static T doit(connection& c, Input& in) {
299 return helper<T>::doit(c, in);
300 }
301 };
302 static connection_id get_connection_id(Input& in) {
f67539c2
TL
303 sstring id = uninitialized_string(sizeof(connection_id));
304 in.read(id.data(), sizeof(connection_id));
11fdf7f2
TL
305 return deserialize_connection_id(id);
306 }
307 template<typename... T> struct helper<sink<T...>> {
308 static sink<T...> doit(connection& c, Input& in) {
309 return sink<T...>(make_shared<sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
310 }
311 };
312 template<typename... T> struct helper<source<T...>> {
313 static source<T...> doit(connection& c, Input& in) {
314 return source<T...>(make_shared<source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
315 }
316 };
9f95a23c
TL
317 template <typename... T> struct helper<tuple<T...>> {
318 static tuple<T...> doit(connection& c, Input& in) {
319 return do_unmarshall<Serializer, Input, T...>(c, in);
320 }
321 };
11fdf7f2
TL
322};
323
324template <typename Serializer, typename Input, typename T0, typename... Trest>
325inline std::tuple<T0, Trest...> do_unmarshall(connection& c, Input& in) {
326 // FIXME: something less recursive
327 auto first = std::make_tuple(unmarshal_one<Serializer, Input>::template helper<T0>::doit(c, in));
328 auto rest = do_unmarshall<Serializer, Input, Trest...>(c, in);
329 return std::tuple_cat(std::move(first), std::move(rest));
330}
331
332template <typename Serializer, typename... T>
333inline std::tuple<T...> unmarshall(connection& c, rcv_buf input) {
334 auto in = make_deserializer_stream(input);
335 return do_unmarshall<Serializer, decltype(in), T...>(c, in);
336}
337
338inline std::exception_ptr unmarshal_exception(rcv_buf& d) {
339 std::exception_ptr ex;
340 auto data = make_deserializer_stream(d);
341
342 uint32_t v32;
343 data.read(reinterpret_cast<char*>(&v32), 4);
344 exception_type ex_type = exception_type(le_to_cpu(v32));
345 data.read(reinterpret_cast<char*>(&v32), 4);
346 uint32_t ex_len = le_to_cpu(v32);
347
348 switch (ex_type) {
349 case exception_type::USER: {
350 std::string s(ex_len, '\0');
351 data.read(&*s.begin(), ex_len);
352 ex = std::make_exception_ptr(std::runtime_error(std::move(s)));
353 break;
354 }
355 case exception_type::UNKNOWN_VERB: {
356 uint64_t v64;
357 data.read(reinterpret_cast<char*>(&v64), 8);
358 ex = std::make_exception_ptr(unknown_verb_error(le_to_cpu(v64)));
359 break;
360 }
361 default:
362 ex = std::make_exception_ptr(unknown_exception_error());
363 break;
364 }
365 return ex;
366}
367
368template <typename Payload, typename... T>
369struct rcv_reply_base {
370 bool done = false;
371 promise<T...> p;
372 template<typename... V>
373 void set_value(V&&... v) {
374 done = true;
f67539c2 375 p.set_value(internal::untuple(std::forward<V>(v))...);
11fdf7f2
TL
376 }
377 ~rcv_reply_base() {
378 if (!done) {
379 p.set_exception(closed_error());
380 }
381 }
382};
383
384template<typename Serializer, typename T>
385struct rcv_reply : rcv_reply_base<T, T> {
386 inline void get_reply(rpc::client& dst, rcv_buf input) {
387 this->set_value(unmarshall<Serializer, T>(dst, std::move(input)));
388 }
389};
390
391template<typename Serializer, typename... T>
392struct rcv_reply<Serializer, future<T...>> : rcv_reply_base<std::tuple<T...>, T...> {
393 inline void get_reply(rpc::client& dst, rcv_buf input) {
394 this->set_value(unmarshall<Serializer, T...>(dst, std::move(input)));
395 }
396};
397
398template<typename Serializer>
399struct rcv_reply<Serializer, void> : rcv_reply_base<void, void> {
400 inline void get_reply(rpc::client& dst, rcv_buf input) {
401 this->set_value();
402 }
403};
404
405template<typename Serializer>
406struct rcv_reply<Serializer, future<>> : rcv_reply<Serializer, void> {};
407
408template <typename Serializer, typename Ret, typename... InArgs>
f67539c2 409inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, rpc::client& dst, id_type msg_id,
11fdf7f2
TL
410 signature<Ret (InArgs...)> sig) {
411 using reply_type = rcv_reply<Serializer, Ret>;
412 auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable {
413 if (msg_id >= 0) {
414 dst.get_stats_internal().replied++;
415 return r.get_reply(dst, std::move(data));
416 } else {
417 dst.get_stats_internal().exception_received++;
418 r.done = true;
419 r.p.set_exception(unmarshal_exception(data));
420 }
421 };
422 using handler_type = typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
423 auto r = std::make_unique<handler_type>(std::move(lambda));
424 auto fut = r->reply.p.get_future();
425 dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
426 return fut;
427}
428
429template<typename Serializer, typename... InArgs>
f67539c2 430inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id,
11fdf7f2
TL
431 signature<no_wait_type (InArgs...)> sig) { // no_wait overload
432 return make_ready_future<>();
433}
434
435template<typename Serializer, typename... InArgs>
f67539c2 436inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id,
11fdf7f2
TL
437 signature<future<no_wait_type> (InArgs...)> sig) { // future<no_wait> overload
438 return make_ready_future<>();
439}
440
9f95a23c
TL
441// Convert a relative timeout (a duration) to an absolute one (time_point).
442// Do the calculation safely so that a very large duration will be capped by
443// time_point::max, instead of wrapping around to ancient history.
444inline rpc_clock_type::time_point
445relative_timeout_to_absolute(rpc_clock_type::duration relative) {
446 rpc_clock_type::time_point now = rpc_clock_type::now();
447 return now + std::min(relative, rpc_clock_type::time_point::max() - now);
448}
449
11fdf7f2
TL
450// Returns lambda that can be used to send rpc messages.
451// The lambda gets client connection and rpc parameters as arguments, marshalls them sends
452// to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion
453// to a caller.
454template<typename Serializer, typename MsgType, typename Ret, typename... InArgs>
455auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
456 struct shelper {
457 MsgType t;
458 signature<Ret (InArgs...)> sig;
f67539c2 459 auto send(rpc::client& dst, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, const InArgs&... args) {
11fdf7f2
TL
460 if (dst.error()) {
461 using cleaned_ret_type = typename wait_signature<Ret>::cleaned_type;
462 return futurize<cleaned_ret_type>::make_exception_future(closed_error());
463 }
464
465 // send message
466 auto msg_id = dst.next_message_id();
467 snd_buf data = marshall(dst.template serializer<Serializer>(), 28, args...);
468 static_assert(snd_buf::chunk_size >= 28, "send buffer chunk size is too small");
469 auto p = data.front().get_write() + 8; // 8 extra bytes for expiration timer
470 write_le<uint64_t>(p, uint64_t(t));
471 write_le<int64_t>(p + 8, msg_id);
472 write_le<uint32_t>(p + 16, data.size - 28);
473
474 // prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent
475 using wait = wait_signature_t<Ret>;
476 return when_all(dst.send(std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, cancel, dst, msg_id, sig)).then([] (auto r) {
477 return std::move(std::get<1>(r)); // return future of wait_for_reply
478 });
479 }
480 auto operator()(rpc::client& dst, const InArgs&... args) {
481 return send(dst, {}, nullptr, args...);
482 }
483 auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout, const InArgs&... args) {
484 return send(dst, timeout, nullptr, args...);
485 }
486 auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, const InArgs&... args) {
9f95a23c 487 return send(dst, relative_timeout_to_absolute(timeout), nullptr, args...);
11fdf7f2
TL
488 }
489 auto operator()(rpc::client& dst, cancellable& cancel, const InArgs&... args) {
490 return send(dst, {}, &cancel, args...);
491 }
492
493 };
494 return shelper{xt, xsig};
495}
496
f67539c2
TL
497template<typename Serializer, typename SEASTAR_ELLIPSIS RetTypes>
498inline future<> reply(wait_type, future<RetTypes SEASTAR_ELLIPSIS>&& ret, int64_t msg_id, shared_ptr<server::connection> client,
499 std::optional<rpc_clock_type::time_point> timeout) {
11fdf7f2
TL
500 if (!client->error()) {
501 snd_buf data;
502 try {
f67539c2
TL
503#if SEASTAR_API_LEVEL < 6
504 if constexpr (sizeof...(RetTypes) == 0) {
505#else
506 if constexpr (std::is_void_v<RetTypes>) {
507#endif
508 ret.get();
509 data = std::invoke(marshall<Serializer>, std::ref(client->template serializer<Serializer>()), 12);
510 } else {
511 data = std::invoke(marshall<Serializer, const RetTypes& SEASTAR_ELLIPSIS>, std::ref(client->template serializer<Serializer>()), 12, std::move(ret.get0()));
512 }
11fdf7f2
TL
513 } catch (std::exception& ex) {
514 uint32_t len = std::strlen(ex.what());
515 data = snd_buf(20 + len);
516 auto os = make_serializer_stream(data);
517 os.skip(12);
518 uint32_t v32 = cpu_to_le(uint32_t(exception_type::USER));
519 os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
520 v32 = cpu_to_le(len);
521 os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
522 os.write(ex.what(), len);
523 msg_id = -msg_id;
524 }
525
526 return client->respond(msg_id, std::move(data), timeout);
527 } else {
528 ret.ignore_ready_future();
529 return make_ready_future<>();
530 }
531}
532
533// specialization for no_wait_type which does not send a reply
534template<typename Serializer>
f67539c2 535inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client, std::optional<rpc_clock_type::time_point> timeout) {
11fdf7f2
TL
536 try {
537 r.get();
538 } catch (std::exception& ex) {
539 client->get_logger()(client->info(), msgid, to_sstring("exception \"") + ex.what() + "\" in no_wait handler ignored");
540 }
541 return make_ready_future<>();
542}
543
544template<typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint, typename Func, typename ArgsTuple>
545inline futurize_t<Ret> apply(Func& func, client_info& info, opt_time_point time_point, WantClientInfo wci, WantTimePoint wtp, signature<Ret (InArgs...)> sig, ArgsTuple&& args) {
546 using futurator = futurize<Ret>;
547 try {
548 return futurator::apply(func, maybe_add_client_info(wci, info, maybe_add_time_point(wtp, time_point, std::forward<ArgsTuple>(args))));
549 } catch (std::runtime_error& ex) {
550 return futurator::make_exception_future(std::current_exception());
551 }
552}
553
554// lref_to_cref is a helper that encapsulates lvalue reference in std::ref() or does nothing otherwise
555template<typename T>
556auto lref_to_cref(T&& x) {
557 return std::move(x);
558}
559
560template<typename T>
561auto lref_to_cref(T& x) {
562 return std::ref(x);
563}
564
565// Creates lambda to handle RPC message on a server.
566// The lambda unmarshalls all parameters, calls a handler, marshall return values and sends them back to a client
567template <typename Serializer, typename Func, typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint>
568auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci, WantTimePoint wtp) {
569 using signature = decltype(sig);
570 using wait_style = wait_signature_t<Ret>;
571 return [func = lref_to_cref(std::forward<Func>(func))](shared_ptr<server::connection> client,
f67539c2 572 std::optional<rpc_clock_type::time_point> timeout,
11fdf7f2
TL
573 int64_t msg_id,
574 rcv_buf data) mutable {
575 auto memory_consumed = client->estimate_request_size(data.size);
576 if (memory_consumed > client->max_request_size()) {
577 auto err = format("request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size());
578 client->get_logger()(client->peer_address(), err);
9f95a23c 579 // FIXME: future is discarded
f67539c2
TL
580 (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
581 return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout).handle_exception([client, msg_id] (std::exception_ptr eptr) {
582 client->get_logger()(client->info(), msg_id, format("got exception while processing an oversized message: {}", eptr));
583 });
584 }).handle_exception_type([] (gate_closed_exception&) {/* ignore */});
11fdf7f2
TL
585 return make_ready_future();
586 }
587 // note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
588 auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func] (auto permit) mutable {
9f95a23c 589 // FIXME: future is discarded
f67539c2 590 (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
9f95a23c
TL
591 try {
592 auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
593 return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
594 return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
595 client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", eptr));
596 });
597 });
598 } catch (...) {
599 client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", std::current_exception()));
600 return make_ready_future();
601 }
f67539c2 602 }).handle_exception_type([] (gate_closed_exception&) {/* ignore */});
11fdf7f2
TL
603 });
604
605 if (timeout) {
606 f = f.handle_exception_type([] (semaphore_timed_out&) { /* ignore */ });
607 }
608
9f95a23c 609 return f;
11fdf7f2
TL
610 };
611}
612
613// helper to create copy constructible lambda from non copy constructible one. std::function<> works only with former kind.
614template<typename Func>
615auto make_copyable_function(Func&& func, std::enable_if_t<!std::is_copy_constructible<std::decay_t<Func>>::value, void*> = nullptr) {
616 auto p = make_lw_shared<typename std::decay_t<Func>>(std::forward<Func>(func));
617 return [p] (auto&&... args) { return (*p)( std::forward<decltype(args)>(args)... ); };
618}
619
620template<typename Func>
621auto make_copyable_function(Func&& func, std::enable_if_t<std::is_copy_constructible<std::decay_t<Func>>::value, void*> = nullptr) {
622 return std::forward<Func>(func);
623}
624
625// This class is used to calculate client side rpc function signature.
626// Return type is converted from a smart pointer to a type it points to.
627// rpc::optional are converted to non optional type.
628//
629// Examples:
630// std::unique_ptr<int>(int, rpc::optional<long>) -> int(int, long)
631// double(float) -> double(float)
632template<typename Ret, typename... In>
633class client_function_type {
634 template<typename T, bool IsSmartPtr>
635 struct drop_smart_ptr_impl;
636 template<typename T>
637 struct drop_smart_ptr_impl<T, true> {
638 using type = typename T::element_type;
639 };
640 template<typename T>
641 struct drop_smart_ptr_impl<T, false> {
642 using type = T;
643 };
644 template<typename T>
645 using drop_smart_ptr = drop_smart_ptr_impl<T, is_smart_ptr<T>::value>;
646
647 // if return type is smart ptr take a type it points to instead
648 using return_type = typename drop_smart_ptr<Ret>::type;
649public:
650 using type = return_type(typename remove_optional<In>::type...);
651};
652
653template<typename Serializer, typename MsgType>
654template<typename Ret, typename... In>
655auto protocol<Serializer, MsgType>::make_client(signature<Ret(In...)> clear_sig, MsgType t) {
656 using sig_type = signature<typename client_function_type<Ret, In...>::type>;
657 return send_helper<Serializer>(t, sig_type());
658}
659
660template<typename Serializer, typename MsgType>
661template<typename Func>
662auto protocol<Serializer, MsgType>::make_client(MsgType t) {
663 return make_client(typename signature<typename function_traits<Func>::signature>::clean(), t);
664}
665
666template<typename Serializer, typename MsgType>
667template<typename Func>
668auto protocol<Serializer, MsgType>::register_handler(MsgType t, scheduling_group sg, Func&& func) {
669 using sig_type = signature<typename function_traits<Func>::signature>;
670 using clean_sig_type = typename sig_type::clean;
671 using want_client_info = typename sig_type::want_client_info;
672 using want_time_point = typename sig_type::want_time_point;
673 auto recv = recv_helper<Serializer>(clean_sig_type(), std::forward<Func>(func),
674 want_client_info(), want_time_point());
675 register_receiver(t, rpc_handler{sg, make_copyable_function(std::move(recv))});
676 return make_client(clean_sig_type(), t);
677}
678
679template<typename Serializer, typename MsgType>
680template<typename Func>
681auto protocol<Serializer, MsgType>::register_handler(MsgType t, Func&& func) {
682 return register_handler(t, scheduling_group(), std::forward<Func>(func));
683}
684
9f95a23c
TL
685template<typename Serializer, typename MsgType>
686future<> protocol<Serializer, MsgType>::unregister_handler(MsgType t) {
687 auto it = _handlers.find(t);
688 if (it != _handlers.end()) {
689 return it->second.use_gate.close().finally([this, t] {
690 _handlers.erase(t);
691 });
692 }
693 return make_ready_future<>();
694}
695
696template<typename Serializer, typename MsgType>
f67539c2
TL
697bool protocol<Serializer, MsgType>::has_handler(MsgType msg_id) {
698 auto it = _handlers.find(msg_id);
9f95a23c
TL
699 if (it == _handlers.end()) {
700 return false;
701 }
702 return !it->second.use_gate.is_closed();
703}
704
705template<typename Serializer, typename MsgType>
706rpc_handler* protocol<Serializer, MsgType>::get_handler(uint64_t msg_id) {
707 rpc_handler* h = nullptr;
708 auto it = _handlers.find(MsgType(msg_id));
709 if (it != _handlers.end()) {
710 try {
711 it->second.use_gate.enter();
712 h = &it->second;
713 } catch (gate_closed_exception&) {
714 // unregistered, just ignore
715 }
716 }
717 return h;
718}
719
720template<typename Serializer, typename MsgType>
721void protocol<Serializer, MsgType>::put_handler(rpc_handler* h) {
722 h->use_gate.leave();
723}
724
11fdf7f2
TL
725template<typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org);
726
727template<typename Serializer, typename... Out>
728future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
729 // note that we use remote serializer pointer, so if serailizer needs a state
730 // it should have per-cpu one
731 snd_buf data = marshall(this->_con->get()->template serializer<Serializer>(), 4, args...);
732 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
733 auto p = data.front().get_write();
734 write_le<uint32_t>(p, data.size - 4);
735 // we do not want to dead lock on huge packets, so let them in
736 // but only one at a time
737 auto size = std::min(size_t(data.size), max_stream_buffers_memory);
20effc67
TL
738 const auto seq_num = _next_seq_num++;
739 return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique<snd_buf>(std::move(data))), seq_num] (semaphore_units<> su) mutable {
11fdf7f2
TL
740 if (this->_ex) {
741 return make_exception_future(this->_ex);
742 }
f67539c2
TL
743 // It is OK to discard this future. The user is required to
744 // wait for it when closing.
20effc67 745 (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data), seq_num] () mutable {
11fdf7f2
TL
746 connection* con = this->_con->get();
747 if (con->error()) {
748 return make_exception_future(closed_error());
749 }
750 if(con->sink_closed()) {
751 return make_exception_future(stream_closed());
752 }
20effc67
TL
753
754 auto& last_seq_num = _remote_state.last_seq_num;
755 auto& out_of_order_bufs = _remote_state.out_of_order_bufs;
756
757 auto local_data = make_shard_local_buffer_copy(std::move(data));
758 const auto seq_num_diff = seq_num - last_seq_num;
759 if (seq_num_diff > 1) {
760 auto [it, _] = out_of_order_bufs.emplace(seq_num, deferred_snd_buf{promise<>{}, std::move(local_data)});
761 return it->second.pr.get_future();
762 }
763
764 last_seq_num = seq_num;
765 auto ret_fut = con->send(std::move(local_data), {}, nullptr);
766 while (!out_of_order_bufs.empty() && out_of_order_bufs.begin()->first == (last_seq_num + 1)) {
767 auto it = out_of_order_bufs.begin();
768 last_seq_num = it->first;
769 auto fut = con->send(std::move(it->second.data), {}, nullptr);
770 fut.forward_to(std::move(it->second.pr));
771 out_of_order_bufs.erase(it);
772 }
773 return ret_fut;
11fdf7f2
TL
774 }).then_wrapped([su = std::move(su), this] (future<> f) {
775 if (f.failed() && !this->_ex) { // first error is the interesting one
776 this->_ex = f.get_exception();
777 } else {
778 f.ignore_ready_future();
779 }
780 });
781 return make_ready_future<>();
782 });
783}
784
9f95a23c
TL
785template<typename Serializer, typename... Out>
786future<> sink_impl<Serializer, Out...>::flush() {
787 // wait until everything is sent out before returning.
788 return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
789 if (this->_ex) {
790 return make_exception_future(this->_ex);
791 }
792 return make_ready_future();
793 });
794}
795
11fdf7f2
TL
796template<typename Serializer, typename... Out>
797future<> sink_impl<Serializer, Out...>::close() {
798 return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
799 return smp::submit_to(this->_con->get_owner_shard(), [this] {
800 connection* con = this->_con->get();
801 if (con->sink_closed()) { // double close, should not happen!
802 return make_exception_future(stream_closed());
803 }
804 future<> f = make_ready_future<>();
805 if (!con->error() && !this->_ex) {
806 snd_buf data = marshall(con->template serializer<Serializer>(), 4);
807 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
808 auto p = data.front().get_write();
809 write_le<uint32_t>(p, -1U); // max len fragment marks an end of a stream
810 f = con->send(std::move(data), {}, nullptr);
811 } else {
812 f = this->_ex ? make_exception_future(this->_ex) : make_exception_future(closed_error());
813 }
814 return f.finally([con] { return con->close_sink(); });
815 });
816 });
817}
818
f67539c2
TL
819template<typename Serializer, typename... Out>
820sink_impl<Serializer, Out...>::~sink_impl() {
821 // A failure to close might leave some continuations running after
822 // this is destroyed, leading to use-after-free bugs.
823 assert(this->_con->get()->sink_closed());
824}
825
11fdf7f2 826template<typename Serializer, typename... In>
f67539c2 827future<std::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operator()() {
11fdf7f2
TL
828 auto process_one_buffer = [this] {
829 foreign_ptr<std::unique_ptr<rcv_buf>> buf = std::move(this->_bufs.front());
830 this->_bufs.pop_front();
f67539c2
TL
831 return std::apply([] (In&&... args) {
832 auto ret = std::make_optional(std::make_tuple(std::move(args)...));
833 return make_ready_future<std::optional<std::tuple<In...>>>(std::move(ret));
11fdf7f2
TL
834 }, unmarshall<Serializer, In...>(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf))));
835 };
836
837 if (!this->_bufs.empty()) {
838 return process_one_buffer();
839 }
840
841 // refill buffers from remote cpu
842 return smp::submit_to(this->_con->get_owner_shard(), [this] () -> future<> {
843 connection* con = this->_con->get();
9f95a23c
TL
844 if (con->_source_closed) {
845 return make_exception_future<>(stream_closed());
846 }
11fdf7f2
TL
847 return con->stream_receive(this->_bufs).then_wrapped([this, con] (future<>&& f) {
848 if (f.failed()) {
849 return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){
850 f.ignore_ready_future();
851 return make_exception_future<>(ex);
852 });
853 }
854 if (this->_bufs.empty()) { // nothing to read -> eof
855 return con->close_source().then_wrapped([] (future<> f) {
856 f.ignore_ready_future();
857 return make_ready_future<>();
858 });
859 }
860 return make_ready_future<>();
861 });
862 }).then([this, process_one_buffer] () {
863 if (this->_bufs.empty()) {
f67539c2 864 return make_ready_future<std::optional<std::tuple<In...>>>(std::nullopt);
11fdf7f2
TL
865 } else {
866 return process_one_buffer();
867 }
868 });
869}
870
871template<typename... Out>
872connection_id sink<Out...>::get_id() const {
873 return _impl->_con->get()->get_connection_id();
874}
875
876template<typename... In>
877connection_id source<In...>::get_id() const {
878 return _impl->_con->get()->get_connection_id();
879}
880
881template<typename... In>
882template<typename Serializer, typename... Out>
883sink<Out...> source<In...>::make_sink() {
884 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(_impl->_con));
885}
886
887}
888
889}
890
891namespace std {
892template<>
893struct hash<seastar::rpc::streaming_domain_type> {
894 size_t operator()(const seastar::rpc::streaming_domain_type& domain) const {
895 size_t h = 0;
896 boost::hash_combine(h, std::hash<uint64_t>{}(domain._id));
897 return h;
898 }
899};
900}
901
902