]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/rpc/rpc_impl.hh
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc_impl.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21 #pragma once
22
23 #include <seastar/core/function_traits.hh>
24 #include <seastar/core/apply.hh>
25 #include <seastar/core/shared_ptr.hh>
26 #include <seastar/core/sstring.hh>
27 #include <seastar/core/future-util.hh>
28 #include <seastar/util/is_smart_ptr.hh>
29 #include <seastar/core/simple-stream.hh>
30 #include <boost/range/numeric.hpp>
31 #include <boost/range/adaptor/transformed.hpp>
32 #include <seastar/net/packet-data-source.hh>
33
34 namespace seastar {
35
36 namespace rpc {
37
38 enum class exception_type : uint32_t {
39 USER = 0,
40 UNKNOWN_VERB = 1,
41 };
42
43 template<typename T>
44 struct remove_optional {
45 using type = T;
46 };
47
48 template<typename T>
49 struct remove_optional<optional<T>> {
50 using type = T;
51 };
52
53 struct wait_type {}; // opposite of no_wait_type
54
55 // tags to tell whether we want a const client_info& parameter
56 struct do_want_client_info {};
57 struct dont_want_client_info {};
58
59 // tags to tell whether we want a opt_time_point parameter
60 struct do_want_time_point {};
61 struct dont_want_time_point {};
62
63 // General case
64 template <typename Ret, typename... In>
65 struct signature<Ret (In...)> {
66 using ret_type = Ret;
67 using arg_types = std::tuple<In...>;
68 using clean = signature;
69 using want_client_info = dont_want_client_info;
70 using want_time_point = dont_want_time_point;
71 };
72
73 // Specialize 'clean' for handlers that receive client_info
74 template <typename Ret, typename... In>
75 struct signature<Ret (const client_info&, In...)> {
76 using ret_type = Ret;
77 using arg_types = std::tuple<In...>;
78 using clean = signature<Ret (In...)>;
79 using want_client_info = do_want_client_info;
80 using want_time_point = dont_want_time_point;
81 };
82
83 template <typename Ret, typename... In>
84 struct signature<Ret (client_info&, In...)> {
85 using ret_type = Ret;
86 using arg_types = std::tuple<In...>;
87 using clean = signature<Ret (In...)>;
88 using want_client_info = do_want_client_info;
89 using want_time_point = dont_want_time_point;
90 };
91
92 // Specialize 'clean' for handlers that receive client_info and opt_time_point
93 template <typename Ret, typename... In>
94 struct signature<Ret (const client_info&, opt_time_point, In...)> {
95 using ret_type = Ret;
96 using arg_types = std::tuple<In...>;
97 using clean = signature<Ret (In...)>;
98 using want_client_info = do_want_client_info;
99 using want_time_point = do_want_time_point;
100 };
101
102 template <typename Ret, typename... In>
103 struct signature<Ret (client_info&, opt_time_point, In...)> {
104 using ret_type = Ret;
105 using arg_types = std::tuple<In...>;
106 using clean = signature<Ret (In...)>;
107 using want_client_info = do_want_client_info;
108 using want_time_point = do_want_time_point;
109 };
110
111 // Specialize 'clean' for handlers that receive opt_time_point
112 template <typename Ret, typename... In>
113 struct signature<Ret (opt_time_point, In...)> {
114 using ret_type = Ret;
115 using arg_types = std::tuple<In...>;
116 using clean = signature<Ret (In...)>;
117 using want_client_info = dont_want_client_info;
118 using want_time_point = do_want_time_point;
119 };
120
121 template <typename T>
122 struct wait_signature {
123 using type = wait_type;
124 using cleaned_type = T;
125 };
126
127 template <typename... T>
128 struct wait_signature<future<T...>> {
129 using type = wait_type;
130 using cleaned_type = future<T...>;
131 };
132
133 template <>
134 struct wait_signature<no_wait_type> {
135 using type = no_wait_type;
136 using cleaned_type = void;
137 };
138
139 template <>
140 struct wait_signature<future<no_wait_type>> {
141 using type = no_wait_type;
142 using cleaned_type = future<>;
143 };
144
145 template <typename T>
146 using wait_signature_t = typename wait_signature<T>::type;
147
148 template <typename... In>
149 inline
150 std::tuple<In...>
151 maybe_add_client_info(dont_want_client_info, client_info& ci, std::tuple<In...>&& args) {
152 return std::move(args);
153 }
154
155 template <typename... In>
156 inline
157 std::tuple<std::reference_wrapper<client_info>, In...>
158 maybe_add_client_info(do_want_client_info, client_info& ci, std::tuple<In...>&& args) {
159 return std::tuple_cat(std::make_tuple(std::ref(ci)), std::move(args));
160 }
161
162 template <typename... In>
163 inline
164 std::tuple<In...>
165 maybe_add_time_point(dont_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
166 return std::move(args);
167 }
168
169 template <typename... In>
170 inline
171 std::tuple<opt_time_point, In...>
172 maybe_add_time_point(do_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) {
173 return std::tuple_cat(std::make_tuple(otp), std::move(args));
174 }
175
176 inline sstring serialize_connection_id(const connection_id& id) {
177 sstring p(sstring::initialized_later(), sizeof(id));
178 auto c = p.begin();
179 write_le(c, id.id);
180 return p;
181 }
182
183 inline connection_id deserialize_connection_id(const sstring& s) {
184 connection_id id;
185 auto p = s.c_str();
186 id.id = read_le<decltype(id.id)>(p);
187 return id;
188 }
189
190 template <bool IsSmartPtr>
191 struct serialize_helper;
192
193 template <>
194 struct serialize_helper<false> {
195 template <typename Serializer, typename Output, typename T>
196 static inline void serialize(Serializer& serializer, Output& out, const T& t) {
197 return write(serializer, out, t);
198 }
199 };
200
201 template <>
202 struct serialize_helper<true> {
203 template <typename Serializer, typename Output, typename T>
204 static inline void serialize(Serializer& serializer, Output& out, const T& t) {
205 return write(serializer, out, *t);
206 }
207 };
208
209 template <typename Serializer, typename Output>
210 struct marshall_one {
211 template <typename T> struct helper {
212 static void doit(Serializer& serializer, Output& out, const T& arg) {
213 using serialize_helper_type = serialize_helper<is_smart_ptr<typename std::remove_reference<T>::type>::value>;
214 serialize_helper_type::serialize(serializer, out, arg);
215 }
216 };
217 template<typename T> struct helper<std::reference_wrapper<const T>> {
218 static void doit(Serializer& serializer, Output& out, const std::reference_wrapper<const T>& arg) {
219 helper<T>::doit(serializer, out, arg.get());
220 }
221 };
222 static void put_connection_id(const connection_id& cid, Output& out) {
223 sstring id = serialize_connection_id(cid);
224 out.write(id.c_str(), id.size());
225 }
226 template <typename... T> struct helper<sink<T...>> {
227 static void doit(Serializer& serializer, Output& out, const sink<T...>& arg) {
228 put_connection_id(arg.get_id(), out);
229 }
230 };
231 template <typename... T> struct helper<source<T...>> {
232 static void doit(Serializer& serializer, Output& out, const source<T...>& arg) {
233 put_connection_id(arg.get_id(), out);
234 }
235 };
236 };
237
238 template <typename Serializer, typename Output, typename... T>
239 inline void do_marshall(Serializer& serializer, Output& out, const T&... args) {
240 // C++ guarantees that brace-initialization expressions are evaluted in order
241 (void)std::initializer_list<int>{(marshall_one<Serializer, Output>::template helper<T>::doit(serializer, out, args), 1)...};
242 }
243
244 static inline memory_output_stream<snd_buf::iterator> make_serializer_stream(snd_buf& output) {
245 auto* b = compat::get_if<temporary_buffer<char>>(&output.bufs);
246 if (b) {
247 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::simple(b->get_write(), b->size()));
248 } else {
249 auto& ar = compat::get<std::vector<temporary_buffer<char>>>(output.bufs);
250 return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::fragmented(ar.begin(), output.size));
251 }
252 }
253
254 template <typename Serializer, typename... T>
255 inline snd_buf marshall(Serializer& serializer, size_t head_space, const T&... args) {
256 measuring_output_stream measure;
257 do_marshall(serializer, measure, args...);
258 snd_buf ret(measure.size() + head_space);
259 auto out = make_serializer_stream(ret);
260 out.skip(head_space);
261 do_marshall(serializer, out, args...);
262 return ret;
263 }
264
265 template <typename Serializer, typename Input>
266 inline std::tuple<> do_unmarshall(connection& c, Input& in) {
267 return std::make_tuple();
268 }
269
270 template<typename Serializer, typename Input>
271 struct unmarshal_one {
272 template<typename T> struct helper {
273 static T doit(connection& c, Input& in) {
274 return read(c.serializer<Serializer>(), in, type<T>());
275 }
276 };
277 template<typename T> struct helper<optional<T>> {
278 static optional<T> doit(connection& c, Input& in) {
279 if (in.size()) {
280 return optional<T>(read(c.serializer<Serializer>(), in, type<typename remove_optional<T>::type>()));
281 } else {
282 return optional<T>();
283 }
284 }
285 };
286 template<typename T> struct helper<std::reference_wrapper<const T>> {
287 static T doit(connection& c, Input& in) {
288 return helper<T>::doit(c, in);
289 }
290 };
291 static connection_id get_connection_id(Input& in) {
292 sstring id(sstring::initialized_later(), sizeof(connection_id));
293 in.read(id.begin(), sizeof(connection_id));
294 return deserialize_connection_id(id);
295 }
296 template<typename... T> struct helper<sink<T...>> {
297 static sink<T...> doit(connection& c, Input& in) {
298 return sink<T...>(make_shared<sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
299 }
300 };
301 template<typename... T> struct helper<source<T...>> {
302 static source<T...> doit(connection& c, Input& in) {
303 return source<T...>(make_shared<source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
304 }
305 };
306 };
307
308 template <typename Serializer, typename Input, typename T0, typename... Trest>
309 inline std::tuple<T0, Trest...> do_unmarshall(connection& c, Input& in) {
310 // FIXME: something less recursive
311 auto first = std::make_tuple(unmarshal_one<Serializer, Input>::template helper<T0>::doit(c, in));
312 auto rest = do_unmarshall<Serializer, Input, Trest...>(c, in);
313 return std::tuple_cat(std::move(first), std::move(rest));
314 }
315
316 template <typename Serializer, typename... T>
317 inline std::tuple<T...> unmarshall(connection& c, rcv_buf input) {
318 auto in = make_deserializer_stream(input);
319 return do_unmarshall<Serializer, decltype(in), T...>(c, in);
320 }
321
322 inline std::exception_ptr unmarshal_exception(rcv_buf& d) {
323 std::exception_ptr ex;
324 auto data = make_deserializer_stream(d);
325
326 uint32_t v32;
327 data.read(reinterpret_cast<char*>(&v32), 4);
328 exception_type ex_type = exception_type(le_to_cpu(v32));
329 data.read(reinterpret_cast<char*>(&v32), 4);
330 uint32_t ex_len = le_to_cpu(v32);
331
332 switch (ex_type) {
333 case exception_type::USER: {
334 std::string s(ex_len, '\0');
335 data.read(&*s.begin(), ex_len);
336 ex = std::make_exception_ptr(std::runtime_error(std::move(s)));
337 break;
338 }
339 case exception_type::UNKNOWN_VERB: {
340 uint64_t v64;
341 data.read(reinterpret_cast<char*>(&v64), 8);
342 ex = std::make_exception_ptr(unknown_verb_error(le_to_cpu(v64)));
343 break;
344 }
345 default:
346 ex = std::make_exception_ptr(unknown_exception_error());
347 break;
348 }
349 return ex;
350 }
351
352 template <typename Payload, typename... T>
353 struct rcv_reply_base {
354 bool done = false;
355 promise<T...> p;
356 template<typename... V>
357 void set_value(V&&... v) {
358 done = true;
359 p.set_value(std::forward<V>(v)...);
360 }
361 ~rcv_reply_base() {
362 if (!done) {
363 p.set_exception(closed_error());
364 }
365 }
366 };
367
368 template<typename Serializer, typename T>
369 struct rcv_reply : rcv_reply_base<T, T> {
370 inline void get_reply(rpc::client& dst, rcv_buf input) {
371 this->set_value(unmarshall<Serializer, T>(dst, std::move(input)));
372 }
373 };
374
375 template<typename Serializer, typename... T>
376 struct rcv_reply<Serializer, future<T...>> : rcv_reply_base<std::tuple<T...>, T...> {
377 inline void get_reply(rpc::client& dst, rcv_buf input) {
378 this->set_value(unmarshall<Serializer, T...>(dst, std::move(input)));
379 }
380 };
381
382 template<typename Serializer>
383 struct rcv_reply<Serializer, void> : rcv_reply_base<void, void> {
384 inline void get_reply(rpc::client& dst, rcv_buf input) {
385 this->set_value();
386 }
387 };
388
389 template<typename Serializer>
390 struct rcv_reply<Serializer, future<>> : rcv_reply<Serializer, void> {};
391
392 template <typename Serializer, typename Ret, typename... InArgs>
393 inline auto wait_for_reply(wait_type, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, rpc::client& dst, id_type msg_id,
394 signature<Ret (InArgs...)> sig) {
395 using reply_type = rcv_reply<Serializer, Ret>;
396 auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable {
397 if (msg_id >= 0) {
398 dst.get_stats_internal().replied++;
399 return r.get_reply(dst, std::move(data));
400 } else {
401 dst.get_stats_internal().exception_received++;
402 r.done = true;
403 r.p.set_exception(unmarshal_exception(data));
404 }
405 };
406 using handler_type = typename rpc::client::template reply_handler<reply_type, decltype(lambda)>;
407 auto r = std::make_unique<handler_type>(std::move(lambda));
408 auto fut = r->reply.p.get_future();
409 dst.wait_for_reply(msg_id, std::move(r), timeout, cancel);
410 return fut;
411 }
412
413 template<typename Serializer, typename... InArgs>
414 inline auto wait_for_reply(no_wait_type, compat::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id,
415 signature<no_wait_type (InArgs...)> sig) { // no_wait overload
416 return make_ready_future<>();
417 }
418
419 template<typename Serializer, typename... InArgs>
420 inline auto wait_for_reply(no_wait_type, compat::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id,
421 signature<future<no_wait_type> (InArgs...)> sig) { // future<no_wait> overload
422 return make_ready_future<>();
423 }
424
425 // Returns lambda that can be used to send rpc messages.
426 // The lambda gets client connection and rpc parameters as arguments, marshalls them sends
427 // to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion
428 // to a caller.
429 template<typename Serializer, typename MsgType, typename Ret, typename... InArgs>
430 auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
431 struct shelper {
432 MsgType t;
433 signature<Ret (InArgs...)> sig;
434 auto send(rpc::client& dst, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, const InArgs&... args) {
435 if (dst.error()) {
436 using cleaned_ret_type = typename wait_signature<Ret>::cleaned_type;
437 return futurize<cleaned_ret_type>::make_exception_future(closed_error());
438 }
439
440 // send message
441 auto msg_id = dst.next_message_id();
442 snd_buf data = marshall(dst.template serializer<Serializer>(), 28, args...);
443 static_assert(snd_buf::chunk_size >= 28, "send buffer chunk size is too small");
444 auto p = data.front().get_write() + 8; // 8 extra bytes for expiration timer
445 write_le<uint64_t>(p, uint64_t(t));
446 write_le<int64_t>(p + 8, msg_id);
447 write_le<uint32_t>(p + 16, data.size - 28);
448
449 // prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent
450 using wait = wait_signature_t<Ret>;
451 return when_all(dst.send(std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, cancel, dst, msg_id, sig)).then([] (auto r) {
452 return std::move(std::get<1>(r)); // return future of wait_for_reply
453 });
454 }
455 auto operator()(rpc::client& dst, const InArgs&... args) {
456 return send(dst, {}, nullptr, args...);
457 }
458 auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout, const InArgs&... args) {
459 return send(dst, timeout, nullptr, args...);
460 }
461 auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, const InArgs&... args) {
462 return send(dst, rpc_clock_type::now() + timeout, nullptr, args...);
463 }
464 auto operator()(rpc::client& dst, cancellable& cancel, const InArgs&... args) {
465 return send(dst, {}, &cancel, args...);
466 }
467
468 };
469 return shelper{xt, xsig};
470 }
471
472 template<typename Serializer, typename... RetTypes>
473 inline future<> reply(wait_type, future<RetTypes...>&& ret, int64_t msg_id, shared_ptr<server::connection> client,
474 compat::optional<rpc_clock_type::time_point> timeout) {
475 if (!client->error()) {
476 snd_buf data;
477 try {
478 data = apply(marshall<Serializer, const RetTypes&...>,
479 std::tuple_cat(std::make_tuple(std::ref(client->template serializer<Serializer>()), 12), std::move(ret.get())));
480 } catch (std::exception& ex) {
481 uint32_t len = std::strlen(ex.what());
482 data = snd_buf(20 + len);
483 auto os = make_serializer_stream(data);
484 os.skip(12);
485 uint32_t v32 = cpu_to_le(uint32_t(exception_type::USER));
486 os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
487 v32 = cpu_to_le(len);
488 os.write(reinterpret_cast<char*>(&v32), sizeof(v32));
489 os.write(ex.what(), len);
490 msg_id = -msg_id;
491 }
492
493 return client->respond(msg_id, std::move(data), timeout);
494 } else {
495 ret.ignore_ready_future();
496 return make_ready_future<>();
497 }
498 }
499
500 // specialization for no_wait_type which does not send a reply
501 template<typename Serializer>
502 inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client, compat::optional<rpc_clock_type::time_point> timeout) {
503 try {
504 r.get();
505 } catch (std::exception& ex) {
506 client->get_logger()(client->info(), msgid, to_sstring("exception \"") + ex.what() + "\" in no_wait handler ignored");
507 }
508 return make_ready_future<>();
509 }
510
511 template<typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint, typename Func, typename ArgsTuple>
512 inline futurize_t<Ret> apply(Func& func, client_info& info, opt_time_point time_point, WantClientInfo wci, WantTimePoint wtp, signature<Ret (InArgs...)> sig, ArgsTuple&& args) {
513 using futurator = futurize<Ret>;
514 try {
515 return futurator::apply(func, maybe_add_client_info(wci, info, maybe_add_time_point(wtp, time_point, std::forward<ArgsTuple>(args))));
516 } catch (std::runtime_error& ex) {
517 return futurator::make_exception_future(std::current_exception());
518 }
519 }
520
521 // lref_to_cref is a helper that encapsulates lvalue reference in std::ref() or does nothing otherwise
522 template<typename T>
523 auto lref_to_cref(T&& x) {
524 return std::move(x);
525 }
526
527 template<typename T>
528 auto lref_to_cref(T& x) {
529 return std::ref(x);
530 }
531
532 // Creates lambda to handle RPC message on a server.
533 // The lambda unmarshalls all parameters, calls a handler, marshall return values and sends them back to a client
534 template <typename Serializer, typename Func, typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint>
535 auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci, WantTimePoint wtp) {
536 using signature = decltype(sig);
537 using wait_style = wait_signature_t<Ret>;
538 return [func = lref_to_cref(std::forward<Func>(func))](shared_ptr<server::connection> client,
539 compat::optional<rpc_clock_type::time_point> timeout,
540 int64_t msg_id,
541 rcv_buf data) mutable {
542 auto memory_consumed = client->estimate_request_size(data.size);
543 if (memory_consumed > client->max_request_size()) {
544 auto err = format("request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size());
545 client->get_logger()(client->peer_address(), err);
546 with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] {
547 return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout);
548 });
549 return make_ready_future();
550 }
551 // note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
552 auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func] (auto permit) mutable {
553 try {
554 with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable {
555 auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data));
556 return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable {
557 return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).then([permit = std::move(permit)] {});
558 });
559 });
560 } catch (gate_closed_exception&) {/* ignore */ }
561 });
562
563 if (timeout) {
564 f = f.handle_exception_type([] (semaphore_timed_out&) { /* ignore */ });
565 }
566
567 return std::move(f);
568 };
569 }
570
571 // helper to create copy constructible lambda from non copy constructible one. std::function<> works only with former kind.
572 template<typename Func>
573 auto make_copyable_function(Func&& func, std::enable_if_t<!std::is_copy_constructible<std::decay_t<Func>>::value, void*> = nullptr) {
574 auto p = make_lw_shared<typename std::decay_t<Func>>(std::forward<Func>(func));
575 return [p] (auto&&... args) { return (*p)( std::forward<decltype(args)>(args)... ); };
576 }
577
578 template<typename Func>
579 auto make_copyable_function(Func&& func, std::enable_if_t<std::is_copy_constructible<std::decay_t<Func>>::value, void*> = nullptr) {
580 return std::forward<Func>(func);
581 }
582
583 // This class is used to calculate client side rpc function signature.
584 // Return type is converted from a smart pointer to a type it points to.
585 // rpc::optional are converted to non optional type.
586 //
587 // Examples:
588 // std::unique_ptr<int>(int, rpc::optional<long>) -> int(int, long)
589 // double(float) -> double(float)
590 template<typename Ret, typename... In>
591 class client_function_type {
592 template<typename T, bool IsSmartPtr>
593 struct drop_smart_ptr_impl;
594 template<typename T>
595 struct drop_smart_ptr_impl<T, true> {
596 using type = typename T::element_type;
597 };
598 template<typename T>
599 struct drop_smart_ptr_impl<T, false> {
600 using type = T;
601 };
602 template<typename T>
603 using drop_smart_ptr = drop_smart_ptr_impl<T, is_smart_ptr<T>::value>;
604
605 // if return type is smart ptr take a type it points to instead
606 using return_type = typename drop_smart_ptr<Ret>::type;
607 public:
608 using type = return_type(typename remove_optional<In>::type...);
609 };
610
611 template<typename Serializer, typename MsgType>
612 template<typename Ret, typename... In>
613 auto protocol<Serializer, MsgType>::make_client(signature<Ret(In...)> clear_sig, MsgType t) {
614 using sig_type = signature<typename client_function_type<Ret, In...>::type>;
615 return send_helper<Serializer>(t, sig_type());
616 }
617
618 template<typename Serializer, typename MsgType>
619 template<typename Func>
620 auto protocol<Serializer, MsgType>::make_client(MsgType t) {
621 return make_client(typename signature<typename function_traits<Func>::signature>::clean(), t);
622 }
623
624 template<typename Serializer, typename MsgType>
625 template<typename Func>
626 auto protocol<Serializer, MsgType>::register_handler(MsgType t, scheduling_group sg, Func&& func) {
627 using sig_type = signature<typename function_traits<Func>::signature>;
628 using clean_sig_type = typename sig_type::clean;
629 using want_client_info = typename sig_type::want_client_info;
630 using want_time_point = typename sig_type::want_time_point;
631 auto recv = recv_helper<Serializer>(clean_sig_type(), std::forward<Func>(func),
632 want_client_info(), want_time_point());
633 register_receiver(t, rpc_handler{sg, make_copyable_function(std::move(recv))});
634 return make_client(clean_sig_type(), t);
635 }
636
637 template<typename Serializer, typename MsgType>
638 template<typename Func>
639 auto protocol<Serializer, MsgType>::register_handler(MsgType t, Func&& func) {
640 return register_handler(t, scheduling_group(), std::forward<Func>(func));
641 }
642
643 template<typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org);
644
645 template<typename Serializer, typename... Out>
646 future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
647 // note that we use remote serializer pointer, so if serailizer needs a state
648 // it should have per-cpu one
649 snd_buf data = marshall(this->_con->get()->template serializer<Serializer>(), 4, args...);
650 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
651 auto p = data.front().get_write();
652 write_le<uint32_t>(p, data.size - 4);
653 // we do not want to dead lock on huge packets, so let them in
654 // but only one at a time
655 auto size = std::min(size_t(data.size), max_stream_buffers_memory);
656 return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique<snd_buf>(std::move(data)))] (semaphore_units<> su) mutable {
657 if (this->_ex) {
658 return make_exception_future(this->_ex);
659 }
660 smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data)] () mutable {
661 connection* con = this->_con->get();
662 if (con->error()) {
663 return make_exception_future(closed_error());
664 }
665 if(con->sink_closed()) {
666 return make_exception_future(stream_closed());
667 }
668 return con->send(make_shard_local_buffer_copy(std::move(data)), {}, nullptr);
669 }).then_wrapped([su = std::move(su), this] (future<> f) {
670 if (f.failed() && !this->_ex) { // first error is the interesting one
671 this->_ex = f.get_exception();
672 } else {
673 f.ignore_ready_future();
674 }
675 });
676 return make_ready_future<>();
677 });
678 }
679
680 template<typename Serializer, typename... Out>
681 future<> sink_impl<Serializer, Out...>::close() {
682 return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
683 return smp::submit_to(this->_con->get_owner_shard(), [this] {
684 connection* con = this->_con->get();
685 if (con->sink_closed()) { // double close, should not happen!
686 return make_exception_future(stream_closed());
687 }
688 future<> f = make_ready_future<>();
689 if (!con->error() && !this->_ex) {
690 snd_buf data = marshall(con->template serializer<Serializer>(), 4);
691 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
692 auto p = data.front().get_write();
693 write_le<uint32_t>(p, -1U); // max len fragment marks an end of a stream
694 f = con->send(std::move(data), {}, nullptr);
695 } else {
696 f = this->_ex ? make_exception_future(this->_ex) : make_exception_future(closed_error());
697 }
698 return f.finally([con] { return con->close_sink(); });
699 });
700 });
701 }
702
703 template<typename Serializer, typename... In>
704 future<compat::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operator()() {
705 auto process_one_buffer = [this] {
706 foreign_ptr<std::unique_ptr<rcv_buf>> buf = std::move(this->_bufs.front());
707 this->_bufs.pop_front();
708 return seastar::apply([] (In&&... args) {
709 auto ret = compat::make_optional(std::make_tuple(std::move(args)...));
710 return make_ready_future<compat::optional<std::tuple<In...>>>(std::move(ret));
711 }, unmarshall<Serializer, In...>(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf))));
712 };
713
714 if (!this->_bufs.empty()) {
715 return process_one_buffer();
716 }
717
718 // refill buffers from remote cpu
719 return smp::submit_to(this->_con->get_owner_shard(), [this] () -> future<> {
720 connection* con = this->_con->get();
721 return con->stream_receive(this->_bufs).then_wrapped([this, con] (future<>&& f) {
722 if (f.failed()) {
723 return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){
724 f.ignore_ready_future();
725 return make_exception_future<>(ex);
726 });
727 }
728 if (this->_bufs.empty()) { // nothing to read -> eof
729 return con->close_source().then_wrapped([] (future<> f) {
730 f.ignore_ready_future();
731 return make_ready_future<>();
732 });
733 }
734 return make_ready_future<>();
735 });
736 }).then([this, process_one_buffer] () {
737 if (this->_bufs.empty()) {
738 return make_ready_future<compat::optional<std::tuple<In...>>>(compat::nullopt);
739 } else {
740 return process_one_buffer();
741 }
742 });
743 }
744
745 template<typename... Out>
746 connection_id sink<Out...>::get_id() const {
747 return _impl->_con->get()->get_connection_id();
748 }
749
750 template<typename... In>
751 connection_id source<In...>::get_id() const {
752 return _impl->_con->get()->get_connection_id();
753 }
754
755 template<typename... In>
756 template<typename Serializer, typename... Out>
757 sink<Out...> source<In...>::make_sink() {
758 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(_impl->_con));
759 }
760
761 }
762
763 }
764
765 namespace std {
766 template<>
767 struct hash<seastar::rpc::streaming_domain_type> {
768 size_t operator()(const seastar::rpc::streaming_domain_type& domain) const {
769 size_t h = 0;
770 boost::hash_combine(h, std::hash<uint64_t>{}(domain._id));
771 return h;
772 }
773 };
774 }
775
776