]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2015 Cloudius Systems, Ltd. | |
20 | */ | |
21 | ||
22 | #pragma once | |
23 | ||
24 | #include <unordered_map> | |
25 | #include <unordered_set> | |
26 | #include <list> | |
27 | #include <seastar/core/future.hh> | |
f67539c2 | 28 | #include <seastar/core/seastar.hh> |
11fdf7f2 | 29 | #include <seastar/net/api.hh> |
11fdf7f2 TL |
30 | #include <seastar/core/iostream.hh> |
31 | #include <seastar/core/shared_ptr.hh> | |
32 | #include <seastar/core/condition-variable.hh> | |
33 | #include <seastar/core/gate.hh> | |
34 | #include <seastar/rpc/rpc_types.hh> | |
35 | #include <seastar/core/byteorder.hh> | |
36 | #include <seastar/core/shared_future.hh> | |
37 | #include <seastar/core/queue.hh> | |
38 | #include <seastar/core/weak_ptr.hh> | |
39 | #include <seastar/core/scheduling.hh> | |
9f95a23c | 40 | #include <seastar/util/backtrace.hh> |
f67539c2 | 41 | #include <seastar/util/log.hh> |
11fdf7f2 TL |
42 | |
43 | namespace seastar { | |
44 | ||
45 | namespace rpc { | |
46 | ||
f67539c2 TL |
47 | /// \defgroup rpc rpc - remote procedure call framework |
48 | /// | |
49 | /// \brief | |
50 | /// rpc is a framework that can be used to define client-server communication | |
51 | /// protocols. | |
52 | /// For a high-level description of the RPC features see | |
53 | /// [doc/rpc.md](./md_rpc.html), | |
54 | /// [doc/rpc-streaming.md](./md_rpc-streaming.html) and | |
55 | /// [doc/rpc-compression.md](./md_rpc-compression.html) | |
56 | /// | |
57 | /// The entry point for setting up an rpc protocol is | |
58 | /// seastar::rpc::protocol. | |
59 | ||
11fdf7f2 TL |
60 | using id_type = int64_t; |
61 | ||
62 | using rpc_semaphore = basic_semaphore<semaphore_default_exception_factory, rpc_clock_type>; | |
63 | using resource_permit = semaphore_units<semaphore_default_exception_factory, rpc_clock_type>; | |
64 | ||
11fdf7f2 TL |
65 | static constexpr char rpc_magic[] = "SSTARRPC"; |
66 | ||
f67539c2 TL |
67 | /// \addtogroup rpc |
68 | /// @{ | |
69 | ||
11fdf7f2 TL |
70 | /// Specifies resource isolation for a connection. |
71 | struct isolation_config { | |
72 | /// Specifies a scheduling group under which the connection (and all its | |
73 | /// verb handlers) will execute. | |
74 | scheduling_group sched_group = current_scheduling_group(); | |
75 | }; | |
76 | ||
77 | /// Default isolation configuration - run everything in the default scheduling group. | |
f67539c2 TL |
78 | /// |
79 | /// In the scheduling_group that the protocol::server was created in. | |
11fdf7f2 TL |
80 | isolation_config default_isolate_connection(sstring isolation_cookie); |
81 | ||
82 | /// \brief Resource limits for an RPC server | |
83 | /// | |
84 | /// A request's memory use will be estimated as | |
85 | /// | |
86 | /// req_mem = basic_request_size * sizeof(serialized_request) * bloat_factor | |
87 | /// | |
88 | /// Concurrent requests will be limited so that | |
89 | /// | |
90 | /// sum(req_mem) <= max_memory | |
91 | /// | |
92 | /// \see server | |
93 | struct resource_limits { | |
94 | size_t basic_request_size = 0; ///< Minimum request footprint in memory | |
95 | unsigned bloat_factor = 1; ///< Serialized size multiplied by this to estimate memory used by request | |
96 | size_t max_memory = rpc_semaphore::max_counter(); ///< Maximum amount of memory that may be consumed by all requests | |
97 | /// Configures isolation for a connection based on its isolation cookie. May throw, | |
98 | /// in which case the connection will be terminated. | |
99 | std::function<isolation_config (sstring isolation_cookie)> isolate_connection = default_isolate_connection; | |
100 | }; | |
101 | ||
102 | struct client_options { | |
f67539c2 | 103 | std::optional<net::tcp_keepalive_params> keepalive; |
11fdf7f2 | 104 | bool tcp_nodelay = true; |
9f95a23c | 105 | bool reuseaddr = false; |
11fdf7f2 TL |
106 | compressor::factory* compressor_factory = nullptr; |
107 | bool send_timeout_data = true; | |
108 | connection_id stream_parent = invalid_connection_id; | |
109 | /// Configures how this connection is isolated from other connection on the same server. | |
110 | /// | |
111 | /// \see resource_limits::isolate_connection | |
112 | sstring isolation_cookie; | |
113 | }; | |
114 | ||
f67539c2 TL |
115 | /// @} |
116 | ||
11fdf7f2 TL |
117 | // RPC call that passes stream connection id as a parameter |
118 | // may arrive to a different shard from where the stream connection | |
119 | // was opened, so the connection id is not known to a server that handles | |
120 | // the RPC call. The shard that the stream connection belong to is know | |
121 | // since it is a part of connection id, but this is not enough to locate | |
122 | // a server instance the connection belongs to if there are more than one | |
123 | // server on the shard. Stream domain parameter is here to help with that. | |
124 | // Different servers on all shards logically belonging to the same service should | |
125 | // belong to the same streaming domain. Only one server on each shard can belong to | |
126 | // a particulr streaming domain. | |
127 | class streaming_domain_type { | |
128 | uint64_t _id; | |
129 | public: | |
130 | explicit streaming_domain_type(uint64_t id) : _id(id) {} | |
131 | bool operator==(const streaming_domain_type& o) const { | |
132 | return _id == o._id; | |
133 | } | |
134 | friend struct std::hash<streaming_domain_type>; | |
135 | friend std::ostream& operator<<(std::ostream&, const streaming_domain_type&); | |
136 | }; | |
137 | ||
f67539c2 TL |
138 | /// \addtogroup rpc |
139 | /// @{ | |
140 | ||
20effc67 TL |
141 | class server; |
142 | ||
11fdf7f2 TL |
143 | struct server_options { |
144 | compressor::factory* compressor_factory = nullptr; | |
145 | bool tcp_nodelay = true; | |
f67539c2 | 146 | std::optional<streaming_domain_type> streaming_domain; |
11fdf7f2 | 147 | server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_; |
20effc67 TL |
148 | // optional filter function. If set, will be called with remote |
149 | // (connecting) address. | |
150 | // Returning false will refuse the incoming connection. | |
151 | // Returning true will allow the mechanism to proceed. | |
152 | std::function<bool(const socket_address&)> filter_connection = {}; | |
11fdf7f2 TL |
153 | }; |
154 | ||
f67539c2 TL |
155 | /// @} |
156 | ||
11fdf7f2 TL |
157 | inline |
158 | size_t | |
159 | estimate_request_size(const resource_limits& lim, size_t serialized_size) { | |
160 | return lim.basic_request_size + serialized_size * lim.bloat_factor; | |
161 | } | |
162 | ||
163 | struct negotiation_frame { | |
164 | char magic[sizeof(rpc_magic) - 1]; | |
165 | uint32_t len; // additional negotiation data length; multiple negotiation_frame_feature_record structs | |
166 | }; | |
167 | ||
168 | enum class protocol_features : uint32_t { | |
169 | COMPRESS = 0, | |
170 | TIMEOUT = 1, | |
171 | CONNECTION_ID = 2, | |
172 | STREAM_PARENT = 3, | |
173 | ISOLATION = 4, | |
174 | }; | |
175 | ||
176 | // internal representation of feature data | |
177 | using feature_map = std::map<protocol_features, sstring>; | |
178 | ||
179 | // An rpc signature, in the form signature<Ret (In0, In1, In2)>. | |
180 | template <typename Function> | |
181 | struct signature; | |
182 | ||
183 | class logger { | |
184 | std::function<void(const sstring&)> _logger; | |
f67539c2 | 185 | ::seastar::logger* _seastar_logger = nullptr; |
11fdf7f2 | 186 | |
f67539c2 | 187 | // _seastar_logger will always be used first if it's available |
11fdf7f2 | 188 | void log(const sstring& str) const { |
f67539c2 TL |
189 | if (_seastar_logger) { |
190 | // default level for log messages is `info` | |
191 | _seastar_logger->info("{}", str); | |
192 | } else if (_logger) { | |
11fdf7f2 TL |
193 | _logger(str); |
194 | } | |
195 | } | |
196 | ||
f67539c2 TL |
197 | // _seastar_logger will always be used first if it's available |
198 | template <typename... Args> | |
199 | void log(log_level level, const char* fmt, Args&&... args) const { | |
200 | if (_seastar_logger) { | |
201 | _seastar_logger->log(level, fmt, std::forward<Args>(args)...); | |
202 | // If the log level is at least `info`, fall back to legacy logging without explicit level. | |
203 | // Ignore less severe levels in order not to spam user's log with messages during transition, | |
204 | // i.e. when the user still only defines a level-less logger. | |
205 | } else if (_logger && level <= log_level::info) { | |
206 | _logger(format(fmt, std::forward<Args>(args)...)); | |
207 | } | |
208 | } | |
209 | ||
11fdf7f2 TL |
210 | public: |
211 | void set(std::function<void(const sstring&)> l) { | |
212 | _logger = std::move(l); | |
213 | } | |
214 | ||
f67539c2 TL |
215 | void set(::seastar::logger* logger) { |
216 | _seastar_logger = logger; | |
217 | } | |
218 | ||
9f95a23c | 219 | void operator()(const client_info& info, id_type msg_id, const sstring& str) const; |
f67539c2 | 220 | void operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const; |
11fdf7f2 | 221 | |
9f95a23c | 222 | void operator()(const client_info& info, const sstring& str) const; |
f67539c2 | 223 | void operator()(const client_info& info, log_level level, std::string_view str) const; |
11fdf7f2 | 224 | |
9f95a23c | 225 | void operator()(const socket_address& addr, const sstring& str) const; |
f67539c2 | 226 | void operator()(const socket_address& addr, log_level level, std::string_view str) const; |
11fdf7f2 TL |
227 | }; |
228 | ||
229 | class connection { | |
230 | protected: | |
231 | connected_socket _fd; | |
232 | input_stream<char> _read_buf; | |
233 | output_stream<char> _write_buf; | |
234 | bool _error = false; | |
235 | bool _connected = false; | |
236 | promise<> _stopped; | |
237 | stats _stats; | |
238 | const logger& _logger; | |
239 | // The owner of the pointer below is an instance of rpc::protocol<typename Serializer> class. | |
240 | // The type of the pointer is erased here, but the original type is Serializer | |
241 | void* _serializer; | |
242 | struct outgoing_entry { | |
243 | timer<rpc_clock_type> t; | |
244 | snd_buf buf; | |
f67539c2 | 245 | std::optional<promise<>> p = promise<>(); |
11fdf7f2 TL |
246 | cancellable* pcancel = nullptr; |
247 | outgoing_entry(snd_buf b) : buf(std::move(b)) {} | |
f67539c2 TL |
248 | outgoing_entry(outgoing_entry&& o) noexcept : t(std::move(o.t)), buf(std::move(o.buf)), p(std::move(o.p)), pcancel(o.pcancel) { |
249 | o.p = std::nullopt; | |
11fdf7f2 TL |
250 | } |
251 | ~outgoing_entry() { | |
252 | if (p) { | |
253 | if (pcancel) { | |
254 | pcancel->cancel_send = std::function<void()>(); | |
255 | pcancel->send_back_pointer = nullptr; | |
256 | } | |
257 | p->set_value(); | |
258 | } | |
259 | } | |
260 | }; | |
261 | friend outgoing_entry; | |
262 | std::list<outgoing_entry> _outgoing_queue; | |
263 | condition_variable _outgoing_queue_cond; | |
264 | future<> _send_loop_stopped = make_ready_future<>(); | |
265 | std::unique_ptr<compressor> _compressor; | |
266 | bool _timeout_negotiated = false; | |
267 | // stream related fields | |
268 | bool _is_stream = false; | |
269 | connection_id _id = invalid_connection_id; | |
270 | ||
271 | std::unordered_map<connection_id, xshard_connection_ptr> _streams; | |
272 | queue<rcv_buf> _stream_queue = queue<rcv_buf>(max_queued_stream_buffers); | |
273 | semaphore _stream_sem = semaphore(max_stream_buffers_memory); | |
9f95a23c TL |
274 | bool _sink_closed = true; |
275 | bool _source_closed = true; | |
11fdf7f2 TL |
276 | // the future holds if sink is already closed |
277 | // if it is not ready it means the sink is been closed | |
278 | future<bool> _sink_closed_future = make_ready_future<bool>(false); | |
279 | ||
280 | bool is_stream() { | |
281 | return _is_stream; | |
282 | } | |
283 | ||
284 | snd_buf compress(snd_buf buf); | |
285 | future<> send_buffer(snd_buf buf); | |
286 | ||
287 | enum class outgoing_queue_type { | |
288 | request, | |
289 | response, | |
290 | stream = response | |
291 | }; | |
292 | ||
293 | template<outgoing_queue_type QueueType> void send_loop(); | |
294 | future<> stop_send_loop(); | |
f67539c2 | 295 | future<std::optional<rcv_buf>> read_stream_frame_compressed(input_stream<char>& in); |
11fdf7f2 TL |
296 | bool stream_check_twoway_closed() { |
297 | return _sink_closed && _source_closed; | |
298 | } | |
299 | future<> stream_close(); | |
300 | future<> stream_process_incoming(rcv_buf&&); | |
301 | future<> handle_stream_frame(); | |
302 | ||
303 | public: | |
304 | connection(connected_socket&& fd, const logger& l, void* s, connection_id id = invalid_connection_id) : connection(l, s, id) { | |
305 | set_socket(std::move(fd)); | |
306 | } | |
307 | connection(const logger& l, void* s, connection_id id = invalid_connection_id) : _logger(l), _serializer(s), _id(id) {} | |
308 | virtual ~connection() {} | |
309 | void set_socket(connected_socket&& fd); | |
310 | future<> send_negotiation_frame(feature_map features); | |
311 | // functions below are public because they are used by external heavily templated functions | |
312 | // and I am not smart enough to know how to define them as friends | |
f67539c2 | 313 | future<> send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout = {}, cancellable* cancel = nullptr); |
11fdf7f2 TL |
314 | bool error() { return _error; } |
315 | void abort(); | |
20effc67 | 316 | future<> stop() noexcept; |
11fdf7f2 TL |
317 | future<> stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs); |
318 | future<> close_sink() { | |
319 | _sink_closed = true; | |
320 | if (stream_check_twoway_closed()) { | |
321 | return stream_close(); | |
322 | } | |
323 | return make_ready_future(); | |
324 | } | |
325 | bool sink_closed() { | |
326 | return _sink_closed; | |
327 | } | |
328 | future<> close_source() { | |
329 | _source_closed = true; | |
330 | if (stream_check_twoway_closed()) { | |
331 | return stream_close(); | |
332 | } | |
333 | return make_ready_future(); | |
334 | } | |
335 | connection_id get_connection_id() const { | |
336 | return _id; | |
337 | } | |
338 | xshard_connection_ptr get_stream(connection_id id) const; | |
339 | void register_stream(connection_id id, xshard_connection_ptr c); | |
9f95a23c | 340 | virtual socket_address peer_address() const = 0; |
11fdf7f2 TL |
341 | |
342 | const logger& get_logger() const { | |
343 | return _logger; | |
344 | } | |
345 | ||
346 | template<typename Serializer> | |
347 | Serializer& serializer() { | |
348 | return *static_cast<Serializer*>(_serializer); | |
349 | } | |
350 | ||
9f95a23c TL |
351 | template <typename FrameType> |
352 | typename FrameType::return_type read_frame(socket_address info, input_stream<char>& in); | |
11fdf7f2 | 353 | |
9f95a23c TL |
354 | template <typename FrameType> |
355 | typename FrameType::return_type read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in); | |
11fdf7f2 | 356 | friend class client; |
9f95a23c TL |
357 | template<typename Serializer, typename... Out> |
358 | friend class sink_impl; | |
359 | template<typename Serializer, typename... In> | |
360 | friend class source_impl; | |
11fdf7f2 TL |
361 | }; |
362 | ||
20effc67 TL |
363 | struct deferred_snd_buf { |
364 | promise<> pr; | |
365 | snd_buf data; | |
366 | }; | |
367 | ||
11fdf7f2 TL |
368 | // send data Out... |
369 | template<typename Serializer, typename... Out> | |
370 | class sink_impl : public sink<Out...>::impl { | |
20effc67 TL |
371 | // Used on the shard *this lives on. |
372 | alignas (cache_line_size) uint64_t _next_seq_num = 1; | |
373 | ||
374 | // Used on the shard the _conn lives on. | |
375 | struct alignas (cache_line_size) { | |
376 | uint64_t last_seq_num = 0; | |
377 | std::map<uint64_t, deferred_snd_buf> out_of_order_bufs; | |
378 | } _remote_state; | |
11fdf7f2 | 379 | public: |
9f95a23c | 380 | sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; } |
11fdf7f2 TL |
381 | future<> operator()(const Out&... args) override; |
382 | future<> close() override; | |
9f95a23c | 383 | future<> flush() override; |
f67539c2 | 384 | ~sink_impl() override; |
11fdf7f2 TL |
385 | }; |
386 | ||
387 | // receive data In... | |
388 | template<typename Serializer, typename... In> | |
389 | class source_impl : public source<In...>::impl { | |
390 | public: | |
9f95a23c | 391 | source_impl(xshard_connection_ptr con) : source<In...>::impl(std::move(con)) { this->_con->get()->_source_closed = false; } |
f67539c2 | 392 | future<std::optional<std::tuple<In...>>> operator()() override; |
11fdf7f2 TL |
393 | }; |
394 | ||
395 | class client : public rpc::connection, public weakly_referencable<client> { | |
396 | socket _socket; | |
397 | id_type _message_id = 1; | |
398 | struct reply_handler_base { | |
399 | timer<rpc_clock_type> t; | |
400 | cancellable* pcancel = nullptr; | |
401 | virtual void operator()(client&, id_type, rcv_buf data) = 0; | |
402 | virtual void timeout() {} | |
403 | virtual void cancel() {} | |
404 | virtual ~reply_handler_base() { | |
405 | if (pcancel) { | |
406 | pcancel->cancel_wait = std::function<void()>(); | |
407 | pcancel->wait_back_pointer = nullptr; | |
408 | } | |
409 | }; | |
410 | }; | |
411 | public: | |
412 | template<typename Reply, typename Func> | |
413 | struct reply_handler final : reply_handler_base { | |
414 | Func func; | |
415 | Reply reply; | |
416 | reply_handler(Func&& f) : func(std::move(f)) {} | |
417 | virtual void operator()(client& client, id_type msg_id, rcv_buf data) override { | |
418 | return func(reply, client, msg_id, std::move(data)); | |
419 | } | |
420 | virtual void timeout() override { | |
421 | reply.done = true; | |
422 | reply.p.set_exception(timeout_error()); | |
423 | } | |
424 | virtual void cancel() override { | |
425 | reply.done = true; | |
426 | reply.p.set_exception(canceled_error()); | |
427 | } | |
428 | virtual ~reply_handler() {} | |
429 | }; | |
430 | private: | |
431 | std::unordered_map<id_type, std::unique_ptr<reply_handler_base>> _outstanding; | |
20effc67 | 432 | socket_address _server_addr, _local_addr; |
11fdf7f2 | 433 | client_options _options; |
f67539c2 | 434 | std::optional<shared_promise<>> _client_negotiated = shared_promise<>(); |
11fdf7f2 TL |
435 | weak_ptr<client> _parent; // for stream clients |
436 | ||
437 | private: | |
438 | future<> negotiate_protocol(input_stream<char>& in); | |
439 | void negotiate(feature_map server_features); | |
f67539c2 | 440 | future<std::tuple<int64_t, std::optional<rcv_buf>>> |
11fdf7f2 | 441 | read_response_frame(input_stream<char>& in); |
f67539c2 | 442 | future<std::tuple<int64_t, std::optional<rcv_buf>>> |
11fdf7f2 TL |
443 | read_response_frame_compressed(input_stream<char>& in); |
444 | void send_loop() { | |
445 | if (is_stream()) { | |
446 | rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>(); | |
447 | } else { | |
448 | rpc::connection::send_loop<rpc::connection::outgoing_queue_type::request>(); | |
449 | } | |
450 | } | |
451 | public: | |
452 | /** | |
453 | * Create client object which will attempt to connect to the remote address. | |
454 | * | |
f67539c2 TL |
455 | * @param l \ref seastar::logger to use for logging error messages |
456 | * @param s an optional connection serializer | |
11fdf7f2 TL |
457 | * @param addr the remote address identifying this client |
458 | * @param local the local address of this client | |
459 | */ | |
9f95a23c TL |
460 | client(const logger& l, void* s, const socket_address& addr, const socket_address& local = {}); |
461 | client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local = {}); | |
11fdf7f2 TL |
462 | |
463 | /** | |
464 | * Create client object which will attempt to connect to the remote address using the | |
465 | * specified seastar::socket. | |
466 | * | |
f67539c2 TL |
467 | * @param l \ref seastar::logger to use for logging error messages |
468 | * @param s an optional connection serializer | |
11fdf7f2 TL |
469 | * @param addr the remote address identifying this client |
470 | * @param local the local address of this client | |
471 | * @param socket the socket object use to connect to the remote address | |
472 | */ | |
9f95a23c TL |
473 | client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local = {}); |
474 | client(const logger& l, void* s, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}); | |
11fdf7f2 TL |
475 | |
476 | stats get_stats() const; | |
477 | stats& get_stats_internal() { | |
478 | return _stats; | |
479 | } | |
480 | auto next_message_id() { return _message_id++; } | |
f67539c2 | 481 | void wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel); |
11fdf7f2 | 482 | void wait_timed_out(id_type id); |
20effc67 | 483 | future<> stop() noexcept; |
11fdf7f2 TL |
484 | void abort_all_streams(); |
485 | void deregister_this_stream(); | |
9f95a23c | 486 | socket_address peer_address() const override { |
11fdf7f2 TL |
487 | return _server_addr; |
488 | } | |
489 | future<> await_connection() { | |
490 | if (!_client_negotiated) { | |
491 | return make_ready_future<>(); | |
492 | } else { | |
493 | return _client_negotiated->get_shared_future(); | |
494 | } | |
495 | } | |
496 | template<typename Serializer, typename... Out> | |
497 | future<sink<Out...>> make_stream_sink(socket socket) { | |
498 | return await_connection().then([this, socket = std::move(socket)] () mutable { | |
499 | if (!this->get_connection_id()) { | |
500 | return make_exception_future<sink<Out...>>(std::runtime_error("Streaming is not supported by the server")); | |
501 | } | |
502 | client_options o = _options; | |
503 | o.stream_parent = this->get_connection_id(); | |
504 | o.send_timeout_data = false; | |
20effc67 | 505 | auto c = make_shared<client>(_logger, _serializer, o, std::move(socket), _server_addr, _local_addr); |
11fdf7f2 TL |
506 | c->_parent = this->weak_from_this(); |
507 | c->_is_stream = true; | |
508 | return c->await_connection().then([c, this] { | |
509 | xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c))); | |
510 | this->register_stream(c->get_connection_id(), s); | |
511 | return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s))); | |
512 | }); | |
513 | }); | |
514 | } | |
515 | template<typename Serializer, typename... Out> | |
516 | future<sink<Out...>> make_stream_sink() { | |
f67539c2 | 517 | return make_stream_sink<Serializer, Out...>(make_socket()); |
11fdf7f2 TL |
518 | } |
519 | }; | |
520 | ||
521 | class protocol_base; | |
522 | ||
523 | class server { | |
524 | private: | |
525 | static thread_local std::unordered_map<streaming_domain_type, server*> _servers; | |
526 | ||
527 | public: | |
528 | class connection : public rpc::connection, public enable_shared_from_this<connection> { | |
529 | server& _server; | |
530 | client_info _info; | |
531 | connection_id _parent_id = invalid_connection_id; | |
f67539c2 | 532 | std::optional<isolation_config> _isolation_config; |
11fdf7f2 TL |
533 | private: |
534 | future<> negotiate_protocol(input_stream<char>& in); | |
f67539c2 | 535 | future<std::tuple<std::optional<uint64_t>, uint64_t, int64_t, std::optional<rcv_buf>>> |
11fdf7f2 TL |
536 | read_request_frame_compressed(input_stream<char>& in); |
537 | future<feature_map> negotiate(feature_map requested); | |
538 | void send_loop() { | |
539 | if (is_stream()) { | |
540 | rpc::connection::send_loop<rpc::connection::outgoing_queue_type::stream>(); | |
541 | } else { | |
542 | rpc::connection::send_loop<rpc::connection::outgoing_queue_type::response>(); | |
543 | } | |
544 | } | |
f67539c2 | 545 | future<> send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type); |
11fdf7f2 TL |
546 | public: |
547 | connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* seralizer, connection_id id); | |
548 | future<> process(); | |
f67539c2 | 549 | future<> respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout); |
11fdf7f2 TL |
550 | client_info& info() { return _info; } |
551 | const client_info& info() const { return _info; } | |
552 | stats get_stats() const { | |
553 | stats res = _stats; | |
554 | res.pending = _outgoing_queue.size(); | |
555 | return res; | |
556 | } | |
557 | ||
558 | stats& get_stats_internal() { | |
559 | return _stats; | |
560 | } | |
9f95a23c TL |
561 | socket_address peer_address() const override { |
562 | return _info.addr; | |
11fdf7f2 TL |
563 | } |
564 | // Resources will be released when this goes out of scope | |
f67539c2 | 565 | future<resource_permit> wait_for_resources(size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) { |
11fdf7f2 TL |
566 | if (timeout) { |
567 | return get_units(_server._resources_available, memory_consumed, *timeout); | |
568 | } else { | |
569 | return get_units(_server._resources_available, memory_consumed); | |
570 | } | |
571 | } | |
572 | size_t estimate_request_size(size_t serialized_size) { | |
573 | return rpc::estimate_request_size(_server._limits, serialized_size); | |
574 | } | |
575 | size_t max_request_size() const { | |
576 | return _server._limits.max_memory; | |
577 | } | |
578 | server& get_server() { | |
579 | return _server; | |
580 | } | |
581 | future<> deregister_this_stream(); | |
582 | }; | |
583 | private: | |
584 | protocol_base* _proto; | |
f67539c2 | 585 | server_socket _ss; |
11fdf7f2 TL |
586 | resource_limits _limits; |
587 | rpc_semaphore _resources_available; | |
588 | std::unordered_map<connection_id, shared_ptr<connection>> _conns; | |
589 | promise<> _ss_stopped; | |
590 | gate _reply_gate; | |
591 | server_options _options; | |
592 | uint64_t _next_client_id = 1; | |
593 | ||
594 | public: | |
9f95a23c TL |
595 | server(protocol_base* proto, const socket_address& addr, resource_limits memory_limit = resource_limits()); |
596 | server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()); | |
11fdf7f2 TL |
597 | server(protocol_base* proto, server_socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{}); |
598 | server(protocol_base* proto, server_options opts, server_socket, resource_limits memory_limit = resource_limits()); | |
599 | void accept(); | |
600 | future<> stop(); | |
601 | template<typename Func> | |
602 | void foreach_connection(Func&& f) { | |
603 | for (auto c : _conns) { | |
604 | f(*c.second); | |
605 | } | |
606 | } | |
607 | gate& reply_gate() { | |
608 | return _reply_gate; | |
609 | } | |
610 | friend connection; | |
611 | friend client; | |
612 | }; | |
613 | ||
f67539c2 | 614 | using rpc_handler_func = std::function<future<> (shared_ptr<server::connection>, std::optional<rpc_clock_type::time_point> timeout, int64_t msgid, |
11fdf7f2 TL |
615 | rcv_buf data)>; |
616 | ||
617 | struct rpc_handler { | |
618 | scheduling_group sg; | |
619 | rpc_handler_func func; | |
9f95a23c | 620 | gate use_gate; |
11fdf7f2 TL |
621 | }; |
622 | ||
623 | class protocol_base { | |
624 | public: | |
625 | virtual ~protocol_base() {}; | |
626 | virtual shared_ptr<server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) = 0; | |
9f95a23c TL |
627 | protected: |
628 | friend class server; | |
629 | ||
11fdf7f2 | 630 | virtual rpc_handler* get_handler(uint64_t msg_id) = 0; |
9f95a23c | 631 | virtual void put_handler(rpc_handler*) = 0; |
11fdf7f2 TL |
632 | }; |
633 | ||
f67539c2 TL |
634 | /// \addtogroup rpc |
635 | /// @{ | |
636 | ||
637 | /// Defines a protocol for communication between a server and a client. | |
638 | /// | |
639 | /// A protocol is defined by a `Serializer` and a `MsgType`. The `Serializer` is | |
640 | /// responsible for serializing and unserializing all types used as arguments and | |
641 | /// return types used in the protocol. The `Serializer` is expected to define a | |
642 | /// `read()` and `write()` method for each such type `T` as follows: | |
643 | /// | |
644 | /// template <typename Output> | |
645 | /// void write(const serializer&, Output& output, const T& data); | |
646 | /// | |
647 | /// template <typename Input> | |
648 | /// T read(const serializer&, Input& input, type<T> type_tag); // type_tag used to disambiguate | |
649 | /// | |
650 | /// Where `Input` and `Output` have a `void read(char*, size_t)` and | |
651 | /// `write(const char*, size_t)` respectively. | |
652 | /// `MsgType` defines the type to be used as the message id, the id which is | |
653 | /// used to identify different messages used in the protocol. These are also | |
654 | /// often referred to as "verbs". The client will use the message id, to | |
655 | /// specify the remote method (verb) to invoke on the server. The server uses | |
656 | /// the message id to dispatch the incoming call to the right handler. | |
657 | /// `MsgType` should be hashable and serializable. It is preferable to use enum | |
658 | /// for message types, but do not forget to provide hash function for it. | |
659 | /// | |
660 | /// Use register_handler() on the server to define the available verbs and the | |
661 | /// code to be executed when they are invoked by clients. Use make_client() on | |
662 | /// the client to create a matching callable that can be used to invoke the | |
663 | /// verb on the server and wait for its result. Note that register_handler() | |
664 | /// also returns a client, that can be used to invoke the registered verb on | |
665 | /// another node (given that the other node has the same verb). This is useful | |
666 | /// for symmetric protocols, where two or more nodes all have servers as well as | |
667 | /// connect to the other nodes as clients. | |
668 | /// | |
669 | /// Use protocol::server to listen for and accept incoming connections on the | |
670 | /// server and protocol::client to establish connections to the server. | |
671 | /// Note that registering the available verbs can be done before/after | |
672 | /// listening for connections, but best to ensure that by the time incoming | |
673 | /// requests are to be expected, all the verbs are set-up. | |
674 | /// | |
675 | /// ## Configuration | |
676 | /// | |
677 | /// TODO | |
678 | /// | |
679 | /// ## Isolation | |
680 | /// | |
681 | /// RPC supports isolating verb handlers from each other. There are two ways to | |
682 | /// achieve this: per-handler isolation (the old way) and per-connection | |
683 | /// isolation (the new way). If no isolation is configured, all handlers will be | |
684 | /// executed in the context of the scheduling_group in which the | |
685 | /// protocol::server was created. | |
686 | /// | |
687 | /// Per-handler isolation (the old way) can be configured by using the | |
688 | /// register_handler() overload which takes a scheduling_group. When invoked, | |
689 | /// the body of the handler will be executed from the context of the configured | |
690 | /// scheduling_group. | |
691 | /// | |
692 | /// Per-connection isolation (the new way) is a more flexible mechanism that | |
693 | /// requires user application provided logic to determine how connections are | |
694 | /// isolated. This mechanism has two parts, the server and the client part. | |
695 | /// The client configures isolation by setting client_options::isolation_cookie. | |
696 | /// This cookie is an opaque (to the RPC layer) string that is to be interpreted | |
697 | /// on the server using user application provided logic. The application | |
698 | /// provides this logic to the server by setting | |
699 | /// resource_limits::isolate_connection to an appropriate handler function, that | |
700 | /// interprets the opaque cookie and resolves it to an isolation_config. The | |
701 | /// scheduling_group in the former will be used not just to execute all verb | |
702 | /// handlers, but also the connection loop itself, hence providing better | |
703 | /// isolation. | |
704 | /// | |
705 | /// There a few gotchas related to mixing the two isolation mechanisms. This can | |
706 | /// happen when the application is updated and one of the client/server is | |
707 | /// still using the old/new mechanism. In general per-connection isolation | |
708 | /// overrides the per-handler one. If both are set up, the former will determine | |
709 | /// the scheduling_group context for the handlers. If the client is not | |
710 | /// configured to send an isolation cookie, the server's | |
711 | /// resource_limits::isolate_connection will not be invoked and the server will | |
712 | /// fall back to per-handler isolation if configured. If the client is | |
713 | /// configured to send an isolation cookie but the server doesn't have a | |
714 | /// resource_limits::isolate_connection configured, it will use | |
715 | /// default_isolate_connection() to interpret the cookie. Note that this still | |
716 | /// overrides the per-handler isolation if any is configured. If the server is | |
717 | /// so old that it doesn't have the per-connection isolation feature at all, it | |
718 | /// will of course just use the per-handler one, if configured. | |
719 | /// | |
720 | /// ## Compatibility | |
721 | /// | |
722 | /// TODO | |
723 | /// | |
724 | /// \tparam Serializer the serializer for the protocol. | |
725 | /// \tparam MsgType the type to be used as the message id or verb id. | |
11fdf7f2 | 726 | template<typename Serializer, typename MsgType = uint32_t> |
20effc67 | 727 | class protocol final : public protocol_base { |
11fdf7f2 | 728 | public: |
f67539c2 | 729 | /// Represents the listening port and all accepted connections. |
11fdf7f2 TL |
730 | class server : public rpc::server { |
731 | public: | |
9f95a23c | 732 | server(protocol& proto, const socket_address& addr, resource_limits memory_limit = resource_limits()) : |
11fdf7f2 | 733 | rpc::server(&proto, addr, memory_limit) {} |
9f95a23c | 734 | server(protocol& proto, server_options opts, const socket_address& addr, resource_limits memory_limit = resource_limits()) : |
11fdf7f2 TL |
735 | rpc::server(&proto, opts, addr, memory_limit) {} |
736 | server(protocol& proto, server_socket socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{}) : | |
737 | rpc::server(&proto, std::move(socket), memory_limit) {} | |
738 | server(protocol& proto, server_options opts, server_socket socket, resource_limits memory_limit = resource_limits()) : | |
739 | rpc::server(&proto, opts, std::move(socket), memory_limit) {} | |
740 | }; | |
f67539c2 | 741 | /// Represents a client side connection. |
11fdf7f2 TL |
742 | class client : public rpc::client { |
743 | public: | |
744 | /* | |
745 | * Create client object which will attempt to connect to the remote address. | |
746 | * | |
747 | * @param addr the remote address identifying this client | |
748 | * @param local the local address of this client | |
749 | */ | |
9f95a23c | 750 | client(protocol& p, const socket_address& addr, const socket_address& local = {}) : |
11fdf7f2 | 751 | rpc::client(p.get_logger(), &p._serializer, addr, local) {} |
9f95a23c | 752 | client(protocol& p, client_options options, const socket_address& addr, const socket_address& local = {}) : |
11fdf7f2 TL |
753 | rpc::client(p.get_logger(), &p._serializer, options, addr, local) {} |
754 | ||
755 | /** | |
756 | * Create client object which will attempt to connect to the remote address using the | |
757 | * specified seastar::socket. | |
758 | * | |
759 | * @param addr the remote address identifying this client | |
760 | * @param local the local address of this client | |
761 | * @param socket the socket object use to connect to the remote address | |
762 | */ | |
9f95a23c | 763 | client(protocol& p, socket socket, const socket_address& addr, const socket_address& local = {}) : |
11fdf7f2 | 764 | rpc::client(p.get_logger(), &p._serializer, std::move(socket), addr, local) {} |
9f95a23c | 765 | client(protocol& p, client_options options, socket socket, const socket_address& addr, const socket_address& local = {}) : |
11fdf7f2 TL |
766 | rpc::client(p.get_logger(), &p._serializer, options, std::move(socket), addr, local) {} |
767 | }; | |
768 | ||
769 | friend server; | |
770 | private: | |
771 | std::unordered_map<MsgType, rpc_handler> _handlers; | |
772 | Serializer _serializer; | |
773 | logger _logger; | |
774 | ||
775 | public: | |
776 | protocol(Serializer&& serializer) : _serializer(std::forward<Serializer>(serializer)) {} | |
f67539c2 TL |
777 | |
778 | /// Creates a callable that can be used to invoke the verb on the remote. | |
779 | /// | |
780 | /// \tparam Func The signature of the verb. Has to be either the same or | |
781 | /// compatible with the one passed to register_handler on the server. | |
782 | /// \param t the verb to invoke on the remote. | |
783 | /// | |
784 | /// \returns a callable whose signature is derived from Func as follows: | |
785 | /// given `Func == Ret(Args...)` the returned callable has the following | |
786 | /// signature: `future<Ret>(protocol::client&, Args...)`. | |
11fdf7f2 TL |
787 | template<typename Func> |
788 | auto make_client(MsgType t); | |
789 | ||
f67539c2 TL |
790 | /// Register a handler to be called when this verb is invoked. |
791 | /// | |
792 | /// \tparam Func the type of the handler for the verb. This determines the | |
793 | /// signature of the verb. | |
794 | /// \param t the verb to register the handler for. | |
795 | /// \param func the callable to be called when the verb is invoked by the | |
796 | /// remote. | |
797 | /// | |
798 | /// \returns a client, a callable that can be used to invoke the verb. See | |
799 | /// make_client(). The client can be discarded, in fact this is what | |
800 | /// most callers will do as real clients will live on a remote node, not | |
801 | /// on the one where handlers are registered. | |
11fdf7f2 TL |
802 | template<typename Func> |
803 | auto register_handler(MsgType t, Func&& func); | |
804 | ||
f67539c2 TL |
805 | /// Register a handler to be called when this verb is invoked. |
806 | /// | |
807 | /// \tparam Func the type of the handler for the verb. This determines the | |
808 | /// signature of the verb. | |
809 | /// \param t the verb to register the handler for. | |
810 | /// \param sg the scheduling group that will be used to invoke the handler | |
811 | /// in. This can be used to execute different verbs in different | |
812 | /// scheduling groups. Note that there is a newer mechanism to determine | |
813 | /// the scheduling groups a handler will run it per invocation, see | |
814 | /// isolation_config. | |
815 | /// \param func the callable to be called when the verb is invoked by the | |
816 | /// remote. | |
817 | /// | |
818 | /// \returns a client, a callable that can be used to invoke the verb. See | |
819 | /// make_client(). The client can be discarded, in fact this is what | |
820 | /// most callers will do as real clients will live on a remote node, not | |
821 | /// on the one where handlers are registered. | |
11fdf7f2 TL |
822 | template <typename Func> |
823 | auto register_handler(MsgType t, scheduling_group sg, Func&& func); | |
824 | ||
f67539c2 TL |
825 | /// Unregister the handler for the verb. |
826 | /// | |
827 | /// Waits for all currently running handlers, then unregisters the handler. | |
828 | /// Future attempts to invoke the verb will fail. This becomes effective | |
829 | /// immediately after calling this function. | |
830 | /// | |
831 | /// \param t the verb to unregister the handler for. | |
832 | /// | |
833 | /// \returns a future that becomes available once all currently running | |
834 | /// handlers finished. | |
9f95a23c | 835 | future<> unregister_handler(MsgType t); |
11fdf7f2 | 836 | |
f67539c2 TL |
837 | /// Set a logger function to be used to log messages. |
838 | /// | |
839 | /// \deprecated use the logger overload set_logger(::seastar::logger*) | |
840 | /// instead. | |
841 | [[deprecated("Use set_logger(::seastar::logger*) instead")]] | |
11fdf7f2 TL |
842 | void set_logger(std::function<void(const sstring&)> logger) { |
843 | _logger.set(std::move(logger)); | |
844 | } | |
845 | ||
f67539c2 TL |
846 | /// Set a logger to be used to log messages. |
847 | void set_logger(::seastar::logger* logger) { | |
848 | _logger.set(logger); | |
849 | } | |
850 | ||
11fdf7f2 TL |
851 | const logger& get_logger() const { |
852 | return _logger; | |
853 | } | |
854 | ||
855 | shared_ptr<rpc::server::connection> make_server_connection(rpc::server& server, connected_socket fd, socket_address addr, connection_id id) override { | |
856 | return make_shared<rpc::server::connection>(server, std::move(fd), std::move(addr), _logger, &_serializer, id); | |
857 | } | |
858 | ||
f67539c2 TL |
859 | bool has_handler(MsgType msg_id); |
860 | ||
861 | /// Checks if any there are handlers registered. | |
862 | /// Debugging helper, should only be used for debugging and not relied on. | |
863 | /// | |
864 | /// \returns true if there are, false if there are no registered handlers. | |
865 | bool has_handlers() const noexcept { | |
866 | return !_handlers.empty(); | |
867 | } | |
11fdf7f2 TL |
868 | |
869 | private: | |
9f95a23c TL |
870 | rpc_handler* get_handler(uint64_t msg_id) override; |
871 | void put_handler(rpc_handler*) override; | |
872 | ||
11fdf7f2 TL |
873 | template<typename Ret, typename... In> |
874 | auto make_client(signature<Ret(In...)> sig, MsgType t); | |
875 | ||
876 | void register_receiver(MsgType t, rpc_handler&& handler) { | |
9f95a23c TL |
877 | auto r = _handlers.emplace(t, std::move(handler)); |
878 | if (!r.second) { | |
879 | throw_with_backtrace<std::runtime_error>("registered handler already exists"); | |
880 | } | |
11fdf7f2 TL |
881 | } |
882 | }; | |
f67539c2 TL |
883 | |
884 | /// @} | |
885 | ||
11fdf7f2 TL |
886 | } |
887 | ||
888 | } | |
889 | ||
890 | #include "rpc_impl.hh" |