1 #include <seastar/rpc/rpc.hh>
2 #include <boost/range/adaptor/map.hpp>
9 constexpr size_t snd_buf::chunk_size
;
11 snd_buf::snd_buf(size_t size_
) : size(size_
) {
12 if (size
<= chunk_size
) {
13 bufs
= temporary_buffer
<char>(size
);
15 std::vector
<temporary_buffer
<char>> v
;
16 v
.reserve(align_up(size_t(size
), chunk_size
) / chunk_size
);
18 v
.push_back(temporary_buffer
<char>(std::min(chunk_size
, size_
)));
19 size_
-= v
.back().size();
25 temporary_buffer
<char>& snd_buf::front() {
26 auto* one
= compat::get_if
<temporary_buffer
<char>>(&bufs
);
30 return compat::get
<std::vector
<temporary_buffer
<char>>>(bufs
).front();
34 // Make a copy of a remote buffer. No data is actually copied, only pointers and
35 // a deleter of a new buffer takes care of deleting the original buffer
36 template<typename T
> // T is either snd_buf or rcv_buf
37 T
make_shard_local_buffer_copy(foreign_ptr
<std::unique_ptr
<T
>> org
) {
38 if (org
.get_owner_shard() == engine().cpu_id()) {
39 return std::move(*org
);
42 auto* one
= compat::get_if
<temporary_buffer
<char>>(&org
->bufs
);
45 buf
.bufs
= temporary_buffer
<char>(one
->get_write(), one
->size(), make_object_deleter(std::move(org
)));
47 auto& orgbufs
= compat::get
<std::vector
<temporary_buffer
<char>>>(org
->bufs
);
48 std::vector
<temporary_buffer
<char>> newbufs
;
49 newbufs
.reserve(orgbufs
.size());
50 deleter d
= make_object_deleter(std::move(org
));
51 for (auto&& b
: orgbufs
) {
52 newbufs
.push_back(temporary_buffer
<char>(b
.get_write(), b
.size(), d
.share()));
54 buf
.bufs
= std::move(newbufs
);
60 template snd_buf
make_shard_local_buffer_copy(foreign_ptr
<std::unique_ptr
<snd_buf
>>);
61 template rcv_buf
make_shard_local_buffer_copy(foreign_ptr
<std::unique_ptr
<rcv_buf
>>);
63 snd_buf
connection::compress(snd_buf buf
) {
65 buf
= _compressor
->compress(4, std::move(buf
));
66 static_assert(snd_buf::chunk_size
>= 4, "send buffer chunk size is too small");
67 write_le
<uint32_t>(buf
.front().get_write(), buf
.size
- 4);
68 return std::move(buf
);
70 return std::move(buf
);
73 future
<> connection::send_buffer(snd_buf buf
) {
74 auto* b
= compat::get_if
<temporary_buffer
<char>>(&buf
.bufs
);
76 return _write_buf
.write(std::move(*b
));
78 return do_with(std::move(compat::get
<std::vector
<temporary_buffer
<char>>>(buf
.bufs
)),
79 [this] (std::vector
<temporary_buffer
<char>>& ar
) {
80 return do_for_each(ar
.begin(), ar
.end(), [this] (auto& b
) {
81 return _write_buf
.write(std::move(b
));
87 template<connection::outgoing_queue_type QueueType
>
88 void connection::send_loop() {
89 _send_loop_stopped
= do_until([this] { return _error
; }, [this] {
90 return _outgoing_queue_cond
.wait([this] { return !_outgoing_queue
.empty(); }).then([this] {
91 // despite using wait with predicated above _outgoing_queue can still be empty here if
92 // there is only one entry on the list and its expire timer runs after wait() returned ready future,
93 // but before this continuation runs.
94 if (_outgoing_queue
.empty()) {
95 return make_ready_future();
97 auto d
= std::move(_outgoing_queue
.front());
98 _outgoing_queue
.pop_front();
99 d
.t
.cancel(); // cancel timeout timer
101 d
.pcancel
->cancel_send
= std::function
<void()>(); // request is no longer cancellable
103 if (QueueType
== outgoing_queue_type::request
) {
104 static_assert(snd_buf::chunk_size
>= 8, "send buffer chunk size is too small");
105 if (_timeout_negotiated
) {
106 auto expire
= d
.t
.get_timeout();
108 if (expire
!= typename timer
<rpc_clock_type
>::time_point()) {
109 left
= std::chrono::duration_cast
<std::chrono::milliseconds
>(expire
- timer
<rpc_clock_type
>::clock::now()).count();
111 write_le
<uint64_t>(d
.buf
.front().get_write(), left
);
113 d
.buf
.front().trim_front(8);
117 d
.buf
= compress(std::move(d
.buf
));
118 auto f
= send_buffer(std::move(d
.buf
)).then([this] {
119 _stats
.sent_messages
++;
120 return _write_buf
.flush();
122 return f
.finally([d
= std::move(d
)] {});
124 }).handle_exception([this] (std::exception_ptr eptr
) {
129 future
<> connection::stop_send_loop() {
132 _outgoing_queue_cond
.broken();
133 _fd
.shutdown_output();
135 return when_all(std::move(_send_loop_stopped
), std::move(_sink_closed_future
)).then([this] (std::tuple
<future
<>, future
<bool>> res
){
136 _outgoing_queue
.clear();
137 // both _send_loop_stopped and _sink_closed_future are never exceptional
138 bool sink_closed
= std::get
<1>(res
).get0();
139 return _connected
&& !sink_closed
? _write_buf
.close() : make_ready_future();
143 void connection::set_socket(connected_socket
&& fd
) {
145 throw std::runtime_error("already connected");
148 _read_buf
=_fd
.input();
149 _write_buf
= _fd
.output();
153 future
<> connection::send_negotiation_frame(feature_map features
) {
154 auto negotiation_frame_feature_record_size
= [] (const feature_map::value_type
& e
) {
155 return 8 + e
.second
.size();
157 auto extra_len
= boost::accumulate(
158 features
| boost::adaptors::transformed(negotiation_frame_feature_record_size
),
160 temporary_buffer
<char> reply(sizeof(negotiation_frame
) + extra_len
);
161 auto p
= reply
.get_write();
162 p
= std::copy_n(rpc_magic
, 8, p
);
163 write_le
<uint32_t>(p
, extra_len
);
165 for (auto&& e
: features
) {
166 write_le
<uint32_t>(p
, static_cast<uint32_t>(e
.first
));
168 write_le
<uint32_t>(p
, e
.second
.size());
170 p
= std::copy_n(e
.second
.begin(), e
.second
.size(), p
);
172 return _write_buf
.write(std::move(reply
)).then([this] {
173 _stats
.sent_messages
++;
174 return _write_buf
.flush();
178 future
<> connection::send(snd_buf buf
, compat::optional
<rpc_clock_type::time_point
> timeout
, cancellable
* cancel
) {
180 if (timeout
&& *timeout
<= rpc_clock_type::now()) {
181 return make_ready_future
<>();
183 _outgoing_queue
.emplace_back(std::move(buf
));
184 auto deleter
= [this, it
= std::prev(_outgoing_queue
.cend())] {
185 _outgoing_queue
.erase(it
);
188 auto& t
= _outgoing_queue
.back().t
;
189 t
.set_callback(deleter
);
190 t
.arm(timeout
.value());
193 cancel
->cancel_send
= std::move(deleter
);
194 cancel
->send_back_pointer
= &_outgoing_queue
.back().pcancel
;
195 _outgoing_queue
.back().pcancel
= cancel
;
197 _outgoing_queue_cond
.signal();
198 return _outgoing_queue
.back().p
->get_future();
200 return make_exception_future
<>(closed_error());
204 void connection::abort() {
207 _fd
.shutdown_input();
211 future
<> connection::stop() {
213 return _stopped
.get_future();
216 template<typename Connection
>
217 static bool verify_frame(Connection
& c
, temporary_buffer
<char>& buf
, size_t expected
, const char* log
) {
218 if (buf
.size() != expected
) {
219 if (buf
.size() != 0) {
220 c
.get_logger()(c
.peer_address(), log
);
227 template<typename Connection
>
230 receive_negotiation_frame(Connection
& c
, input_stream
<char>& in
) {
231 return in
.read_exactly(sizeof(negotiation_frame
)).then([&c
, &in
] (temporary_buffer
<char> neg
) {
232 if (!verify_frame(c
, neg
, sizeof(negotiation_frame
), "unexpected eof during negotiation frame")) {
233 return make_exception_future
<feature_map
>(closed_error());
235 negotiation_frame frame
;
236 std::copy_n(neg
.get_write(), sizeof(frame
.magic
), frame
.magic
);
237 frame
.len
= read_le
<uint32_t>(neg
.get_write() + 8);
238 if (std::memcmp(frame
.magic
, rpc_magic
, sizeof(frame
.magic
)) != 0) {
239 c
.get_logger()(c
.peer_address(), "wrong protocol magic");
240 return make_exception_future
<feature_map
>(closed_error());
242 auto len
= frame
.len
;
243 return in
.read_exactly(len
).then([&c
, len
] (temporary_buffer
<char> extra
) {
244 if (extra
.size() != len
) {
245 c
.get_logger()(c
.peer_address(), "unexpected eof during negotiation frame");
246 return make_exception_future
<feature_map
>(closed_error());
249 auto p
= extra
.get();
250 auto end
= p
+ extra
.size();
253 c
.get_logger()(c
.peer_address(), "bad feature data format in negotiation frame");
254 return make_exception_future
<feature_map
>(closed_error());
256 auto feature
= static_cast<protocol_features
>(read_le
<uint32_t>(p
));
257 auto f_len
= read_le
<uint32_t>(p
+ 4);
259 if (f_len
> end
- p
) {
260 c
.get_logger()(c
.peer_address(), "buffer underflow in feature data in negotiation frame");
261 return make_exception_future
<feature_map
>(closed_error());
263 auto data
= sstring(p
, f_len
);
265 map
.emplace(feature
, std::move(data
));
267 return make_ready_future
<feature_map
>(std::move(map
));
272 inline future
<rcv_buf
>
273 read_rcv_buf(input_stream
<char>& in
, uint32_t size
) {
274 return in
.read_up_to(size
).then([&, size
] (temporary_buffer
<char> data
) mutable {
276 if (data
.size() == 0) {
277 return make_ready_future
<rcv_buf
>(rcv_buf());
278 } else if (data
.size() == size
) {
279 rb
.bufs
= std::move(data
);
280 return make_ready_future
<rcv_buf
>(std::move(rb
));
283 std::vector
<temporary_buffer
<char>> v
;
284 v
.push_back(std::move(data
));
285 rb
.bufs
= std::move(v
);
286 return do_with(std::move(rb
), std::move(size
), [&in
] (rcv_buf
& rb
, uint32_t& left
) {
287 return repeat([&] () {
288 return in
.read_up_to(left
).then([&] (temporary_buffer
<char> data
) {
291 return stop_iteration::yes
;
294 compat::get
<std::vector
<temporary_buffer
<char>>>(rb
.bufs
).push_back(std::move(data
));
295 return left
? stop_iteration::no
: stop_iteration::yes
;
299 return std::move(rb
);
306 template<typename FrameType
, typename Info
>
307 typename
FrameType::return_type
308 connection::read_frame(const Info
& info
, input_stream
<char>& in
) {
309 auto header_size
= FrameType::header_size();
310 return in
.read_exactly(header_size
).then([this, header_size
, &info
, &in
] (temporary_buffer
<char> header
) {
311 if (header
.size() != header_size
) {
312 if (header
.size() != 0) {
313 _logger(info
, format("unexpected eof on a {} while reading header: expected {:d} got {:d}", FrameType::role(), header_size
, header
.size()));
315 return FrameType::empty_value();
317 auto h
= FrameType::decode_header(header
.get());
318 auto size
= FrameType::get_size(h
);
320 return FrameType::make_value(h
, rcv_buf());
322 return read_rcv_buf(in
, size
).then([this, &info
, h
= std::move(h
), size
] (rcv_buf rb
) {
323 if (rb
.size
!= size
) {
324 _logger(info
, format("unexpected eof on a {} while reading data: expected {:d} got {:d}", FrameType::role(), size
, rb
.size
));
325 return FrameType::empty_value();
327 return FrameType::make_value(h
, std::move(rb
));
334 template<typename FrameType
, typename Info
>
335 typename
FrameType::return_type
336 connection::read_frame_compressed(const Info
& info
, std::unique_ptr
<compressor
>& compressor
, input_stream
<char>& in
) {
338 return in
.read_exactly(4).then([&] (temporary_buffer
<char> compress_header
) {
339 if (compress_header
.size() != 4) {
340 if (compress_header
.size() != 0) {
341 _logger(info
, format("unexpected eof on a {} while reading compression header: expected 4 got {:d}", FrameType::role(), compress_header
.size()));
343 return FrameType::empty_value();
345 auto ptr
= compress_header
.get();
346 auto size
= read_le
<uint32_t>(ptr
);
347 return read_rcv_buf(in
, size
).then([this, size
, &compressor
, &info
] (rcv_buf compressed_data
) {
348 if (compressed_data
.size
!= size
) {
349 _logger(info
, format("unexpected eof on a {} while reading compressed data: expected {:d} got {:d}", FrameType::role(), size
, compressed_data
.size
));
350 return FrameType::empty_value();
352 auto eb
= compressor
->decompress(std::move(compressed_data
));
354 auto* one
= compat::get_if
<temporary_buffer
<char>>(&eb
.bufs
);
356 p
= net::packet(std::move(p
), std::move(*one
));
358 for (auto&& b
: compat::get
<std::vector
<temporary_buffer
<char>>>(eb
.bufs
)) {
359 p
= net::packet(std::move(p
), std::move(b
));
362 return do_with(as_input_stream(std::move(p
)), [this, &info
] (input_stream
<char>& in
) {
363 return read_frame
<FrameType
>(info
, in
);
368 return read_frame
<FrameType
>(info
, in
);
372 struct stream_frame
{
373 using opt_buf_type
= compat::optional
<rcv_buf
>;
374 using return_type
= future
<opt_buf_type
>;
379 static size_t header_size() {
382 static const char* role() {
385 static future
<opt_buf_type
> empty_value() {
386 return make_ready_future
<opt_buf_type
>(compat::nullopt
);
388 static header_type
decode_header(const char* ptr
) {
389 header_type h
{read_le
<uint32_t>(ptr
), false};
396 static uint32_t get_size(const header_type
& t
) {
399 static future
<opt_buf_type
> make_value(const header_type
& t
, rcv_buf data
) {
403 return make_ready_future
<opt_buf_type
>(std::move(data
));
407 future
<compat::optional
<rcv_buf
>>
408 connection::read_stream_frame_compressed(input_stream
<char>& in
) {
409 return read_frame_compressed
<stream_frame
>(peer_address(), _compressor
, in
);
412 future
<> connection::stream_close() {
413 auto f
= make_ready_future
<>();
416 _sink_closed_future
= p
.get_future();
417 // stop_send_loop(), which also calls _write_buf.close(), and this code can run in parallel.
418 // Use _sink_closed_future to serialize them and skip second call to close()
419 f
= _write_buf
.close().finally([p
= std::move(p
)] () mutable { p
.set_value(true);});
421 return f
.finally([this] () mutable { return stop(); });
424 future
<> connection::stream_process_incoming(rcv_buf
&& buf
) {
425 // we do not want to dead lock on huge packets, so let them in
426 // but only one at a time
427 auto size
= std::min(size_t(buf
.size
), max_stream_buffers_memory
);
428 return get_units(_stream_sem
, size
).then([this, buf
= std::move(buf
)] (semaphore_units
<>&& su
) mutable {
429 buf
.su
= std::move(su
);
430 return _stream_queue
.push_eventually(std::move(buf
));
434 future
<> connection::handle_stream_frame() {
435 return read_stream_frame_compressed(_read_buf
).then([this] (compat::optional
<rcv_buf
> data
) {
438 return make_ready_future
<>();
440 return stream_process_incoming(std::move(*data
));
444 future
<> connection::stream_receive(circular_buffer
<foreign_ptr
<std::unique_ptr
<rcv_buf
>>>& bufs
) {
445 if (_source_closed
) {
446 return make_exception_future
<>(stream_closed());
449 return _stream_queue
.not_empty().then([this, &bufs
] {
450 bool eof
= !_stream_queue
.consume([this, &bufs
] (rcv_buf
&& b
) {
451 if (b
.size
== -1U) { // max fragment length marks an end of a stream
454 bufs
.push_back(make_foreign(std::make_unique
<rcv_buf
>(std::move(b
))));
458 if (eof
&& !bufs
.empty()) {
459 assert(_stream_queue
.empty());
460 _stream_queue
.push(rcv_buf(-1U)); // push eof marker back for next read to notice it
465 void connection::register_stream(connection_id id
, xshard_connection_ptr c
) {
466 _streams
.emplace(id
, std::move(c
));
469 xshard_connection_ptr
connection::get_stream(connection_id id
) const {
470 auto it
= _streams
.find(id
);
471 if (it
== _streams
.end()) {
472 throw std::logic_error(format("rpc stream id {:d} not found", id
).c_str());
477 static void log_exception(connection
& c
, const char* log
, std::exception_ptr eptr
) {
480 std::rethrow_exception(eptr
);
481 } catch (std::exception
& ex
) {
484 s
= "unknown exception";
486 c
.get_logger()(c
.peer_address(), format("{}: {}", log
, s
));
491 client::negotiate(feature_map provided
) {
492 // record features returned here
493 for (auto&& e
: provided
) {
496 // supported features go here
497 case protocol_features::COMPRESS
:
498 if (_options
.compressor_factory
) {
499 _compressor
= _options
.compressor_factory
->negotiate(e
.second
, false);
502 case protocol_features::TIMEOUT
:
503 _timeout_negotiated
= true;
505 case protocol_features::CONNECTION_ID
: {
506 _id
= deserialize_connection_id(e
.second
);
517 client::negotiate_protocol(input_stream
<char>& in
) {
518 return receive_negotiation_frame(*this, in
).then([this] (feature_map features
) {
519 return negotiate(features
);
523 struct response_frame
{
524 using opt_buf_type
= compat::optional
<rcv_buf
>;
525 using return_type
= future
<int64_t, opt_buf_type
>;
526 using header_type
= std::tuple
<int64_t, uint32_t>;
527 static size_t header_size() {
530 static const char* role() {
533 static auto empty_value() {
534 return make_ready_future
<int64_t, opt_buf_type
>(0, compat::nullopt
);
536 static header_type
decode_header(const char* ptr
) {
537 auto msgid
= read_le
<int64_t>(ptr
);
538 auto size
= read_le
<uint32_t>(ptr
+ 8);
539 return std::make_tuple(msgid
, size
);
541 static uint32_t get_size(const header_type
& t
) {
542 return std::get
<1>(t
);
544 static auto make_value(const header_type
& t
, rcv_buf data
) {
545 return make_ready_future
<int64_t, opt_buf_type
>(std::get
<0>(t
), std::move(data
));
550 future
<int64_t, compat::optional
<rcv_buf
>>
551 client::read_response_frame(input_stream
<char>& in
) {
552 return read_frame
<response_frame
>(_server_addr
, in
);
555 future
<int64_t, compat::optional
<rcv_buf
>>
556 client::read_response_frame_compressed(input_stream
<char>& in
) {
557 return read_frame_compressed
<response_frame
>(_server_addr
, _compressor
, in
);
560 stats
client::get_stats() const {
562 res
.wait_reply
= _outstanding
.size();
563 res
.pending
= _outgoing_queue
.size();
567 void client::wait_for_reply(id_type id
, std::unique_ptr
<reply_handler_base
>&& h
, compat::optional
<rpc_clock_type::time_point
> timeout
, cancellable
* cancel
) {
569 h
->t
.set_callback(std::bind(std::mem_fn(&client::wait_timed_out
), this, id
));
570 h
->t
.arm(timeout
.value());
573 cancel
->cancel_wait
= [this, id
] {
574 _outstanding
[id
]->cancel();
575 _outstanding
.erase(id
);
578 cancel
->wait_back_pointer
= &h
->pcancel
;
580 _outstanding
.emplace(id
, std::move(h
));
582 void client::wait_timed_out(id_type id
) {
584 _outstanding
[id
]->timeout();
585 _outstanding
.erase(id
);
588 future
<> client::stop() {
593 return _stopped
.get_future();
596 void client::abort_all_streams() {
597 while (!_streams
.empty()) {
598 auto&& s
= _streams
.begin();
599 assert(s
->second
->get_owner_shard() == engine().cpu_id()); // abort can be called only locally
600 s
->second
->get()->abort();
605 void client::deregister_this_stream() {
607 _parent
->_streams
.erase(_id
);
611 client::client(const logger
& l
, void* s
, client_options ops
, socket socket
, ipv4_addr addr
, ipv4_addr local
)
612 : rpc::connection(l
, s
), _socket(std::move(socket
)), _server_addr(addr
), _options(ops
) {
613 _socket
.connect(addr
, local
).then([this, ops
= std::move(ops
)] (connected_socket fd
) {
614 fd
.set_nodelay(ops
.tcp_nodelay
);
616 fd
.set_keepalive(true);
617 fd
.set_keepalive_parameters(ops
.keepalive
.value());
619 set_socket(std::move(fd
));
621 feature_map features
;
622 if (_options
.compressor_factory
) {
623 features
[protocol_features::COMPRESS
] = _options
.compressor_factory
->supported();
625 if (_options
.send_timeout_data
) {
626 features
[protocol_features::TIMEOUT
] = "";
628 if (_options
.stream_parent
) {
629 features
[protocol_features::STREAM_PARENT
] = serialize_connection_id(_options
.stream_parent
);
631 if (!_options
.isolation_cookie
.empty()) {
632 features
[protocol_features::ISOLATION
] = _options
.isolation_cookie
;
635 send_negotiation_frame(std::move(features
));
637 return negotiate_protocol(_read_buf
).then([this] () {
638 _client_negotiated
->set_value();
639 _client_negotiated
= compat::nullopt
;
641 return do_until([this] { return _read_buf
.eof() || _error
; }, [this] () mutable {
643 return handle_stream_frame();
645 return read_response_frame_compressed(_read_buf
).then([this] (int64_t msg_id
, compat::optional
<rcv_buf
> data
) {
646 auto it
= _outstanding
.find(std::abs(msg_id
));
649 } else if (it
!= _outstanding
.end()) {
650 auto handler
= std::move(it
->second
);
651 _outstanding
.erase(it
);
652 (*handler
)(*this, msg_id
, std::move(data
.value()));
653 } else if (msg_id
< 0) {
655 std::rethrow_exception(unmarshal_exception(data
.value()));
656 } catch(const unknown_verb_error
& ex
) {
657 // if this is unknown verb exception with unknown id ignore it
658 // can happen if unknown verb was used by no_wait client
659 get_logger()(peer_address(), format("unknown verb exception {:d} ignored", ex
.type
));
661 // We've got error response but handler is no longer waiting, could be timed out.
662 log_exception(*this, "ignoring error response", std::current_exception());
665 // we get a reply for a message id not in _outstanding
666 // this can happened if the message id is timed out already
667 // FIXME: log it but with low level, currently log levels are not supported
672 }).then_wrapped([this] (future
<> f
) {
673 std::exception_ptr ep
;
675 ep
= f
.get_exception();
677 log_exception(*this, _connected
? "client stream connection dropped" : "stream fail to connect", ep
);
679 log_exception(*this, _connected
? "client connection dropped" : "fail to connect", ep
);
683 _stream_queue
.abort(std::make_exception_ptr(stream_closed()));
684 return stop_send_loop().then_wrapped([this] (future
<> f
) {
685 f
.ignore_ready_future();
686 _outstanding
.clear();
688 deregister_this_stream();
692 }).finally([this, ep
]{
693 if (_client_negotiated
&& ep
) {
694 _client_negotiated
->set_exception(ep
);
696 _stopped
.set_value();
701 client::client(const logger
& l
, void* s
, ipv4_addr addr
, ipv4_addr local
)
702 : client(l
, s
, client_options
{}, engine().net().socket(), addr
, local
)
705 client::client(const logger
& l
, void* s
, client_options options
, ipv4_addr addr
, ipv4_addr local
)
706 : client(l
, s
, options
, engine().net().socket(), addr
, local
)
709 client::client(const logger
& l
, void* s
, socket socket
, ipv4_addr addr
, ipv4_addr local
)
710 : client(l
, s
, client_options
{}, std::move(socket
), addr
, local
)
715 server::connection::negotiate(feature_map requested
) {
717 future
<> f
= make_ready_future
<>();
718 for (auto&& e
: requested
) {
721 // supported features go here
722 case protocol_features::COMPRESS
: {
723 if (_server
._options
.compressor_factory
) {
724 _compressor
= _server
._options
.compressor_factory
->negotiate(e
.second
, true);
725 ret
[protocol_features::COMPRESS
] = _server
._options
.compressor_factory
->supported();
729 case protocol_features::TIMEOUT
:
730 _timeout_negotiated
= true;
731 ret
[protocol_features::TIMEOUT
] = "";
733 case protocol_features::STREAM_PARENT
: {
734 if (!_server
._options
.streaming_domain
) {
735 f
= make_exception_future
<>(std::runtime_error("streaming is not configured for the server"));
737 _parent_id
= deserialize_connection_id(e
.second
);
739 // remove stream connection from rpc connection list
740 _server
._conns
.erase(get_connection_id());
741 f
= smp::submit_to(_parent_id
.shard(), [this, c
= make_foreign(static_pointer_cast
<rpc::connection
>(shared_from_this()))] () mutable {
742 auto sit
= _servers
.find(*_server
._options
.streaming_domain
);
743 if (sit
== _servers
.end()) {
744 throw std::logic_error(format("Shard {:d} does not have server with streaming domain {:x}", engine().cpu_id(), *_server
._options
.streaming_domain
).c_str());
746 auto s
= sit
->second
;
747 auto it
= s
->_conns
.find(_parent_id
);
748 if (it
== s
->_conns
.end()) {
749 throw std::logic_error(format("Unknown parent connection {:d} on shard {:d}", _parent_id
, engine().cpu_id()).c_str());
751 auto id
= c
->get_connection_id();
752 it
->second
->register_stream(id
, make_lw_shared(std::move(c
)));
757 case protocol_features::ISOLATION
: {
758 auto&& isolation_cookie
= e
.second
;
759 _isolation_config
= _server
._limits
.isolate_connection(isolation_cookie
);
768 if (_server
._options
.streaming_domain
) {
769 ret
[protocol_features::CONNECTION_ID
] = serialize_connection_id(_id
);
771 return f
.then([ret
= std::move(ret
)] {
777 server::connection::negotiate_protocol(input_stream
<char>& in
) {
778 return receive_negotiation_frame(*this, in
).then([this] (feature_map requested_features
) {
779 return negotiate(std::move(requested_features
)).then([this] (feature_map returned_features
) {
780 return send_negotiation_frame(std::move(returned_features
));
785 struct request_frame
{
786 using opt_buf_type
= compat::optional
<rcv_buf
>;
787 using return_type
= future
<compat::optional
<uint64_t>, uint64_t, int64_t, opt_buf_type
>;
788 using header_type
= std::tuple
<compat::optional
<uint64_t>, uint64_t, int64_t, uint32_t>;
789 static size_t header_size() {
792 static const char* role() {
795 static auto empty_value() {
796 return make_ready_future
<compat::optional
<uint64_t>, uint64_t, int64_t, opt_buf_type
>(compat::nullopt
, uint64_t(0), 0, compat::nullopt
);
798 static header_type
decode_header(const char* ptr
) {
799 auto type
= read_le
<uint64_t>(ptr
);
800 auto msgid
= read_le
<int64_t>(ptr
+ 8);
801 auto size
= read_le
<uint32_t>(ptr
+ 16);
802 return std::make_tuple(compat::nullopt
, type
, msgid
, size
);
804 static uint32_t get_size(const header_type
& t
) {
805 return std::get
<3>(t
);
807 static auto make_value(const header_type
& t
, rcv_buf data
) {
808 return make_ready_future
<compat::optional
<uint64_t>, uint64_t, int64_t, opt_buf_type
>(std::get
<0>(t
), std::get
<1>(t
), std::get
<2>(t
), std::move(data
));
812 struct request_frame_with_timeout
: request_frame
{
813 using super
= request_frame
;
814 static size_t header_size() {
817 static typename
super::header_type
decode_header(const char* ptr
) {
818 auto h
= super::decode_header(ptr
+ 8);
819 std::get
<0>(h
) = read_le
<uint64_t>(ptr
);
824 future
<compat::optional
<uint64_t>, uint64_t, int64_t, compat::optional
<rcv_buf
>>
825 server::connection::read_request_frame_compressed(input_stream
<char>& in
) {
826 if (_timeout_negotiated
) {
827 return read_frame_compressed
<request_frame_with_timeout
>(_info
, _compressor
, in
);
829 return read_frame_compressed
<request_frame
>(_info
, _compressor
, in
);
834 server::connection::respond(int64_t msg_id
, snd_buf
&& data
, compat::optional
<rpc_clock_type::time_point
> timeout
) {
835 static_assert(snd_buf::chunk_size
>= 12, "send buffer chunk size is too small");
836 auto p
= data
.front().get_write();
837 write_le
<int64_t>(p
, msg_id
);
838 write_le
<uint32_t>(p
+ 8, data
.size
- 12);
839 return send(std::move(data
), timeout
);
842 future
<> server::connection::process() {
843 return negotiate_protocol(_read_buf
).then([this] () mutable {
844 auto sg
= _isolation_config
? _isolation_config
->sched_group
: current_scheduling_group();
845 return with_scheduling_group(sg
, [this] {
847 return do_until([this] { return _read_buf
.eof() || _error
; }, [this] () mutable {
849 return handle_stream_frame();
851 return read_request_frame_compressed(_read_buf
).then([this] (compat::optional
<uint64_t> expire
, uint64_t type
, int64_t msg_id
, compat::optional
<rcv_buf
> data
) {
854 return make_ready_future
<>();
856 compat::optional
<rpc_clock_type::time_point
> timeout
;
857 if (expire
&& *expire
) {
858 timeout
= rpc_clock_type::now() + std::chrono::milliseconds(*expire
);
860 auto h
= _server
._proto
->get_handler(type
);
862 // If the new method of per-connection scheduling group was used, honor it.
863 // Otherwise, use the old per-handler scheduling group.
864 auto sg
= _isolation_config
? _isolation_config
->sched_group
: h
->sg
;
865 return with_scheduling_group(sg
, std::ref(h
->func
), shared_from_this(), timeout
, msg_id
, std::move(data
.value()));
867 return wait_for_resources(28, timeout
).then([this, timeout
, msg_id
, type
] (auto permit
) {
868 // send unknown_verb exception back
870 static_assert(snd_buf::chunk_size
>= 28, "send buffer chunk size is too small");
871 auto p
= data
.front().get_write() + 12;
872 write_le
<uint32_t>(p
, uint32_t(exception_type::UNKNOWN_VERB
));
873 write_le
<uint32_t>(p
+ 4, uint32_t(8));
874 write_le
<uint64_t>(p
+ 8, type
);
876 with_gate(_server
._reply_gate
, [this, timeout
, msg_id
, data
= std::move(data
), permit
= std::move(permit
)] () mutable {
877 // workaround for https://gcc.gnu.org/bugzilla/show_bug.cgi?id=83268
878 auto c
= shared_from_this();
879 return respond(-msg_id
, std::move(data
), timeout
).then([c
= std::move(c
), permit
= std::move(permit
)] {});
881 } catch(gate_closed_exception
&) {/* ignore */}
888 }).then_wrapped([this] (future
<> f
) {
890 log_exception(*this, format("server{} connection dropped", is_stream() ? " stream" : "").c_str(), f
.get_exception());
892 _fd
.shutdown_input();
894 _stream_queue
.abort(std::make_exception_ptr(stream_closed()));
895 return stop_send_loop().then_wrapped([this] (future
<> f
) {
896 f
.ignore_ready_future();
897 _server
._conns
.erase(get_connection_id());
899 return deregister_this_stream();
901 return make_ready_future
<>();
904 _stopped
.set_value();
906 }).finally([conn_ptr
= shared_from_this()] {
907 // hold onto connection pointer until do_until() exists
911 server::connection::connection(server
& s
, connected_socket
&& fd
, socket_address
&& addr
, const logger
& l
, void* serializer
, connection_id id
)
912 : rpc::connection(std::move(fd
), l
, serializer
, id
), _server(s
) {
913 _info
.addr
= std::move(addr
);
916 future
<> server::connection::deregister_this_stream() {
917 if (!_server
._options
.streaming_domain
) {
918 return make_ready_future
<>();
920 return smp::submit_to(_parent_id
.shard(), [this] () mutable {
921 auto sit
= server::_servers
.find(*_server
._options
.streaming_domain
);
922 if (sit
!= server::_servers
.end()) {
923 auto s
= sit
->second
;
924 auto it
= s
->_conns
.find(_parent_id
);
925 if (it
!= s
->_conns
.end()) {
926 it
->second
->_streams
.erase(get_connection_id());
932 thread_local
std::unordered_map
<streaming_domain_type
, server
*> server::_servers
;
934 server::server(protocol_base
* proto
, ipv4_addr addr
, resource_limits limits
)
935 : server(proto
, engine().listen(addr
, listen_options
{true}), limits
, server_options
{})
938 server::server(protocol_base
* proto
, server_options opts
, ipv4_addr addr
, resource_limits limits
)
939 : server(proto
, engine().listen(addr
, listen_options
{true, opts
.load_balancing_algorithm
}), limits
, opts
)
942 server::server(protocol_base
* proto
, server_socket ss
, resource_limits limits
, server_options opts
)
943 : _proto(proto
), _ss(std::move(ss
)), _limits(limits
), _resources_available(limits
.max_memory
), _options(opts
)
945 if (_options
.streaming_domain
) {
946 _servers
[*_options
.streaming_domain
] = this;
951 server::server(protocol_base
* proto
, server_options opts
, server_socket ss
, resource_limits limits
)
952 : server(proto
, std::move(ss
), limits
, opts
)
955 void server::accept() {
956 keep_doing([this] () mutable {
957 return _ss
.accept().then([this] (connected_socket fd
, socket_address addr
) mutable {
958 fd
.set_nodelay(_options
.tcp_nodelay
);
959 connection_id id
= invalid_connection_id
;
960 if (_options
.streaming_domain
) {
961 id
= {_next_client_id
++ << 16 | uint16_t(engine().cpu_id())};
963 auto conn
= _proto
->make_server_connection(*this, std::move(fd
), std::move(addr
), id
);
964 _conns
.emplace(id
, conn
);
967 }).then_wrapped([this] (future
<>&& f
){
972 _ss_stopped
.set_value();
977 future
<> server::stop() {
979 _resources_available
.broken();
980 if (_options
.streaming_domain
) {
981 _servers
.erase(*_options
.streaming_domain
);
983 return when_all(_ss_stopped
.get_future(),
984 parallel_for_each(_conns
| boost::adaptors::map_values
, [] (shared_ptr
<connection
> conn
) {
991 std::ostream
& operator<<(std::ostream
& os
, const connection_id
& id
) {
992 return fmt_print(os
, "{:x}", id
.id
);
995 std::ostream
& operator<<(std::ostream
& os
, const streaming_domain_type
& domain
) {
996 return fmt_print(os
, "{:d}", domain
._id
);
999 isolation_config
default_isolate_connection(sstring isolation_cookie
) {
1000 return isolation_config
{};