]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/rpc/rpc.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / rpc / rpc.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22 #pragma once
23
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <list>
27 #include <variant>
28 #include <boost/intrusive/list.hpp>
29 #include <seastar/core/future.hh>
30 #include <seastar/core/seastar.hh>
31 #include <seastar/net/api.hh>
32 #include <seastar/core/iostream.hh>
33 #include <seastar/core/shared_ptr.hh>
34 #include <seastar/core/condition-variable.hh>
35 #include <seastar/core/gate.hh>
36 #include <seastar/rpc/rpc_types.hh>
37 #include <seastar/core/byteorder.hh>
38 #include <seastar/core/shared_future.hh>
39 #include <seastar/core/queue.hh>
40 #include <seastar/core/weak_ptr.hh>
41 #include <seastar/core/scheduling.hh>
42 #include <seastar/util/backtrace.hh>
43 #include <seastar/util/log.hh>
44
45 namespace bi = boost::intrusive;
46
47 namespace seastar {
48
49 namespace rpc {
50
51 /// \defgroup rpc rpc - remote procedure call framework
52 ///
53 /// \brief
54 /// rpc is a framework that can be used to define client-server communication
55 /// protocols.
56 /// For a high-level description of the RPC features see
57 /// [doc/rpc.md](./md_rpc.html),
58 /// [doc/rpc-streaming.md](./md_rpc-streaming.html) and
59 /// [doc/rpc-compression.md](./md_rpc-compression.html)
60 ///
61 /// The entry point for setting up an rpc protocol is
62 /// seastar::rpc::protocol.
63
64 using id_type = int64_t;
65
66 using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
67 using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
68
69 static constexpr char rpc_magic[] = "SSTARRPC";
70
71 /// \addtogroup rpc
72 /// @{
73
74 /// Specifies resource isolation for a connection.
75 struct isolation_config {
76 /// Specifies a scheduling group under which the connection (and all its
77 /// verb handlers) will execute.
78 scheduling_group sched_group = current_scheduling_group();
79 };
80
81 /// Default isolation configuration - run everything in the default scheduling group.
82 ///
83 /// In the scheduling_group that the protocol::server was created in.
84 isolation_config default_isolate_connection(sstring isolation_cookie);
85
86 /// \brief Resource limits for an RPC server
87 ///
88 /// A request's memory use will be estimated as
89 ///
90 /// req_mem = basic_request_size * sizeof(serialized_request) * bloat_factor
91 ///
92 /// Concurrent requests will be limited so that
93 ///
94 /// sum(req_mem) <= max_memory
95 ///
96 /// \see server
97 struct resource_limits {
98 size_t basic_request_size = 0; ///< Minimum request footprint in memory
99 unsigned bloat_factor = 1; ///< Serialized size multiplied by this to estimate memory used by request
100 size_t max_memory = rpc_semaphore::max_counter(); ///< Maximum amount of memory that may be consumed by all requests
101 /// Configures isolation for a connection based on its isolation cookie. May throw,
102 /// in which case the connection will be terminated.
103 using syncronous_isolation_function = std::function<isolation_config (sstring isolation_cookie)>;
104 using asyncronous_isolation_function = std::function<future<isolation_config> (sstring isolation_cookie)>;
105 using isolation_function_alternatives = std::variant<syncronous_isolation_function, asyncronous_isolation_function>;
106 isolation_function_alternatives isolate_connection = default_isolate_connection;
107 };
108
109 struct client_options {
110 std::optional<net::tcp_keepalive_params> keepalive;
111 bool tcp_nodelay = true;
112 bool reuseaddr = false;
113 compressor::factory* compressor_factory = nullptr;
114 bool send_timeout_data = true;
115 connection_id stream_parent = invalid_connection_id;
116 /// Configures how this connection is isolated from other connection on the same server.
117 ///
118 /// \see resource_limits::isolate_connection
119 sstring isolation_cookie;
120 };
121
122 /// @}
123
124 // RPC call that passes stream connection id as a parameter
125 // may arrive to a different shard from where the stream connection
126 // was opened, so the connection id is not known to a server that handles
127 // the RPC call. The shard that the stream connection belong to is know
128 // since it is a part of connection id, but this is not enough to locate
129 // a server instance the connection belongs to if there are more than one
130 // server on the shard. Stream domain parameter is here to help with that.
131 // Different servers on all shards logically belonging to the same service should
132 // belong to the same streaming domain. Only one server on each shard can belong to
133 // a particulr streaming domain.
134 class streaming_domain_type {
135 uint64_t _id;
136 public:
137 explicit streaming_domain_type(uint64_t id) : _id(id) {}
138 bool operator==(const streaming_domain_type& o) const {
139 return _id == o._id;
140 }
141 friend struct std::hash<streaming_domain_type>;
142 friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
143 };
144
145 /// \addtogroup rpc
146 /// @{
147
148 class server;
149
150 struct server_options {
151 compressor::factory* compressor_factory = nullptr;
152 bool tcp_nodelay = true;
153 std::optional<streaming_domain_type> streaming_domain;
154 server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
155 // optional filter function. If set, will be called with remote
156 // (connecting) address.
157 // Returning false will refuse the incoming connection.
158 // Returning true will allow the mechanism to proceed.
159 std::function<bool(const socket_address&)> filter_connection = {};
160 };
161
162 /// @}
163
164 inline
165 size_t
166 estimate_request_size(const resource_limits& lim, size_t serialized_size) {
167 return lim.basic_request_size + serialized_size * lim.bloat_factor;
168 }
169
170 struct negotiation_frame {
171 char magic[sizeof(rpc_magic) - 1];
172 uint32_t len; // additional negotiation data length; multiple negotiation_frame_feature_record structs
173 };
174
175 enum class protocol_features : uint32_t {
176 COMPRESS = 0,
177 TIMEOUT = 1,
178 CONNECTION_ID = 2,
179 STREAM_PARENT = 3,
180 ISOLATION = 4,
181 };
182
183 // internal representation of feature data
184 using feature_map = std::map<protocol_features, sstring>;
185
186 // An rpc signature, in the form signature<Ret (In0, In1, In2)>.
187 template <typename Function>
188 struct signature;
189
190 class logger {
191 std::function<void(const sstring&)> _logger;
192 ::seastar::logger* _seastar_logger = nullptr;
193
194 // _seastar_logger will always be used first if it's available
195 void log(const sstring& str) const {
196 if (_seastar_logger) {
197 // default level for log messages is `info`
198 _seastar_logger->info("{}", str);
199 } else if (_logger) {
200 _logger(str);
201 }
202 }
203
204 // _seastar_logger will always be used first if it's available
205 template <typename... Args>
206 void log(log_level level, const char* fmt, Args&&... args) const {
207 if (_seastar_logger) {
208 _seastar_logger->log(level, fmt, std::forward<Args>(args)...);
209 // If the log level is at least `info`, fall back to legacy logging without explicit level.
210 // Ignore less severe levels in order not to spam user's log with messages during transition,
211 // i.e. when the user still only defines a level-less logger.
212 } else if (_logger && level <= log_level::info) {
213 _logger(format(fmt, std::forward<Args>(args)...));
214 }
215 }
216
217 public:
218 void set(std::function<void(const sstring&)> l) {
219 _logger = std::move(l);
220 }
221
222 void set(::seastar::logger* logger) {
223 _seastar_logger = logger;
224 }
225
226 void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
227 void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const;
228
229 void operator()(const client_info& info, const sstring& str) const;
230 void operator()(const client_info& info, log_level level, std::string_view str) const;
231
232 void operator()(const socket_address& addr, const sstring& str) const;
233 void operator()(const socket_address& addr, log_level level, std::string_view str) const;
234 };
235
236 class connection {
237 protected:
238 connected_socket _fd;
239 input_stream<char> _read_buf;
240 output_stream<char> _write_buf;
241 bool _error = false;
242 bool _connected = false;
243 std::optional<shared_promise<>> _negotiated = shared_promise<>();
244 promise<> _stopped;
245 stats _stats;
246 const logger& _logger;
247 // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
248 // The type of the pointer is erased here, but the original type is Serializer
249 void* _serializer;
250 struct outgoing_entry : public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
251 timer<rpc_clock_type> t;
252 snd_buf buf;
253 promise<> done;
254 cancellable* pcancel = nullptr;
255 outgoing_entry(snd_buf b) : buf(std::move(b)) {}
256
257 outgoing_entry(outgoing_entry&&) = delete;
258 outgoing_entry(const outgoing_entry&) = delete;
259
260 void uncancellable() {
261 t.cancel();
262 if (pcancel) {
263 pcancel->cancel_send = std::function<void()>();
264 }
265 }
266
267 ~outgoing_entry() {
268 if (pcancel) {
269 pcancel->cancel_send = std::function<void()>();
270 pcancel->send_back_pointer = nullptr;
271 }
272 }
273
274 using container_t = bi::list<outgoing_entry, bi::constant_time_size<false>>;
275 };
276 void withdraw(outgoing_entry::container_t::iterator it, std::exception_ptr ex = nullptr);
277 future<> _outgoing_queue_ready = _negotiated->get_shared_future();
278 outgoing_entry::container_t _outgoing_queue;
279 size_t _outgoing_queue_size = 0;
280 std::unique_ptr<compressor> _compressor;
281 bool _propagate_timeout = false;
282 bool _timeout_negotiated = false;
283 // stream related fields
284 bool _is_stream = false;
285 connection_id _id = invalid_connection_id;
286
287 std::unordered_map<connection_id, xshard_connection_ptr> _streams;
288 queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
289 semaphore _stream_sem = semaphore(max_stream_buffers_memory);
290 bool _sink_closed = true;
291 bool _source_closed = true;
292 // the future holds if sink is already closed
293 // if it is not ready it means the sink is been closed
294 future<bool> _sink_closed_future = make_ready_future<bool>(false);
295
296 size_t outgoing_queue_length() const noexcept {
297 return _outgoing_queue_size;
298 }
299
300 void set_negotiated() noexcept;
301
302 bool is_stream() const noexcept {
303 return _is_stream;
304 }
305
306 snd_buf compress(snd_buf buf);
307 future<> send_buffer(snd_buf buf);
308
309 future<> send_entry(outgoing_entry& d);
310 future<> stop_send_loop(std::exception_ptr ex);
311 future<std::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
312 bool stream_check_twoway_closed() const noexcept {
313 return _sink_closed && _source_closed;
314 }
315 future<> stream_close();
316 future<> stream_process_incoming(rcv_buf&&);
317 future<> handle_stream_frame();
318
319 public:
320 connection(connected_socket&& fd, const logger& l, void* s, connection_id id = invalid_connection_id) : connection(l, s, id) {
321 set_socket(std::move(fd));
322 }
323 connection(const logger& l, void* s, connection_id id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
324 virtual ~connection() {}
325 void set_socket(connected_socket&& fd);
326 future<> send_negotiation_frame(feature_map features);
327 // functions below are public because they are used by external heavily templated functions
328 // and I am not smart enough to know how to define them as friends
329 future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
330 bool error() const noexcept { return _error; }
331 void abort();
332 future<> stop() noexcept;
333 future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
334 future<> close_sink() {
335 _sink_closed = true;
336 if (stream_check_twoway_closed()) {
337 return stream_close();
338 }
339 return make_ready_future();
340 }
341 bool sink_closed() const noexcept {
342 return _sink_closed;
343 }
344 future<> close_source() {
345 _source_closed = true;
346 if (stream_check_twoway_closed()) {
347 return stream_close();
348 }
349 return make_ready_future();
350 }
351 connection_id get_connection_id() const noexcept {
352 return _id;
353 }
354 xshard_connection_ptr get_stream(connection_id id) const;
355 void register_stream(connection_id id, xshard_connection_ptr c);
356 virtual socket_address peer_address() const = 0;
357
358 const logger& get_logger() const noexcept {
359 return _logger;
360 }
361
362 template<typename Serializer>
363 Serializer& serializer() {
364 return *static_cast<Serializer*>(_serializer);
365 }
366
367 template <typename FrameType>
368 typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in);
369
370 template <typename FrameType>
371 typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
372 friend class client;
373 template<typename Serializer, typename... Out>
374 friend class sink_impl;
375 template<typename Serializer, typename... In>
376 friend class source_impl;
377
378 void suspend_for_testing(promise<>& p) {
379 _outgoing_queue_ready.get();
380 auto dummy = std::make_unique<outgoing_entry>(snd_buf());
381 _outgoing_queue.push_back(*dummy);
382 _outgoing_queue_ready = dummy->done.get_future();
383 (void)p.get_future().then([dummy = std::move(dummy)] { dummy->done.set_value(); });
384 }
385 };
386
387 struct deferred_snd_buf {
388 promise<> pr;
389 snd_buf data;
390 };
391
392 // send data Out...
393 template<typename Serializer, typename... Out>
394 class sink_impl : public sink<Out...>::impl {
395 // Used on the shard *this lives on.
396 alignas (cache_line_size) uint64_t _next_seq_num = 1;
397
398 // Used on the shard the _conn lives on.
399 struct alignas (cache_line_size) {
400 uint64_t last_seq_num = 0;
401 std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
402 } _remote_state;
403 public:
404 sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
405 future<> operator()(const Out&... args) override;
406 future<> close() override;
407 future<> flush() override;
408 ~sink_impl() override;
409 };
410
411 // receive data In...
412 template<typename Serializer, typename... In>
413 class source_impl : public source<In...>::impl {
414 public:
415 source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
416 future<std::optional<std::tuple<In...>>> operator()() override;
417 };
418
419 class client : public rpc::connection, public weakly_referencable<client> {
420 socket _socket;
421 id_type _message_id = 1;
422 struct reply_handler_base {
423 timer<rpc_clock_type> t;
424 cancellable* pcancel = nullptr;
425 virtual void operator()(client&, id_type, rcv_buf data) = 0;
426 virtual void timeout() {}
427 virtual void cancel() {}
428 virtual ~reply_handler_base() {
429 if (pcancel) {
430 pcancel->cancel_wait = std::function<void()>();
431 pcancel->wait_back_pointer = nullptr;
432 }
433 };
434 };
435 public:
436 template<typename Reply, typename Func>
437 struct reply_handler final : reply_handler_base {
438 Func func;
439 Reply reply;
440 reply_handler(Func&& f) : func(std::move(f)) {}
441 virtual void operator()(client& client, id_type msg_id, rcv_buf data) override {
442 return func(reply, client, msg_id, std::move(data));
443 }
444 virtual void timeout() override {
445 reply.done = true;
446 reply.p.set_exception(timeout_error());
447 }
448 virtual void cancel() override {
449 reply.done = true;
450 reply.p.set_exception(canceled_error());
451 }
452 virtual ~reply_handler() {}
453 };
454 private:
455 std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
456 socket_address _server_addr, _local_addr;
457 client_options _options;
458 weak_ptr<client> _parent; // for stream clients
459
460 private:
461 future<> negotiate_protocol(input_stream<char>& in);
462 void negotiate(feature_map server_features);
463 future<std::tuple<int64_t, std::optional<rcv_buf>>>
464 read_response_frame(input_stream<char>& in);
465 future<std::tuple<int64_t, std::optional<rcv_buf>>>
466 read_response_frame_compressed(input_stream<char>& in);
467 public:
468 /**
469 * Create client object which will attempt to connect to the remote address.
470 *
471 * @param l \ref seastar::logger to use for logging error messages
472 * @param s an optional connection serializer
473 * @param addr the remote address identifying this client
474 * @param local the local address of this client
475 */
476 client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
477 client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
478
479 /**
480 * Create client object which will attempt to connect to the remote address using the
481 * specified seastar::socket.
482 *
483 * @param l \ref seastar::logger to use for logging error messages
484 * @param s an optional connection serializer
485 * @param addr the remote address identifying this client
486 * @param local the local address of this client
487 * @param socket the socket object use to connect to the remote address
488 */
489 client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
490 client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
491
492 stats get_stats() const;
493 stats& get_stats_internal() {
494 return _stats;
495 }
496 auto next_message_id() { return _message_id++; }
497 void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
498 void wait_timed_out(id_type id);
499 future<> stop() noexcept;
500 void abort_all_streams();
501 void deregister_this_stream();
502 socket_address peer_address() const override {
503 return _server_addr;
504 }
505 future<> await_connection() {
506 if (!_negotiated) {
507 return make_ready_future<>();
508 } else {
509 return _negotiated->get_shared_future();
510 }
511 }
512 template<typename Serializer, typename... Out>
513 future<sink<Out...>> make_stream_sink(socket socket) {
514 return await_connection().then([this, socket = std::move(socket)] () mutable {
515 if (!this->get_connection_id()) {
516 return make_exception_future<sink<Out...>>(std::runtime_error("Streaming is not supported by the server"));
517 }
518 client_options o = _options;
519 o.stream_parent = this->get_connection_id();
520 o.send_timeout_data = false;
521 auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr);
522 c->_parent = this->weak_from_this();
523 c->_is_stream = true;
524 return c->await_connection().then([c, this] {
525 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
526 this->register_stream(c->get_connection_id(), s);
527 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
528 }).handle_exception([c] (std::exception_ptr eptr) {
529 // If await_connection fails we need to stop the client
530 // before destroying it.
531 return c->stop().then([eptr, c] {
532 return make_exception_future<sink<Out...>>(eptr);
533 });
534 });
535 });
536 }
537 template<typename Serializer, typename... Out>
538 future<sink<Out...>> make_stream_sink() {
539 return make_stream_sink<Serializer, Out...>(make_socket());
540 }
541 };
542
543 class protocol_base;
544
545 class server {
546 private:
547 static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
548
549 public:
550 class connection : public rpc::connection, public enable_shared_from_this<connection> {
551 server& _server;
552 client_info _info;
553 connection_id _parent_id = invalid_connection_id;
554 std::optional<isolation_config> _isolation_config;
555 private:
556 future<> negotiate_protocol(input_stream<char>& in);
557 future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>>
558 read_request_frame_compressed(input_stream<char>& in);
559 future<feature_map> negotiate(feature_map requested);
560 future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
561 public:
562 connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
563 future<> process();
564 future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout);
565 client_info& info() { return _info; }
566 const client_info& info() const { return _info; }
567 stats get_stats() const {
568 stats res = _stats;
569 res.pending = outgoing_queue_length();
570 return res;
571 }
572
573 stats& get_stats_internal() {
574 return _stats;
575 }
576 socket_address peer_address() const override {
577 return _info.addr;
578 }
579 // Resources will be released when this goes out of scope
580 future<resource_permit> wait_for_resources(size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
581 if (timeout) {
582 return get_units(_server._resources_available, memory_consumed, *timeout);
583 } else {
584 return get_units(_server._resources_available, memory_consumed);
585 }
586 }
587 size_t estimate_request_size(size_t serialized_size) {
588 return rpc::estimate_request_size(_server._limits, serialized_size);
589 }
590 size_t max_request_size() const {
591 return _server._limits.max_memory;
592 }
593 server& get_server() {
594 return _server;
595 }
596 future<> deregister_this_stream();
597 };
598 private:
599 protocol_base* _proto;
600 server_socket _ss;
601 resource_limits _limits;
602 rpc_semaphore _resources_available;
603 std::unordered_map<connection_id, shared_ptr<connection>> _conns;
604 promise<> _ss_stopped;
605 gate _reply_gate;
606 server_options _options;
607 uint64_t _next_client_id = 1;
608
609 public:
610 server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
611 server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
612 server(protocol_base* proto, server_socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{});
613 server(protocol_base* proto, server_options opts, server_socket, resource_limits memory_limit = resource_limits());
614 void accept();
615 future<> stop();
616 template<typename Func>
617 void foreach_connection(Func&& f) {
618 for (auto c : _conns) {
619 f(*c.second);
620 }
621 }
622 gate& reply_gate() {
623 return _reply_gate;
624 }
625 friend connection;
626 friend client;
627 };
628
629 using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
630 rcv_buf data)>;
631
632 struct rpc_handler {
633 scheduling_group sg;
634 rpc_handler_func func;
635 gate use_gate;
636 };
637
638 class protocol_base {
639 public:
640 virtual ~protocol_base() {};
641 virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
642 protected:
643 friend class server;
644
645 virtual rpc_handler* get_handler(uint64_t msg_id) = 0;
646 virtual void put_handler(rpc_handler*) = 0;
647 };
648
649 /// \addtogroup rpc
650 /// @{
651
652 /// Defines a protocol for communication between a server and a client.
653 ///
654 /// A protocol is defined by a `Serializer` and a `MsgType`. The `Serializer` is
655 /// responsible for serializing and unserializing all types used as arguments and
656 /// return types used in the protocol. The `Serializer` is expected to define a
657 /// `read()` and `write()` method for each such type `T` as follows:
658 ///
659 /// template <typename Output>
660 /// void write(const serializer&, Output& output, const T& data);
661 ///
662 /// template <typename Input>
663 /// T read(const serializer&, Input& input, type<T> type_tag); // type_tag used to disambiguate
664 ///
665 /// Where `Input` and `Output` have a `void read(char*, size_t)` and
666 /// `write(const char*, size_t)` respectively.
667 /// `MsgType` defines the type to be used as the message id, the id which is
668 /// used to identify different messages used in the protocol. These are also
669 /// often referred to as "verbs". The client will use the message id, to
670 /// specify the remote method (verb) to invoke on the server. The server uses
671 /// the message id to dispatch the incoming call to the right handler.
672 /// `MsgType` should be hashable and serializable. It is preferable to use enum
673 /// for message types, but do not forget to provide hash function for it.
674 ///
675 /// Use register_handler() on the server to define the available verbs and the
676 /// code to be executed when they are invoked by clients. Use make_client() on
677 /// the client to create a matching callable that can be used to invoke the
678 /// verb on the server and wait for its result. Note that register_handler()
679 /// also returns a client, that can be used to invoke the registered verb on
680 /// another node (given that the other node has the same verb). This is useful
681 /// for symmetric protocols, where two or more nodes all have servers as well as
682 /// connect to the other nodes as clients.
683 ///
684 /// Use protocol::server to listen for and accept incoming connections on the
685 /// server and protocol::client to establish connections to the server.
686 /// Note that registering the available verbs can be done before/after
687 /// listening for connections, but best to ensure that by the time incoming
688 /// requests are to be expected, all the verbs are set-up.
689 ///
690 /// ## Configuration
691 ///
692 /// TODO
693 ///
694 /// ## Isolation
695 ///
696 /// RPC supports isolating verb handlers from each other. There are two ways to
697 /// achieve this: per-handler isolation (the old way) and per-connection
698 /// isolation (the new way). If no isolation is configured, all handlers will be
699 /// executed in the context of the scheduling_group in which the
700 /// protocol::server was created.
701 ///
702 /// Per-handler isolation (the old way) can be configured by using the
703 /// register_handler() overload which takes a scheduling_group. When invoked,
704 /// the body of the handler will be executed from the context of the configured
705 /// scheduling_group.
706 ///
707 /// Per-connection isolation (the new way) is a more flexible mechanism that
708 /// requires user application provided logic to determine how connections are
709 /// isolated. This mechanism has two parts, the server and the client part.
710 /// The client configures isolation by setting client_options::isolation_cookie.
711 /// This cookie is an opaque (to the RPC layer) string that is to be interpreted
712 /// on the server using user application provided logic. The application
713 /// provides this logic to the server by setting
714 /// resource_limits::isolate_connection to an appropriate handler function, that
715 /// interprets the opaque cookie and resolves it to an isolation_config. The
716 /// scheduling_group in the former will be used not just to execute all verb
717 /// handlers, but also the connection loop itself, hence providing better
718 /// isolation.
719 ///
720 /// There a few gotchas related to mixing the two isolation mechanisms. This can
721 /// happen when the application is updated and one of the client/server is
722 /// still using the old/new mechanism. In general per-connection isolation
723 /// overrides the per-handler one. If both are set up, the former will determine
724 /// the scheduling_group context for the handlers. If the client is not
725 /// configured to send an isolation cookie, the server's
726 /// resource_limits::isolate_connection will not be invoked and the server will
727 /// fall back to per-handler isolation if configured. If the client is
728 /// configured to send an isolation cookie but the server doesn't have a
729 /// resource_limits::isolate_connection configured, it will use
730 /// default_isolate_connection() to interpret the cookie. Note that this still
731 /// overrides the per-handler isolation if any is configured. If the server is
732 /// so old that it doesn't have the per-connection isolation feature at all, it
733 /// will of course just use the per-handler one, if configured.
734 ///
735 /// ## Compatibility
736 ///
737 /// TODO
738 ///
739 /// \tparam Serializer the serializer for the protocol.
740 /// \tparam MsgType the type to be used as the message id or verb id.
741 template<typename Serializer, typename MsgType = uint32_t>
742 class protocol final : public protocol_base {
743 public:
744 /// Represents the listening port and all accepted connections.
745 class server : public rpc::server {
746 public:
747 server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
748 rpc::server(&proto, addr, memory_limit) {}
749 server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
750 rpc::server(&proto, opts, addr, memory_limit) {}
751 server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options = server_options{}) :
752 rpc::server(&proto, std::move(socket), memory_limit) {}
753 server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) :
754 rpc::server(&proto, opts, std::move(socket), memory_limit) {}
755 };
756 /// Represents a client side connection.
757 class client : public rpc::client {
758 public:
759 /*
760 * Create client object which will attempt to connect to the remote address.
761 *
762 * @param addr the remote address identifying this client
763 * @param local the local address of this client
764 */
765 client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
766 rpc::client(p.get_logger(), &p._serializer, addr, local) {}
767 client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
768 rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
769
770 /**
771 * Create client object which will attempt to connect to the remote address using the
772 * specified seastar::socket.
773 *
774 * @param addr the remote address identifying this client
775 * @param local the local address of this client
776 * @param socket the socket object use to connect to the remote address
777 */
778 client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
779 rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
780 client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
781 rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
782 };
783
784 friend server;
785 private:
786 std::unordered_map<MsgType, rpc_handler> _handlers;
787 Serializer _serializer;
788 logger _logger;
789
790 public:
791 protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
792
793 /// Creates a callable that can be used to invoke the verb on the remote.
794 ///
795 /// \tparam Func The signature of the verb. Has to be either the same or
796 /// compatible with the one passed to register_handler on the server.
797 /// \param t the verb to invoke on the remote.
798 ///
799 /// \returns a callable whose signature is derived from Func as follows:
800 /// given `Func == Ret(Args...)` the returned callable has the following
801 /// signature: `future<Ret>(protocol::client&, Args...)`.
802 template<typename Func>
803 auto make_client(MsgType t);
804
805 /// Register a handler to be called when this verb is invoked.
806 ///
807 /// \tparam Func the type of the handler for the verb. This determines the
808 /// signature of the verb.
809 /// \param t the verb to register the handler for.
810 /// \param func the callable to be called when the verb is invoked by the
811 /// remote.
812 ///
813 /// \returns a client, a callable that can be used to invoke the verb. See
814 /// make_client(). The client can be discarded, in fact this is what
815 /// most callers will do as real clients will live on a remote node, not
816 /// on the one where handlers are registered.
817 template<typename Func>
818 auto register_handler(MsgType t, Func&& func);
819
820 /// Register a handler to be called when this verb is invoked.
821 ///
822 /// \tparam Func the type of the handler for the verb. This determines the
823 /// signature of the verb.
824 /// \param t the verb to register the handler for.
825 /// \param sg the scheduling group that will be used to invoke the handler
826 /// in. This can be used to execute different verbs in different
827 /// scheduling groups. Note that there is a newer mechanism to determine
828 /// the scheduling groups a handler will run it per invocation, see
829 /// isolation_config.
830 /// \param func the callable to be called when the verb is invoked by the
831 /// remote.
832 ///
833 /// \returns a client, a callable that can be used to invoke the verb. See
834 /// make_client(). The client can be discarded, in fact this is what
835 /// most callers will do as real clients will live on a remote node, not
836 /// on the one where handlers are registered.
837 template <typename Func>
838 auto register_handler(MsgType t, scheduling_group sg, Func&& func);
839
840 /// Unregister the handler for the verb.
841 ///
842 /// Waits for all currently running handlers, then unregisters the handler.
843 /// Future attempts to invoke the verb will fail. This becomes effective
844 /// immediately after calling this function.
845 ///
846 /// \param t the verb to unregister the handler for.
847 ///
848 /// \returns a future that becomes available once all currently running
849 /// handlers finished.
850 future<> unregister_handler(MsgType t);
851
852 /// Set a logger function to be used to log messages.
853 ///
854 /// \deprecated use the logger overload set_logger(::seastar::logger*)
855 /// instead.
856 [[deprecated("Use set_logger(::seastar::logger*) instead")]]
857 void set_logger(std::function<void(const sstring&)> logger) {
858 _logger.set(std::move(logger));
859 }
860
861 /// Set a logger to be used to log messages.
862 void set_logger(::seastar::logger* logger) {
863 _logger.set(logger);
864 }
865
866 const logger& get_logger() const {
867 return _logger;
868 }
869
870 shared_ptr<rpc::server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) override {
871 return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
872 }
873
874 bool has_handler(MsgType msg_id);
875
876 /// Checks if any there are handlers registered.
877 /// Debugging helper, should only be used for debugging and not relied on.
878 ///
879 /// \returns true if there are, false if there are no registered handlers.
880 bool has_handlers() const noexcept {
881 return !_handlers.empty();
882 }
883
884 private:
885 rpc_handler* get_handler(uint64_t msg_id) override;
886 void put_handler(rpc_handler*) override;
887
888 template<typename Ret, typename... In>
889 auto make_client(signature<Ret(In...)> sig, MsgType t);
890
891 void register_receiver(MsgType t, rpc_handler&& handler) {
892 auto r = _handlers.emplace(t, std::move(handler));
893 if (!r.second) {
894 throw_with_backtrace<std::runtime_error>("registered handler already exists");
895 }
896 }
897 };
898
899 /// @}
900
901 }
902
903 }
904
905 #include "rpc_impl.hh"