]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/rpc/rpc.hh
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22 #pragma once
23
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <list>
27 #include <seastar/core/future.hh>
28 #include <seastar/net/api.hh>
29 #include <seastar/core/reactor.hh>
30 #include <seastar/core/iostream.hh>
31 #include <seastar/core/shared_ptr.hh>
32 #include <seastar/core/condition-variable.hh>
33 #include <seastar/core/gate.hh>
34 #include <seastar/rpc/rpc_types.hh>
35 #include <seastar/core/byteorder.hh>
36 #include <seastar/core/shared_future.hh>
37 #include <seastar/core/queue.hh>
38 #include <seastar/core/weak_ptr.hh>
39 #include <seastar/core/scheduling.hh>
40 #include <seastar/util/backtrace.hh>
41
42 namespace seastar {
43
44 namespace rpc {
45
46 using id_type = int64_t;
47
48 using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
49 using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
50
51 struct SerializerConcept {
52 // For each serializable type T, implement
53 class T;
54 template <typename Output>
55 friend void write(const SerializerConcept&, Output& output, const T& data);
56 template <typename Input>
57 friend T read(const SerializerConcept&, Input& input, type<T> type_tag); // type_tag used to disambiguate
58 // Input and Output expose void read(char*, size_t) and write(const char*, size_t).
59 };
60
61 static constexpr char rpc_magic[] = "SSTARRPC";
62
63 /// Specifies resource isolation for a connection.
64 struct isolation_config {
65 /// Specifies a scheduling group under which the connection (and all its
66 /// verb handlers) will execute.
67 scheduling_group sched_group = current_scheduling_group();
68 };
69
70 /// Default isolation configuration - run everything in the default scheduling group.
71 isolation_config default_isolate_connection(sstring isolation_cookie);
72
73 /// \brief Resource limits for an RPC server
74 ///
75 /// A request's memory use will be estimated as
76 ///
77 /// req_mem = basic_request_size * sizeof(serialized_request) * bloat_factor
78 ///
79 /// Concurrent requests will be limited so that
80 ///
81 /// sum(req_mem) <= max_memory
82 ///
83 /// \see server
84 struct resource_limits {
85 size_t basic_request_size = 0; ///< Minimum request footprint in memory
86 unsigned bloat_factor = 1; ///< Serialized size multiplied by this to estimate memory used by request
87 size_t max_memory = rpc_semaphore::max_counter(); ///< Maximum amount of memory that may be consumed by all requests
88 /// Configures isolation for a connection based on its isolation cookie. May throw,
89 /// in which case the connection will be terminated.
90 std::function<isolation_config (sstring isolation_cookie)> isolate_connection = default_isolate_connection;
91 };
92
93 struct client_options {
94 compat::optional<net::tcp_keepalive_params> keepalive;
95 bool tcp_nodelay = true;
96 bool reuseaddr = false;
97 compressor::factory* compressor_factory = nullptr;
98 bool send_timeout_data = true;
99 connection_id stream_parent = invalid_connection_id;
100 /// Configures how this connection is isolated from other connection on the same server.
101 ///
102 /// \see resource_limits::isolate_connection
103 sstring isolation_cookie;
104 };
105
106 // RPC call that passes stream connection id as a parameter
107 // may arrive to a different shard from where the stream connection
108 // was opened, so the connection id is not known to a server that handles
109 // the RPC call. The shard that the stream connection belong to is know
110 // since it is a part of connection id, but this is not enough to locate
111 // a server instance the connection belongs to if there are more than one
112 // server on the shard. Stream domain parameter is here to help with that.
113 // Different servers on all shards logically belonging to the same service should
114 // belong to the same streaming domain. Only one server on each shard can belong to
115 // a particulr streaming domain.
116 class streaming_domain_type {
117 uint64_t _id;
118 public:
119 explicit streaming_domain_type(uint64_t id) : _id(id) {}
120 bool operator==(const streaming_domain_type& o) const {
121 return _id == o._id;
122 }
123 friend struct std::hash<streaming_domain_type>;
124 friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
125 };
126
127 struct server_options {
128 compressor::factory* compressor_factory = nullptr;
129 bool tcp_nodelay = true;
130 compat::optional<streaming_domain_type> streaming_domain;
131 server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
132 };
133
134 inline
135 size_t
136 estimate_request_size(const resource_limits& lim, size_t serialized_size) {
137 return lim.basic_request_size + serialized_size * lim.bloat_factor;
138 }
139
140 struct negotiation_frame {
141 char magic[sizeof(rpc_magic) - 1];
142 uint32_t len; // additional negotiation data length; multiple negotiation_frame_feature_record structs
143 };
144
145 enum class protocol_features : uint32_t {
146 COMPRESS = 0,
147 TIMEOUT = 1,
148 CONNECTION_ID = 2,
149 STREAM_PARENT = 3,
150 ISOLATION = 4,
151 };
152
153 // internal representation of feature data
154 using feature_map = std::map<protocol_features, sstring>;
155
156 // An rpc signature, in the form signature<Ret (In0, In1, In2)>.
157 template <typename Function>
158 struct signature;
159
160 class logger {
161 std::function<void(const sstring&)> _logger;
162
163 void log(const sstring& str) const {
164 if (_logger) {
165 _logger(str);
166 }
167 }
168
169 public:
170 void set(std::function<void(const sstring&)> l) {
171 _logger = std::move(l);
172 }
173
174 void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
175
176 void operator()(const client_info& info, const sstring& str) const;
177
178 void operator()(const socket_address& addr, const sstring& str) const;
179 };
180
181 class connection {
182 protected:
183 connected_socket _fd;
184 input_stream<char> _read_buf;
185 output_stream<char> _write_buf;
186 bool _error = false;
187 bool _connected = false;
188 promise<> _stopped;
189 stats _stats;
190 const logger& _logger;
191 // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
192 // The type of the pointer is erased here, but the original type is Serializer
193 void* _serializer;
194 struct outgoing_entry {
195 timer<rpc_clock_type> t;
196 snd_buf buf;
197 compat::optional<promise<>> p = promise<>();
198 cancellable* pcancel = nullptr;
199 outgoing_entry(snd_buf b) : buf(std::move(b)) {}
200 outgoing_entry(outgoing_entry&& o) : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
201 o.p = compat::nullopt;
202 }
203 ~outgoing_entry() {
204 if (p) {
205 if (pcancel) {
206 pcancel->cancel_send = std::function<void()>();
207 pcancel->send_back_pointer = nullptr;
208 }
209 p->set_value();
210 }
211 }
212 };
213 friend outgoing_entry;
214 std::list<outgoing_entry> _outgoing_queue;
215 condition_variable _outgoing_queue_cond;
216 future<> _send_loop_stopped = make_ready_future<>();
217 std::unique_ptr<compressor> _compressor;
218 bool _timeout_negotiated = false;
219 // stream related fields
220 bool _is_stream = false;
221 connection_id _id = invalid_connection_id;
222
223 std::unordered_map<connection_id, xshard_connection_ptr> _streams;
224 queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
225 semaphore _stream_sem = semaphore(max_stream_buffers_memory);
226 bool _sink_closed = true;
227 bool _source_closed = true;
228 // the future holds if sink is already closed
229 // if it is not ready it means the sink is been closed
230 future<bool> _sink_closed_future = make_ready_future<bool>(false);
231
232 bool is_stream() {
233 return _is_stream;
234 }
235
236 snd_buf compress(snd_buf buf);
237 future<> send_buffer(snd_buf buf);
238
239 enum class outgoing_queue_type {
240 request,
241 response,
242 stream = response
243 };
244
245 template<outgoing_queue_type QueueType> void send_loop();
246 future<> stop_send_loop();
247 future<compat::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
248 bool stream_check_twoway_closed() {
249 return _sink_closed && _source_closed;
250 }
251 future<> stream_close();
252 future<> stream_process_incoming(rcv_buf&&);
253 future<> handle_stream_frame();
254
255 public:
256 connection(connected_socket&& fd, const logger& l, void* s, connection_id id = invalid_connection_id) : connection(l, s, id) {
257 set_socket(std::move(fd));
258 }
259 connection(const logger& l, void* s, connection_id id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
260 virtual ~connection() {}
261 void set_socket(connected_socket&& fd);
262 future<> send_negotiation_frame(feature_map features);
263 // functions below are public because they are used by external heavily templated functions
264 // and I am not smart enough to know how to define them as friends
265 future<> send(snd_buf buf, compat::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
266 bool error() { return _error; }
267 void abort();
268 future<> stop();
269 future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
270 future<> close_sink() {
271 _sink_closed = true;
272 if (stream_check_twoway_closed()) {
273 return stream_close();
274 }
275 return make_ready_future();
276 }
277 bool sink_closed() {
278 return _sink_closed;
279 }
280 future<> close_source() {
281 _source_closed = true;
282 if (stream_check_twoway_closed()) {
283 return stream_close();
284 }
285 return make_ready_future();
286 }
287 connection_id get_connection_id() const {
288 return _id;
289 }
290 xshard_connection_ptr get_stream(connection_id id) const;
291 void register_stream(connection_id id, xshard_connection_ptr c);
292 virtual socket_address peer_address() const = 0;
293
294 const logger& get_logger() const {
295 return _logger;
296 }
297
298 template<typename Serializer>
299 Serializer& serializer() {
300 return *static_cast<Serializer*>(_serializer);
301 }
302
303 template <typename FrameType>
304 typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in);
305
306 template <typename FrameType>
307 typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
308 friend class client;
309 template<typename Serializer, typename... Out>
310 friend class sink_impl;
311 template<typename Serializer, typename... In>
312 friend class source_impl;
313 };
314
315 // send data Out...
316 template<typename Serializer, typename... Out>
317 class sink_impl : public sink<Out...>::impl {
318 public:
319 sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
320 future<> operator()(const Out&... args) override;
321 future<> close() override;
322 future<> flush() override;
323 };
324
325 // receive data In...
326 template<typename Serializer, typename... In>
327 class source_impl : public source<In...>::impl {
328 public:
329 source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
330 future<compat::optional<std::tuple<In...>>> operator()() override;
331 };
332
333 class client : public rpc::connection, public weakly_referencable<client> {
334 socket _socket;
335 id_type _message_id = 1;
336 struct reply_handler_base {
337 timer<rpc_clock_type> t;
338 cancellable* pcancel = nullptr;
339 virtual void operator()(client&, id_type, rcv_buf data) = 0;
340 virtual void timeout() {}
341 virtual void cancel() {}
342 virtual ~reply_handler_base() {
343 if (pcancel) {
344 pcancel->cancel_wait = std::function<void()>();
345 pcancel->wait_back_pointer = nullptr;
346 }
347 };
348 };
349 public:
350 template<typename Reply, typename Func>
351 struct reply_handler final : reply_handler_base {
352 Func func;
353 Reply reply;
354 reply_handler(Func&& f) : func(std::move(f)) {}
355 virtual void operator()(client& client, id_type msg_id, rcv_buf data) override {
356 return func(reply, client, msg_id, std::move(data));
357 }
358 virtual void timeout() override {
359 reply.done = true;
360 reply.p.set_exception(timeout_error());
361 }
362 virtual void cancel() override {
363 reply.done = true;
364 reply.p.set_exception(canceled_error());
365 }
366 virtual ~reply_handler() {}
367 };
368 private:
369 std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
370 socket_address _server_addr;
371 client_options _options;
372 compat::optional<shared_promise<>> _client_negotiated = shared_promise<>();
373 weak_ptr<client> _parent; // for stream clients
374
375 private:
376 future<> negotiate_protocol(input_stream<char>& in);
377 void negotiate(feature_map server_features);
378 future<std::tuple<int64_t, compat::optional<rcv_buf>>>
379 read_response_frame(input_stream<char>& in);
380 future<std::tuple<int64_t, compat::optional<rcv_buf>>>
381 read_response_frame_compressed(input_stream<char>& in);
382 void send_loop() {
383 if (is_stream()) {
384 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
385 } else {
386 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::request>();
387 }
388 }
389 public:
390 /**
391 * Create client object which will attempt to connect to the remote address.
392 *
393 * @param addr the remote address identifying this client
394 * @param local the local address of this client
395 */
396 client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
397 client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
398
399 /**
400 * Create client object which will attempt to connect to the remote address using the
401 * specified seastar::socket.
402 *
403 * @param addr the remote address identifying this client
404 * @param local the local address of this client
405 * @param socket the socket object use to connect to the remote address
406 */
407 client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
408 client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
409
410 stats get_stats() const;
411 stats& get_stats_internal() {
412 return _stats;
413 }
414 auto next_message_id() { return _message_id++; }
415 void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
416 void wait_timed_out(id_type id);
417 future<> stop();
418 void abort_all_streams();
419 void deregister_this_stream();
420 socket_address peer_address() const override {
421 return _server_addr;
422 }
423 future<> await_connection() {
424 if (!_client_negotiated) {
425 return make_ready_future<>();
426 } else {
427 return _client_negotiated->get_shared_future();
428 }
429 }
430 template<typename Serializer, typename... Out>
431 future<sink<Out...>> make_stream_sink(socket socket) {
432 return await_connection().then([this, socket = std::move(socket)] () mutable {
433 if (!this->get_connection_id()) {
434 return make_exception_future<sink<Out...>>(std::runtime_error("Streaming is not supported by the server"));
435 }
436 client_options o = _options;
437 o.stream_parent = this->get_connection_id();
438 o.send_timeout_data = false;
439 auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr);
440 c->_parent = this->weak_from_this();
441 c->_is_stream = true;
442 return c->await_connection().then([c, this] {
443 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
444 this->register_stream(c->get_connection_id(), s);
445 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
446 });
447 });
448 }
449 template<typename Serializer, typename... Out>
450 future<sink<Out...>> make_stream_sink() {
451 return make_stream_sink<Serializer, Out...>(engine().net().socket());
452 }
453 };
454
455 class protocol_base;
456
457 class server {
458 private:
459 static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
460
461 public:
462 class connection : public rpc::connection, public enable_shared_from_this<connection> {
463 server& _server;
464 client_info _info;
465 connection_id _parent_id = invalid_connection_id;
466 compat::optional<isolation_config> _isolation_config;
467 private:
468 future<> negotiate_protocol(input_stream<char>& in);
469 future<std::tuple<compat::optional<uint64_t>, uint64_t, int64_t, compat::optional<rcv_buf>>>
470 read_request_frame_compressed(input_stream<char>& in);
471 future<feature_map> negotiate(feature_map requested);
472 void send_loop() {
473 if (is_stream()) {
474 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
475 } else {
476 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>();
477 }
478 }
479 future<> send_unknown_verb_reply(compat::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
480 public:
481 connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
482 future<> process();
483 future<> respond(int64_t msg_id, snd_buf&& data, compat::optional<rpc_clock_type::time_point> timeout);
484 client_info& info() { return _info; }
485 const client_info& info() const { return _info; }
486 stats get_stats() const {
487 stats res = _stats;
488 res.pending = _outgoing_queue.size();
489 return res;
490 }
491
492 stats& get_stats_internal() {
493 return _stats;
494 }
495 socket_address peer_address() const override {
496 return _info.addr;
497 }
498 // Resources will be released when this goes out of scope
499 future<resource_permit> wait_for_resources(size_t memory_consumed, compat::optional<rpc_clock_type::time_point> timeout) {
500 if (timeout) {
501 return get_units(_server._resources_available, memory_consumed, *timeout);
502 } else {
503 return get_units(_server._resources_available, memory_consumed);
504 }
505 }
506 size_t estimate_request_size(size_t serialized_size) {
507 return rpc::estimate_request_size(_server._limits, serialized_size);
508 }
509 size_t max_request_size() const {
510 return _server._limits.max_memory;
511 }
512 server& get_server() {
513 return _server;
514 }
515 future<> deregister_this_stream();
516 };
517 private:
518 protocol_base* _proto;
519 api_v2::server_socket _ss;
520 resource_limits _limits;
521 rpc_semaphore _resources_available;
522 std::unordered_map<connection_id, shared_ptr<connection>> _conns;
523 promise<> _ss_stopped;
524 gate _reply_gate;
525 server_options _options;
526 uint64_t _next_client_id = 1;
527
528 public:
529 server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
530 server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
531 server(protocol_base* proto, server_socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{});
532 server(protocol_base* proto, server_options opts, server_socket, resource_limits memory_limit = resource_limits());
533 void accept();
534 future<> stop();
535 template<typename Func>
536 void foreach_connection(Func&& f) {
537 for (auto c : _conns) {
538 f(*c.second);
539 }
540 }
541 gate& reply_gate() {
542 return _reply_gate;
543 }
544 friend connection;
545 friend client;
546 };
547
548 using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, compat::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
549 rcv_buf data)>;
550
551 struct rpc_handler {
552 scheduling_group sg;
553 rpc_handler_func func;
554 gate use_gate;
555 };
556
557 class protocol_base {
558 public:
559 virtual ~protocol_base() {};
560 virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
561 protected:
562 friend class server;
563
564 virtual rpc_handler* get_handler(uint64_t msg_id) = 0;
565 virtual void put_handler(rpc_handler*) = 0;
566 };
567
568 // MsgType is a type that holds type of a message. The type should be hashable
569 // and serializable. It is preferable to use enum for message types, but
570 // do not forget to provide hash function for it
571 template<typename Serializer, typename MsgType = uint32_t>
572 class protocol : public protocol_base {
573 public:
574 class server : public rpc::server {
575 public:
576 server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
577 rpc::server(&proto, addr, memory_limit) {}
578 server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
579 rpc::server(&proto, opts, addr, memory_limit) {}
580 server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{}) :
581 rpc::server(&proto, std::move(socket), memory_limit) {}
582 server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) :
583 rpc::server(&proto, opts, std::move(socket), memory_limit) {}
584 };
585 class client : public rpc::client {
586 public:
587 /*
588 * Create client object which will attempt to connect to the remote address.
589 *
590 * @param addr the remote address identifying this client
591 * @param local the local address of this client
592 */
593 client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
594 rpc::client(p.get_logger(), &p._serializer, addr, local) {}
595 client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
596 rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
597
598 /**
599 * Create client object which will attempt to connect to the remote address using the
600 * specified seastar::socket.
601 *
602 * @param addr the remote address identifying this client
603 * @param local the local address of this client
604 * @param socket the socket object use to connect to the remote address
605 */
606 client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
607 rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
608 client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
609 rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
610 };
611
612 friend server;
613 private:
614 std::unordered_map<MsgType, rpc_handler> _handlers;
615 Serializer _serializer;
616 logger _logger;
617
618 public:
619 protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
620 template<typename Func>
621 auto make_client(MsgType t);
622
623 // returns a function which type depends on Func
624 // if Func == Ret(Args...) then return function is
625 // future<Ret>(protocol::client&, Args...)
626 template<typename Func>
627 auto register_handler(MsgType t, Func&& func);
628
629 // returns a function which type depends on Func
630 // if Func == Ret(Args...) then return function is
631 // future<Ret>(protocol::client&, Args...)
632 template <typename Func>
633 auto register_handler(MsgType t, scheduling_group sg, Func&& func);
634
635 future<> unregister_handler(MsgType t);
636
637 void set_logger(std::function<void(const sstring&)> logger) {
638 _logger.set(std::move(logger));
639 }
640
641 const logger& get_logger() const {
642 return _logger;
643 }
644
645 shared_ptr<rpc::server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) override {
646 return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
647 }
648
649 bool has_handler(uint64_t msg_id);
650
651 private:
652 rpc_handler* get_handler(uint64_t msg_id) override;
653 void put_handler(rpc_handler*) override;
654
655 template<typename Ret, typename... In>
656 auto make_client(signature<Ret(In...)> sig, MsgType t);
657
658 void register_receiver(MsgType t, rpc_handler&& handler) {
659 auto r = _handlers.emplace(t, std::move(handler));
660 if (!r.second) {
661 throw_with_backtrace<std::runtime_error>("registered handler already exists");
662 }
663 }
664 };
665 }
666
667 }
668
669 #include "rpc_impl.hh"