]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2015 Cloudius Systems, Ltd. | |
20 | */ | |
21 | #pragma once | |
22 | ||
23 | #include <seastar/core/function_traits.hh> | |
11fdf7f2 TL |
24 | #include <seastar/core/shared_ptr.hh> |
25 | #include <seastar/core/sstring.hh> | |
f67539c2 | 26 | #include <seastar/core/when_all.hh> |
11fdf7f2 TL |
27 | #include <seastar/util/is_smart_ptr.hh> |
28 | #include <seastar/core/simple-stream.hh> | |
29 | #include <boost/range/numeric.hpp> | |
30 | #include <boost/range/adaptor/transformed.hpp> | |
31 | #include <seastar/net/packet-data-source.hh> | |
9f95a23c | 32 | #include <seastar/core/print.hh> |
11fdf7f2 TL |
33 | |
34 | namespace seastar { | |
35 | ||
36 | namespace rpc { | |
37 | ||
38 | enum class exception_type : uint32_t { | |
39 | USER = 0, | |
40 | UNKNOWN_VERB = 1, | |
41 | }; | |
42 | ||
43 | template<typename T> | |
44 | struct remove_optional { | |
45 | using type = T; | |
46 | }; | |
47 | ||
48 | template<typename T> | |
49 | struct remove_optional<optional<T>> { | |
50 | using type = T; | |
51 | }; | |
52 | ||
53 | struct wait_type {}; // opposite of no_wait_type | |
54 | ||
55 | // tags to tell whether we want a const client_info& parameter | |
56 | struct do_want_client_info {}; | |
57 | struct dont_want_client_info {}; | |
58 | ||
59 | // tags to tell whether we want a opt_time_point parameter | |
60 | struct do_want_time_point {}; | |
61 | struct dont_want_time_point {}; | |
62 | ||
63 | // General case | |
64 | template <typename Ret, typename... In> | |
65 | struct signature<Ret (In...)> { | |
66 | using ret_type = Ret; | |
67 | using arg_types = std::tuple<In...>; | |
68 | using clean = signature; | |
69 | using want_client_info = dont_want_client_info; | |
70 | using want_time_point = dont_want_time_point; | |
71 | }; | |
72 | ||
73 | // Specialize 'clean' for handlers that receive client_info | |
74 | template <typename Ret, typename... In> | |
75 | struct signature<Ret (const client_info&, In...)> { | |
76 | using ret_type = Ret; | |
77 | using arg_types = std::tuple<In...>; | |
78 | using clean = signature<Ret (In...)>; | |
79 | using want_client_info = do_want_client_info; | |
80 | using want_time_point = dont_want_time_point; | |
81 | }; | |
82 | ||
83 | template <typename Ret, typename... In> | |
84 | struct signature<Ret (client_info&, In...)> { | |
85 | using ret_type = Ret; | |
86 | using arg_types = std::tuple<In...>; | |
87 | using clean = signature<Ret (In...)>; | |
88 | using want_client_info = do_want_client_info; | |
89 | using want_time_point = dont_want_time_point; | |
90 | }; | |
91 | ||
92 | // Specialize 'clean' for handlers that receive client_info and opt_time_point | |
93 | template <typename Ret, typename... In> | |
94 | struct signature<Ret (const client_info&, opt_time_point, In...)> { | |
95 | using ret_type = Ret; | |
96 | using arg_types = std::tuple<In...>; | |
97 | using clean = signature<Ret (In...)>; | |
98 | using want_client_info = do_want_client_info; | |
99 | using want_time_point = do_want_time_point; | |
100 | }; | |
101 | ||
102 | template <typename Ret, typename... In> | |
103 | struct signature<Ret (client_info&, opt_time_point, In...)> { | |
104 | using ret_type = Ret; | |
105 | using arg_types = std::tuple<In...>; | |
106 | using clean = signature<Ret (In...)>; | |
107 | using want_client_info = do_want_client_info; | |
108 | using want_time_point = do_want_time_point; | |
109 | }; | |
110 | ||
111 | // Specialize 'clean' for handlers that receive opt_time_point | |
112 | template <typename Ret, typename... In> | |
113 | struct signature<Ret (opt_time_point, In...)> { | |
114 | using ret_type = Ret; | |
115 | using arg_types = std::tuple<In...>; | |
116 | using clean = signature<Ret (In...)>; | |
117 | using want_client_info = dont_want_client_info; | |
118 | using want_time_point = do_want_time_point; | |
119 | }; | |
120 | ||
121 | template <typename T> | |
122 | struct wait_signature { | |
123 | using type = wait_type; | |
124 | using cleaned_type = T; | |
125 | }; | |
126 | ||
127 | template <typename... T> | |
128 | struct wait_signature<future<T...>> { | |
129 | using type = wait_type; | |
130 | using cleaned_type = future<T...>; | |
131 | }; | |
132 | ||
133 | template <> | |
134 | struct wait_signature<no_wait_type> { | |
135 | using type = no_wait_type; | |
136 | using cleaned_type = void; | |
137 | }; | |
138 | ||
139 | template <> | |
140 | struct wait_signature<future<no_wait_type>> { | |
141 | using type = no_wait_type; | |
142 | using cleaned_type = future<>; | |
143 | }; | |
144 | ||
145 | template <typename T> | |
146 | using wait_signature_t = typename wait_signature<T>::type; | |
147 | ||
148 | template <typename... In> | |
149 | inline | |
150 | std::tuple<In...> | |
151 | maybe_add_client_info(dont_want_client_info, client_info& ci, std::tuple<In...>&& args) { | |
152 | return std::move(args); | |
153 | } | |
154 | ||
155 | template <typename... In> | |
156 | inline | |
157 | std::tuple<std::reference_wrapper<client_info>, In...> | |
158 | maybe_add_client_info(do_want_client_info, client_info& ci, std::tuple<In...>&& args) { | |
159 | return std::tuple_cat(std::make_tuple(std::ref(ci)), std::move(args)); | |
160 | } | |
161 | ||
162 | template <typename... In> | |
163 | inline | |
164 | std::tuple<In...> | |
165 | maybe_add_time_point(dont_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) { | |
166 | return std::move(args); | |
167 | } | |
168 | ||
169 | template <typename... In> | |
170 | inline | |
171 | std::tuple<opt_time_point, In...> | |
172 | maybe_add_time_point(do_want_time_point, opt_time_point& otp, std::tuple<In...>&& args) { | |
173 | return std::tuple_cat(std::make_tuple(otp), std::move(args)); | |
174 | } | |
175 | ||
176 | inline sstring serialize_connection_id(const connection_id& id) { | |
f67539c2 TL |
177 | sstring p = uninitialized_string(sizeof(id)); |
178 | auto c = p.data(); | |
11fdf7f2 TL |
179 | write_le(c, id.id); |
180 | return p; | |
181 | } | |
182 | ||
183 | inline connection_id deserialize_connection_id(const sstring& s) { | |
184 | connection_id id; | |
185 | auto p = s.c_str(); | |
186 | id.id = read_le<decltype(id.id)>(p); | |
187 | return id; | |
188 | } | |
189 | ||
190 | template <bool IsSmartPtr> | |
191 | struct serialize_helper; | |
192 | ||
193 | template <> | |
194 | struct serialize_helper<false> { | |
195 | template <typename Serializer, typename Output, typename T> | |
196 | static inline void serialize(Serializer& serializer, Output& out, const T& t) { | |
197 | return write(serializer, out, t); | |
198 | } | |
199 | }; | |
200 | ||
201 | template <> | |
202 | struct serialize_helper<true> { | |
203 | template <typename Serializer, typename Output, typename T> | |
204 | static inline void serialize(Serializer& serializer, Output& out, const T& t) { | |
205 | return write(serializer, out, *t); | |
206 | } | |
207 | }; | |
208 | ||
9f95a23c TL |
209 | template <typename Serializer, typename Output, typename... T> |
210 | inline void do_marshall(Serializer& serializer, Output& out, const T&... args); | |
211 | ||
11fdf7f2 TL |
212 | template <typename Serializer, typename Output> |
213 | struct marshall_one { | |
214 | template <typename T> struct helper { | |
215 | static void doit(Serializer& serializer, Output& out, const T& arg) { | |
216 | using serialize_helper_type = serialize_helper<is_smart_ptr<typename std::remove_reference<T>::type>::value>; | |
217 | serialize_helper_type::serialize(serializer, out, arg); | |
218 | } | |
219 | }; | |
220 | template<typename T> struct helper<std::reference_wrapper<const T>> { | |
221 | static void doit(Serializer& serializer, Output& out, const std::reference_wrapper<const T>& arg) { | |
222 | helper<T>::doit(serializer, out, arg.get()); | |
223 | } | |
224 | }; | |
225 | static void put_connection_id(const connection_id& cid, Output& out) { | |
226 | sstring id = serialize_connection_id(cid); | |
227 | out.write(id.c_str(), id.size()); | |
228 | } | |
229 | template <typename... T> struct helper<sink<T...>> { | |
230 | static void doit(Serializer& serializer, Output& out, const sink<T...>& arg) { | |
231 | put_connection_id(arg.get_id(), out); | |
232 | } | |
233 | }; | |
234 | template <typename... T> struct helper<source<T...>> { | |
235 | static void doit(Serializer& serializer, Output& out, const source<T...>& arg) { | |
236 | put_connection_id(arg.get_id(), out); | |
237 | } | |
238 | }; | |
9f95a23c TL |
239 | template <typename... T> struct helper<tuple<T...>> { |
240 | static void doit(Serializer& serializer, Output& out, const tuple<T...>& arg) { | |
241 | auto do_do_marshall = [&serializer, &out] (const auto&... args) { | |
242 | do_marshall(serializer, out, args...); | |
243 | }; | |
f67539c2 | 244 | std::apply(do_do_marshall, arg); |
9f95a23c TL |
245 | } |
246 | }; | |
11fdf7f2 TL |
247 | }; |
248 | ||
249 | template <typename Serializer, typename Output, typename... T> | |
250 | inline void do_marshall(Serializer& serializer, Output& out, const T&... args) { | |
251 | // C++ guarantees that brace-initialization expressions are evaluted in order | |
252 | (void)std::initializer_list<int>{(marshall_one<Serializer, Output>::template helper<T>::doit(serializer, out, args), 1)...}; | |
253 | } | |
254 | ||
255 | static inline memory_output_stream<snd_buf::iterator> make_serializer_stream(snd_buf& output) { | |
f67539c2 | 256 | auto* b = std::get_if<temporary_buffer<char>>(&output.bufs); |
11fdf7f2 TL |
257 | if (b) { |
258 | return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::simple(b->get_write(), b->size())); | |
259 | } else { | |
f67539c2 | 260 | auto& ar = std::get<std::vector<temporary_buffer<char>>>(output.bufs); |
11fdf7f2 TL |
261 | return memory_output_stream<snd_buf::iterator>(memory_output_stream<snd_buf::iterator>::fragmented(ar.begin(), output.size)); |
262 | } | |
263 | } | |
264 | ||
265 | template <typename Serializer, typename... T> | |
266 | inline snd_buf marshall(Serializer& serializer, size_t head_space, const T&... args) { | |
267 | measuring_output_stream measure; | |
268 | do_marshall(serializer, measure, args...); | |
269 | snd_buf ret(measure.size() + head_space); | |
270 | auto out = make_serializer_stream(ret); | |
271 | out.skip(head_space); | |
272 | do_marshall(serializer, out, args...); | |
273 | return ret; | |
274 | } | |
275 | ||
276 | template <typename Serializer, typename Input> | |
277 | inline std::tuple<> do_unmarshall(connection& c, Input& in) { | |
278 | return std::make_tuple(); | |
279 | } | |
280 | ||
281 | template<typename Serializer, typename Input> | |
282 | struct unmarshal_one { | |
283 | template<typename T> struct helper { | |
284 | static T doit(connection& c, Input& in) { | |
285 | return read(c.serializer<Serializer>(), in, type<T>()); | |
286 | } | |
287 | }; | |
288 | template<typename T> struct helper<optional<T>> { | |
289 | static optional<T> doit(connection& c, Input& in) { | |
290 | if (in.size()) { | |
291 | return optional<T>(read(c.serializer<Serializer>(), in, type<typename remove_optional<T>::type>())); | |
292 | } else { | |
293 | return optional<T>(); | |
294 | } | |
295 | } | |
296 | }; | |
297 | template<typename T> struct helper<std::reference_wrapper<const T>> { | |
298 | static T doit(connection& c, Input& in) { | |
299 | return helper<T>::doit(c, in); | |
300 | } | |
301 | }; | |
302 | static connection_id get_connection_id(Input& in) { | |
f67539c2 TL |
303 | sstring id = uninitialized_string(sizeof(connection_id)); |
304 | in.read(id.data(), sizeof(connection_id)); | |
11fdf7f2 TL |
305 | return deserialize_connection_id(id); |
306 | } | |
307 | template<typename... T> struct helper<sink<T...>> { | |
308 | static sink<T...> doit(connection& c, Input& in) { | |
309 | return sink<T...>(make_shared<sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in)))); | |
310 | } | |
311 | }; | |
312 | template<typename... T> struct helper<source<T...>> { | |
313 | static source<T...> doit(connection& c, Input& in) { | |
314 | return source<T...>(make_shared<source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in)))); | |
315 | } | |
316 | }; | |
9f95a23c TL |
317 | template <typename... T> struct helper<tuple<T...>> { |
318 | static tuple<T...> doit(connection& c, Input& in) { | |
319 | return do_unmarshall<Serializer, Input, T...>(c, in); | |
320 | } | |
321 | }; | |
11fdf7f2 TL |
322 | }; |
323 | ||
324 | template <typename Serializer, typename Input, typename T0, typename... Trest> | |
325 | inline std::tuple<T0, Trest...> do_unmarshall(connection& c, Input& in) { | |
326 | // FIXME: something less recursive | |
327 | auto first = std::make_tuple(unmarshal_one<Serializer, Input>::template helper<T0>::doit(c, in)); | |
328 | auto rest = do_unmarshall<Serializer, Input, Trest...>(c, in); | |
329 | return std::tuple_cat(std::move(first), std::move(rest)); | |
330 | } | |
331 | ||
332 | template <typename Serializer, typename... T> | |
333 | inline std::tuple<T...> unmarshall(connection& c, rcv_buf input) { | |
334 | auto in = make_deserializer_stream(input); | |
335 | return do_unmarshall<Serializer, decltype(in), T...>(c, in); | |
336 | } | |
337 | ||
338 | inline std::exception_ptr unmarshal_exception(rcv_buf& d) { | |
339 | std::exception_ptr ex; | |
340 | auto data = make_deserializer_stream(d); | |
341 | ||
342 | uint32_t v32; | |
343 | data.read(reinterpret_cast<char*>(&v32), 4); | |
344 | exception_type ex_type = exception_type(le_to_cpu(v32)); | |
345 | data.read(reinterpret_cast<char*>(&v32), 4); | |
346 | uint32_t ex_len = le_to_cpu(v32); | |
347 | ||
348 | switch (ex_type) { | |
349 | case exception_type::USER: { | |
350 | std::string s(ex_len, '\0'); | |
351 | data.read(&*s.begin(), ex_len); | |
352 | ex = std::make_exception_ptr(std::runtime_error(std::move(s))); | |
353 | break; | |
354 | } | |
355 | case exception_type::UNKNOWN_VERB: { | |
356 | uint64_t v64; | |
357 | data.read(reinterpret_cast<char*>(&v64), 8); | |
358 | ex = std::make_exception_ptr(unknown_verb_error(le_to_cpu(v64))); | |
359 | break; | |
360 | } | |
361 | default: | |
362 | ex = std::make_exception_ptr(unknown_exception_error()); | |
363 | break; | |
364 | } | |
365 | return ex; | |
366 | } | |
367 | ||
368 | template <typename Payload, typename... T> | |
369 | struct rcv_reply_base { | |
370 | bool done = false; | |
371 | promise<T...> p; | |
372 | template<typename... V> | |
373 | void set_value(V&&... v) { | |
374 | done = true; | |
f67539c2 | 375 | p.set_value(internal::untuple(std::forward<V>(v))...); |
11fdf7f2 TL |
376 | } |
377 | ~rcv_reply_base() { | |
378 | if (!done) { | |
379 | p.set_exception(closed_error()); | |
380 | } | |
381 | } | |
382 | }; | |
383 | ||
384 | template<typename Serializer, typename T> | |
385 | struct rcv_reply : rcv_reply_base<T, T> { | |
386 | inline void get_reply(rpc::client& dst, rcv_buf input) { | |
387 | this->set_value(unmarshall<Serializer, T>(dst, std::move(input))); | |
388 | } | |
389 | }; | |
390 | ||
391 | template<typename Serializer, typename... T> | |
392 | struct rcv_reply<Serializer, future<T...>> : rcv_reply_base<std::tuple<T...>, T...> { | |
393 | inline void get_reply(rpc::client& dst, rcv_buf input) { | |
394 | this->set_value(unmarshall<Serializer, T...>(dst, std::move(input))); | |
395 | } | |
396 | }; | |
397 | ||
398 | template<typename Serializer> | |
399 | struct rcv_reply<Serializer, void> : rcv_reply_base<void, void> { | |
400 | inline void get_reply(rpc::client& dst, rcv_buf input) { | |
401 | this->set_value(); | |
402 | } | |
403 | }; | |
404 | ||
405 | template<typename Serializer> | |
406 | struct rcv_reply<Serializer, future<>> : rcv_reply<Serializer, void> {}; | |
407 | ||
408 | template <typename Serializer, typename Ret, typename... InArgs> | |
f67539c2 | 409 | inline auto wait_for_reply(wait_type, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, rpc::client& dst, id_type msg_id, |
11fdf7f2 TL |
410 | signature<Ret (InArgs...)> sig) { |
411 | using reply_type = rcv_reply<Serializer, Ret>; | |
412 | auto lambda = [] (reply_type& r, rpc::client& dst, id_type msg_id, rcv_buf data) mutable { | |
413 | if (msg_id >= 0) { | |
414 | dst.get_stats_internal().replied++; | |
415 | return r.get_reply(dst, std::move(data)); | |
416 | } else { | |
417 | dst.get_stats_internal().exception_received++; | |
418 | r.done = true; | |
419 | r.p.set_exception(unmarshal_exception(data)); | |
420 | } | |
421 | }; | |
422 | using handler_type = typename rpc::client::template reply_handler<reply_type, decltype(lambda)>; | |
423 | auto r = std::make_unique<handler_type>(std::move(lambda)); | |
424 | auto fut = r->reply.p.get_future(); | |
425 | dst.wait_for_reply(msg_id, std::move(r), timeout, cancel); | |
426 | return fut; | |
427 | } | |
428 | ||
429 | template<typename Serializer, typename... InArgs> | |
f67539c2 | 430 | inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id, |
11fdf7f2 TL |
431 | signature<no_wait_type (InArgs...)> sig) { // no_wait overload |
432 | return make_ready_future<>(); | |
433 | } | |
434 | ||
435 | template<typename Serializer, typename... InArgs> | |
f67539c2 | 436 | inline auto wait_for_reply(no_wait_type, std::optional<rpc_clock_type::time_point>, cancellable* cancel, rpc::client& dst, id_type msg_id, |
11fdf7f2 TL |
437 | signature<future<no_wait_type> (InArgs...)> sig) { // future<no_wait> overload |
438 | return make_ready_future<>(); | |
439 | } | |
440 | ||
9f95a23c TL |
441 | // Convert a relative timeout (a duration) to an absolute one (time_point). |
442 | // Do the calculation safely so that a very large duration will be capped by | |
443 | // time_point::max, instead of wrapping around to ancient history. | |
444 | inline rpc_clock_type::time_point | |
445 | relative_timeout_to_absolute(rpc_clock_type::duration relative) { | |
446 | rpc_clock_type::time_point now = rpc_clock_type::now(); | |
447 | return now + std::min(relative, rpc_clock_type::time_point::max() - now); | |
448 | } | |
449 | ||
11fdf7f2 TL |
450 | // Returns lambda that can be used to send rpc messages. |
451 | // The lambda gets client connection and rpc parameters as arguments, marshalls them sends | |
452 | // to a server and waits for a reply. After receiving reply it unmarshalls it and signal completion | |
453 | // to a caller. | |
454 | template<typename Serializer, typename MsgType, typename Ret, typename... InArgs> | |
455 | auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) { | |
456 | struct shelper { | |
457 | MsgType t; | |
458 | signature<Ret (InArgs...)> sig; | |
f67539c2 | 459 | auto send(rpc::client& dst, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel, const InArgs&... args) { |
11fdf7f2 TL |
460 | if (dst.error()) { |
461 | using cleaned_ret_type = typename wait_signature<Ret>::cleaned_type; | |
462 | return futurize<cleaned_ret_type>::make_exception_future(closed_error()); | |
463 | } | |
464 | ||
465 | // send message | |
466 | auto msg_id = dst.next_message_id(); | |
467 | snd_buf data = marshall(dst.template serializer<Serializer>(), 28, args...); | |
468 | static_assert(snd_buf::chunk_size >= 28, "send buffer chunk size is too small"); | |
469 | auto p = data.front().get_write() + 8; // 8 extra bytes for expiration timer | |
470 | write_le<uint64_t>(p, uint64_t(t)); | |
471 | write_le<int64_t>(p + 8, msg_id); | |
472 | write_le<uint32_t>(p + 16, data.size - 28); | |
473 | ||
474 | // prepare reply handler, if return type is now_wait_type this does nothing, since no reply will be sent | |
475 | using wait = wait_signature_t<Ret>; | |
476 | return when_all(dst.send(std::move(data), timeout, cancel), wait_for_reply<Serializer>(wait(), timeout, cancel, dst, msg_id, sig)).then([] (auto r) { | |
477 | return std::move(std::get<1>(r)); // return future of wait_for_reply | |
478 | }); | |
479 | } | |
480 | auto operator()(rpc::client& dst, const InArgs&... args) { | |
481 | return send(dst, {}, nullptr, args...); | |
482 | } | |
483 | auto operator()(rpc::client& dst, rpc_clock_type::time_point timeout, const InArgs&... args) { | |
484 | return send(dst, timeout, nullptr, args...); | |
485 | } | |
486 | auto operator()(rpc::client& dst, rpc_clock_type::duration timeout, const InArgs&... args) { | |
9f95a23c | 487 | return send(dst, relative_timeout_to_absolute(timeout), nullptr, args...); |
11fdf7f2 TL |
488 | } |
489 | auto operator()(rpc::client& dst, cancellable& cancel, const InArgs&... args) { | |
490 | return send(dst, {}, &cancel, args...); | |
491 | } | |
492 | ||
493 | }; | |
494 | return shelper{xt, xsig}; | |
495 | } | |
496 | ||
f67539c2 TL |
497 | template<typename Serializer, typename SEASTAR_ELLIPSIS RetTypes> |
498 | inline future<> reply(wait_type, future<RetTypes SEASTAR_ELLIPSIS>&& ret, int64_t msg_id, shared_ptr<server::connection> client, | |
499 | std::optional<rpc_clock_type::time_point> timeout) { | |
11fdf7f2 TL |
500 | if (!client->error()) { |
501 | snd_buf data; | |
502 | try { | |
f67539c2 TL |
503 | #if SEASTAR_API_LEVEL < 6 |
504 | if constexpr (sizeof...(RetTypes) == 0) { | |
505 | #else | |
506 | if constexpr (std::is_void_v<RetTypes>) { | |
507 | #endif | |
508 | ret.get(); | |
509 | data = std::invoke(marshall<Serializer>, std::ref(client->template serializer<Serializer>()), 12); | |
510 | } else { | |
511 | data = std::invoke(marshall<Serializer, const RetTypes& SEASTAR_ELLIPSIS>, std::ref(client->template serializer<Serializer>()), 12, std::move(ret.get0())); | |
512 | } | |
11fdf7f2 TL |
513 | } catch (std::exception& ex) { |
514 | uint32_t len = std::strlen(ex.what()); | |
515 | data = snd_buf(20 + len); | |
516 | auto os = make_serializer_stream(data); | |
517 | os.skip(12); | |
518 | uint32_t v32 = cpu_to_le(uint32_t(exception_type::USER)); | |
519 | os.write(reinterpret_cast<char*>(&v32), sizeof(v32)); | |
520 | v32 = cpu_to_le(len); | |
521 | os.write(reinterpret_cast<char*>(&v32), sizeof(v32)); | |
522 | os.write(ex.what(), len); | |
523 | msg_id = -msg_id; | |
524 | } | |
525 | ||
526 | return client->respond(msg_id, std::move(data), timeout); | |
527 | } else { | |
528 | ret.ignore_ready_future(); | |
529 | return make_ready_future<>(); | |
530 | } | |
531 | } | |
532 | ||
533 | // specialization for no_wait_type which does not send a reply | |
534 | template<typename Serializer> | |
f67539c2 | 535 | inline future<> reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, shared_ptr<server::connection> client, std::optional<rpc_clock_type::time_point> timeout) { |
11fdf7f2 TL |
536 | try { |
537 | r.get(); | |
538 | } catch (std::exception& ex) { | |
539 | client->get_logger()(client->info(), msgid, to_sstring("exception \"") + ex.what() + "\" in no_wait handler ignored"); | |
540 | } | |
541 | return make_ready_future<>(); | |
542 | } | |
543 | ||
544 | template<typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint, typename Func, typename ArgsTuple> | |
545 | inline futurize_t<Ret> apply(Func& func, client_info& info, opt_time_point time_point, WantClientInfo wci, WantTimePoint wtp, signature<Ret (InArgs...)> sig, ArgsTuple&& args) { | |
546 | using futurator = futurize<Ret>; | |
547 | try { | |
548 | return futurator::apply(func, maybe_add_client_info(wci, info, maybe_add_time_point(wtp, time_point, std::forward<ArgsTuple>(args)))); | |
549 | } catch (std::runtime_error& ex) { | |
550 | return futurator::make_exception_future(std::current_exception()); | |
551 | } | |
552 | } | |
553 | ||
554 | // lref_to_cref is a helper that encapsulates lvalue reference in std::ref() or does nothing otherwise | |
555 | template<typename T> | |
556 | auto lref_to_cref(T&& x) { | |
557 | return std::move(x); | |
558 | } | |
559 | ||
560 | template<typename T> | |
561 | auto lref_to_cref(T& x) { | |
562 | return std::ref(x); | |
563 | } | |
564 | ||
565 | // Creates lambda to handle RPC message on a server. | |
566 | // The lambda unmarshalls all parameters, calls a handler, marshall return values and sends them back to a client | |
567 | template <typename Serializer, typename Func, typename Ret, typename... InArgs, typename WantClientInfo, typename WantTimePoint> | |
568 | auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci, WantTimePoint wtp) { | |
569 | using signature = decltype(sig); | |
570 | using wait_style = wait_signature_t<Ret>; | |
571 | return [func = lref_to_cref(std::forward<Func>(func))](shared_ptr<server::connection> client, | |
f67539c2 | 572 | std::optional<rpc_clock_type::time_point> timeout, |
11fdf7f2 TL |
573 | int64_t msg_id, |
574 | rcv_buf data) mutable { | |
575 | auto memory_consumed = client->estimate_request_size(data.size); | |
576 | if (memory_consumed > client->max_request_size()) { | |
577 | auto err = format("request size {:d} large than memory limit {:d}", memory_consumed, client->max_request_size()); | |
578 | client->get_logger()(client->peer_address(), err); | |
9f95a23c | 579 | // FIXME: future is discarded |
f67539c2 TL |
580 | (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, err = std::move(err)] { |
581 | return reply<Serializer>(wait_style(), futurize<Ret>::make_exception_future(std::runtime_error(err.c_str())), msg_id, client, timeout).handle_exception([client, msg_id] (std::exception_ptr eptr) { | |
582 | client->get_logger()(client->info(), msg_id, format("got exception while processing an oversized message: {}", eptr)); | |
583 | }); | |
584 | }).handle_exception_type([] (gate_closed_exception&) {/* ignore */}); | |
11fdf7f2 TL |
585 | return make_ready_future(); |
586 | } | |
587 | // note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()" | |
588 | auto f = client->wait_for_resources(memory_consumed, timeout).then([client, timeout, msg_id, data = std::move(data), &func] (auto permit) mutable { | |
9f95a23c | 589 | // FIXME: future is discarded |
f67539c2 | 590 | (void)try_with_gate(client->get_server().reply_gate(), [client, timeout, msg_id, data = std::move(data), permit = std::move(permit), &func] () mutable { |
9f95a23c TL |
591 | try { |
592 | auto args = unmarshall<Serializer, InArgs...>(*client, std::move(data)); | |
593 | return apply(func, client->info(), timeout, WantClientInfo(), WantTimePoint(), signature(), std::move(args)).then_wrapped([client, timeout, msg_id, permit = std::move(permit)] (futurize_t<Ret> ret) mutable { | |
594 | return reply<Serializer>(wait_style(), std::move(ret), msg_id, client, timeout).handle_exception([permit = std::move(permit), client, msg_id] (std::exception_ptr eptr) { | |
595 | client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", eptr)); | |
596 | }); | |
597 | }); | |
598 | } catch (...) { | |
599 | client->get_logger()(client->info(), msg_id, format("got exception while processing a message: {}", std::current_exception())); | |
600 | return make_ready_future(); | |
601 | } | |
f67539c2 | 602 | }).handle_exception_type([] (gate_closed_exception&) {/* ignore */}); |
11fdf7f2 TL |
603 | }); |
604 | ||
605 | if (timeout) { | |
606 | f = f.handle_exception_type([] (semaphore_timed_out&) { /* ignore */ }); | |
607 | } | |
608 | ||
9f95a23c | 609 | return f; |
11fdf7f2 TL |
610 | }; |
611 | } | |
612 | ||
613 | // helper to create copy constructible lambda from non copy constructible one. std::function<> works only with former kind. | |
614 | template<typename Func> | |
615 | auto make_copyable_function(Func&& func, std::enable_if_t<!std::is_copy_constructible<std::decay_t<Func>>::value, void*> = nullptr) { | |
616 | auto p = make_lw_shared<typename std::decay_t<Func>>(std::forward<Func>(func)); | |
617 | return [p] (auto&&... args) { return (*p)( std::forward<decltype(args)>(args)... ); }; | |
618 | } | |
619 | ||
620 | template<typename Func> | |
621 | auto make_copyable_function(Func&& func, std::enable_if_t<std::is_copy_constructible<std::decay_t<Func>>::value, void*> = nullptr) { | |
622 | return std::forward<Func>(func); | |
623 | } | |
624 | ||
625 | // This class is used to calculate client side rpc function signature. | |
626 | // Return type is converted from a smart pointer to a type it points to. | |
627 | // rpc::optional are converted to non optional type. | |
628 | // | |
629 | // Examples: | |
630 | // std::unique_ptr<int>(int, rpc::optional<long>) -> int(int, long) | |
631 | // double(float) -> double(float) | |
632 | template<typename Ret, typename... In> | |
633 | class client_function_type { | |
634 | template<typename T, bool IsSmartPtr> | |
635 | struct drop_smart_ptr_impl; | |
636 | template<typename T> | |
637 | struct drop_smart_ptr_impl<T, true> { | |
638 | using type = typename T::element_type; | |
639 | }; | |
640 | template<typename T> | |
641 | struct drop_smart_ptr_impl<T, false> { | |
642 | using type = T; | |
643 | }; | |
644 | template<typename T> | |
645 | using drop_smart_ptr = drop_smart_ptr_impl<T, is_smart_ptr<T>::value>; | |
646 | ||
647 | // if return type is smart ptr take a type it points to instead | |
648 | using return_type = typename drop_smart_ptr<Ret>::type; | |
649 | public: | |
650 | using type = return_type(typename remove_optional<In>::type...); | |
651 | }; | |
652 | ||
653 | template<typename Serializer, typename MsgType> | |
654 | template<typename Ret, typename... In> | |
655 | auto protocol<Serializer, MsgType>::make_client(signature<Ret(In...)> clear_sig, MsgType t) { | |
656 | using sig_type = signature<typename client_function_type<Ret, In...>::type>; | |
657 | return send_helper<Serializer>(t, sig_type()); | |
658 | } | |
659 | ||
660 | template<typename Serializer, typename MsgType> | |
661 | template<typename Func> | |
662 | auto protocol<Serializer, MsgType>::make_client(MsgType t) { | |
663 | return make_client(typename signature<typename function_traits<Func>::signature>::clean(), t); | |
664 | } | |
665 | ||
666 | template<typename Serializer, typename MsgType> | |
667 | template<typename Func> | |
668 | auto protocol<Serializer, MsgType>::register_handler(MsgType t, scheduling_group sg, Func&& func) { | |
669 | using sig_type = signature<typename function_traits<Func>::signature>; | |
670 | using clean_sig_type = typename sig_type::clean; | |
671 | using want_client_info = typename sig_type::want_client_info; | |
672 | using want_time_point = typename sig_type::want_time_point; | |
673 | auto recv = recv_helper<Serializer>(clean_sig_type(), std::forward<Func>(func), | |
674 | want_client_info(), want_time_point()); | |
675 | register_receiver(t, rpc_handler{sg, make_copyable_function(std::move(recv))}); | |
676 | return make_client(clean_sig_type(), t); | |
677 | } | |
678 | ||
679 | template<typename Serializer, typename MsgType> | |
680 | template<typename Func> | |
681 | auto protocol<Serializer, MsgType>::register_handler(MsgType t, Func&& func) { | |
682 | return register_handler(t, scheduling_group(), std::forward<Func>(func)); | |
683 | } | |
684 | ||
9f95a23c TL |
685 | template<typename Serializer, typename MsgType> |
686 | future<> protocol<Serializer, MsgType>::unregister_handler(MsgType t) { | |
687 | auto it = _handlers.find(t); | |
688 | if (it != _handlers.end()) { | |
689 | return it->second.use_gate.close().finally([this, t] { | |
690 | _handlers.erase(t); | |
691 | }); | |
692 | } | |
693 | return make_ready_future<>(); | |
694 | } | |
695 | ||
696 | template<typename Serializer, typename MsgType> | |
f67539c2 TL |
697 | bool protocol<Serializer, MsgType>::has_handler(MsgType msg_id) { |
698 | auto it = _handlers.find(msg_id); | |
9f95a23c TL |
699 | if (it == _handlers.end()) { |
700 | return false; | |
701 | } | |
702 | return !it->second.use_gate.is_closed(); | |
703 | } | |
704 | ||
705 | template<typename Serializer, typename MsgType> | |
706 | rpc_handler* protocol<Serializer, MsgType>::get_handler(uint64_t msg_id) { | |
707 | rpc_handler* h = nullptr; | |
708 | auto it = _handlers.find(MsgType(msg_id)); | |
709 | if (it != _handlers.end()) { | |
710 | try { | |
711 | it->second.use_gate.enter(); | |
712 | h = &it->second; | |
713 | } catch (gate_closed_exception&) { | |
714 | // unregistered, just ignore | |
715 | } | |
716 | } | |
717 | return h; | |
718 | } | |
719 | ||
720 | template<typename Serializer, typename MsgType> | |
721 | void protocol<Serializer, MsgType>::put_handler(rpc_handler* h) { | |
722 | h->use_gate.leave(); | |
723 | } | |
724 | ||
11fdf7f2 TL |
725 | template<typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org); |
726 | ||
727 | template<typename Serializer, typename... Out> | |
728 | future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) { | |
729 | // note that we use remote serializer pointer, so if serailizer needs a state | |
730 | // it should have per-cpu one | |
731 | snd_buf data = marshall(this->_con->get()->template serializer<Serializer>(), 4, args...); | |
732 | static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small"); | |
733 | auto p = data.front().get_write(); | |
734 | write_le<uint32_t>(p, data.size - 4); | |
735 | // we do not want to dead lock on huge packets, so let them in | |
736 | // but only one at a time | |
737 | auto size = std::min(size_t(data.size), max_stream_buffers_memory); | |
20effc67 TL |
738 | const auto seq_num = _next_seq_num++; |
739 | return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique<snd_buf>(std::move(data))), seq_num] (semaphore_units<> su) mutable { | |
11fdf7f2 TL |
740 | if (this->_ex) { |
741 | return make_exception_future(this->_ex); | |
742 | } | |
f67539c2 TL |
743 | // It is OK to discard this future. The user is required to |
744 | // wait for it when closing. | |
20effc67 | 745 | (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data), seq_num] () mutable { |
11fdf7f2 TL |
746 | connection* con = this->_con->get(); |
747 | if (con->error()) { | |
748 | return make_exception_future(closed_error()); | |
749 | } | |
750 | if(con->sink_closed()) { | |
751 | return make_exception_future(stream_closed()); | |
752 | } | |
20effc67 TL |
753 | |
754 | auto& last_seq_num = _remote_state.last_seq_num; | |
755 | auto& out_of_order_bufs = _remote_state.out_of_order_bufs; | |
756 | ||
757 | auto local_data = make_shard_local_buffer_copy(std::move(data)); | |
758 | const auto seq_num_diff = seq_num - last_seq_num; | |
759 | if (seq_num_diff > 1) { | |
760 | auto [it, _] = out_of_order_bufs.emplace(seq_num, deferred_snd_buf{promise<>{}, std::move(local_data)}); | |
761 | return it->second.pr.get_future(); | |
762 | } | |
763 | ||
764 | last_seq_num = seq_num; | |
765 | auto ret_fut = con->send(std::move(local_data), {}, nullptr); | |
766 | while (!out_of_order_bufs.empty() && out_of_order_bufs.begin()->first == (last_seq_num + 1)) { | |
767 | auto it = out_of_order_bufs.begin(); | |
768 | last_seq_num = it->first; | |
769 | auto fut = con->send(std::move(it->second.data), {}, nullptr); | |
770 | fut.forward_to(std::move(it->second.pr)); | |
771 | out_of_order_bufs.erase(it); | |
772 | } | |
773 | return ret_fut; | |
11fdf7f2 TL |
774 | }).then_wrapped([su = std::move(su), this] (future<> f) { |
775 | if (f.failed() && !this->_ex) { // first error is the interesting one | |
776 | this->_ex = f.get_exception(); | |
777 | } else { | |
778 | f.ignore_ready_future(); | |
779 | } | |
780 | }); | |
781 | return make_ready_future<>(); | |
782 | }); | |
783 | } | |
784 | ||
9f95a23c TL |
785 | template<typename Serializer, typename... Out> |
786 | future<> sink_impl<Serializer, Out...>::flush() { | |
787 | // wait until everything is sent out before returning. | |
788 | return with_semaphore(this->_sem, max_stream_buffers_memory, [this] { | |
789 | if (this->_ex) { | |
790 | return make_exception_future(this->_ex); | |
791 | } | |
792 | return make_ready_future(); | |
793 | }); | |
794 | } | |
795 | ||
11fdf7f2 TL |
796 | template<typename Serializer, typename... Out> |
797 | future<> sink_impl<Serializer, Out...>::close() { | |
798 | return with_semaphore(this->_sem, max_stream_buffers_memory, [this] { | |
799 | return smp::submit_to(this->_con->get_owner_shard(), [this] { | |
800 | connection* con = this->_con->get(); | |
801 | if (con->sink_closed()) { // double close, should not happen! | |
802 | return make_exception_future(stream_closed()); | |
803 | } | |
804 | future<> f = make_ready_future<>(); | |
805 | if (!con->error() && !this->_ex) { | |
806 | snd_buf data = marshall(con->template serializer<Serializer>(), 4); | |
807 | static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small"); | |
808 | auto p = data.front().get_write(); | |
809 | write_le<uint32_t>(p, -1U); // max len fragment marks an end of a stream | |
810 | f = con->send(std::move(data), {}, nullptr); | |
811 | } else { | |
812 | f = this->_ex ? make_exception_future(this->_ex) : make_exception_future(closed_error()); | |
813 | } | |
814 | return f.finally([con] { return con->close_sink(); }); | |
815 | }); | |
816 | }); | |
817 | } | |
818 | ||
f67539c2 TL |
819 | template<typename Serializer, typename... Out> |
820 | sink_impl<Serializer, Out...>::~sink_impl() { | |
821 | // A failure to close might leave some continuations running after | |
822 | // this is destroyed, leading to use-after-free bugs. | |
823 | assert(this->_con->get()->sink_closed()); | |
824 | } | |
825 | ||
11fdf7f2 | 826 | template<typename Serializer, typename... In> |
f67539c2 | 827 | future<std::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operator()() { |
11fdf7f2 TL |
828 | auto process_one_buffer = [this] { |
829 | foreign_ptr<std::unique_ptr<rcv_buf>> buf = std::move(this->_bufs.front()); | |
830 | this->_bufs.pop_front(); | |
f67539c2 TL |
831 | return std::apply([] (In&&... args) { |
832 | auto ret = std::make_optional(std::make_tuple(std::move(args)...)); | |
833 | return make_ready_future<std::optional<std::tuple<In...>>>(std::move(ret)); | |
11fdf7f2 TL |
834 | }, unmarshall<Serializer, In...>(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf)))); |
835 | }; | |
836 | ||
837 | if (!this->_bufs.empty()) { | |
838 | return process_one_buffer(); | |
839 | } | |
840 | ||
841 | // refill buffers from remote cpu | |
842 | return smp::submit_to(this->_con->get_owner_shard(), [this] () -> future<> { | |
843 | connection* con = this->_con->get(); | |
9f95a23c TL |
844 | if (con->_source_closed) { |
845 | return make_exception_future<>(stream_closed()); | |
846 | } | |
11fdf7f2 TL |
847 | return con->stream_receive(this->_bufs).then_wrapped([this, con] (future<>&& f) { |
848 | if (f.failed()) { | |
849 | return con->close_source().then_wrapped([ex = f.get_exception()] (future<> f){ | |
850 | f.ignore_ready_future(); | |
851 | return make_exception_future<>(ex); | |
852 | }); | |
853 | } | |
854 | if (this->_bufs.empty()) { // nothing to read -> eof | |
855 | return con->close_source().then_wrapped([] (future<> f) { | |
856 | f.ignore_ready_future(); | |
857 | return make_ready_future<>(); | |
858 | }); | |
859 | } | |
860 | return make_ready_future<>(); | |
861 | }); | |
862 | }).then([this, process_one_buffer] () { | |
863 | if (this->_bufs.empty()) { | |
f67539c2 | 864 | return make_ready_future<std::optional<std::tuple<In...>>>(std::nullopt); |
11fdf7f2 TL |
865 | } else { |
866 | return process_one_buffer(); | |
867 | } | |
868 | }); | |
869 | } | |
870 | ||
871 | template<typename... Out> | |
872 | connection_id sink<Out...>::get_id() const { | |
873 | return _impl->_con->get()->get_connection_id(); | |
874 | } | |
875 | ||
876 | template<typename... In> | |
877 | connection_id source<In...>::get_id() const { | |
878 | return _impl->_con->get()->get_connection_id(); | |
879 | } | |
880 | ||
881 | template<typename... In> | |
882 | template<typename Serializer, typename... Out> | |
883 | sink<Out...> source<In...>::make_sink() { | |
884 | return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(_impl->_con)); | |
885 | } | |
886 | ||
887 | } | |
888 | ||
889 | } | |
890 | ||
891 | namespace std { | |
892 | template<> | |
893 | struct hash<seastar::rpc::streaming_domain_type> { | |
894 | size_t operator()(const seastar::rpc::streaming_domain_type& domain) const { | |
895 | size_t h = 0; | |
896 | boost::hash_combine(h, std::hash<uint64_t>{}(domain._id)); | |
897 | return h; | |
898 | } | |
899 | }; | |
900 | } | |
901 | ||
902 |