]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/rpc/rpc.hh
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22 #pragma once
23
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <list>
27 #include <seastar/core/future.hh>
28 #include <seastar/core/seastar.hh>
29 #include <seastar/net/api.hh>
30 #include <seastar/core/iostream.hh>
31 #include <seastar/core/shared_ptr.hh>
32 #include <seastar/core/condition-variable.hh>
33 #include <seastar/core/gate.hh>
34 #include <seastar/rpc/rpc_types.hh>
35 #include <seastar/core/byteorder.hh>
36 #include <seastar/core/shared_future.hh>
37 #include <seastar/core/queue.hh>
38 #include <seastar/core/weak_ptr.hh>
39 #include <seastar/core/scheduling.hh>
40 #include <seastar/util/backtrace.hh>
41 #include <seastar/util/log.hh>
42
43 namespace seastar {
44
45 namespace rpc {
46
47 /// \defgroup rpc rpc - remote procedure call framework
48 ///
49 /// \brief
50 /// rpc is a framework that can be used to define client-server communication
51 /// protocols.
52 /// For a high-level description of the RPC features see
53 /// [doc/rpc.md](./md_rpc.html),
54 /// [doc/rpc-streaming.md](./md_rpc-streaming.html) and
55 /// [doc/rpc-compression.md](./md_rpc-compression.html)
56 ///
57 /// The entry point for setting up an rpc protocol is
58 /// seastar::rpc::protocol.
59
60 using id_type = int64_t;
61
62 using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
63 using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
64
65 static constexpr char rpc_magic[] = "SSTARRPC";
66
67 /// \addtogroup rpc
68 /// @{
69
70 /// Specifies resource isolation for a connection.
71 struct isolation_config {
72 /// Specifies a scheduling group under which the connection (and all its
73 /// verb handlers) will execute.
74 scheduling_group sched_group = current_scheduling_group();
75 };
76
77 /// Default isolation configuration - run everything in the default scheduling group.
78 ///
79 /// In the scheduling_group that the protocol::server was created in.
80 isolation_config default_isolate_connection(sstring isolation_cookie);
81
82 /// \brief Resource limits for an RPC server
83 ///
84 /// A request's memory use will be estimated as
85 ///
86 /// req_mem = basic_request_size * sizeof(serialized_request) * bloat_factor
87 ///
88 /// Concurrent requests will be limited so that
89 ///
90 /// sum(req_mem) <= max_memory
91 ///
92 /// \see server
93 struct resource_limits {
94 size_t basic_request_size = 0; ///< Minimum request footprint in memory
95 unsigned bloat_factor = 1; ///< Serialized size multiplied by this to estimate memory used by request
96 size_t max_memory = rpc_semaphore::max_counter(); ///< Maximum amount of memory that may be consumed by all requests
97 /// Configures isolation for a connection based on its isolation cookie. May throw,
98 /// in which case the connection will be terminated.
99 std::function<isolation_config (sstring isolation_cookie)> isolate_connection = default_isolate_connection;
100 };
101
102 struct client_options {
103 std::optional<net::tcp_keepalive_params> keepalive;
104 bool tcp_nodelay = true;
105 bool reuseaddr = false;
106 compressor::factory* compressor_factory = nullptr;
107 bool send_timeout_data = true;
108 connection_id stream_parent = invalid_connection_id;
109 /// Configures how this connection is isolated from other connection on the same server.
110 ///
111 /// \see resource_limits::isolate_connection
112 sstring isolation_cookie;
113 };
114
115 /// @}
116
117 // RPC call that passes stream connection id as a parameter
118 // may arrive to a different shard from where the stream connection
119 // was opened, so the connection id is not known to a server that handles
120 // the RPC call. The shard that the stream connection belong to is know
121 // since it is a part of connection id, but this is not enough to locate
122 // a server instance the connection belongs to if there are more than one
123 // server on the shard. Stream domain parameter is here to help with that.
124 // Different servers on all shards logically belonging to the same service should
125 // belong to the same streaming domain. Only one server on each shard can belong to
126 // a particulr streaming domain.
127 class streaming_domain_type {
128 uint64_t _id;
129 public:
130 explicit streaming_domain_type(uint64_t id) : _id(id) {}
131 bool operator==(const streaming_domain_type& o) const {
132 return _id == o._id;
133 }
134 friend struct std::hash<streaming_domain_type>;
135 friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
136 };
137
138 /// \addtogroup rpc
139 /// @{
140
141 struct server_options {
142 compressor::factory* compressor_factory = nullptr;
143 bool tcp_nodelay = true;
144 std::optional<streaming_domain_type> streaming_domain;
145 server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
146 };
147
148 /// @}
149
150 inline
151 size_t
152 estimate_request_size(const resource_limits& lim, size_t serialized_size) {
153 return lim.basic_request_size + serialized_size * lim.bloat_factor;
154 }
155
156 struct negotiation_frame {
157 char magic[sizeof(rpc_magic) - 1];
158 uint32_t len; // additional negotiation data length; multiple negotiation_frame_feature_record structs
159 };
160
161 enum class protocol_features : uint32_t {
162 COMPRESS = 0,
163 TIMEOUT = 1,
164 CONNECTION_ID = 2,
165 STREAM_PARENT = 3,
166 ISOLATION = 4,
167 };
168
169 // internal representation of feature data
170 using feature_map = std::map<protocol_features, sstring>;
171
172 // An rpc signature, in the form signature<Ret (In0, In1, In2)>.
173 template <typename Function>
174 struct signature;
175
176 class logger {
177 std::function<void(const sstring&)> _logger;
178 ::seastar::logger* _seastar_logger = nullptr;
179
180 // _seastar_logger will always be used first if it's available
181 void log(const sstring& str) const {
182 if (_seastar_logger) {
183 // default level for log messages is `info`
184 _seastar_logger->info("{}", str);
185 } else if (_logger) {
186 _logger(str);
187 }
188 }
189
190 // _seastar_logger will always be used first if it's available
191 template <typename... Args>
192 void log(log_level level, const char* fmt, Args&&... args) const {
193 if (_seastar_logger) {
194 _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
195 // If the log level is at least `info`, fall back to legacy logging without explicit level.
196 // Ignore less severe levels in order not to spam user's log with messages during transition,
197 // i.e. when the user still only defines a level-less logger.
198 } else if (_logger && level <= log_level::info) {
199 _logger(format(fmt, std::forward<Args>(args)...));
200 }
201 }
202
203 public:
204 void set(std::function<void(const sstring&)> l) {
205 _logger = std::move(l);
206 }
207
208 void set(::seastar::logger* logger) {
209 _seastar_logger = logger;
210 }
211
212 void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
213 void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const;
214
215 void operator()(const client_info& info, const sstring& str) const;
216 void operator()(const client_info& info, log_level level, std::string_view str) const;
217
218 void operator()(const socket_address& addr, const sstring& str) const;
219 void operator()(const socket_address& addr, log_level level, std::string_view str) const;
220 };
221
222 class connection {
223 protected:
224 connected_socket _fd;
225 input_stream<char> _read_buf;
226 output_stream<char> _write_buf;
227 bool _error = false;
228 bool _connected = false;
229 promise<> _stopped;
230 stats _stats;
231 const logger& _logger;
232 // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
233 // The type of the pointer is erased here, but the original type is Serializer
234 void* _serializer;
235 struct outgoing_entry {
236 timer<rpc_clock_type> t;
237 snd_buf buf;
238 std::optional<promise<>> p = promise<>();
239 cancellable* pcancel = nullptr;
240 outgoing_entry(snd_buf b) : buf(std::move(b)) {}
241 outgoing_entry(outgoing_entry&& o) noexcept : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
242 o.p = std::nullopt;
243 }
244 ~outgoing_entry() {
245 if (p) {
246 if (pcancel) {
247 pcancel->cancel_send = std::function<void()>();
248 pcancel->send_back_pointer = nullptr;
249 }
250 p->set_value();
251 }
252 }
253 };
254 friend outgoing_entry;
255 std::list<outgoing_entry> _outgoing_queue;
256 condition_variable _outgoing_queue_cond;
257 future<> _send_loop_stopped = make_ready_future<>();
258 std::unique_ptr<compressor> _compressor;
259 bool _timeout_negotiated = false;
260 // stream related fields
261 bool _is_stream = false;
262 connection_id _id = invalid_connection_id;
263
264 std::unordered_map<connection_id, xshard_connection_ptr> _streams;
265 queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
266 semaphore _stream_sem = semaphore(max_stream_buffers_memory);
267 bool _sink_closed = true;
268 bool _source_closed = true;
269 // the future holds if sink is already closed
270 // if it is not ready it means the sink is been closed
271 future<bool> _sink_closed_future = make_ready_future<bool>(false);
272
273 bool is_stream() {
274 return _is_stream;
275 }
276
277 snd_buf compress(snd_buf buf);
278 future<> send_buffer(snd_buf buf);
279
280 enum class outgoing_queue_type {
281 request,
282 response,
283 stream = response
284 };
285
286 template<outgoing_queue_type QueueType> void send_loop();
287 future<> stop_send_loop();
288 future<std::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
289 bool stream_check_twoway_closed() {
290 return _sink_closed && _source_closed;
291 }
292 future<> stream_close();
293 future<> stream_process_incoming(rcv_buf&&);
294 future<> handle_stream_frame();
295
296 public:
297 connection(connected_socket&& fd, const logger& l, void* s, connection_id id = invalid_connection_id) : connection(l, s, id) {
298 set_socket(std::move(fd));
299 }
300 connection(const logger& l, void* s, connection_id id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
301 virtual ~connection() {}
302 void set_socket(connected_socket&& fd);
303 future<> send_negotiation_frame(feature_map features);
304 // functions below are public because they are used by external heavily templated functions
305 // and I am not smart enough to know how to define them as friends
306 future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
307 bool error() { return _error; }
308 void abort();
309 future<> stop();
310 future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
311 future<> close_sink() {
312 _sink_closed = true;
313 if (stream_check_twoway_closed()) {
314 return stream_close();
315 }
316 return make_ready_future();
317 }
318 bool sink_closed() {
319 return _sink_closed;
320 }
321 future<> close_source() {
322 _source_closed = true;
323 if (stream_check_twoway_closed()) {
324 return stream_close();
325 }
326 return make_ready_future();
327 }
328 connection_id get_connection_id() const {
329 return _id;
330 }
331 xshard_connection_ptr get_stream(connection_id id) const;
332 void register_stream(connection_id id, xshard_connection_ptr c);
333 virtual socket_address peer_address() const = 0;
334
335 const logger& get_logger() const {
336 return _logger;
337 }
338
339 template<typename Serializer>
340 Serializer& serializer() {
341 return *static_cast<Serializer*>(_serializer);
342 }
343
344 template <typename FrameType>
345 typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in);
346
347 template <typename FrameType>
348 typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
349 friend class client;
350 template<typename Serializer, typename... Out>
351 friend class sink_impl;
352 template<typename Serializer, typename... In>
353 friend class source_impl;
354 };
355
356 // send data Out...
357 template<typename Serializer, typename... Out>
358 class sink_impl : public sink<Out...>::impl {
359 public:
360 sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
361 future<> operator()(const Out&... args) override;
362 future<> close() override;
363 future<> flush() override;
364 ~sink_impl() override;
365 };
366
367 // receive data In...
368 template<typename Serializer, typename... In>
369 class source_impl : public source<In...>::impl {
370 public:
371 source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
372 future<std::optional<std::tuple<In...>>> operator()() override;
373 };
374
375 class client : public rpc::connection, public weakly_referencable<client> {
376 socket _socket;
377 id_type _message_id = 1;
378 struct reply_handler_base {
379 timer<rpc_clock_type> t;
380 cancellable* pcancel = nullptr;
381 virtual void operator()(client&, id_type, rcv_buf data) = 0;
382 virtual void timeout() {}
383 virtual void cancel() {}
384 virtual ~reply_handler_base() {
385 if (pcancel) {
386 pcancel->cancel_wait = std::function<void()>();
387 pcancel->wait_back_pointer = nullptr;
388 }
389 };
390 };
391 public:
392 template<typename Reply, typename Func>
393 struct reply_handler final : reply_handler_base {
394 Func func;
395 Reply reply;
396 reply_handler(Func&& f) : func(std::move(f)) {}
397 virtual void operator()(client& client, id_type msg_id, rcv_buf data) override {
398 return func(reply, client, msg_id, std::move(data));
399 }
400 virtual void timeout() override {
401 reply.done = true;
402 reply.p.set_exception(timeout_error());
403 }
404 virtual void cancel() override {
405 reply.done = true;
406 reply.p.set_exception(canceled_error());
407 }
408 virtual ~reply_handler() {}
409 };
410 private:
411 std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
412 socket_address _server_addr;
413 client_options _options;
414 std::optional<shared_promise<>> _client_negotiated = shared_promise<>();
415 weak_ptr<client> _parent; // for stream clients
416
417 private:
418 future<> negotiate_protocol(input_stream<char>& in);
419 void negotiate(feature_map server_features);
420 future<std::tuple<int64_t, std::optional<rcv_buf>>>
421 read_response_frame(input_stream<char>& in);
422 future<std::tuple<int64_t, std::optional<rcv_buf>>>
423 read_response_frame_compressed(input_stream<char>& in);
424 void send_loop() {
425 if (is_stream()) {
426 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
427 } else {
428 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::request>();
429 }
430 }
431 public:
432 /**
433 * Create client object which will attempt to connect to the remote address.
434 *
435 * @param l \ref seastar::logger to use for logging error messages
436 * @param s an optional connection serializer
437 * @param addr the remote address identifying this client
438 * @param local the local address of this client
439 */
440 client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
441 client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
442
443 /**
444 * Create client object which will attempt to connect to the remote address using the
445 * specified seastar::socket.
446 *
447 * @param l \ref seastar::logger to use for logging error messages
448 * @param s an optional connection serializer
449 * @param addr the remote address identifying this client
450 * @param local the local address of this client
451 * @param socket the socket object use to connect to the remote address
452 */
453 client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
454 client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
455
456 stats get_stats() const;
457 stats& get_stats_internal() {
458 return _stats;
459 }
460 auto next_message_id() { return _message_id++; }
461 void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
462 void wait_timed_out(id_type id);
463 future<> stop();
464 void abort_all_streams();
465 void deregister_this_stream();
466 socket_address peer_address() const override {
467 return _server_addr;
468 }
469 future<> await_connection() {
470 if (!_client_negotiated) {
471 return make_ready_future<>();
472 } else {
473 return _client_negotiated->get_shared_future();
474 }
475 }
476 template<typename Serializer, typename... Out>
477 future<sink<Out...>> make_stream_sink(socket socket) {
478 return await_connection().then([this, socket = std::move(socket)] () mutable {
479 if (!this->get_connection_id()) {
480 return make_exception_future<sink<Out...>>(std::runtime_error("Streaming is not supported by the server"));
481 }
482 client_options o = _options;
483 o.stream_parent = this->get_connection_id();
484 o.send_timeout_data = false;
485 auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr);
486 c->_parent = this->weak_from_this();
487 c->_is_stream = true;
488 return c->await_connection().then([c, this] {
489 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
490 this->register_stream(c->get_connection_id(), s);
491 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
492 });
493 });
494 }
495 template<typename Serializer, typename... Out>
496 future<sink<Out...>> make_stream_sink() {
497 return make_stream_sink<Serializer, Out...>(make_socket());
498 }
499 };
500
501 class protocol_base;
502
503 class server {
504 private:
505 static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
506
507 public:
508 class connection : public rpc::connection, public enable_shared_from_this<connection> {
509 server& _server;
510 client_info _info;
511 connection_id _parent_id = invalid_connection_id;
512 std::optional<isolation_config> _isolation_config;
513 private:
514 future<> negotiate_protocol(input_stream<char>& in);
515 future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
516 read_request_frame_compressed(input_stream<char>& in);
517 future<feature_map> negotiate(feature_map requested);
518 void send_loop() {
519 if (is_stream()) {
520 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
521 } else {
522 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>();
523 }
524 }
525 future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
526 public:
527 connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
528 future<> process();
529 future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
530 client_info& info() { return _info; }
531 const client_info& info() const { return _info; }
532 stats get_stats() const {
533 stats res = _stats;
534 res.pending = _outgoing_queue.size();
535 return res;
536 }
537
538 stats& get_stats_internal() {
539 return _stats;
540 }
541 socket_address peer_address() const override {
542 return _info.addr;
543 }
544 // Resources will be released when this goes out of scope
545 future<resource_permit> wait_for_resources(size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
546 if (timeout) {
547 return get_units(_server._resources_available, memory_consumed, *timeout);
548 } else {
549 return get_units(_server._resources_available, memory_consumed);
550 }
551 }
552 size_t estimate_request_size(size_t serialized_size) {
553 return rpc::estimate_request_size(_server._limits, serialized_size);
554 }
555 size_t max_request_size() const {
556 return _server._limits.max_memory;
557 }
558 server& get_server() {
559 return _server;
560 }
561 future<> deregister_this_stream();
562 };
563 private:
564 protocol_base* _proto;
565 server_socket _ss;
566 resource_limits _limits;
567 rpc_semaphore _resources_available;
568 std::unordered_map<connection_id, shared_ptr<connection>> _conns;
569 promise<> _ss_stopped;
570 gate _reply_gate;
571 server_options _options;
572 uint64_t _next_client_id = 1;
573
574 public:
575 server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
576 server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
577 server(protocol_base* proto, server_socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{});
578 server(protocol_base* proto, server_options opts, server_socket, resource_limits memory_limit = resource_limits());
579 void accept();
580 future<> stop();
581 template<typename Func>
582 void foreach_connection(Func&& f) {
583 for (auto c : _conns) {
584 f(*c.second);
585 }
586 }
587 gate& reply_gate() {
588 return _reply_gate;
589 }
590 friend connection;
591 friend client;
592 };
593
594 using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
595 rcv_buf data)>;
596
597 struct rpc_handler {
598 scheduling_group sg;
599 rpc_handler_func func;
600 gate use_gate;
601 };
602
603 class protocol_base {
604 public:
605 virtual ~protocol_base() {};
606 virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
607 protected:
608 friend class server;
609
610 virtual rpc_handler* get_handler(uint64_t msg_id) = 0;
611 virtual void put_handler(rpc_handler*) = 0;
612 };
613
614 /// \addtogroup rpc
615 /// @{
616
617 /// Defines a protocol for communication between a server and a client.
618 ///
619 /// A protocol is defined by a `Serializer` and a `MsgType`. The `Serializer` is
620 /// responsible for serializing and unserializing all types used as arguments and
621 /// return types used in the protocol. The `Serializer` is expected to define a
622 /// `read()` and `write()` method for each such type `T` as follows:
623 ///
624 /// template <typename Output>
625 /// void write(const serializer&, Output& output, const T& data);
626 ///
627 /// template <typename Input>
628 /// T read(const serializer&, Input& input, type<T> type_tag); // type_tag used to disambiguate
629 ///
630 /// Where `Input` and `Output` have a `void read(char*, size_t)` and
631 /// `write(const char*, size_t)` respectively.
632 /// `MsgType` defines the type to be used as the message id, the id which is
633 /// used to identify different messages used in the protocol. These are also
634 /// often referred to as "verbs". The client will use the message id, to
635 /// specify the remote method (verb) to invoke on the server. The server uses
636 /// the message id to dispatch the incoming call to the right handler.
637 /// `MsgType` should be hashable and serializable. It is preferable to use enum
638 /// for message types, but do not forget to provide hash function for it.
639 ///
640 /// Use register_handler() on the server to define the available verbs and the
641 /// code to be executed when they are invoked by clients. Use make_client() on
642 /// the client to create a matching callable that can be used to invoke the
643 /// verb on the server and wait for its result. Note that register_handler()
644 /// also returns a client, that can be used to invoke the registered verb on
645 /// another node (given that the other node has the same verb). This is useful
646 /// for symmetric protocols, where two or more nodes all have servers as well as
647 /// connect to the other nodes as clients.
648 ///
649 /// Use protocol::server to listen for and accept incoming connections on the
650 /// server and protocol::client to establish connections to the server.
651 /// Note that registering the available verbs can be done before/after
652 /// listening for connections, but best to ensure that by the time incoming
653 /// requests are to be expected, all the verbs are set-up.
654 ///
655 /// ## Configuration
656 ///
657 /// TODO
658 ///
659 /// ## Isolation
660 ///
661 /// RPC supports isolating verb handlers from each other. There are two ways to
662 /// achieve this: per-handler isolation (the old way) and per-connection
663 /// isolation (the new way). If no isolation is configured, all handlers will be
664 /// executed in the context of the scheduling_group in which the
665 /// protocol::server was created.
666 ///
667 /// Per-handler isolation (the old way) can be configured by using the
668 /// register_handler() overload which takes a scheduling_group. When invoked,
669 /// the body of the handler will be executed from the context of the configured
670 /// scheduling_group.
671 ///
672 /// Per-connection isolation (the new way) is a more flexible mechanism that
673 /// requires user application provided logic to determine how connections are
674 /// isolated. This mechanism has two parts, the server and the client part.
675 /// The client configures isolation by setting client_options::isolation_cookie.
676 /// This cookie is an opaque (to the RPC layer) string that is to be interpreted
677 /// on the server using user application provided logic. The application
678 /// provides this logic to the server by setting
679 /// resource_limits::isolate_connection to an appropriate handler function, that
680 /// interprets the opaque cookie and resolves it to an isolation_config. The
681 /// scheduling_group in the former will be used not just to execute all verb
682 /// handlers, but also the connection loop itself, hence providing better
683 /// isolation.
684 ///
685 /// There a few gotchas related to mixing the two isolation mechanisms. This can
686 /// happen when the application is updated and one of the client/server is
687 /// still using the old/new mechanism. In general per-connection isolation
688 /// overrides the per-handler one. If both are set up, the former will determine
689 /// the scheduling_group context for the handlers. If the client is not
690 /// configured to send an isolation cookie, the server's
691 /// resource_limits::isolate_connection will not be invoked and the server will
692 /// fall back to per-handler isolation if configured. If the client is
693 /// configured to send an isolation cookie but the server doesn't have a
694 /// resource_limits::isolate_connection configured, it will use
695 /// default_isolate_connection() to interpret the cookie. Note that this still
696 /// overrides the per-handler isolation if any is configured. If the server is
697 /// so old that it doesn't have the per-connection isolation feature at all, it
698 /// will of course just use the per-handler one, if configured.
699 ///
700 /// ## Compatibility
701 ///
702 /// TODO
703 ///
704 /// \tparam Serializer the serializer for the protocol.
705 /// \tparam MsgType the type to be used as the message id or verb id.
706 template<typename Serializer, typename MsgType = uint32_t>
707 class protocol : public protocol_base {
708 public:
709 /// Represents the listening port and all accepted connections.
710 class server : public rpc::server {
711 public:
712 server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
713 rpc::server(&proto, addr, memory_limit) {}
714 server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
715 rpc::server(&proto, opts, addr, memory_limit) {}
716 server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{}) :
717 rpc::server(&proto, std::move(socket), memory_limit) {}
718 server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) :
719 rpc::server(&proto, opts, std::move(socket), memory_limit) {}
720 };
721 /// Represents a client side connection.
722 class client : public rpc::client {
723 public:
724 /*
725 * Create client object which will attempt to connect to the remote address.
726 *
727 * @param addr the remote address identifying this client
728 * @param local the local address of this client
729 */
730 client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
731 rpc::client(p.get_logger(), &p._serializer, addr, local) {}
732 client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
733 rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
734
735 /**
736 * Create client object which will attempt to connect to the remote address using the
737 * specified seastar::socket.
738 *
739 * @param addr the remote address identifying this client
740 * @param local the local address of this client
741 * @param socket the socket object use to connect to the remote address
742 */
743 client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
744 rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
745 client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
746 rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
747 };
748
749 friend server;
750 private:
751 std::unordered_map<MsgType, rpc_handler> _handlers;
752 Serializer _serializer;
753 logger _logger;
754
755 public:
756 protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
757
758 /// Creates a callable that can be used to invoke the verb on the remote.
759 ///
760 /// \tparam Func The signature of the verb. Has to be either the same or
761 /// compatible with the one passed to register_handler on the server.
762 /// \param t the verb to invoke on the remote.
763 ///
764 /// \returns a callable whose signature is derived from Func as follows:
765 /// given `Func == Ret(Args...)` the returned callable has the following
766 /// signature: `future<Ret>(protocol::client&, Args...)`.
767 template<typename Func>
768 auto make_client(MsgType t);
769
770 /// Register a handler to be called when this verb is invoked.
771 ///
772 /// \tparam Func the type of the handler for the verb. This determines the
773 /// signature of the verb.
774 /// \param t the verb to register the handler for.
775 /// \param func the callable to be called when the verb is invoked by the
776 /// remote.
777 ///
778 /// \returns a client, a callable that can be used to invoke the verb. See
779 /// make_client(). The client can be discarded, in fact this is what
780 /// most callers will do as real clients will live on a remote node, not
781 /// on the one where handlers are registered.
782 template<typename Func>
783 auto register_handler(MsgType t, Func&& func);
784
785 /// Register a handler to be called when this verb is invoked.
786 ///
787 /// \tparam Func the type of the handler for the verb. This determines the
788 /// signature of the verb.
789 /// \param t the verb to register the handler for.
790 /// \param sg the scheduling group that will be used to invoke the handler
791 /// in. This can be used to execute different verbs in different
792 /// scheduling groups. Note that there is a newer mechanism to determine
793 /// the scheduling groups a handler will run it per invocation, see
794 /// isolation_config.
795 /// \param func the callable to be called when the verb is invoked by the
796 /// remote.
797 ///
798 /// \returns a client, a callable that can be used to invoke the verb. See
799 /// make_client(). The client can be discarded, in fact this is what
800 /// most callers will do as real clients will live on a remote node, not
801 /// on the one where handlers are registered.
802 template <typename Func>
803 auto register_handler(MsgType t, scheduling_group sg, Func&& func);
804
805 /// Unregister the handler for the verb.
806 ///
807 /// Waits for all currently running handlers, then unregisters the handler.
808 /// Future attempts to invoke the verb will fail. This becomes effective
809 /// immediately after calling this function.
810 ///
811 /// \param t the verb to unregister the handler for.
812 ///
813 /// \returns a future that becomes available once all currently running
814 /// handlers finished.
815 future<> unregister_handler(MsgType t);
816
817 /// Set a logger function to be used to log messages.
818 ///
819 /// \deprecated use the logger overload set_logger(::seastar::logger*)
820 /// instead.
821 [[deprecated("Use set_logger(::seastar::logger*) instead")]]
822 void set_logger(std::function<void(const sstring&)> logger) {
823 _logger.set(std::move(logger));
824 }
825
826 /// Set a logger to be used to log messages.
827 void set_logger(::seastar::logger* logger) {
828 _logger.set(logger);
829 }
830
831 const logger& get_logger() const {
832 return _logger;
833 }
834
835 shared_ptr<rpc::server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) override {
836 return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
837 }
838
839 bool has_handler(MsgType msg_id);
840
841 /// Checks if any there are handlers registered.
842 /// Debugging helper, should only be used for debugging and not relied on.
843 ///
844 /// \returns true if there are, false if there are no registered handlers.
845 bool has_handlers() const noexcept {
846 return !_handlers.empty();
847 }
848
849 private:
850 rpc_handler* get_handler(uint64_t msg_id) override;
851 void put_handler(rpc_handler*) override;
852
853 template<typename Ret, typename... In>
854 auto make_client(signature<Ret(In...)> sig, MsgType t);
855
856 void register_receiver(MsgType t, rpc_handler&& handler) {
857 auto r = _handlers.emplace(t, std::move(handler));
858 if (!r.second) {
859 throw_with_backtrace<std::runtime_error>("registered handler already exists");
860 }
861 }
862 };
863
864 /// @}
865
866 }
867
868 }
869
870 #include "rpc_impl.hh"