]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/rpc/rpc_impl.hh
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc_impl.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21 #pragma once
22
23 #include <seastar/core/function_traits.hh>
24 #include <seastar/core/apply.hh>
25 #include <seastar/core/shared_ptr.hh>
26 #include <seastar/core/sstring.hh>
27 #include <seastar/core/future-util.hh>
28 #include <seastar/util/is_smart_ptr.hh>
29 #include <seastar/core/simple-stream.hh>
30 #include <boost/range/numeric.hpp>
31 #include <boost/range/adaptor/transformed.hpp>
32 #include <seastar/net/packet-data-source.hh>
33 #include <seastar/core/print.hh>
34
35 namespace seastar {
36
37 namespace rpc {
38
39 enum class exception_type : uint32_t {
40 USER = 0,
41 UNKNOWN_VERB = 1,
42 };
43
44 template<typename T>
45 struct remove_optional {
46 using type = T;
47 };
48
49 template<typename T>
50 struct remove_optional<optional<T>> {
51 using type = T;
52 };
53
54 struct wait_type {}; // opposite of no_wait_type
55
56 // tags to tell whether we want a const client_info& parameter
57 struct do_want_client_info {};
58 struct dont_want_client_info {};
59
60 // tags to tell whether we want a opt_time_point parameter
61 struct do_want_time_point {};
62 struct dont_want_time_point {};
63
64 // General case
65 template <typename Ret, typename... In>
66 struct signature<Ret (In...)> {
67 using ret_type = Ret;
68 using arg_types = std::tuple<In...>;
69 using clean = signature;
70 using want_client_info = dont_want_client_info;
71 using want_time_point = dont_want_time_point;
72 };
73
74 // Specialize 'clean' for handlers that receive client_info
75 template <typename Ret, typename... In>
76 struct signature<Ret (const client_info&, In...)> {
77 using ret_type = Ret;
78 using arg_types = std::tuple<In...>;
79 using clean = signature<Ret (In...)>;
80 using want_client_info = do_want_client_info;
81 using want_time_point = dont_want_time_point;
82 };
83
84 template <typename Ret, typename... In>
85 struct signature<Ret (client_info&, In...)> {
86 using ret_type = Ret;
87 using arg_types = std::tuple<In...>;
88 using clean = signature<Ret (In...)>;
89 using want_client_info = do_want_client_info;
90 using want_time_point = dont_want_time_point;
91 };
92
93 // Specialize 'clean' for handlers that receive client_info and opt_time_point
94 template <typename Ret, typename... In>
95 struct signature<Ret (const client_info&, opt_time_point, In...)> {
96 using ret_type = Ret;
97 using arg_types = std::tuple<In...>;
98 using clean = signature<Ret (In...)>;
99 using want_client_info = do_want_client_info;
100 using want_time_point = do_want_time_point;
101 };
102
103 template <typename Ret, typename... In>
104 struct signature<Ret (client_info&, opt_time_point, In...)> {
105 using ret_type = Ret;
106 using arg_types = std::tuple<In...>;
107 using clean = signature<Ret (In...)>;
108 using want_client_info = do_want_client_info;
109 using want_time_point = do_want_time_point;
110 };
111
112 // Specialize 'clean' for handlers that receive opt_time_point
113 template <typename Ret, typename... In>
114 struct signature<Ret (opt_time_point, In...)> {
115 using ret_type = Ret;
116 using arg_types = std::tuple<In...>;
117 using clean = signature<Ret (In...)>;
118 using want_client_info = dont_want_client_info;
119 using want_time_point = do_want_time_point;
120 };
121
122 template <typename T>
123 struct wait_signature {
124 using type = wait_type;
125 using cleaned_type = T;
126 };
127
128 template <typename... T>
129 struct wait_signature<future<T...>> {
130 using type = wait_type;
131 using cleaned_type = future<T...>;
132 };
133
134 template <>
135 struct wait_signature<no_wait_type> {
136 using type = no_wait_type;
137 using cleaned_type = void;
138 };
139
140 template <>
141 struct wait_signature<future<no_wait_type>> {
142 using type = no_wait_type;
143 using cleaned_type = future<>;
144 };
145
146 template <typename T>
147 using wait_signature_t = typename wait_signature<T>::type;
148
149 template <typename... In>
150 inline
151 std::tuple<In...>
152 maybe_add_client_info(dont_want_client_info, client_info& ci, std::tuple<In...>&& args) {
153 return std::move(args);
154 }
155
156 template <typename... In>
157 inline
158 std::tuple<std::reference_wrapper<client_info>, In...>
159 maybe_add_client_info(do_want_client_info, client_info& ci, std::tuple<In...>&& args) {
160 return std::tuple_cat(std::make_tuple(std::ref(ci)), std::move(args));
161 }
162
163 template <typename... In>
164 inline
165 std::tuple<In...>
166 maybe_add_time_point(dont_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
167 return std::move(args);
168 }
169
170 template <typename... In>
171 inline
172 std::tuple<opt_time_point, In...>
173 maybe_add_time_point(do_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
174 return std::tuple_cat(std::make_tuple(otp), std::move(args));
175 }
176
177 inline sstring serialize_connection_id(const connection_id& id) {
178 sstring p(sstring::initialized_later(), sizeof(id));
179 auto c = p.begin();
180 write_le(c, id.id);
181 return p;
182 }
183
184 inline connection_id deserialize_connection_id(const sstring& s) {
185 connection_id id;
186 auto p = s.c_str();
187 id.id = read_le<decltype(id.id)>(p);
188 return id;
189 }
190
191 template <bool IsSmartPtr>
192 struct serialize_helper;
193
194 template <>
195 struct serialize_helper<false> {
196 template <typename Serializer, typename Output, typename T>
197 static inline void serialize(Serializer& serializer, Output& out, const T& t) {
198 return write(serializer, out, t);
199 }
200 };
201
202 template <>
203 struct serialize_helper<true> {
204 template <typename Serializer, typename Output, typename T>
205 static inline void serialize(Serializer& serializer, Output& out, const T& t) {
206 return write(serializer, out, *t);
207 }
208 };
209
210 template <typename Serializer, typename Output, typename... T>
211 inline void do_marshall(Serializer& serializer, Output& out, const T&... args);
212
213 template <typename Serializer, typename Output>
214 struct marshall_one {
215 template <typename T> struct helper {
216 static void doit(Serializer& serializer, Output& out, const T& arg) {
217 using serialize_helper_type = serialize_helper<is_smart_ptr<typename std::remove_reference<T>::type>::value>;
218 serialize_helper_type::serialize(serializer, out, arg);
219 }
220 };
221 template<typename T> struct helper<std::reference_wrapper<const T>> {
222 static void doit(Serializer& serializer, Output& out, const std::reference_wrapper<const T>& arg) {
223 helper<T>::doit(serializer, out, arg.get());
224 }
225 };
226 static void put_connection_id(const connection_id& cid, Output& out) {
227 sstring id = serialize_connection_id(cid);
228 out.write(id.c_str(), id.size());
229 }
230 template <typename... T> struct helper<sink<T...>> {
231 static void doit(Serializer& serializer, Output& out, const sink<T...>& arg) {
232 put_connection_id(arg.get_id(), out);
233 }
234 };
235 template <typename... T> struct helper<source<T...>> {
236 static void doit(Serializer& serializer, Output& out, const source<T...>& arg) {
237 put_connection_id(arg.get_id(), out);
238 }
239 };
240 template <typename... T> struct helper<tuple<T...>> {
241 static void doit(Serializer& serializer, Output& out, const tuple<T...>& arg) {
242 auto do_do_marshall = [&serializer, &out] (const auto&... args) {
243 do_marshall(serializer, out, args...);
244 };
245 apply(do_do_marshall, arg);
246 }
247 };
248 };
249
250 template <typename Serializer, typename Output, typename... T>
251 inline void do_marshall(Serializer& serializer, Output& out, const T&... args) {
252 // C++ guarantees that brace-initialization expressions are evaluted in order
253 (void)std::initializer_list<int>{(marshall_one<Serializer, Output>::template helper<T>::doit(serializer, out, args), 1)...};
254 }
255
256 static inline memory_output_stream<snd_buf::iterator> make_serializer_stream(snd_buf& output) {
257 auto* b = compat::get_if<temporary_buffer<char>>(&output.bufs);
258 if (b) {
259 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::simple(b->get_write(), b->size()));
260 } else {
261 auto& ar = compat::get<std::vector<temporary_buffer<char>>>(output.bufs);
262 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::fragmented(ar.begin(), output.size));
263 }
264 }
265
266 template <typename Serializer, typename... T>
267 inline snd_buf marshall(Serializer& serializer, size_t head_space, const T&... args) {
268 measuring_output_stream measure;
269 do_marshall(serializer, measure, args...);
270 snd_buf ret(measure.size() + head_space);
271 auto out = make_serializer_stream(ret);
272 out.skip(head_space);
273 do_marshall(serializer, out, args...);
274 return ret;
275 }
276
277 template <typename Serializer, typename Input>
278 inline std::tuple<> do_unmarshall(connection& c, Input& in) {
279 return std::make_tuple();
280 }
281
282 template<typename Serializer, typename Input>
283 struct unmarshal_one {
284 template<typename T> struct helper {
285 static T doit(connection& c, Input& in) {
286 return read(c.serializer<Serializer>(), in, type<T>());
287 }
288 };
289 template<typename T> struct helper<optional<T>> {
290 static optional<T> doit(connection& c, Input& in) {
291 if (in.size()) {
292 return optional<T>(read(c.serializer<Serializer>(), in, type<typename remove_optional<T>::type>()));
293 } else {
294 return optional<T>();
295 }
296 }
297 };
298 template<typename T> struct helper<std::reference_wrapper<const T>> {
299 static T doit(connection& c, Input& in) {
300 return helper<T>::doit(c, in);
301 }
302 };
303 static connection_id get_connection_id(Input& in) {
304 sstring id(sstring::initialized_later(), sizeof(connection_id));
305 in.read(id.begin(), sizeof(connection_id));
306 return deserialize_connection_id(id);
307 }
308 template<typename... T> struct helper<sink<T...>> {
309 static sink<T...> doit(connection& c, Input& in) {
310 return sink<T...>(make_shared<sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
311 }
312 };
313 template<typename... T> struct helper<source<T...>> {
314 static source<T...> doit(connection& c, Input& in) {
315 return source<T...>(make_shared<source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
316 }
317 };
318 template <typename... T> struct helper<tuple<T...>> {
319 static tuple<T...> doit(connection& c, Input& in) {
320 return do_unmarshall<Serializer, Input, T...>(c, in);
321 }
322 };
323 };
324
325 template <typename Serializer, typename Input, typename T0, typename... Trest>
326 inline std::tuple<T0, Trest...> do_unmarshall(connection& c, Input& in) {
327 // FIXME: something less recursive
328 auto first = std::make_tuple(unmarshal_one<Serializer, Input>::template helper<T0>::doit(c, in));
329 auto rest = do_unmarshall<Serializer, Input, Trest...>(c, in);
330 return std::tuple_cat(std::move(first), std::move(rest));
331 }
332
333 template <typename Serializer, typename... T>
334 inline std::tuple<T...> unmarshall(connection& c, rcv_buf input) {
335 auto in = make_deserializer_stream(input);
336 return do_unmarshall<Serializer, decltype(in), T...>(c, in);
337 }
338
339 inline std::exception_ptr unmarshal_exception(rcv_buf& d) {
340 std::exception_ptr ex;
341 auto data = make_deserializer_stream(d);
342
343 uint32_t v32;
344 data.read(reinterpret_cast<char*>(&v32), 4);
345 exception_type ex_type = exception_type(le_to_cpu(v32));
346 data.read(reinterpret_cast<char*>(&v32), 4);
347 uint32_t ex_len = le_to_cpu(v32);
348
349 switch (ex_type) {
350 case exception_type::USER: {
351 std::string s(ex_len, '\0');
352 data.read(&*s.begin(), ex_len);
353 ex = std::make_exception_ptr(std::runtime_error(std::move(s)));
354 break;
355 }
356 case exception_type::UNKNOWN_VERB: {
357 uint64_t v64;
358 data.read(reinterpret_cast<char*>(&v64), 8);
359 ex = std::make_exception_ptr(unknown_verb_error(le_to_cpu(v64)));
360 break;
361 }
362 default:
363 ex = std::make_exception_ptr(unknown_exception_error());
364 break;
365 }
366 return ex;
367 }
368
369 template <typename Payload, typename... T>
370 struct rcv_reply_base {
371 bool done = false;
372 promise<T...> p;
373 template<typename... V>
374 void set_value(V&&... v) {
375 done = true;
376 p.set_value(std::forward<V>(v)...);
377 }
378 ~rcv_reply_base() {
379 if (!done) {
380 p.set_exception(closed_error());
381 }
382 }
383 };
384
385 template<typename Serializer, typename T>
386 struct rcv_reply : rcv_reply_base<T, T> {
387 inline void get_reply(rpc::client& dst, rcv_buf input) {
388 this->set_value(unmarshall<Serializer, T>(dst, std::move(input)));
389 }
390 };
391
392 template<typename Serializer, typename... T>
393 struct rcv_reply<Serializer, future<T...>> : rcv_reply_base<std::tuple<T...>, T...> {
394 inline void get_reply(rpc::client& dst, rcv_buf input) {
395 this->set_value(unmarshall<Serializer, T...>(dst, std::move(input)));
396 }
397 };
398
399 template<typename Serializer>
400 struct rcv_reply<Serializer, void> : rcv_reply_base<void, void> {
401 inline void get_reply(rpc::client& dst, rcv_buf input) {
402 this->set_value();
403 }
404 };
405
406 template<typename Serializer>
407 struct rcv_reply<Serializer, future<>> : rcv_reply<Serializer, void> {};
408
409 template <typename Serializer, typename Ret, typename... InArgs>
410 inline auto wait_for_reply(wait_type, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, rpc::client& dst, id_type msg_id,
411 signature<Ret (InArgs...)> sig) {
412 using reply_type = rcv_reply<Serializer, Ret>;
413 auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable {
414 if (msg_id >= 0) {
415 dst.get_stats_internal().replied++;
416 return r.get_reply(dst, std::move(data));
417 } else {
418 dst.get_stats_internal().exception_received++;
419 r.done = true;
420 r.p.set_exception(unmarshal_exception(data));
421 }
422 };
423 using handler_type = typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
424 auto r = std::make_unique<handler_type>(std::move(lambda));
425 auto fut = r->reply.p.get_future();
426 dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
427 return fut;
428 }
429
430 template<typename Serializer, typename... InArgs>
431 inline auto wait_for_reply(no_wait_type, compat::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id,
432 signature<no_wait_type (InArgs...)> sig) { // no_wait overload
433 return make_ready_future<>();
434 }
435
436 template<typename Serializer, typename... InArgs>
437 inline auto wait_for_reply(no_wait_type, compat::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id,
438 signature<future<no_wait_type> (InArgs...)> sig) { // future<no_wait> overload
439 return make_ready_future<>();
440 }
441
442 // Convert a relative timeout (a duration) to an absolute one (time_point).
443 // Do the calculation safely so that a very large duration will be capped by
444 // time_point::max, instead of wrapping around to ancient history.
445 inline rpc_clock_type::time_point
446 relative_timeout_to_absolute(rpc_clock_type::duration relative) {
447 rpc_clock_type::time_point now = rpc_clock_type::now();
448 return now + std::min(relative, rpc_clock_type::time_point::max() - now);
449 }
450
451 // Returns lambda that can be used to send rpc messages.
452 // The lambda gets client connection and rpc parameters as arguments, marshalls them sends
453 // to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion
454 // to a caller.
455 template<typename Serializer, typename MsgType, typename Ret, typename... InArgs>
456 auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
457 struct shelper {
458 MsgType t;
459 signature<Ret (InArgs...)> sig;
460 auto send(rpc::client& dst, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, const InArgs&... args) {
461 if (dst.error()) {
462 using cleaned_ret_type = typename wait_signature<Ret>::cleaned_type;
463 return futurize<cleaned_ret_type>::make_exception_future(closed_error());
464 }
465
466 // send message
467 auto msg_id = dst.next_message_id();
468 snd_buf data = marshall(dst.template serializer<Serializer>(), 28, args...);
469 static_assert(snd_buf::chunk_size >= 28, "send buffer chunk size is too small");
470 auto p = data.front().get_write() + 8; // 8 extra bytes for expiration timer
471 write_le<uint64_t>(p, uint64_t(t));
472 write_le<int64_t>(p + 8, msg_id);
473 write_le<uint32_t>(p + 16, data.size - 28);
474
475 // prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent
476 using wait = wait_signature_t<Ret>;
477 return when_all(dst.send(std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, cancel, dst, msg_id, sig)).then([] (auto r) {
478 return std::move(std::get<1>(r)); // return future of wait_for_reply
479 });
480 }
481 auto operator()(rpc::client& dst, const InArgs&... args) {
482 return send(dst, {}, nullptr, args...);
483 }
484 auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout, const InArgs&... args) {
485 return send(dst, timeout, nullptr, args...);
486 }
487 auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, const InArgs&... args) {
488 return send(dst, relative_timeout_to_absolute(timeout), nullptr, args...);
489 }
490 auto operator()(rpc::client& dst, cancellable& cancel, const InArgs&... args) {
491 return send(dst, {}, &cancel, args...);
492 }
493
494 };
495 return shelper{xt, xsig};
496 }
497
498 template<typename Serializer, typename... RetTypes>
499 inline future<> reply(wait_type, future<RetTypes...>&& ret, int64_t msg_id, shared_ptr<server::connection> client,
500 compat::optional<rpc_clock_type::time_point> timeout) {
501 if (!client->error()) {
502 snd_buf data;
503 try {
504 data = apply(marshall<Serializer, const RetTypes&...>,
505 std::tuple_cat(std::make_tuple(std::ref(client->template serializer<Serializer>()), 12), std::move(ret.get())));
506 } catch (std::exception& ex) {
507 uint32_t len = std::strlen(ex.what());
508 data = snd_buf(20 + len);
509 auto os = make_serializer_stream(data);
510 os.skip(12);
511 uint32_t v32 = cpu_to_le(uint32_t(exception_type::USER));
512 os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
513 v32 = cpu_to_le(len);
514 os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
515 os.write(ex.what(), len);
516 msg_id = -msg_id;
517 }
518
519 return client->respond(msg_id, std::move(data), timeout);
520 } else {
521 ret.ignore_ready_future();
522 return make_ready_future<>();
523 }
524 }
525
526 // specialization for no_wait_type which does not send a reply
527 template<typename Serializer>
528 inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client, compat::optional<rpc_clock_type::time_point> timeout) {
529 try {
530 r.get();
531 } catch (std::exception& ex) {
532 client->get_logger()(client->info(), msgid, to_sstring("exception \"") + ex.what() + "\" in no_wait handler ignored");
533 }
534 return make_ready_future<>();
535 }
536
537 template<typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint, typename Func, typename ArgsTuple>
538 inline futurize_t<Ret> apply(Func& func, client_info& info, opt_time_point time_point, WantClientInfo wci, WantTimePoint wtp, signature<Ret (InArgs...)> sig, ArgsTuple&& args) {
539 using futurator = futurize<Ret>;
540 try {
541 return futurator::apply(func, maybe_add_client_info(wci, info, maybe_add_time_point(wtp, time_point, std::forward<ArgsTuple>(args))));
542 } catch (std::runtime_error& ex) {
543 return futurator::make_exception_future(std::current_exception());
544 }
545 }
546
547 // lref_to_cref is a helper that encapsulates lvalue reference in std::ref() or does nothing otherwise
548 template<typename T>
549 auto lref_to_cref(T&& x) {
550 return std::move(x);
551 }
552
553 template<typename T>
554 auto lref_to_cref(T& x) {
555 return std::ref(x);
556 }
557
558 // Creates lambda to handle RPC message on a server.
559 // The lambda unmarshalls all parameters, calls a handler, marshall return values and sends them back to a client
560 template <typename Serializer, typename Func, typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint>
561 auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci, WantTimePoint wtp) {
562 using signature = decltype(sig);
563 using wait_style = wait_signature_t<Ret>;
564 return [func = lref_to_cref(std::forward<Func>(func))](shared_ptr<server::connection> client,
565 compat::optional<rpc_clock_type::time_point> timeout,
566 int64_t msg_id,
567 rcv_buf data) mutable {
568 auto memory_consumed = client->estimate_request_size(data.size);
569 if (memory_consumed > client->max_request_size()) {
570 auto err = format("request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size());
571 client->get_logger()(client->peer_address(), err);
572 // FIXME: future is discarded
573 (void)with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
574 return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout);
575 });
576 return make_ready_future();
577 }
578 // note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
579 auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func] (auto permit) mutable {
580 try {
581 // FIXME: future is discarded
582 (void)with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
583 try {
584 auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
585 return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
586 return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) {
587 client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", eptr));
588 });
589 });
590 } catch (...) {
591 client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", std::current_exception()));
592 return make_ready_future();
593 }
594 });
595 } catch (gate_closed_exception&) {/* ignore */ }
596 });
597
598 if (timeout) {
599 f = f.handle_exception_type([] (semaphore_timed_out&) { /* ignore */ });
600 }
601
602 return f;
603 };
604 }
605
606 // helper to create copy constructible lambda from non copy constructible one. std::function<> works only with former kind.
607 template<typename Func>
608 auto make_copyable_function(Func&& func, std::enable_if_t<!std::is_copy_constructible<std::decay_t<Func>>::value, void*> = nullptr) {
609 auto p = make_lw_shared<typename std::decay_t<Func>>(std::forward<Func>(func));
610 return [p] (auto&&... args) { return (*p)( std::forward<decltype(args)>(args)... ); };
611 }
612
613 template<typename Func>
614 auto make_copyable_function(Func&& func, std::enable_if_t<std::is_copy_constructible<std::decay_t<Func>>::value, void*> = nullptr) {
615 return std::forward<Func>(func);
616 }
617
618 // This class is used to calculate client side rpc function signature.
619 // Return type is converted from a smart pointer to a type it points to.
620 // rpc::optional are converted to non optional type.
621 //
622 // Examples:
623 // std::unique_ptr<int>(int, rpc::optional<long>) -> int(int, long)
624 // double(float) -> double(float)
625 template<typename Ret, typename... In>
626 class client_function_type {
627 template<typename T, bool IsSmartPtr>
628 struct drop_smart_ptr_impl;
629 template<typename T>
630 struct drop_smart_ptr_impl<T, true> {
631 using type = typename T::element_type;
632 };
633 template<typename T>
634 struct drop_smart_ptr_impl<T, false> {
635 using type = T;
636 };
637 template<typename T>
638 using drop_smart_ptr = drop_smart_ptr_impl<T, is_smart_ptr<T>::value>;
639
640 // if return type is smart ptr take a type it points to instead
641 using return_type = typename drop_smart_ptr<Ret>::type;
642 public:
643 using type = return_type(typename remove_optional<In>::type...);
644 };
645
646 template<typename Serializer, typename MsgType>
647 template<typename Ret, typename... In>
648 auto protocol<Serializer, MsgType>::make_client(signature<Ret(In...)> clear_sig, MsgType t) {
649 using sig_type = signature<typename client_function_type<Ret, In...>::type>;
650 return send_helper<Serializer>(t, sig_type());
651 }
652
653 template<typename Serializer, typename MsgType>
654 template<typename Func>
655 auto protocol<Serializer, MsgType>::make_client(MsgType t) {
656 return make_client(typename signature<typename function_traits<Func>::signature>::clean(), t);
657 }
658
659 template<typename Serializer, typename MsgType>
660 template<typename Func>
661 auto protocol<Serializer, MsgType>::register_handler(MsgType t, scheduling_group sg, Func&& func) {
662 using sig_type = signature<typename function_traits<Func>::signature>;
663 using clean_sig_type = typename sig_type::clean;
664 using want_client_info = typename sig_type::want_client_info;
665 using want_time_point = typename sig_type::want_time_point;
666 auto recv = recv_helper<Serializer>(clean_sig_type(), std::forward<Func>(func),
667 want_client_info(), want_time_point());
668 register_receiver(t, rpc_handler{sg, make_copyable_function(std::move(recv))});
669 return make_client(clean_sig_type(), t);
670 }
671
672 template<typename Serializer, typename MsgType>
673 template<typename Func>
674 auto protocol<Serializer, MsgType>::register_handler(MsgType t, Func&& func) {
675 return register_handler(t, scheduling_group(), std::forward<Func>(func));
676 }
677
678 template<typename Serializer, typename MsgType>
679 future<> protocol<Serializer, MsgType>::unregister_handler(MsgType t) {
680 auto it = _handlers.find(t);
681 if (it != _handlers.end()) {
682 return it->second.use_gate.close().finally([this, t] {
683 _handlers.erase(t);
684 });
685 }
686 return make_ready_future<>();
687 }
688
689 template<typename Serializer, typename MsgType>
690 bool protocol<Serializer, MsgType>::has_handler(uint64_t msg_id) {
691 auto it = _handlers.find(MsgType(msg_id));
692 if (it == _handlers.end()) {
693 return false;
694 }
695 return !it->second.use_gate.is_closed();
696 }
697
698 template<typename Serializer, typename MsgType>
699 rpc_handler* protocol<Serializer, MsgType>::get_handler(uint64_t msg_id) {
700 rpc_handler* h = nullptr;
701 auto it = _handlers.find(MsgType(msg_id));
702 if (it != _handlers.end()) {
703 try {
704 it->second.use_gate.enter();
705 h = &it->second;
706 } catch (gate_closed_exception&) {
707 // unregistered, just ignore
708 }
709 }
710 return h;
711 }
712
713 template<typename Serializer, typename MsgType>
714 void protocol<Serializer, MsgType>::put_handler(rpc_handler* h) {
715 h->use_gate.leave();
716 }
717
718 template<typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org);
719
720 template<typename Serializer, typename... Out>
721 future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
722 // note that we use remote serializer pointer, so if serailizer needs a state
723 // it should have per-cpu one
724 snd_buf data = marshall(this->_con->get()->template serializer<Serializer>(), 4, args...);
725 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
726 auto p = data.front().get_write();
727 write_le<uint32_t>(p, data.size - 4);
728 // we do not want to dead lock on huge packets, so let them in
729 // but only one at a time
730 auto size = std::min(size_t(data.size), max_stream_buffers_memory);
731 return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique<snd_buf>(std::move(data)))] (semaphore_units<> su) mutable {
732 if (this->_ex) {
733 return make_exception_future(this->_ex);
734 }
735 // FIXME: future is discarded
736 (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data)] () mutable {
737 connection* con = this->_con->get();
738 if (con->error()) {
739 return make_exception_future(closed_error());
740 }
741 if(con->sink_closed()) {
742 return make_exception_future(stream_closed());
743 }
744 return con->send(make_shard_local_buffer_copy(std::move(data)), {}, nullptr);
745 }).then_wrapped([su = std::move(su), this] (future<> f) {
746 if (f.failed() && !this->_ex) { // first error is the interesting one
747 this->_ex = f.get_exception();
748 } else {
749 f.ignore_ready_future();
750 }
751 });
752 return make_ready_future<>();
753 });
754 }
755
756 template<typename Serializer, typename... Out>
757 future<> sink_impl<Serializer, Out...>::flush() {
758 // wait until everything is sent out before returning.
759 return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
760 if (this->_ex) {
761 return make_exception_future(this->_ex);
762 }
763 return make_ready_future();
764 });
765 }
766
767 template<typename Serializer, typename... Out>
768 future<> sink_impl<Serializer, Out...>::close() {
769 return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
770 return smp::submit_to(this->_con->get_owner_shard(), [this] {
771 connection* con = this->_con->get();
772 if (con->sink_closed()) { // double close, should not happen!
773 return make_exception_future(stream_closed());
774 }
775 future<> f = make_ready_future<>();
776 if (!con->error() && !this->_ex) {
777 snd_buf data = marshall(con->template serializer<Serializer>(), 4);
778 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
779 auto p = data.front().get_write();
780 write_le<uint32_t>(p, -1U); // max len fragment marks an end of a stream
781 f = con->send(std::move(data), {}, nullptr);
782 } else {
783 f = this->_ex ? make_exception_future(this->_ex) : make_exception_future(closed_error());
784 }
785 return f.finally([con] { return con->close_sink(); });
786 });
787 });
788 }
789
790 template<typename Serializer, typename... In>
791 future<compat::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operator()() {
792 auto process_one_buffer = [this] {
793 foreign_ptr<std::unique_ptr<rcv_buf>> buf = std::move(this->_bufs.front());
794 this->_bufs.pop_front();
795 return seastar::apply([] (In&&... args) {
796 auto ret = compat::make_optional(std::make_tuple(std::move(args)...));
797 return make_ready_future<compat::optional<std::tuple<In...>>>(std::move(ret));
798 }, unmarshall<Serializer, In...>(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf))));
799 };
800
801 if (!this->_bufs.empty()) {
802 return process_one_buffer();
803 }
804
805 // refill buffers from remote cpu
806 return smp::submit_to(this->_con->get_owner_shard(), [this] () -> future<> {
807 connection* con = this->_con->get();
808 if (con->_source_closed) {
809 return make_exception_future<>(stream_closed());
810 }
811 return con->stream_receive(this->_bufs).then_wrapped([this, con] (future<>&& f) {
812 if (f.failed()) {
813 return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){
814 f.ignore_ready_future();
815 return make_exception_future<>(ex);
816 });
817 }
818 if (this->_bufs.empty()) { // nothing to read -> eof
819 return con->close_source().then_wrapped([] (future<> f) {
820 f.ignore_ready_future();
821 return make_ready_future<>();
822 });
823 }
824 return make_ready_future<>();
825 });
826 }).then([this, process_one_buffer] () {
827 if (this->_bufs.empty()) {
828 return make_ready_future<compat::optional<std::tuple<In...>>>(compat::nullopt);
829 } else {
830 return process_one_buffer();
831 }
832 });
833 }
834
835 template<typename... Out>
836 connection_id sink<Out...>::get_id() const {
837 return _impl->_con->get()->get_connection_id();
838 }
839
840 template<typename... In>
841 connection_id source<In...>::get_id() const {
842 return _impl->_con->get()->get_connection_id();
843 }
844
845 template<typename... In>
846 template<typename Serializer, typename... Out>
847 sink<Out...> source<In...>::make_sink() {
848 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(_impl->_con));
849 }
850
851 }
852
853 }
854
855 namespace std {
856 template<>
857 struct hash<seastar::rpc::streaming_domain_type> {
858 size_t operator()(const seastar::rpc::streaming_domain_type& domain) const {
859 size_t h = 0;
860 boost::hash_combine(h, std::hash<uint64_t>{}(domain._id));
861 return h;
862 }
863 };
864 }
865
866