1 #include <seastar/rpc/rpc.hh>
2 #include <seastar/core/align.hh>
3 #include <seastar/core/seastar.hh>
4 #include <seastar/core/print.hh>
5 #include <seastar/core/future-util.hh>
6 #include <boost/range/adaptor/map.hpp>
8 #if FMT_VERSION >= 90000
9 template <> struct fmt::formatter
<seastar::rpc::streaming_domain_type
> : fmt::ostream_formatter
{};
16 void logger::operator()(const client_info
& info
, id_type msg_id
, const sstring
& str
) const {
17 log(format("client {} msg_id {}: {}", info
.addr
, msg_id
, str
));
20 void logger::operator()(const client_info
& info
, id_type msg_id
, log_level level
, std::string_view str
) const {
21 log(level
, "client {} msg_id {}: {}", info
.addr
, msg_id
, str
);
24 void logger::operator()(const client_info
& info
, const sstring
& str
) const {
25 (*this)(info
.addr
, str
);
28 void logger::operator()(const client_info
& info
, log_level level
, std::string_view str
) const {
29 (*this)(info
.addr
, level
, str
);
32 void logger::operator()(const socket_address
& addr
, const sstring
& str
) const {
33 log(format("client {}: {}", addr
, str
));
36 void logger::operator()(const socket_address
& addr
, log_level level
, std::string_view str
) const {
37 log(level
, "client {}: {}", addr
, str
);
42 constexpr size_t snd_buf::chunk_size
;
44 snd_buf::snd_buf(size_t size_
) : size(size_
) {
45 if (size
<= chunk_size
) {
46 bufs
= temporary_buffer
<char>(size
);
48 std::vector
<temporary_buffer
<char>> v
;
49 v
.reserve(align_up(size_t(size
), chunk_size
) / chunk_size
);
51 v
.push_back(temporary_buffer
<char>(std::min(chunk_size
, size_
)));
52 size_
-= v
.back().size();
58 snd_buf::snd_buf(snd_buf
&&) noexcept
= default;
59 snd_buf
& snd_buf::operator=(snd_buf
&&) noexcept
= default;
61 temporary_buffer
<char>& snd_buf::front() {
62 auto* one
= std::get_if
<temporary_buffer
<char>>(&bufs
);
66 return std::get
<std::vector
<temporary_buffer
<char>>>(bufs
).front();
70 // Make a copy of a remote buffer. No data is actually copied, only pointers and
71 // a deleter of a new buffer takes care of deleting the original buffer
72 template<typename T
> // T is either snd_buf or rcv_buf
73 T
make_shard_local_buffer_copy(foreign_ptr
<std::unique_ptr
<T
>> org
) {
74 if (org
.get_owner_shard() == this_shard_id()) {
75 return std::move(*org
);
78 auto* one
= std::get_if
<temporary_buffer
<char>>(&org
->bufs
);
81 buf
.bufs
= temporary_buffer
<char>(one
->get_write(), one
->size(), make_object_deleter(std::move(org
)));
83 auto& orgbufs
= std::get
<std::vector
<temporary_buffer
<char>>>(org
->bufs
);
84 std::vector
<temporary_buffer
<char>> newbufs
;
85 newbufs
.reserve(orgbufs
.size());
86 deleter d
= make_object_deleter(std::move(org
));
87 for (auto&& b
: orgbufs
) {
88 newbufs
.push_back(temporary_buffer
<char>(b
.get_write(), b
.size(), d
.share()));
90 buf
.bufs
= std::move(newbufs
);
96 template snd_buf
make_shard_local_buffer_copy(foreign_ptr
<std::unique_ptr
<snd_buf
>>);
97 template rcv_buf
make_shard_local_buffer_copy(foreign_ptr
<std::unique_ptr
<rcv_buf
>>);
99 static void log_exception(connection
& c
, log_level level
, const char* log
, std::exception_ptr eptr
) {
102 std::rethrow_exception(eptr
);
103 } catch (std::exception
& ex
) {
106 s
= "unknown exception";
108 auto formatted
= format("{}: {}", log
, s
);
109 c
.get_logger()(c
.peer_address(), level
, std::string_view(formatted
.data(), formatted
.size()));
112 snd_buf
connection::compress(snd_buf buf
) {
114 buf
= _compressor
->compress(4, std::move(buf
));
115 static_assert(snd_buf::chunk_size
>= 4, "send buffer chunk size is too small");
116 write_le
<uint32_t>(buf
.front().get_write(), buf
.size
- 4);
122 future
<> connection::send_buffer(snd_buf buf
) {
123 auto* b
= std::get_if
<temporary_buffer
<char>>(&buf
.bufs
);
125 return _write_buf
.write(std::move(*b
));
127 return do_with(std::move(std::get
<std::vector
<temporary_buffer
<char>>>(buf
.bufs
)),
128 [this] (std::vector
<temporary_buffer
<char>>& ar
) {
129 return do_for_each(ar
.begin(), ar
.end(), [this] (auto& b
) {
130 return _write_buf
.write(std::move(b
));
136 future
<> connection::send_entry(outgoing_entry
& d
) {
137 if (_propagate_timeout
) {
138 static_assert(snd_buf::chunk_size
>= 8, "send buffer chunk size is too small");
139 if (_timeout_negotiated
) {
140 auto expire
= d
.t
.get_timeout();
142 if (expire
!= typename timer
<rpc_clock_type
>::time_point()) {
143 left
= std::chrono::duration_cast
<std::chrono::milliseconds
>(expire
- timer
<rpc_clock_type
>::clock::now()).count();
145 write_le
<uint64_t>(d
.buf
.front().get_write(), left
);
147 d
.buf
.front().trim_front(8);
151 auto buf
= compress(std::move(d
.buf
));
152 return send_buffer(std::move(buf
)).then([this] {
153 _stats
.sent_messages
++;
154 return _write_buf
.flush();
158 void connection::set_negotiated() noexcept
{
159 _negotiated
->set_value();
160 _negotiated
= std::nullopt
;
163 future
<> connection::stop_send_loop(std::exception_ptr ex
) {
166 _fd
.shutdown_output();
169 ex
= std::make_exception_ptr(closed_error());
171 while (!_outgoing_queue
.empty()) {
172 auto it
= std::prev(_outgoing_queue
.end());
173 // Cancel all but front entry normally. The front entry is sitting in the
174 // send_entry() and cannot be withdrawn, except when _negotiated is still
175 // engaged. In the latter case when it will be aborted below the entry's
176 // continuation will not be called and its done promise will not resolve
177 // the _outgoing_queue_ready, so do it here
178 if (it
!= _outgoing_queue
.begin()) {
182 it
->done
.set_exception(ex
);
188 _negotiated
->set_exception(ex
);
190 return when_all(std::move(_outgoing_queue_ready
), std::move(_sink_closed_future
)).then([this] (std::tuple
<future
<>, future
<bool>> res
){
191 // _outgoing_queue_ready might be exceptional if queue drain or
192 // _negotiated abortion set it such
193 std::get
<0>(res
).ignore_ready_future();
194 // _sink_closed_future is never exceptional
195 bool sink_closed
= std::get
<1>(res
).get0();
196 return _connected
&& !sink_closed
? _write_buf
.close() : make_ready_future();
200 void connection::set_socket(connected_socket
&& fd
) {
202 throw std::runtime_error("already connected");
205 _read_buf
=_fd
.input();
206 _write_buf
= _fd
.output();
210 future
<> connection::send_negotiation_frame(feature_map features
) {
211 auto negotiation_frame_feature_record_size
= [] (const feature_map::value_type
& e
) {
212 return 8 + e
.second
.size();
214 auto extra_len
= boost::accumulate(
215 features
| boost::adaptors::transformed(negotiation_frame_feature_record_size
),
217 temporary_buffer
<char> reply(sizeof(negotiation_frame
) + extra_len
);
218 auto p
= reply
.get_write();
219 p
= std::copy_n(rpc_magic
, 8, p
);
220 write_le
<uint32_t>(p
, extra_len
);
222 for (auto&& e
: features
) {
223 write_le
<uint32_t>(p
, static_cast<uint32_t>(e
.first
));
225 write_le
<uint32_t>(p
, e
.second
.size());
227 p
= std::copy_n(e
.second
.begin(), e
.second
.size(), p
);
229 return _write_buf
.write(std::move(reply
)).then([this] {
230 _stats
.sent_messages
++;
231 return _write_buf
.flush();
235 void connection::withdraw(outgoing_entry::container_t::iterator it
, std::exception_ptr ex
) {
236 assert(it
!= _outgoing_queue
.end());
238 auto pit
= std::prev(it
);
239 // Previous entry's (pit's) done future will schedule current entry (it)
240 // continuation. Similarly, it.done will schedule next entry continuation
241 // or will resolve _outgoing_queue_ready future.
243 // To withdraw "it" we need to do two things:
244 // - make pit.done resolve it->next (some time later)
245 // - resolve "it"'s continuation right now
247 // The latter is achieved by resolving pit.done immediatelly, the former
248 // by moving it.done into pit.done. For simplicity (verging on obscurity?)
249 // both done's are just swapped and "it" resolves its new promise
251 std::swap(it
->done
, pit
->done
);
255 it
->done
.set_value();
257 it
->done
.set_exception(ex
);
261 future
<> connection::send(snd_buf buf
, std::optional
<rpc_clock_type::time_point
> timeout
, cancellable
* cancel
) {
263 if (timeout
&& *timeout
<= rpc_clock_type::now()) {
264 return make_ready_future
<>();
267 auto p
= std::make_unique
<outgoing_entry
>(std::move(buf
));
269 _outgoing_queue
.push_back(d
);
270 _outgoing_queue_size
++;
271 auto deleter
= [this, it
= _outgoing_queue
.iterator_to(d
)] {
272 // Front entry is most likely (unless _negotiated is unresolved) sitting
273 // inside send_entry() continuations and thus it cannot be cancelled.
274 if (it
!= _outgoing_queue
.begin()) {
281 t
.set_callback(deleter
);
282 t
.arm(timeout
.value());
285 cancel
->cancel_send
= std::move(deleter
);
286 cancel
->send_back_pointer
= &d
.pcancel
;
290 // New entry should continue (do its .then() lambda) after _outgoing_queue_ready
291 // resolves. Next entry will need to do the same after this entry's done resolves.
292 // Thus -- replace _outgoing_queue_ready with d's future and chain its continuation
293 // on ..._ready's old value.
294 return std::exchange(_outgoing_queue_ready
, d
.done
.get_future()).then([this, p
= std::move(p
)] () mutable {
295 _outgoing_queue_size
--;
296 if (__builtin_expect(!p
->is_linked(), false)) {
297 // If withdrawn the entry is unlinked and this lambda is fired right at once
298 return make_ready_future
<>();
302 return send_entry(*p
).then_wrapped([this, p
= std::move(p
)] (auto f
) mutable {
303 _error
|= f
.failed();
304 f
.ignore_ready_future();
309 return make_exception_future
<>(closed_error());
313 void connection::abort() {
316 _fd
.shutdown_input();
320 future
<> connection::stop() noexcept
{
324 log_exception(*this, log_level::error
, "fail to shutdown connection while stopping", std::current_exception());
326 return _stopped
.get_future();
329 template<typename Connection
>
330 static bool verify_frame(Connection
& c
, temporary_buffer
<char>& buf
, size_t expected
, const char* log
) {
331 if (buf
.size() != expected
) {
332 if (buf
.size() != 0) {
333 c
.get_logger()(c
.peer_address(), log
);
340 template<typename Connection
>
343 receive_negotiation_frame(Connection
& c
, input_stream
<char>& in
) {
344 return in
.read_exactly(sizeof(negotiation_frame
)).then([&c
, &in
] (temporary_buffer
<char> neg
) {
345 if (!verify_frame(c
, neg
, sizeof(negotiation_frame
), "unexpected eof during negotiation frame")) {
346 return make_exception_future
<feature_map
>(closed_error());
348 negotiation_frame frame
;
349 std::copy_n(neg
.get_write(), sizeof(frame
.magic
), frame
.magic
);
350 frame
.len
= read_le
<uint32_t>(neg
.get_write() + 8);
351 if (std::memcmp(frame
.magic
, rpc_magic
, sizeof(frame
.magic
)) != 0) {
352 c
.get_logger()(c
.peer_address(), format("wrong protocol magic: {:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
353 frame
.magic
[0], frame
.magic
[1], frame
.magic
[2], frame
.magic
[3], frame
.magic
[4], frame
.magic
[5], frame
.magic
[6], frame
.magic
[7]));
354 return make_exception_future
<feature_map
>(closed_error());
356 auto len
= frame
.len
;
357 return in
.read_exactly(len
).then([&c
, len
] (temporary_buffer
<char> extra
) {
358 if (extra
.size() != len
) {
359 c
.get_logger()(c
.peer_address(), "unexpected eof during negotiation frame");
360 return make_exception_future
<feature_map
>(closed_error());
363 auto p
= extra
.get();
364 auto end
= p
+ extra
.size();
367 c
.get_logger()(c
.peer_address(), "bad feature data format in negotiation frame");
368 return make_exception_future
<feature_map
>(closed_error());
370 auto feature
= static_cast<protocol_features
>(read_le
<uint32_t>(p
));
371 auto f_len
= read_le
<uint32_t>(p
+ 4);
373 if (f_len
> end
- p
) {
374 c
.get_logger()(c
.peer_address(), "buffer underflow in feature data in negotiation frame");
375 return make_exception_future
<feature_map
>(closed_error());
377 auto data
= sstring(p
, f_len
);
379 map
.emplace(feature
, std::move(data
));
381 return make_ready_future
<feature_map
>(std::move(map
));
386 inline future
<rcv_buf
>
387 read_rcv_buf(input_stream
<char>& in
, uint32_t size
) {
388 return in
.read_up_to(size
).then([&, size
] (temporary_buffer
<char> data
) mutable {
390 if (data
.size() == 0) {
391 return make_ready_future
<rcv_buf
>(rcv_buf());
392 } else if (data
.size() == size
) {
393 rb
.bufs
= std::move(data
);
394 return make_ready_future
<rcv_buf
>(std::move(rb
));
397 std::vector
<temporary_buffer
<char>> v
;
398 v
.push_back(std::move(data
));
399 rb
.bufs
= std::move(v
);
400 return do_with(std::move(rb
), std::move(size
), [&in
] (rcv_buf
& rb
, uint32_t& left
) {
401 return repeat([&] () {
402 return in
.read_up_to(left
).then([&] (temporary_buffer
<char> data
) {
405 return stop_iteration::yes
;
408 std::get
<std::vector
<temporary_buffer
<char>>>(rb
.bufs
).push_back(std::move(data
));
409 return left
? stop_iteration::no
: stop_iteration::yes
;
413 return std::move(rb
);
420 template<typename FrameType
>
421 typename
FrameType::return_type
422 connection::read_frame(socket_address info
, input_stream
<char>& in
) {
423 auto header_size
= FrameType::header_size();
424 return in
.read_exactly(header_size
).then([this, header_size
, info
, &in
] (temporary_buffer
<char> header
) {
425 if (header
.size() != header_size
) {
426 if (header
.size() != 0) {
427 _logger(info
, format("unexpected eof on a {} while reading header: expected {:d} got {:d}", FrameType::role(), header_size
, header
.size()));
429 return FrameType::empty_value();
431 auto h
= FrameType::decode_header(header
.get());
432 auto size
= FrameType::get_size(h
);
434 return FrameType::make_value(h
, rcv_buf());
436 return read_rcv_buf(in
, size
).then([this, info
, h
= std::move(h
), size
] (rcv_buf rb
) {
437 if (rb
.size
!= size
) {
438 _logger(info
, format("unexpected eof on a {} while reading data: expected {:d} got {:d}", FrameType::role(), size
, rb
.size
));
439 return FrameType::empty_value();
441 return FrameType::make_value(h
, std::move(rb
));
448 template<typename FrameType
>
449 typename
FrameType::return_type
450 connection::read_frame_compressed(socket_address info
, std::unique_ptr
<compressor
>& compressor
, input_stream
<char>& in
) {
452 return in
.read_exactly(4).then([this, info
, &in
, &compressor
] (temporary_buffer
<char> compress_header
) {
453 if (compress_header
.size() != 4) {
454 if (compress_header
.size() != 0) {
455 _logger(info
, format("unexpected eof on a {} while reading compression header: expected 4 got {:d}", FrameType::role(), compress_header
.size()));
457 return FrameType::empty_value();
459 auto ptr
= compress_header
.get();
460 auto size
= read_le
<uint32_t>(ptr
);
461 return read_rcv_buf(in
, size
).then([this, size
, &compressor
, info
] (rcv_buf compressed_data
) {
462 if (compressed_data
.size
!= size
) {
463 _logger(info
, format("unexpected eof on a {} while reading compressed data: expected {:d} got {:d}", FrameType::role(), size
, compressed_data
.size
));
464 return FrameType::empty_value();
466 auto eb
= compressor
->decompress(std::move(compressed_data
));
468 auto* one
= std::get_if
<temporary_buffer
<char>>(&eb
.bufs
);
470 p
= net::packet(std::move(p
), std::move(*one
));
472 auto&& bufs
= std::get
<std::vector
<temporary_buffer
<char>>>(eb
.bufs
);
473 p
.reserve(bufs
.size());
474 for (auto&& b
: bufs
) {
475 p
= net::packet(std::move(p
), std::move(b
));
478 return do_with(as_input_stream(std::move(p
)), [this, info
] (input_stream
<char>& in
) {
479 return read_frame
<FrameType
>(info
, in
);
484 return read_frame
<FrameType
>(info
, in
);
488 struct stream_frame
{
489 using opt_buf_type
= std::optional
<rcv_buf
>;
490 using return_type
= future
<opt_buf_type
>;
495 static size_t header_size() {
498 static const char* role() {
501 static future
<opt_buf_type
> empty_value() {
502 return make_ready_future
<opt_buf_type
>(std::nullopt
);
504 static header_type
decode_header(const char* ptr
) {
505 header_type h
{read_le
<uint32_t>(ptr
), false};
512 static uint32_t get_size(const header_type
& t
) {
515 static future
<opt_buf_type
> make_value(const header_type
& t
, rcv_buf data
) {
519 return make_ready_future
<opt_buf_type
>(std::move(data
));
523 future
<std::optional
<rcv_buf
>>
524 connection::read_stream_frame_compressed(input_stream
<char>& in
) {
525 return read_frame_compressed
<stream_frame
>(peer_address(), _compressor
, in
);
528 future
<> connection::stream_close() {
529 auto f
= make_ready_future
<>();
532 _sink_closed_future
= p
.get_future();
533 // stop_send_loop(), which also calls _write_buf.close(), and this code can run in parallel.
534 // Use _sink_closed_future to serialize them and skip second call to close()
535 f
= _write_buf
.close().finally([p
= std::move(p
)] () mutable { p
.set_value(true);});
537 return f
.finally([this] () mutable { return stop(); });
540 future
<> connection::stream_process_incoming(rcv_buf
&& buf
) {
541 // we do not want to dead lock on huge packets, so let them in
542 // but only one at a time
543 auto size
= std::min(size_t(buf
.size
), max_stream_buffers_memory
);
544 return get_units(_stream_sem
, size
).then([this, buf
= std::move(buf
)] (semaphore_units
<>&& su
) mutable {
545 buf
.su
= std::move(su
);
546 return _stream_queue
.push_eventually(std::move(buf
));
550 future
<> connection::handle_stream_frame() {
551 return read_stream_frame_compressed(_read_buf
).then([this] (std::optional
<rcv_buf
> data
) {
554 return make_ready_future
<>();
556 return stream_process_incoming(std::move(*data
));
560 future
<> connection::stream_receive(circular_buffer
<foreign_ptr
<std::unique_ptr
<rcv_buf
>>>& bufs
) {
561 return _stream_queue
.not_empty().then([this, &bufs
] {
562 bool eof
= !_stream_queue
.consume([&bufs
] (rcv_buf
&& b
) {
563 if (b
.size
== -1U) { // max fragment length marks an end of a stream
566 bufs
.push_back(make_foreign(std::make_unique
<rcv_buf
>(std::move(b
))));
570 if (eof
&& !bufs
.empty()) {
571 assert(_stream_queue
.empty());
572 _stream_queue
.push(rcv_buf(-1U)); // push eof marker back for next read to notice it
577 void connection::register_stream(connection_id id
, xshard_connection_ptr c
) {
578 _streams
.emplace(id
, std::move(c
));
581 xshard_connection_ptr
connection::get_stream(connection_id id
) const {
582 auto it
= _streams
.find(id
);
583 if (it
== _streams
.end()) {
584 throw std::logic_error(format("rpc stream id {} not found", id
).c_str());
590 client::negotiate(feature_map provided
) {
591 // record features returned here
592 for (auto&& e
: provided
) {
595 // supported features go here
596 case protocol_features::COMPRESS
:
597 if (_options
.compressor_factory
) {
598 _compressor
= _options
.compressor_factory
->negotiate(e
.second
, false);
601 throw std::runtime_error(format("RPC server responded with compression {} - unsupported", e
.second
));
604 case protocol_features::TIMEOUT
:
605 _timeout_negotiated
= true;
607 case protocol_features::CONNECTION_ID
: {
608 _id
= deserialize_connection_id(e
.second
);
619 client::negotiate_protocol(input_stream
<char>& in
) {
620 return receive_negotiation_frame(*this, in
).then([this] (feature_map features
) {
621 return negotiate(features
);
625 struct response_frame
{
626 using opt_buf_type
= std::optional
<rcv_buf
>;
627 using header_and_buffer_type
= std::tuple
<int64_t, opt_buf_type
>;
628 using return_type
= future
<header_and_buffer_type
>;
629 using header_type
= std::tuple
<int64_t, uint32_t>;
630 static size_t header_size() {
633 static const char* role() {
636 static auto empty_value() {
637 return make_ready_future
<header_and_buffer_type
>(header_and_buffer_type(0, std::nullopt
));
639 static header_type
decode_header(const char* ptr
) {
640 auto msgid
= read_le
<int64_t>(ptr
);
641 auto size
= read_le
<uint32_t>(ptr
+ 8);
642 return std::make_tuple(msgid
, size
);
644 static uint32_t get_size(const header_type
& t
) {
645 return std::get
<1>(t
);
647 static auto make_value(const header_type
& t
, rcv_buf data
) {
648 return make_ready_future
<header_and_buffer_type
>(header_and_buffer_type(std::get
<0>(t
), std::move(data
)));
653 future
<response_frame::header_and_buffer_type
>
654 client::read_response_frame(input_stream
<char>& in
) {
655 return read_frame
<response_frame
>(_server_addr
, in
);
658 future
<response_frame::header_and_buffer_type
>
659 client::read_response_frame_compressed(input_stream
<char>& in
) {
660 return read_frame_compressed
<response_frame
>(_server_addr
, _compressor
, in
);
663 stats
client::get_stats() const {
665 res
.wait_reply
= _outstanding
.size();
666 res
.pending
= outgoing_queue_length();
670 void client::wait_for_reply(id_type id
, std::unique_ptr
<reply_handler_base
>&& h
, std::optional
<rpc_clock_type::time_point
> timeout
, cancellable
* cancel
) {
672 h
->t
.set_callback(std::bind(std::mem_fn(&client::wait_timed_out
), this, id
));
673 h
->t
.arm(timeout
.value());
676 cancel
->cancel_wait
= [this, id
] {
677 _outstanding
[id
]->cancel();
678 _outstanding
.erase(id
);
681 cancel
->wait_back_pointer
= &h
->pcancel
;
683 _outstanding
.emplace(id
, std::move(h
));
685 void client::wait_timed_out(id_type id
) {
687 _outstanding
[id
]->timeout();
688 _outstanding
.erase(id
);
691 future
<> client::stop() noexcept
{
696 log_exception(*this, log_level::error
, "fail to shutdown connection while stopping", std::current_exception());
698 return _stopped
.get_future();
701 void client::abort_all_streams() {
702 while (!_streams
.empty()) {
703 auto&& s
= _streams
.begin();
704 assert(s
->second
->get_owner_shard() == this_shard_id()); // abort can be called only locally
705 s
->second
->get()->abort();
710 void client::deregister_this_stream() {
712 _parent
->_streams
.erase(_id
);
716 client::client(const logger
& l
, void* s
, client_options ops
, socket socket
, const socket_address
& addr
, const socket_address
& local
)
717 : rpc::connection(l
, s
), _socket(std::move(socket
)), _server_addr(addr
), _local_addr(local
), _options(ops
) {
718 _socket
.set_reuseaddr(ops
.reuseaddr
);
719 // Run client in the background.
720 // Communicate result via _stopped.
721 // The caller has to call client::stop() to synchronize.
722 (void)_socket
.connect(addr
, local
).then([this, ops
= std::move(ops
)] (connected_socket fd
) {
723 fd
.set_nodelay(ops
.tcp_nodelay
);
725 fd
.set_keepalive(true);
726 fd
.set_keepalive_parameters(ops
.keepalive
.value());
728 set_socket(std::move(fd
));
730 feature_map features
;
731 if (_options
.compressor_factory
) {
732 features
[protocol_features::COMPRESS
] = _options
.compressor_factory
->supported();
734 if (_options
.send_timeout_data
) {
735 features
[protocol_features::TIMEOUT
] = "";
737 if (_options
.stream_parent
) {
738 features
[protocol_features::STREAM_PARENT
] = serialize_connection_id(_options
.stream_parent
);
740 if (!_options
.isolation_cookie
.empty()) {
741 features
[protocol_features::ISOLATION
] = _options
.isolation_cookie
;
744 return send_negotiation_frame(std::move(features
)).then([this] {
745 return negotiate_protocol(_read_buf
);
747 _propagate_timeout
= !is_stream();
749 return do_until([this] { return _read_buf
.eof() || _error
; }, [this] () mutable {
751 return handle_stream_frame();
753 return read_response_frame_compressed(_read_buf
).then([this] (std::tuple
<int64_t, std::optional
<rcv_buf
>> msg_id_and_data
) {
754 auto& msg_id
= std::get
<0>(msg_id_and_data
);
755 auto& data
= std::get
<1>(msg_id_and_data
);
756 auto it
= _outstanding
.find(std::abs(msg_id
));
759 } else if (it
!= _outstanding
.end()) {
760 auto handler
= std::move(it
->second
);
761 _outstanding
.erase(it
);
762 (*handler
)(*this, msg_id
, std::move(data
.value()));
763 } else if (msg_id
< 0) {
765 std::rethrow_exception(unmarshal_exception(data
.value()));
766 } catch(const unknown_verb_error
& ex
) {
767 // if this is unknown verb exception with unknown id ignore it
768 // can happen if unknown verb was used by no_wait client
769 get_logger()(peer_address(), format("unknown verb exception {:d} ignored", ex
.type
));
771 // We've got error response but handler is no longer waiting, could be timed out.
772 log_exception(*this, log_level::info
, "ignoring error response", std::current_exception());
775 // we get a reply for a message id not in _outstanding
776 // this can happened if the message id is timed out already
777 get_logger()(peer_address(), log_level::debug
, "got a reply for an expired message id");
782 }).then_wrapped([this] (future
<> f
) {
783 std::exception_ptr ep
;
785 ep
= f
.get_exception();
788 log_exception(*this, log_level::error
, "client stream connection dropped", ep
);
790 log_exception(*this, log_level::error
, "client connection dropped", ep
);
794 log_exception(*this, log_level::debug
, "stream fail to connect", ep
);
796 log_exception(*this, log_level::debug
, "fail to connect", ep
);
801 _stream_queue
.abort(std::make_exception_ptr(stream_closed()));
802 return stop_send_loop(ep
).then_wrapped([this] (future
<> f
) {
803 f
.ignore_ready_future();
804 _outstanding
.clear();
806 deregister_this_stream();
811 _stopped
.set_value();
816 client::client(const logger
& l
, void* s
, const socket_address
& addr
, const socket_address
& local
)
817 : client(l
, s
, client_options
{}, make_socket(), addr
, local
)
820 client::client(const logger
& l
, void* s
, client_options options
, const socket_address
& addr
, const socket_address
& local
)
821 : client(l
, s
, options
, make_socket(), addr
, local
)
824 client::client(const logger
& l
, void* s
, socket socket
, const socket_address
& addr
, const socket_address
& local
)
825 : client(l
, s
, client_options
{}, std::move(socket
), addr
, local
)
830 server::connection::negotiate(feature_map requested
) {
832 future
<> f
= make_ready_future
<>();
833 for (auto&& e
: requested
) {
836 // supported features go here
837 case protocol_features::COMPRESS
: {
838 if (_server
._options
.compressor_factory
) {
839 _compressor
= _server
._options
.compressor_factory
->negotiate(e
.second
, true);
841 ret
[protocol_features::COMPRESS
] = _compressor
->name();
846 case protocol_features::TIMEOUT
:
847 _timeout_negotiated
= true;
848 ret
[protocol_features::TIMEOUT
] = "";
850 case protocol_features::STREAM_PARENT
: {
851 if (!_server
._options
.streaming_domain
) {
853 return make_exception_future
<>(std::runtime_error("streaming is not configured for the server"));
856 _parent_id
= deserialize_connection_id(e
.second
);
858 // remove stream connection from rpc connection list
859 _server
._conns
.erase(get_connection_id());
860 f
= f
.then([this, c
= shared_from_this()] () mutable {
861 return smp::submit_to(_parent_id
.shard(), [this, c
= make_foreign(static_pointer_cast
<rpc::connection
>(c
))] () mutable {
862 auto sit
= _servers
.find(*_server
._options
.streaming_domain
);
863 if (sit
== _servers
.end()) {
864 throw std::logic_error(format("Shard {:d} does not have server with streaming domain {}", this_shard_id(), *_server
._options
.streaming_domain
).c_str());
866 auto s
= sit
->second
;
867 auto it
= s
->_conns
.find(_parent_id
);
868 if (it
== s
->_conns
.end()) {
869 throw std::logic_error(format("Unknown parent connection {} on shard {:d}", _parent_id
, this_shard_id()).c_str());
871 auto id
= c
->get_connection_id();
872 it
->second
->register_stream(id
, make_lw_shared(std::move(c
)));
878 case protocol_features::ISOLATION
: {
879 auto&& isolation_cookie
= e
.second
;
880 struct isolation_function_visitor
{
881 isolation_function_visitor(const sstring
& isolation_cookie
) :
882 _isolation_cookie(isolation_cookie
) { }
883 future
<isolation_config
> operator() (resource_limits::syncronous_isolation_function f
) const {
884 return futurize_invoke(f
, _isolation_cookie
);
886 future
<isolation_config
> operator() (resource_limits::asyncronous_isolation_function f
) const {
887 return f(_isolation_cookie
);
890 sstring _isolation_cookie
;
893 auto visitor
= isolation_function_visitor(isolation_cookie
);
894 f
= f
.then([visitor
= std::move(visitor
), this] () mutable {
895 return std::visit(visitor
, _server
._limits
.isolate_connection
).then([this] (isolation_config conf
) {
896 _isolation_config
= conf
;
907 if (_server
._options
.streaming_domain
) {
908 ret
[protocol_features::CONNECTION_ID
] = serialize_connection_id(_id
);
910 return f
.then([ret
= std::move(ret
)] {
916 server::connection::negotiate_protocol(input_stream
<char>& in
) {
917 return receive_negotiation_frame(*this, in
).then([this] (feature_map requested_features
) {
918 return negotiate(std::move(requested_features
)).then([this] (feature_map returned_features
) {
919 return send_negotiation_frame(std::move(returned_features
));
924 struct request_frame
{
925 using opt_buf_type
= std::optional
<rcv_buf
>;
926 using header_and_buffer_type
= std::tuple
<std::optional
<uint64_t>, uint64_t, int64_t, opt_buf_type
>;
927 using return_type
= future
<header_and_buffer_type
>;
928 using header_type
= std::tuple
<std::optional
<uint64_t>, uint64_t, int64_t, uint32_t>;
929 static size_t header_size() {
932 static const char* role() {
935 static auto empty_value() {
936 return make_ready_future
<header_and_buffer_type
>(header_and_buffer_type(std::nullopt
, uint64_t(0), 0, std::nullopt
));
938 static header_type
decode_header(const char* ptr
) {
939 auto type
= read_le
<uint64_t>(ptr
);
940 auto msgid
= read_le
<int64_t>(ptr
+ 8);
941 auto size
= read_le
<uint32_t>(ptr
+ 16);
942 return std::make_tuple(std::nullopt
, type
, msgid
, size
);
944 static uint32_t get_size(const header_type
& t
) {
945 return std::get
<3>(t
);
947 static auto make_value(const header_type
& t
, rcv_buf data
) {
948 return make_ready_future
<header_and_buffer_type
>(header_and_buffer_type(std::get
<0>(t
), std::get
<1>(t
), std::get
<2>(t
), std::move(data
)));
952 struct request_frame_with_timeout
: request_frame
{
953 using super
= request_frame
;
954 static size_t header_size() {
957 static typename
super::header_type
decode_header(const char* ptr
) {
958 auto h
= super::decode_header(ptr
+ 8);
959 std::get
<0>(h
) = read_le
<uint64_t>(ptr
);
964 future
<request_frame::header_and_buffer_type
>
965 server::connection::read_request_frame_compressed(input_stream
<char>& in
) {
966 if (_timeout_negotiated
) {
967 return read_frame_compressed
<request_frame_with_timeout
>(_info
.addr
, _compressor
, in
);
969 return read_frame_compressed
<request_frame
>(_info
.addr
, _compressor
, in
);
974 server::connection::respond(int64_t msg_id
, snd_buf
&& data
, std::optional
<rpc_clock_type::time_point
> timeout
) {
975 static_assert(snd_buf::chunk_size
>= 12, "send buffer chunk size is too small");
976 auto p
= data
.front().get_write();
977 write_le
<int64_t>(p
, msg_id
);
978 write_le
<uint32_t>(p
+ 8, data
.size
- 12);
979 return send(std::move(data
), timeout
);
982 future
<> server::connection::send_unknown_verb_reply(std::optional
<rpc_clock_type::time_point
> timeout
, int64_t msg_id
, uint64_t type
) {
983 return wait_for_resources(28, timeout
).then([this, timeout
, msg_id
, type
] (auto permit
) {
984 // send unknown_verb exception back
986 static_assert(snd_buf::chunk_size
>= 28, "send buffer chunk size is too small");
987 auto p
= data
.front().get_write() + 12;
988 write_le
<uint32_t>(p
, uint32_t(exception_type::UNKNOWN_VERB
));
989 write_le
<uint32_t>(p
+ 4, uint32_t(8));
990 write_le
<uint64_t>(p
+ 8, type
);
992 // Send asynchronously.
993 // This is safe since connection::stop() will wait for background work.
994 (void)with_gate(_server
._reply_gate
, [this, timeout
, msg_id
, data
= std::move(data
), permit
= std::move(permit
)] () mutable {
995 // workaround for https://gcc.gnu.org/bugzilla/show_bug.cgi?id=83268
996 auto c
= shared_from_this();
997 return respond(-msg_id
, std::move(data
), timeout
).then([c
= std::move(c
), permit
= std::move(permit
)] {});
999 } catch(gate_closed_exception
&) {/* ignore */}
1003 future
<> server::connection::process() {
1004 return negotiate_protocol(_read_buf
).then([this] () mutable {
1005 auto sg
= _isolation_config
? _isolation_config
->sched_group
: current_scheduling_group();
1006 return with_scheduling_group(sg
, [this] {
1008 return do_until([this] { return _read_buf
.eof() || _error
; }, [this] () mutable {
1010 return handle_stream_frame();
1012 return read_request_frame_compressed(_read_buf
).then([this] (request_frame::header_and_buffer_type header_and_buffer
) {
1013 auto& expire
= std::get
<0>(header_and_buffer
);
1014 auto& type
= std::get
<1>(header_and_buffer
);
1015 auto& msg_id
= std::get
<2>(header_and_buffer
);
1016 auto& data
= std::get
<3>(header_and_buffer
);
1019 return make_ready_future
<>();
1021 std::optional
<rpc_clock_type::time_point
> timeout
;
1022 if (expire
&& *expire
) {
1023 timeout
= relative_timeout_to_absolute(std::chrono::milliseconds(*expire
));
1025 auto h
= _server
._proto
->get_handler(type
);
1027 return send_unknown_verb_reply(timeout
, msg_id
, type
);
1030 // If the new method of per-connection scheduling group was used, honor it.
1031 // Otherwise, use the old per-handler scheduling group.
1032 auto sg
= _isolation_config
? _isolation_config
->sched_group
: h
->sg
;
1033 return with_scheduling_group(sg
, [this, timeout
, msg_id
, h
, data
= std::move(data
.value())] () mutable {
1034 return h
->func(shared_from_this(), timeout
, msg_id
, std::move(data
)).finally([this, h
] {
1035 // If anything between get_handler() and here throws, we leak put_handler
1036 _server
._proto
->put_handler(h
);
1043 }).then_wrapped([this] (future
<> f
) {
1044 std::exception_ptr ep
;
1046 ep
= f
.get_exception();
1047 log_exception(*this, log_level::error
,
1048 format("server{} connection dropped", is_stream() ? " stream" : "").c_str(), ep
);
1050 _fd
.shutdown_input();
1052 _stream_queue
.abort(std::make_exception_ptr(stream_closed()));
1053 return stop_send_loop(ep
).then_wrapped([this] (future
<> f
) {
1054 f
.ignore_ready_future();
1055 _server
._conns
.erase(get_connection_id());
1057 return deregister_this_stream();
1059 return make_ready_future
<>();
1062 _stopped
.set_value();
1064 }).finally([conn_ptr
= shared_from_this()] {
1065 // hold onto connection pointer until do_until() exists
1069 server::connection::connection(server
& s
, connected_socket
&& fd
, socket_address
&& addr
, const logger
& l
, void* serializer
, connection_id id
)
1070 : rpc::connection(std::move(fd
), l
, serializer
, id
), _server(s
) {
1071 _info
.addr
= std::move(addr
);
1074 future
<> server::connection::deregister_this_stream() {
1075 if (!_server
._options
.streaming_domain
) {
1076 return make_ready_future
<>();
1078 return smp::submit_to(_parent_id
.shard(), [this] () mutable {
1079 auto sit
= server::_servers
.find(*_server
._options
.streaming_domain
);
1080 if (sit
!= server::_servers
.end()) {
1081 auto s
= sit
->second
;
1082 auto it
= s
->_conns
.find(_parent_id
);
1083 if (it
!= s
->_conns
.end()) {
1084 it
->second
->_streams
.erase(get_connection_id());
1090 thread_local
std::unordered_map
<streaming_domain_type
, server
*> server::_servers
;
1092 server::server(protocol_base
* proto
, const socket_address
& addr
, resource_limits limits
)
1093 : server(proto
, seastar::listen(addr
, listen_options
{true}), limits
, server_options
{})
1096 server::server(protocol_base
* proto
, server_options opts
, const socket_address
& addr
, resource_limits limits
)
1097 : server(proto
, seastar::listen(addr
, listen_options
{true, opts
.load_balancing_algorithm
}), limits
, opts
)
1100 server::server(protocol_base
* proto
, server_socket ss
, resource_limits limits
, server_options opts
)
1101 : _proto(proto
), _ss(std::move(ss
)), _limits(limits
), _resources_available(limits
.max_memory
), _options(opts
)
1103 if (_options
.streaming_domain
) {
1104 if (_servers
.find(*_options
.streaming_domain
) != _servers
.end()) {
1105 throw std::runtime_error(format("An RPC server with the streaming domain {} is already exist", *_options
.streaming_domain
));
1107 _servers
[*_options
.streaming_domain
] = this;
1112 server::server(protocol_base
* proto
, server_options opts
, server_socket ss
, resource_limits limits
)
1113 : server(proto
, std::move(ss
), limits
, opts
)
1116 void server::accept() {
1117 // Run asynchronously in background.
1118 // Communicate result via __ss_stopped.
1119 // The caller has to call server::stop() to synchronize.
1120 (void)keep_doing([this] () mutable {
1121 return _ss
.accept().then([this] (accept_result ar
) mutable {
1122 if (_options
.filter_connection
&& !_options
.filter_connection(ar
.remote_address
)) {
1125 auto fd
= std::move(ar
.connection
);
1126 auto addr
= std::move(ar
.remote_address
);
1127 fd
.set_nodelay(_options
.tcp_nodelay
);
1128 connection_id id
= _options
.streaming_domain
?
1129 connection_id::make_id(_next_client_id
++, uint16_t(this_shard_id())) :
1130 connection_id::make_invalid_id(_next_client_id
++);
1131 auto conn
= _proto
->make_server_connection(*this, std::move(fd
), std::move(addr
), id
);
1132 auto r
= _conns
.emplace(id
, conn
);
1134 // Process asynchronously in background.
1135 (void)conn
->process();
1137 }).then_wrapped([this] (future
<>&& f
){
1142 _ss_stopped
.set_value();
1147 future
<> server::stop() {
1149 _resources_available
.broken();
1150 if (_options
.streaming_domain
) {
1151 _servers
.erase(*_options
.streaming_domain
);
1153 return when_all(_ss_stopped
.get_future(),
1154 parallel_for_each(_conns
| boost::adaptors::map_values
, [] (shared_ptr
<connection
> conn
) {
1155 return conn
->stop();
1161 std::ostream
& operator<<(std::ostream
& os
, const connection_id
& id
) {
1162 fmt::print(os
, "{:x}", id
.id
);
1166 std::ostream
& operator<<(std::ostream
& os
, const streaming_domain_type
& domain
) {
1167 fmt::print(os
, "{:d}", domain
._id
);
1171 isolation_config
default_isolate_connection(sstring isolation_cookie
) {
1172 return isolation_config
{};