2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
24 #include <unordered_map>
25 #include <unordered_set>
27 #include <seastar/core/future.hh>
28 #include <seastar/net/api.hh>
29 #include <seastar/core/reactor.hh>
30 #include <seastar/core/iostream.hh>
31 #include <seastar/core/shared_ptr.hh>
32 #include <seastar/core/condition-variable.hh>
33 #include <seastar/core/gate.hh>
34 #include <seastar/rpc/rpc_types.hh>
35 #include <seastar/core/byteorder.hh>
36 #include <seastar/core/shared_future.hh>
37 #include <seastar/core/queue.hh>
38 #include <seastar/core/weak_ptr.hh>
39 #include <seastar/core/scheduling.hh>
40 #include <seastar/util/backtrace.hh>
46 using id_type = int64_t;
48 using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>;
49 using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>;
51 struct SerializerConcept {
52 // For each serializable type T, implement
54 template <typename Output>
55 friend void write(const SerializerConcept&, Output& output, const T& data);
56 template <typename Input>
57 friend T read(const SerializerConcept&, Input& input, type<T> type_tag); // type_tag used to disambiguate
58 // Input and Output expose void read(char*, size_t) and write(const char*, size_t).
61 static constexpr char rpc_magic[] = "SSTARRPC";
63 /// Specifies resource isolation for a connection.
64 struct isolation_config {
65 /// Specifies a scheduling group under which the connection (and all its
66 /// verb handlers) will execute.
67 scheduling_group sched_group = current_scheduling_group();
70 /// Default isolation configuration - run everything in the default scheduling group.
71 isolation_config default_isolate_connection(sstring isolation_cookie);
73 /// \brief Resource limits for an RPC server
75 /// A request's memory use will be estimated as
77 /// req_mem = basic_request_size * sizeof(serialized_request) * bloat_factor
79 /// Concurrent requests will be limited so that
81 /// sum(req_mem) <= max_memory
84 struct resource_limits {
85 size_t basic_request_size = 0; ///< Minimum request footprint in memory
86 unsigned bloat_factor = 1; ///< Serialized size multiplied by this to estimate memory used by request
87 size_t max_memory = rpc_semaphore::max_counter(); ///< Maximum amount of memory that may be consumed by all requests
88 /// Configures isolation for a connection based on its isolation cookie. May throw,
89 /// in which case the connection will be terminated.
90 std::function<isolation_config (sstring isolation_cookie)> isolate_connection = default_isolate_connection;
93 struct client_options {
94 compat::optional<net::tcp_keepalive_params> keepalive;
95 bool tcp_nodelay = true;
96 bool reuseaddr = false;
97 compressor::factory* compressor_factory = nullptr;
98 bool send_timeout_data = true;
99 connection_id stream_parent = invalid_connection_id;
100 /// Configures how this connection is isolated from other connection on the same server.
102 /// \see resource_limits::isolate_connection
103 sstring isolation_cookie;
106 // RPC call that passes stream connection id as a parameter
107 // may arrive to a different shard from where the stream connection
108 // was opened, so the connection id is not known to a server that handles
109 // the RPC call. The shard that the stream connection belong to is know
110 // since it is a part of connection id, but this is not enough to locate
111 // a server instance the connection belongs to if there are more than one
112 // server on the shard. Stream domain parameter is here to help with that.
113 // Different servers on all shards logically belonging to the same service should
114 // belong to the same streaming domain. Only one server on each shard can belong to
115 // a particulr streaming domain.
116 class streaming_domain_type {
119 explicit streaming_domain_type(uint64_t id) : _id(id) {}
120 bool operator==(const streaming_domain_type& o) const {
123 friend struct std::hash<streaming_domain_type>;
124 friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&);
127 struct server_options {
128 compressor::factory* compressor_factory = nullptr;
129 bool tcp_nodelay = true;
130 compat::optional<streaming_domain_type> streaming_domain;
131 server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
136 estimate_request_size(const resource_limits& lim, size_t serialized_size) {
137 return lim.basic_request_size + serialized_size * lim.bloat_factor;
140 struct negotiation_frame {
141 char magic[sizeof(rpc_magic) - 1];
142 uint32_t len; // additional negotiation data length; multiple negotiation_frame_feature_record structs
145 enum class protocol_features : uint32_t {
153 // internal representation of feature data
154 using feature_map = std::map<protocol_features, sstring>;
156 // An rpc signature, in the form signature<Ret (In0, In1, In2)>.
157 template <typename Function>
161 std::function<void(const sstring&)> _logger;
163 void log(const sstring& str) const {
170 void set(std::function<void(const sstring&)> l) {
171 _logger = std::move(l);
174 void operator()(const client_info& info, id_type msg_id, const sstring& str) const;
176 void operator()(const client_info& info, const sstring& str) const;
178 void operator()(const socket_address& addr, const sstring& str) const;
183 connected_socket _fd;
184 input_stream<char> _read_buf;
185 output_stream<char> _write_buf;
187 bool _connected = false;
190 const logger& _logger;
191 // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class.
192 // The type of the pointer is erased here, but the original type is Serializer
194 struct outgoing_entry {
195 timer<rpc_clock_type> t;
197 compat::optional<promise<>> p = promise<>();
198 cancellable* pcancel = nullptr;
199 outgoing_entry(snd_buf b) : buf(std::move(b)) {}
200 outgoing_entry(outgoing_entry&& o) : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) {
201 o.p = compat::nullopt;
206 pcancel->cancel_send = std::function<void()>();
207 pcancel->send_back_pointer = nullptr;
213 friend outgoing_entry;
214 std::list<outgoing_entry> _outgoing_queue;
215 condition_variable _outgoing_queue_cond;
216 future<> _send_loop_stopped = make_ready_future<>();
217 std::unique_ptr<compressor> _compressor;
218 bool _timeout_negotiated = false;
219 // stream related fields
220 bool _is_stream = false;
221 connection_id _id = invalid_connection_id;
223 std::unordered_map<connection_id, xshard_connection_ptr> _streams;
224 queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers);
225 semaphore _stream_sem = semaphore(max_stream_buffers_memory);
226 bool _sink_closed = true;
227 bool _source_closed = true;
228 // the future holds if sink is already closed
229 // if it is not ready it means the sink is been closed
230 future<bool> _sink_closed_future = make_ready_future<bool>(false);
236 snd_buf compress(snd_buf buf);
237 future<> send_buffer(snd_buf buf);
239 enum class outgoing_queue_type {
245 template<outgoing_queue_type QueueType> void send_loop();
246 future<> stop_send_loop();
247 future<compat::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in);
248 bool stream_check_twoway_closed() {
249 return _sink_closed && _source_closed;
251 future<> stream_close();
252 future<> stream_process_incoming(rcv_buf&&);
253 future<> handle_stream_frame();
256 connection(connected_socket&& fd, const logger& l, void* s, connection_id id = invalid_connection_id) : connection(l, s, id) {
257 set_socket(std::move(fd));
259 connection(const logger& l, void* s, connection_id id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {}
260 virtual ~connection() {}
261 void set_socket(connected_socket&& fd);
262 future<> send_negotiation_frame(feature_map features);
263 // functions below are public because they are used by external heavily templated functions
264 // and I am not smart enough to know how to define them as friends
265 future<> send(snd_buf buf, compat::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr);
266 bool error() { return _error; }
269 future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs);
270 future<> close_sink() {
272 if (stream_check_twoway_closed()) {
273 return stream_close();
275 return make_ready_future();
280 future<> close_source() {
281 _source_closed = true;
282 if (stream_check_twoway_closed()) {
283 return stream_close();
285 return make_ready_future();
287 connection_id get_connection_id() const {
290 xshard_connection_ptr get_stream(connection_id id) const;
291 void register_stream(connection_id id, xshard_connection_ptr c);
292 virtual socket_address peer_address() const = 0;
294 const logger& get_logger() const {
298 template<typename Serializer>
299 Serializer& serializer() {
300 return *static_cast<Serializer*>(_serializer);
303 template <typename FrameType>
304 typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in);
306 template <typename FrameType>
307 typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
309 template<typename Serializer, typename... Out>
310 friend class sink_impl;
311 template<typename Serializer, typename... In>
312 friend class source_impl;
316 template<typename Serializer, typename... Out>
317 class sink_impl : public sink<Out...>::impl {
319 sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
320 future<> operator()(const Out&... args) override;
321 future<> close() override;
322 future<> flush() override;
325 // receive data In...
326 template<typename Serializer, typename... In>
327 class source_impl : public source<In...>::impl {
329 source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; }
330 future<compat::optional<std::tuple<In...>>> operator()() override;
333 class client : public rpc::connection, public weakly_referencable<client> {
335 id_type _message_id = 1;
336 struct reply_handler_base {
337 timer<rpc_clock_type> t;
338 cancellable* pcancel = nullptr;
339 virtual void operator()(client&, id_type, rcv_buf data) = 0;
340 virtual void timeout() {}
341 virtual void cancel() {}
342 virtual ~reply_handler_base() {
344 pcancel->cancel_wait = std::function<void()>();
345 pcancel->wait_back_pointer = nullptr;
350 template<typename Reply, typename Func>
351 struct reply_handler final : reply_handler_base {
354 reply_handler(Func&& f) : func(std::move(f)) {}
355 virtual void operator()(client& client, id_type msg_id, rcv_buf data) override {
356 return func(reply, client, msg_id, std::move(data));
358 virtual void timeout() override {
360 reply.p.set_exception(timeout_error());
362 virtual void cancel() override {
364 reply.p.set_exception(canceled_error());
366 virtual ~reply_handler() {}
369 std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding;
370 socket_address _server_addr;
371 client_options _options;
372 compat::optional<shared_promise<>> _client_negotiated = shared_promise<>();
373 weak_ptr<client> _parent; // for stream clients
376 future<> negotiate_protocol(input_stream<char>& in);
377 void negotiate(feature_map server_features);
378 future<std::tuple<int64_t, compat::optional<rcv_buf>>>
379 read_response_frame(input_stream<char>& in);
380 future<std::tuple<int64_t, compat::optional<rcv_buf>>>
381 read_response_frame_compressed(input_stream<char>& in);
384 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
386 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::request>();
391 * Create client object which will attempt to connect to the remote address.
393 * @param addr the remote address identifying this client
394 * @param local the local address of this client
396 client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {});
397 client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {});
400 * Create client object which will attempt to connect to the remote address using the
401 * specified seastar::socket.
403 * @param addr the remote address identifying this client
404 * @param local the local address of this client
405 * @param socket the socket object use to connect to the remote address
407 client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {});
408 client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {});
410 stats get_stats() const;
411 stats& get_stats_internal() {
414 auto next_message_id() { return _message_id++; }
415 void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel);
416 void wait_timed_out(id_type id);
418 void abort_all_streams();
419 void deregister_this_stream();
420 socket_address peer_address() const override {
423 future<> await_connection() {
424 if (!_client_negotiated) {
425 return make_ready_future<>();
427 return _client_negotiated->get_shared_future();
430 template<typename Serializer, typename... Out>
431 future<sink<Out...>> make_stream_sink(socket socket) {
432 return await_connection().then([this, socket = std::move(socket)] () mutable {
433 if (!this->get_connection_id()) {
434 return make_exception_future<sink<Out...>>(std::runtime_error("Streaming is not supported by the server"));
436 client_options o = _options;
437 o.stream_parent = this->get_connection_id();
438 o.send_timeout_data = false;
439 auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr);
440 c->_parent = this->weak_from_this();
441 c->_is_stream = true;
442 return c->await_connection().then([c, this] {
443 xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
444 this->register_stream(c->get_connection_id(), s);
445 return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
449 template<typename Serializer, typename... Out>
450 future<sink<Out...>> make_stream_sink() {
451 return make_stream_sink<Serializer, Out...>(engine().net().socket());
459 static thread_local std::unordered_map<streaming_domain_type, server*> _servers;
462 class connection : public rpc::connection, public enable_shared_from_this<connection> {
465 connection_id _parent_id = invalid_connection_id;
466 compat::optional<isolation_config> _isolation_config;
468 future<> negotiate_protocol(input_stream<char>& in);
469 future<std::tuple<compat::optional<uint64_t>, uint64_t, int64_t, compat::optional<rcv_buf>>>
470 read_request_frame_compressed(input_stream<char>& in);
471 future<feature_map> negotiate(feature_map requested);
474 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>();
476 rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>();
479 future<> send_unknown_verb_reply(compat::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type);
481 connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id);
483 future<> respond(int64_t msg_id, snd_buf&& data, compat::optional<rpc_clock_type::time_point> timeout);
484 client_info& info() { return _info; }
485 const client_info& info() const { return _info; }
486 stats get_stats() const {
488 res.pending = _outgoing_queue.size();
492 stats& get_stats_internal() {
495 socket_address peer_address() const override {
498 // Resources will be released when this goes out of scope
499 future<resource_permit> wait_for_resources(size_t memory_consumed, compat::optional<rpc_clock_type::time_point> timeout) {
501 return get_units(_server._resources_available, memory_consumed, *timeout);
503 return get_units(_server._resources_available, memory_consumed);
506 size_t estimate_request_size(size_t serialized_size) {
507 return rpc::estimate_request_size(_server._limits, serialized_size);
509 size_t max_request_size() const {
510 return _server._limits.max_memory;
512 server& get_server() {
515 future<> deregister_this_stream();
518 protocol_base* _proto;
519 api_v2::server_socket _ss;
520 resource_limits _limits;
521 rpc_semaphore _resources_available;
522 std::unordered_map<connection_id, shared_ptr<connection>> _conns;
523 promise<> _ss_stopped;
525 server_options _options;
526 uint64_t _next_client_id = 1;
529 server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits());
530 server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits());
531 server(protocol_base* proto, server_socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{});
532 server(protocol_base* proto, server_options opts, server_socket, resource_limits memory_limit = resource_limits());
535 template<typename Func>
536 void foreach_connection(Func&& f) {
537 for (auto c : _conns) {
548 using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, compat::optional<rpc_clock_type::time_point> timeout, int64_t msgid,
553 rpc_handler_func func;
557 class protocol_base {
559 virtual ~protocol_base() {};
560 virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0;
564 virtual rpc_handler* get_handler(uint64_t msg_id) = 0;
565 virtual void put_handler(rpc_handler*) = 0;
568 // MsgType is a type that holds type of a message. The type should be hashable
569 // and serializable. It is preferable to use enum for message types, but
570 // do not forget to provide hash function for it
571 template<typename Serializer, typename MsgType = uint32_t>
572 class protocol : public protocol_base {
574 class server : public rpc::server {
576 server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
577 rpc::server(&proto, addr, memory_limit) {}
578 server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) :
579 rpc::server(&proto, opts, addr, memory_limit) {}
580 server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{}) :
581 rpc::server(&proto, std::move(socket), memory_limit) {}
582 server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) :
583 rpc::server(&proto, opts, std::move(socket), memory_limit) {}
585 class client : public rpc::client {
588 * Create client object which will attempt to connect to the remote address.
590 * @param addr the remote address identifying this client
591 * @param local the local address of this client
593 client(protocol& p, const socket_address& addr, const socket_address& local = {}) :
594 rpc::client(p.get_logger(), &p._serializer, addr, local) {}
595 client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) :
596 rpc::client(p.get_logger(), &p._serializer, options, addr, local) {}
599 * Create client object which will attempt to connect to the remote address using the
600 * specified seastar::socket.
602 * @param addr the remote address identifying this client
603 * @param local the local address of this client
604 * @param socket the socket object use to connect to the remote address
606 client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) :
607 rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {}
608 client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) :
609 rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {}
614 std::unordered_map<MsgType, rpc_handler> _handlers;
615 Serializer _serializer;
619 protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {}
620 template<typename Func>
621 auto make_client(MsgType t);
623 // returns a function which type depends on Func
624 // if Func == Ret(Args...) then return function is
625 // future<Ret>(protocol::client&, Args...)
626 template<typename Func>
627 auto register_handler(MsgType t, Func&& func);
629 // returns a function which type depends on Func
630 // if Func == Ret(Args...) then return function is
631 // future<Ret>(protocol::client&, Args...)
632 template <typename Func>
633 auto register_handler(MsgType t, scheduling_group sg, Func&& func);
635 future<> unregister_handler(MsgType t);
637 void set_logger(std::function<void(const sstring&)> logger) {
638 _logger.set(std::move(logger));
641 const logger& get_logger() const {
645 shared_ptr<rpc::server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) override {
646 return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id);
649 bool has_handler(uint64_t msg_id);
652 rpc_handler* get_handler(uint64_t msg_id) override;
653 void put_handler(rpc_handler*) override;
655 template<typename Ret, typename... In>
656 auto make_client(signature<Ret(In...)> sig, MsgType t);
658 void register_receiver(MsgType t, rpc_handler&& handler) {
659 auto r = _handlers.emplace(t, std::move(handler));
661 throw_with_backtrace<std::runtime_error>("registered handler already exists");
669 #include "rpc_impl.hh"