]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/rpc/rpc.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22 #pragma once
23
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <list>
27 #include <seastar/core/future.hh>
28 #include <seastar/core/seastar.hh>
29 #include <seastar/net/api.hh>
30 #include <seastar/core/iostream.hh>
31 #include <seastar/core/shared_ptr.hh>
32 #include <seastar/core/condition-variable.hh>
33 #include <seastar/core/gate.hh>
34 #include <seastar/rpc/rpc_types.hh>
35 #include <seastar/core/byteorder.hh>
36 #include <seastar/core/shared_future.hh>
37 #include <seastar/core/queue.hh>
38 #include <seastar/core/weak_ptr.hh>
39 #include <seastar/core/scheduling.hh>
40 #include <seastar/util/backtrace.hh>
41 #include <seastar/util/log.hh>
42
43 namespace seastar {
44
45 namespace rpc {
46
47 /// \defgroup rpc rpc - remote procedure call framework
48 ///
49 /// \brief
50 /// rpc is a framework that can be used to define client-server communication
51 /// protocols.
52 /// For a high-level description of the RPC features see
53 /// [doc/rpc.md](./md_rpc.html),
54 /// [doc/rpc-streaming.md](./md_rpc-streaming.html) and
55 /// [doc/rpc-compression.md](./md_rpc-compression.html)
56 ///
57 /// The entry point for setting up an rpc protocol is
58 /// seastar::rpc::protocol.
59
60 using id_type = int64_t;
61
62 using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
63 using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
64
65 static constexpr char rpc_magic[] = "SSTARRPC";
66
67 /// \addtogroup rpc
68 /// @{
69
70 /// Specifies resource isolation for a connection.
71 struct isolation_config {
72 /// Specifies a scheduling group under which the connection (and all its
73 /// verb handlers) will execute.
74 scheduling_group sched_group = current_scheduling_group();
75 };
76
77 /// Default isolation configuration - run everything in the default scheduling group.
78 ///
79 /// In the scheduling_group that the protocol::server was created in.
80 isolation_config default_isolate_connection(sstring isolation_cookie);
81
82 /// \brief Resource limits for an RPC server
83 ///
84 /// A request's memory use will be estimated as
85 ///
86 /// req_mem = basic_request_size * sizeof(serialized_request) * bloat_factor
87 ///
88 /// Concurrent requests will be limited so that
89 ///
90 /// sum(req_mem) <= max_memory
91 ///
92 /// \see server
93 struct resource_limits {
94 size_t basic_request_size = 0; ///< Minimum request footprint in memory
95 unsigned bloat_factor = 1; ///< Serialized size multiplied by this to estimate memory used by request
96 size_t max_memory = rpc_semaphore::max_counter(); ///< Maximum amount of memory that may be consumed by all requests
97 /// Configures isolation for a connection based on its isolation cookie. May throw,
98 /// in which case the connection will be terminated.
99 std::function<isolation_config (sstring isolation_cookie)> isolate_connection = default_isolate_connection;
100 };
101
102 struct client_options {
103 std::optional<net::tcp_keepalive_params> keepalive;
104 bool tcp_nodelay = true;
105 bool reuseaddr = false;
106 compressor::factory* compressor_factory = nullptr;
107 bool send_timeout_data = true;
108 connection_id stream_parent = invalid_connection_id;
109 /// Configures how this connection is isolated from other connection on the same server.
110 ///
111 /// \see resource_limits::isolate_connection
112 sstring isolation_cookie;
113 };
114
115 /// @}
116
117 // RPC call that passes stream connection id as a parameter
118 // may arrive to a different shard from where the stream connection
119 // was opened, so the connection id is not known to a server that handles
120 // the RPC call. The shard that the stream connection belong to is know
121 // since it is a part of connection id, but this is not enough to locate
122 // a server instance the connection belongs to if there are more than one
123 // server on the shard. Stream domain parameter is here to help with that.
124 // Different servers on all shards logically belonging to the same service should
125 // belong to the same streaming domain. Only one server on each shard can belong to
126 // a particulr streaming domain.
127 class streaming_domain_type {
128 uint64_t _id;
129 public:
130 explicit streaming_domain_type(uint64_t id) : _id(id) {}
131 bool operator==(const streaming_domain_type& o) const {
132 return _id == o._id;
133 }
134 friend struct std::hash<streaming_domain_type>;
135 friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
136 };
137
138 /// \addtogroup rpc
139 /// @{
140
141 class server;
142
143 struct server_options {
144 compressor::factory* compressor_factory = nullptr;
145 bool tcp_nodelay = true;
146 std::optional<streaming_domain_type> streaming_domain;
147 server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
148 // optional filter function. If set, will be called with remote
149 // (connecting) address.
150 // Returning false will refuse the incoming connection.
151 // Returning true will allow the mechanism to proceed.
152 std::function<bool(const socket_address&)> filter_connection = {};
153 };
154
155 /// @}
156
157 inline
158 size_t
159 estimate_request_size(const resource_limits& lim, size_t serialized_size) {
160 return lim.basic_request_size + serialized_size * lim.bloat_factor;
161 }
162
163 struct negotiation_frame {
164 char magic[sizeof(rpc_magic) - 1];
165 uint32_t len; // additional negotiation data length; multiple negotiation_frame_feature_record structs
166 };
167
168 enum class protocol_features : uint32_t {
169 COMPRESS = 0,
170 TIMEOUT = 1,
171 CONNECTION_ID = 2,
172 STREAM_PARENT = 3,
173 ISOLATION = 4,
174 };
175
176 // internal representation of feature data
177 using feature_map = std::map<protocol_features, sstring>;
178
179 // An rpc signature, in the form signature<Ret (In0, In1, In2)>.
180 template <typename Function>
181 struct signature;
182
183 class logger {
184 std::function<void(const sstring&)> _logger;
185 ::seastar::logger* _seastar_logger = nullptr;
186
187 // _seastar_logger will always be used first if it's available
188 void log(const sstring& str) const {
189 if (_seastar_logger) {
190 // default level for log messages is `info`
191 _seastar_logger->info("{}", str);
192 } else if (_logger) {
193 _logger(str);
194 }
195 }
196
197 // _seastar_logger will always be used first if it's available
198 template <typename... Args>
199 void log(log_level level, const char* fmt, Args&&... args) const {
200 if (_seastar_logger) {
201 _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
202 // If the log level is at least `info`, fall back to legacy logging without explicit level.
203 // Ignore less severe levels in order not to spam user's log with messages during transition,
204 // i.e. when the user still only defines a level-less logger.
205 } else if (_logger && level <= log_level::info) {
206 _logger(format(fmt, std::forward<Args>(args)...));
207 }
208 }
209
210 public:
211 void set(std::function<void(const sstring&)> l) {
212 _logger = std::move(l);
213 }
214
215 void set(::seastar::logger* logger) {
216 _seastar_logger = logger;
217 }
218
219 void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
220 void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const;
221
222 void operator()(const client_info& info, const sstring& str) const;
223 void operator()(const client_info& info, log_level level, std::string_view str) const;
224
225 void operator()(const socket_address& addr, const sstring& str) const;
226 void operator()(const socket_address& addr, log_level level, std::string_view str) const;
227 };
228
229 class connection {
230 protected:
231 connected_socket _fd;
232 input_stream<char> _read_buf;
233 output_stream<char> _write_buf;
234 bool _error = false;
235 bool _connected = false;
236 promise<> _stopped;
237 stats _stats;
238 const logger& _logger;
239 // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
240 // The type of the pointer is erased here, but the original type is Serializer
241 void* _serializer;
242 struct outgoing_entry {
243 timer<rpc_clock_type> t;
244 snd_buf buf;
245 std::optional<promise<>> p = promise<>();
246 cancellable* pcancel = nullptr;
247 outgoing_entry(snd_buf b) : buf(std::move(b)) {}
248 outgoing_entry(outgoing_entry&& o) noexcept : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
249 o.p = std::nullopt;
250 }
251 ~outgoing_entry() {
252 if (p) {
253 if (pcancel) {
254 pcancel->cancel_send = std::function<void()>();
255 pcancel->send_back_pointer = nullptr;
256 }
257 p->set_value();
258 }
259 }
260 };
261 friend outgoing_entry;
262 std::list<outgoing_entry> _outgoing_queue;
263 condition_variable _outgoing_queue_cond;
264 future<> _send_loop_stopped = make_ready_future<>();
265 std::unique_ptr<compressor> _compressor;
266 bool _timeout_negotiated = false;
267 // stream related fields
268 bool _is_stream = false;
269 connection_id _id = invalid_connection_id;
270
271 std::unordered_map<connection_id, xshard_connection_ptr> _streams;
272 queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
273 semaphore _stream_sem = semaphore(max_stream_buffers_memory);
274 bool _sink_closed = true;
275 bool _source_closed = true;
276 // the future holds if sink is already closed
277 // if it is not ready it means the sink is been closed
278 future<bool> _sink_closed_future = make_ready_future<bool>(false);
279
280 bool is_stream() {
281 return _is_stream;
282 }
283
284 snd_buf compress(snd_buf buf);
285 future<> send_buffer(snd_buf buf);
286
287 enum class outgoing_queue_type {
288 request,
289 response,
290 stream = response
291 };
292
293 template<outgoing_queue_type QueueType> void send_loop();
294 future<> stop_send_loop();
295 future<std::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
296 bool stream_check_twoway_closed() {
297 return _sink_closed && _source_closed;
298 }
299 future<> stream_close();
300 future<> stream_process_incoming(rcv_buf&&);
301 future<> handle_stream_frame();
302
303 public:
304 connection(connected_socket&& fd, const logger& l, void* s, connection_id id = invalid_connection_id) : connection(l, s, id) {
305 set_socket(std::move(fd));
306 }
307 connection(const logger& l, void* s, connection_id id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
308 virtual ~connection() {}
309 void set_socket(connected_socket&& fd);
310 future<> send_negotiation_frame(feature_map features);
311 // functions below are public because they are used by external heavily templated functions
312 // and I am not smart enough to know how to define them as friends
313 future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
314 bool error() { return _error; }
315 void abort();
316 future<> stop() noexcept;
317 future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
318 future<> close_sink() {
319 _sink_closed = true;
320 if (stream_check_twoway_closed()) {
321 return stream_close();
322 }
323 return make_ready_future();
324 }
325 bool sink_closed() {
326 return _sink_closed;
327 }
328 future<> close_source() {
329 _source_closed = true;
330 if (stream_check_twoway_closed()) {
331 return stream_close();
332 }
333 return make_ready_future();
334 }
335 connection_id get_connection_id() const {
336 return _id;
337 }
338 xshard_connection_ptr get_stream(connection_id id) const;
339 void register_stream(connection_id id, xshard_connection_ptr c);
340 virtual socket_address peer_address() const = 0;
341
342 const logger& get_logger() const {
343 return _logger;
344 }
345
346 template<typename Serializer>
347 Serializer& serializer() {
348 return *static_cast<Serializer*>(_serializer);
349 }
350
351 template <typename FrameType>
352 typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in);
353
354 template <typename FrameType>
355 typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
356 friend class client;
357 template<typename Serializer, typename... Out>
358 friend class sink_impl;
359 template<typename Serializer, typename... In>
360 friend class source_impl;
361 };
362
363 struct deferred_snd_buf {
364 promise<> pr;
365 snd_buf data;
366 };
367
368 // send data Out...
369 template<typename Serializer, typename... Out>
370 class sink_impl : public sink<Out...>::impl {
371 // Used on the shard *this lives on.
372 alignas (cache_line_size) uint64_t _next_seq_num = 1;
373
374 // Used on the shard the _conn lives on.
375 struct alignas (cache_line_size) {
376 uint64_t last_seq_num = 0;
377 std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
378 } _remote_state;
379 public:
380 sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
381 future<> operator()(const Out&... args) override;
382 future<> close() override;
383 future<> flush() override;
384 ~sink_impl() override;
385 };
386
387 // receive data In...
388 template<typename Serializer, typename... In>
389 class source_impl : public source<In...>::impl {
390 public:
391 source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
392 future<std::optional<std::tuple<In...>>> operator()() override;
393 };
394
395 class client : public rpc::connection, public weakly_referencable<client> {
396 socket _socket;
397 id_type _message_id = 1;
398 struct reply_handler_base {
399 timer<rpc_clock_type> t;
400 cancellable* pcancel = nullptr;
401 virtual void operator()(client&, id_type, rcv_buf data) = 0;
402 virtual void timeout() {}
403 virtual void cancel() {}
404 virtual ~reply_handler_base() {
405 if (pcancel) {
406 pcancel->cancel_wait = std::function<void()>();
407 pcancel->wait_back_pointer = nullptr;
408 }
409 };
410 };
411 public:
412 template<typename Reply, typename Func>
413 struct reply_handler final : reply_handler_base {
414 Func func;
415 Reply reply;
416 reply_handler(Func&& f) : func(std::move(f)) {}
417 virtual void operator()(client& client, id_type msg_id, rcv_buf data) override {
418 return func(reply, client, msg_id, std::move(data));
419 }
420 virtual void timeout() override {
421 reply.done = true;
422 reply.p.set_exception(timeout_error());
423 }
424 virtual void cancel() override {
425 reply.done = true;
426 reply.p.set_exception(canceled_error());
427 }
428 virtual ~reply_handler() {}
429 };
430 private:
431 std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
432 socket_address _server_addr, _local_addr;
433 client_options _options;
434 std::optional<shared_promise<>> _client_negotiated = shared_promise<>();
435 weak_ptr<client> _parent; // for stream clients
436
437 private:
438 future<> negotiate_protocol(input_stream<char>& in);
439 void negotiate(feature_map server_features);
440 future<std::tuple<int64_t, std::optional<rcv_buf>>>
441 read_response_frame(input_stream<char>& in);
442 future<std::tuple<int64_t, std::optional<rcv_buf>>>
443 read_response_frame_compressed(input_stream<char>& in);
444 void send_loop() {
445 if (is_stream()) {
446 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
447 } else {
448 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::request>();
449 }
450 }
451 public:
452 /**
453 * Create client object which will attempt to connect to the remote address.
454 *
455 * @param l \ref seastar::logger to use for logging error messages
456 * @param s an optional connection serializer
457 * @param addr the remote address identifying this client
458 * @param local the local address of this client
459 */
460 client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
461 client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
462
463 /**
464 * Create client object which will attempt to connect to the remote address using the
465 * specified seastar::socket.
466 *
467 * @param l \ref seastar::logger to use for logging error messages
468 * @param s an optional connection serializer
469 * @param addr the remote address identifying this client
470 * @param local the local address of this client
471 * @param socket the socket object use to connect to the remote address
472 */
473 client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
474 client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
475
476 stats get_stats() const;
477 stats& get_stats_internal() {
478 return _stats;
479 }
480 auto next_message_id() { return _message_id++; }
481 void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
482 void wait_timed_out(id_type id);
483 future<> stop() noexcept;
484 void abort_all_streams();
485 void deregister_this_stream();
486 socket_address peer_address() const override {
487 return _server_addr;
488 }
489 future<> await_connection() {
490 if (!_client_negotiated) {
491 return make_ready_future<>();
492 } else {
493 return _client_negotiated->get_shared_future();
494 }
495 }
496 template<typename Serializer, typename... Out>
497 future<sink<Out...>> make_stream_sink(socket socket) {
498 return await_connection().then([this, socket = std::move(socket)] () mutable {
499 if (!this->get_connection_id()) {
500 return make_exception_future<sink<Out...>>(std::runtime_error("Streaming is not supported by the server"));
501 }
502 client_options o = _options;
503 o.stream_parent = this->get_connection_id();
504 o.send_timeout_data = false;
505 auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
506 c->_parent = this->weak_from_this();
507 c->_is_stream = true;
508 return c->await_connection().then([c, this] {
509 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
510 this->register_stream(c->get_connection_id(), s);
511 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
512 });
513 });
514 }
515 template<typename Serializer, typename... Out>
516 future<sink<Out...>> make_stream_sink() {
517 return make_stream_sink<Serializer, Out...>(make_socket());
518 }
519 };
520
521 class protocol_base;
522
523 class server {
524 private:
525 static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
526
527 public:
528 class connection : public rpc::connection, public enable_shared_from_this<connection> {
529 server& _server;
530 client_info _info;
531 connection_id _parent_id = invalid_connection_id;
532 std::optional<isolation_config> _isolation_config;
533 private:
534 future<> negotiate_protocol(input_stream<char>& in);
535 future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
536 read_request_frame_compressed(input_stream<char>& in);
537 future<feature_map> negotiate(feature_map requested);
538 void send_loop() {
539 if (is_stream()) {
540 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
541 } else {
542 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>();
543 }
544 }
545 future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
546 public:
547 connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
548 future<> process();
549 future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
550 client_info& info() { return _info; }
551 const client_info& info() const { return _info; }
552 stats get_stats() const {
553 stats res = _stats;
554 res.pending = _outgoing_queue.size();
555 return res;
556 }
557
558 stats& get_stats_internal() {
559 return _stats;
560 }
561 socket_address peer_address() const override {
562 return _info.addr;
563 }
564 // Resources will be released when this goes out of scope
565 future<resource_permit> wait_for_resources(size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
566 if (timeout) {
567 return get_units(_server._resources_available, memory_consumed, *timeout);
568 } else {
569 return get_units(_server._resources_available, memory_consumed);
570 }
571 }
572 size_t estimate_request_size(size_t serialized_size) {
573 return rpc::estimate_request_size(_server._limits, serialized_size);
574 }
575 size_t max_request_size() const {
576 return _server._limits.max_memory;
577 }
578 server& get_server() {
579 return _server;
580 }
581 future<> deregister_this_stream();
582 };
583 private:
584 protocol_base* _proto;
585 server_socket _ss;
586 resource_limits _limits;
587 rpc_semaphore _resources_available;
588 std::unordered_map<connection_id, shared_ptr<connection>> _conns;
589 promise<> _ss_stopped;
590 gate _reply_gate;
591 server_options _options;
592 uint64_t _next_client_id = 1;
593
594 public:
595 server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
596 server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
597 server(protocol_base* proto, server_socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{});
598 server(protocol_base* proto, server_options opts, server_socket, resource_limits memory_limit = resource_limits());
599 void accept();
600 future<> stop();
601 template<typename Func>
602 void foreach_connection(Func&& f) {
603 for (auto c : _conns) {
604 f(*c.second);
605 }
606 }
607 gate& reply_gate() {
608 return _reply_gate;
609 }
610 friend connection;
611 friend client;
612 };
613
614 using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
615 rcv_buf data)>;
616
617 struct rpc_handler {
618 scheduling_group sg;
619 rpc_handler_func func;
620 gate use_gate;
621 };
622
623 class protocol_base {
624 public:
625 virtual ~protocol_base() {};
626 virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
627 protected:
628 friend class server;
629
630 virtual rpc_handler* get_handler(uint64_t msg_id) = 0;
631 virtual void put_handler(rpc_handler*) = 0;
632 };
633
634 /// \addtogroup rpc
635 /// @{
636
637 /// Defines a protocol for communication between a server and a client.
638 ///
639 /// A protocol is defined by a `Serializer` and a `MsgType`. The `Serializer` is
640 /// responsible for serializing and unserializing all types used as arguments and
641 /// return types used in the protocol. The `Serializer` is expected to define a
642 /// `read()` and `write()` method for each such type `T` as follows:
643 ///
644 /// template <typename Output>
645 /// void write(const serializer&, Output& output, const T& data);
646 ///
647 /// template <typename Input>
648 /// T read(const serializer&, Input& input, type<T> type_tag); // type_tag used to disambiguate
649 ///
650 /// Where `Input` and `Output` have a `void read(char*, size_t)` and
651 /// `write(const char*, size_t)` respectively.
652 /// `MsgType` defines the type to be used as the message id, the id which is
653 /// used to identify different messages used in the protocol. These are also
654 /// often referred to as "verbs". The client will use the message id, to
655 /// specify the remote method (verb) to invoke on the server. The server uses
656 /// the message id to dispatch the incoming call to the right handler.
657 /// `MsgType` should be hashable and serializable. It is preferable to use enum
658 /// for message types, but do not forget to provide hash function for it.
659 ///
660 /// Use register_handler() on the server to define the available verbs and the
661 /// code to be executed when they are invoked by clients. Use make_client() on
662 /// the client to create a matching callable that can be used to invoke the
663 /// verb on the server and wait for its result. Note that register_handler()
664 /// also returns a client, that can be used to invoke the registered verb on
665 /// another node (given that the other node has the same verb). This is useful
666 /// for symmetric protocols, where two or more nodes all have servers as well as
667 /// connect to the other nodes as clients.
668 ///
669 /// Use protocol::server to listen for and accept incoming connections on the
670 /// server and protocol::client to establish connections to the server.
671 /// Note that registering the available verbs can be done before/after
672 /// listening for connections, but best to ensure that by the time incoming
673 /// requests are to be expected, all the verbs are set-up.
674 ///
675 /// ## Configuration
676 ///
677 /// TODO
678 ///
679 /// ## Isolation
680 ///
681 /// RPC supports isolating verb handlers from each other. There are two ways to
682 /// achieve this: per-handler isolation (the old way) and per-connection
683 /// isolation (the new way). If no isolation is configured, all handlers will be
684 /// executed in the context of the scheduling_group in which the
685 /// protocol::server was created.
686 ///
687 /// Per-handler isolation (the old way) can be configured by using the
688 /// register_handler() overload which takes a scheduling_group. When invoked,
689 /// the body of the handler will be executed from the context of the configured
690 /// scheduling_group.
691 ///
692 /// Per-connection isolation (the new way) is a more flexible mechanism that
693 /// requires user application provided logic to determine how connections are
694 /// isolated. This mechanism has two parts, the server and the client part.
695 /// The client configures isolation by setting client_options::isolation_cookie.
696 /// This cookie is an opaque (to the RPC layer) string that is to be interpreted
697 /// on the server using user application provided logic. The application
698 /// provides this logic to the server by setting
699 /// resource_limits::isolate_connection to an appropriate handler function, that
700 /// interprets the opaque cookie and resolves it to an isolation_config. The
701 /// scheduling_group in the former will be used not just to execute all verb
702 /// handlers, but also the connection loop itself, hence providing better
703 /// isolation.
704 ///
705 /// There a few gotchas related to mixing the two isolation mechanisms. This can
706 /// happen when the application is updated and one of the client/server is
707 /// still using the old/new mechanism. In general per-connection isolation
708 /// overrides the per-handler one. If both are set up, the former will determine
709 /// the scheduling_group context for the handlers. If the client is not
710 /// configured to send an isolation cookie, the server's
711 /// resource_limits::isolate_connection will not be invoked and the server will
712 /// fall back to per-handler isolation if configured. If the client is
713 /// configured to send an isolation cookie but the server doesn't have a
714 /// resource_limits::isolate_connection configured, it will use
715 /// default_isolate_connection() to interpret the cookie. Note that this still
716 /// overrides the per-handler isolation if any is configured. If the server is
717 /// so old that it doesn't have the per-connection isolation feature at all, it
718 /// will of course just use the per-handler one, if configured.
719 ///
720 /// ## Compatibility
721 ///
722 /// TODO
723 ///
724 /// \tparam Serializer the serializer for the protocol.
725 /// \tparam MsgType the type to be used as the message id or verb id.
726 template<typename Serializer, typename MsgType = uint32_t>
727 class protocol final : public protocol_base {
728 public:
729 /// Represents the listening port and all accepted connections.
730 class server : public rpc::server {
731 public:
732 server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
733 rpc::server(&proto, addr, memory_limit) {}
734 server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
735 rpc::server(&proto, opts, addr, memory_limit) {}
736 server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{}) :
737 rpc::server(&proto, std::move(socket), memory_limit) {}
738 server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) :
739 rpc::server(&proto, opts, std::move(socket), memory_limit) {}
740 };
741 /// Represents a client side connection.
742 class client : public rpc::client {
743 public:
744 /*
745 * Create client object which will attempt to connect to the remote address.
746 *
747 * @param addr the remote address identifying this client
748 * @param local the local address of this client
749 */
750 client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
751 rpc::client(p.get_logger(), &p._serializer, addr, local) {}
752 client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
753 rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
754
755 /**
756 * Create client object which will attempt to connect to the remote address using the
757 * specified seastar::socket.
758 *
759 * @param addr the remote address identifying this client
760 * @param local the local address of this client
761 * @param socket the socket object use to connect to the remote address
762 */
763 client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
764 rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
765 client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
766 rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
767 };
768
769 friend server;
770 private:
771 std::unordered_map<MsgType, rpc_handler> _handlers;
772 Serializer _serializer;
773 logger _logger;
774
775 public:
776 protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
777
778 /// Creates a callable that can be used to invoke the verb on the remote.
779 ///
780 /// \tparam Func The signature of the verb. Has to be either the same or
781 /// compatible with the one passed to register_handler on the server.
782 /// \param t the verb to invoke on the remote.
783 ///
784 /// \returns a callable whose signature is derived from Func as follows:
785 /// given `Func == Ret(Args...)` the returned callable has the following
786 /// signature: `future<Ret>(protocol::client&, Args...)`.
787 template<typename Func>
788 auto make_client(MsgType t);
789
790 /// Register a handler to be called when this verb is invoked.
791 ///
792 /// \tparam Func the type of the handler for the verb. This determines the
793 /// signature of the verb.
794 /// \param t the verb to register the handler for.
795 /// \param func the callable to be called when the verb is invoked by the
796 /// remote.
797 ///
798 /// \returns a client, a callable that can be used to invoke the verb. See
799 /// make_client(). The client can be discarded, in fact this is what
800 /// most callers will do as real clients will live on a remote node, not
801 /// on the one where handlers are registered.
802 template<typename Func>
803 auto register_handler(MsgType t, Func&& func);
804
805 /// Register a handler to be called when this verb is invoked.
806 ///
807 /// \tparam Func the type of the handler for the verb. This determines the
808 /// signature of the verb.
809 /// \param t the verb to register the handler for.
810 /// \param sg the scheduling group that will be used to invoke the handler
811 /// in. This can be used to execute different verbs in different
812 /// scheduling groups. Note that there is a newer mechanism to determine
813 /// the scheduling groups a handler will run it per invocation, see
814 /// isolation_config.
815 /// \param func the callable to be called when the verb is invoked by the
816 /// remote.
817 ///
818 /// \returns a client, a callable that can be used to invoke the verb. See
819 /// make_client(). The client can be discarded, in fact this is what
820 /// most callers will do as real clients will live on a remote node, not
821 /// on the one where handlers are registered.
822 template <typename Func>
823 auto register_handler(MsgType t, scheduling_group sg, Func&& func);
824
825 /// Unregister the handler for the verb.
826 ///
827 /// Waits for all currently running handlers, then unregisters the handler.
828 /// Future attempts to invoke the verb will fail. This becomes effective
829 /// immediately after calling this function.
830 ///
831 /// \param t the verb to unregister the handler for.
832 ///
833 /// \returns a future that becomes available once all currently running
834 /// handlers finished.
835 future<> unregister_handler(MsgType t);
836
837 /// Set a logger function to be used to log messages.
838 ///
839 /// \deprecated use the logger overload set_logger(::seastar::logger*)
840 /// instead.
841 [[deprecated("Use set_logger(::seastar::logger*) instead")]]
842 void set_logger(std::function<void(const sstring&)> logger) {
843 _logger.set(std::move(logger));
844 }
845
846 /// Set a logger to be used to log messages.
847 void set_logger(::seastar::logger* logger) {
848 _logger.set(logger);
849 }
850
851 const logger& get_logger() const {
852 return _logger;
853 }
854
855 shared_ptr<rpc::server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) override {
856 return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
857 }
858
859 bool has_handler(MsgType msg_id);
860
861 /// Checks if any there are handlers registered.
862 /// Debugging helper, should only be used for debugging and not relied on.
863 ///
864 /// \returns true if there are, false if there are no registered handlers.
865 bool has_handlers() const noexcept {
866 return !_handlers.empty();
867 }
868
869 private:
870 rpc_handler* get_handler(uint64_t msg_id) override;
871 void put_handler(rpc_handler*) override;
872
873 template<typename Ret, typename... In>
874 auto make_client(signature<Ret(In...)> sig, MsgType t);
875
876 void register_receiver(MsgType t, rpc_handler&& handler) {
877 auto r = _handlers.emplace(t, std::move(handler));
878 if (!r.second) {
879 throw_with_backtrace<std::runtime_error>("registered handler already exists");
880 }
881 }
882 };
883
884 /// @}
885
886 }
887
888 }
889
890 #include "rpc_impl.hh"