]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/rpc/rpc.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / src / rpc / rpc.cc
1 #include <seastar/rpc/rpc.hh>
2 #include <seastar/core/align.hh>
3 #include <seastar/core/seastar.hh>
4 #include <seastar/core/print.hh>
5 #include <seastar/core/future-util.hh>
6 #include <boost/range/adaptor/map.hpp>
7
8 #if FMT_VERSION >= 90000
9 template <> struct fmt::formatter<seastar::rpc::streaming_domain_type> : fmt::ostream_formatter {};
10 #endif
11
12 namespace seastar {
13
14 namespace rpc {
15
16 void logger::operator()(const client_info& info, id_type msg_id, const sstring& str) const {
17 log(format("client {} msg_id {}: {}", info.addr, msg_id, str));
18 }
19
20 void logger::operator()(const client_info& info, id_type msg_id, log_level level, std::string_view str) const {
21 log(level, "client {} msg_id {}: {}", info.addr, msg_id, str);
22 }
23
24 void logger::operator()(const client_info& info, const sstring& str) const {
25 (*this)(info.addr, str);
26 }
27
28 void logger::operator()(const client_info& info, log_level level, std::string_view str) const {
29 (*this)(info.addr, level, str);
30 }
31
32 void logger::operator()(const socket_address& addr, const sstring& str) const {
33 log(format("client {}: {}", addr, str));
34 }
35
36 void logger::operator()(const socket_address& addr, log_level level, std::string_view str) const {
37 log(level, "client {}: {}", addr, str);
38 }
39
40 no_wait_type no_wait;
41
42 constexpr size_t snd_buf::chunk_size;
43
44 snd_buf::snd_buf(size_t size_) : size(size_) {
45 if (size <= chunk_size) {
46 bufs = temporary_buffer<char>(size);
47 } else {
48 std::vector<temporary_buffer<char>> v;
49 v.reserve(align_up(size_t(size), chunk_size) / chunk_size);
50 while (size_) {
51 v.push_back(temporary_buffer<char>(std::min(chunk_size, size_)));
52 size_ -= v.back().size();
53 }
54 bufs = std::move(v);
55 }
56 }
57
58 snd_buf::snd_buf(snd_buf&&) noexcept = default;
59 snd_buf& snd_buf::operator=(snd_buf&&) noexcept = default;
60
61 temporary_buffer<char>& snd_buf::front() {
62 auto* one = std::get_if<temporary_buffer<char>>(&bufs);
63 if (one) {
64 return *one;
65 } else {
66 return std::get<std::vector<temporary_buffer<char>>>(bufs).front();
67 }
68 }
69
70 // Make a copy of a remote buffer. No data is actually copied, only pointers and
71 // a deleter of a new buffer takes care of deleting the original buffer
72 template<typename T> // T is either snd_buf or rcv_buf
73 T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org) {
74 if (org.get_owner_shard() == this_shard_id()) {
75 return std::move(*org);
76 }
77 T buf(org->size);
78 auto* one = std::get_if<temporary_buffer<char>>(&org->bufs);
79
80 if (one) {
81 buf.bufs = temporary_buffer<char>(one->get_write(), one->size(), make_object_deleter(std::move(org)));
82 } else {
83 auto& orgbufs = std::get<std::vector<temporary_buffer<char>>>(org->bufs);
84 std::vector<temporary_buffer<char>> newbufs;
85 newbufs.reserve(orgbufs.size());
86 deleter d = make_object_deleter(std::move(org));
87 for (auto&& b : orgbufs) {
88 newbufs.push_back(temporary_buffer<char>(b.get_write(), b.size(), d.share()));
89 }
90 buf.bufs = std::move(newbufs);
91 }
92
93 return buf;
94 }
95
96 template snd_buf make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<snd_buf>>);
97 template rcv_buf make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<rcv_buf>>);
98
99 static void log_exception(connection& c, log_level level, const char* log, std::exception_ptr eptr) {
100 const char* s;
101 try {
102 std::rethrow_exception(eptr);
103 } catch (std::exception& ex) {
104 s = ex.what();
105 } catch (...) {
106 s = "unknown exception";
107 }
108 auto formatted = format("{}: {}", log, s);
109 c.get_logger()(c.peer_address(), level, std::string_view(formatted.data(), formatted.size()));
110 }
111
112 snd_buf connection::compress(snd_buf buf) {
113 if (_compressor) {
114 buf = _compressor->compress(4, std::move(buf));
115 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
116 write_le<uint32_t>(buf.front().get_write(), buf.size - 4);
117 return buf;
118 }
119 return buf;
120 }
121
122 future<> connection::send_buffer(snd_buf buf) {
123 auto* b = std::get_if<temporary_buffer<char>>(&buf.bufs);
124 if (b) {
125 return _write_buf.write(std::move(*b));
126 } else {
127 return do_with(std::move(std::get<std::vector<temporary_buffer<char>>>(buf.bufs)),
128 [this] (std::vector<temporary_buffer<char>>& ar) {
129 return do_for_each(ar.begin(), ar.end(), [this] (auto& b) {
130 return _write_buf.write(std::move(b));
131 });
132 });
133 }
134 }
135
136 future<> connection::send_entry(outgoing_entry& d) {
137 if (_propagate_timeout) {
138 static_assert(snd_buf::chunk_size >= 8, "send buffer chunk size is too small");
139 if (_timeout_negotiated) {
140 auto expire = d.t.get_timeout();
141 uint64_t left = 0;
142 if (expire != typename timer<rpc_clock_type>::time_point()) {
143 left = std::chrono::duration_cast<std::chrono::milliseconds>(expire - timer<rpc_clock_type>::clock::now()).count();
144 }
145 write_le<uint64_t>(d.buf.front().get_write(), left);
146 } else {
147 d.buf.front().trim_front(8);
148 d.buf.size -= 8;
149 }
150 }
151 auto buf = compress(std::move(d.buf));
152 return send_buffer(std::move(buf)).then([this] {
153 _stats.sent_messages++;
154 return _write_buf.flush();
155 });
156 }
157
158 void connection::set_negotiated() noexcept {
159 _negotiated->set_value();
160 _negotiated = std::nullopt;
161 }
162
163 future<> connection::stop_send_loop(std::exception_ptr ex) {
164 _error = true;
165 if (_connected) {
166 _fd.shutdown_output();
167 }
168 if (ex == nullptr) {
169 ex = std::make_exception_ptr(closed_error());
170 }
171 while (!_outgoing_queue.empty()) {
172 auto it = std::prev(_outgoing_queue.end());
173 // Cancel all but front entry normally. The front entry is sitting in the
174 // send_entry() and cannot be withdrawn, except when _negotiated is still
175 // engaged. In the latter case when it will be aborted below the entry's
176 // continuation will not be called and its done promise will not resolve
177 // the _outgoing_queue_ready, so do it here
178 if (it != _outgoing_queue.begin()) {
179 withdraw(it, ex);
180 } else {
181 if (_negotiated) {
182 it->done.set_exception(ex);
183 }
184 break;
185 }
186 }
187 if (_negotiated) {
188 _negotiated->set_exception(ex);
189 }
190 return when_all(std::move(_outgoing_queue_ready), std::move(_sink_closed_future)).then([this] (std::tuple<future<>, future<bool>> res){
191 // _outgoing_queue_ready might be exceptional if queue drain or
192 // _negotiated abortion set it such
193 std::get<0>(res).ignore_ready_future();
194 // _sink_closed_future is never exceptional
195 bool sink_closed = std::get<1>(res).get0();
196 return _connected && !sink_closed ? _write_buf.close() : make_ready_future();
197 });
198 }
199
200 void connection::set_socket(connected_socket&& fd) {
201 if (_connected) {
202 throw std::runtime_error("already connected");
203 }
204 _fd = std::move(fd);
205 _read_buf =_fd.input();
206 _write_buf = _fd.output();
207 _connected = true;
208 }
209
210 future<> connection::send_negotiation_frame(feature_map features) {
211 auto negotiation_frame_feature_record_size = [] (const feature_map::value_type& e) {
212 return 8 + e.second.size();
213 };
214 auto extra_len = boost::accumulate(
215 features | boost::adaptors::transformed(negotiation_frame_feature_record_size),
216 uint32_t(0));
217 temporary_buffer<char> reply(sizeof(negotiation_frame) + extra_len);
218 auto p = reply.get_write();
219 p = std::copy_n(rpc_magic, 8, p);
220 write_le<uint32_t>(p, extra_len);
221 p += 4;
222 for (auto&& e : features) {
223 write_le<uint32_t>(p, static_cast<uint32_t>(e.first));
224 p += 4;
225 write_le<uint32_t>(p, e.second.size());
226 p += 4;
227 p = std::copy_n(e.second.begin(), e.second.size(), p);
228 }
229 return _write_buf.write(std::move(reply)).then([this] {
230 _stats.sent_messages++;
231 return _write_buf.flush();
232 });
233 }
234
235 void connection::withdraw(outgoing_entry::container_t::iterator it, std::exception_ptr ex) {
236 assert(it != _outgoing_queue.end());
237
238 auto pit = std::prev(it);
239 // Previous entry's (pit's) done future will schedule current entry (it)
240 // continuation. Similarly, it.done will schedule next entry continuation
241 // or will resolve _outgoing_queue_ready future.
242 //
243 // To withdraw "it" we need to do two things:
244 // - make pit.done resolve it->next (some time later)
245 // - resolve "it"'s continuation right now
246 //
247 // The latter is achieved by resolving pit.done immediatelly, the former
248 // by moving it.done into pit.done. For simplicity (verging on obscurity?)
249 // both done's are just swapped and "it" resolves its new promise
250
251 std::swap(it->done, pit->done);
252 it->uncancellable();
253 it->unlink();
254 if (ex == nullptr) {
255 it->done.set_value();
256 } else {
257 it->done.set_exception(ex);
258 }
259 }
260
261 future<> connection::send(snd_buf buf, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel) {
262 if (!_error) {
263 if (timeout && *timeout <= rpc_clock_type::now()) {
264 return make_ready_future<>();
265 }
266
267 auto p = std::make_unique<outgoing_entry>(std::move(buf));
268 auto& d = *p;
269 _outgoing_queue.push_back(d);
270 _outgoing_queue_size++;
271 auto deleter = [this, it = _outgoing_queue.iterator_to(d)] {
272 // Front entry is most likely (unless _negotiated is unresolved) sitting
273 // inside send_entry() continuations and thus it cannot be cancelled.
274 if (it != _outgoing_queue.begin()) {
275 withdraw(it);
276 }
277 };
278
279 if (timeout) {
280 auto& t = d.t;
281 t.set_callback(deleter);
282 t.arm(timeout.value());
283 }
284 if (cancel) {
285 cancel->cancel_send = std::move(deleter);
286 cancel->send_back_pointer = &d.pcancel;
287 d.pcancel = cancel;
288 }
289
290 // New entry should continue (do its .then() lambda) after _outgoing_queue_ready
291 // resolves. Next entry will need to do the same after this entry's done resolves.
292 // Thus -- replace _outgoing_queue_ready with d's future and chain its continuation
293 // on ..._ready's old value.
294 return std::exchange(_outgoing_queue_ready, d.done.get_future()).then([this, p = std::move(p)] () mutable {
295 _outgoing_queue_size--;
296 if (__builtin_expect(!p->is_linked(), false)) {
297 // If withdrawn the entry is unlinked and this lambda is fired right at once
298 return make_ready_future<>();
299 }
300
301 p->uncancellable();
302 return send_entry(*p).then_wrapped([this, p = std::move(p)] (auto f) mutable {
303 _error |= f.failed();
304 f.ignore_ready_future();
305 p->done.set_value();
306 });
307 });
308 } else {
309 return make_exception_future<>(closed_error());
310 }
311 }
312
313 void connection::abort() {
314 if (!_error) {
315 _error = true;
316 _fd.shutdown_input();
317 }
318 }
319
320 future<> connection::stop() noexcept {
321 try {
322 abort();
323 } catch (...) {
324 log_exception(*this, log_level::error, "fail to shutdown connection while stopping", std::current_exception());
325 }
326 return _stopped.get_future();
327 }
328
329 template<typename Connection>
330 static bool verify_frame(Connection& c, temporary_buffer<char>& buf, size_t expected, const char* log) {
331 if (buf.size() != expected) {
332 if (buf.size() != 0) {
333 c.get_logger()(c.peer_address(), log);
334 }
335 return false;
336 }
337 return true;
338 }
339
340 template<typename Connection>
341 static
342 future<feature_map>
343 receive_negotiation_frame(Connection& c, input_stream<char>& in) {
344 return in.read_exactly(sizeof(negotiation_frame)).then([&c, &in] (temporary_buffer<char> neg) {
345 if (!verify_frame(c, neg, sizeof(negotiation_frame), "unexpected eof during negotiation frame")) {
346 return make_exception_future<feature_map>(closed_error());
347 }
348 negotiation_frame frame;
349 std::copy_n(neg.get_write(), sizeof(frame.magic), frame.magic);
350 frame.len = read_le<uint32_t>(neg.get_write() + 8);
351 if (std::memcmp(frame.magic, rpc_magic, sizeof(frame.magic)) != 0) {
352 c.get_logger()(c.peer_address(), format("wrong protocol magic: {:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
353 frame.magic[0], frame.magic[1], frame.magic[2], frame.magic[3], frame.magic[4], frame.magic[5], frame.magic[6], frame.magic[7]));
354 return make_exception_future<feature_map>(closed_error());
355 }
356 auto len = frame.len;
357 return in.read_exactly(len).then([&c, len] (temporary_buffer<char> extra) {
358 if (extra.size() != len) {
359 c.get_logger()(c.peer_address(), "unexpected eof during negotiation frame");
360 return make_exception_future<feature_map>(closed_error());
361 }
362 feature_map map;
363 auto p = extra.get();
364 auto end = p + extra.size();
365 while (p != end) {
366 if (end - p < 8) {
367 c.get_logger()(c.peer_address(), "bad feature data format in negotiation frame");
368 return make_exception_future<feature_map>(closed_error());
369 }
370 auto feature = static_cast<protocol_features>(read_le<uint32_t>(p));
371 auto f_len = read_le<uint32_t>(p + 4);
372 p += 8;
373 if (f_len > end - p) {
374 c.get_logger()(c.peer_address(), "buffer underflow in feature data in negotiation frame");
375 return make_exception_future<feature_map>(closed_error());
376 }
377 auto data = sstring(p, f_len);
378 p += f_len;
379 map.emplace(feature, std::move(data));
380 }
381 return make_ready_future<feature_map>(std::move(map));
382 });
383 });
384 }
385
386 inline future<rcv_buf>
387 read_rcv_buf(input_stream<char>& in, uint32_t size) {
388 return in.read_up_to(size).then([&, size] (temporary_buffer<char> data) mutable {
389 rcv_buf rb(size);
390 if (data.size() == 0) {
391 return make_ready_future<rcv_buf>(rcv_buf());
392 } else if (data.size() == size) {
393 rb.bufs = std::move(data);
394 return make_ready_future<rcv_buf>(std::move(rb));
395 } else {
396 size -= data.size();
397 std::vector<temporary_buffer<char>> v;
398 v.push_back(std::move(data));
399 rb.bufs = std::move(v);
400 return do_with(std::move(rb), std::move(size), [&in] (rcv_buf& rb, uint32_t& left) {
401 return repeat([&] () {
402 return in.read_up_to(left).then([&] (temporary_buffer<char> data) {
403 if (!data.size()) {
404 rb.size -= left;
405 return stop_iteration::yes;
406 } else {
407 left -= data.size();
408 std::get<std::vector<temporary_buffer<char>>>(rb.bufs).push_back(std::move(data));
409 return left ? stop_iteration::no : stop_iteration::yes;
410 }
411 });
412 }).then([&rb] {
413 return std::move(rb);
414 });
415 });
416 }
417 });
418 }
419
420 template<typename FrameType>
421 typename FrameType::return_type
422 connection::read_frame(socket_address info, input_stream<char>& in) {
423 auto header_size = FrameType::header_size();
424 return in.read_exactly(header_size).then([this, header_size, info, &in] (temporary_buffer<char> header) {
425 if (header.size() != header_size) {
426 if (header.size() != 0) {
427 _logger(info, format("unexpected eof on a {} while reading header: expected {:d} got {:d}", FrameType::role(), header_size, header.size()));
428 }
429 return FrameType::empty_value();
430 }
431 auto h = FrameType::decode_header(header.get());
432 auto size = FrameType::get_size(h);
433 if (!size) {
434 return FrameType::make_value(h, rcv_buf());
435 } else {
436 return read_rcv_buf(in, size).then([this, info, h = std::move(h), size] (rcv_buf rb) {
437 if (rb.size != size) {
438 _logger(info, format("unexpected eof on a {} while reading data: expected {:d} got {:d}", FrameType::role(), size, rb.size));
439 return FrameType::empty_value();
440 } else {
441 return FrameType::make_value(h, std::move(rb));
442 }
443 });
444 }
445 });
446 }
447
448 template<typename FrameType>
449 typename FrameType::return_type
450 connection::read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in) {
451 if (compressor) {
452 return in.read_exactly(4).then([this, info, &in, &compressor] (temporary_buffer<char> compress_header) {
453 if (compress_header.size() != 4) {
454 if (compress_header.size() != 0) {
455 _logger(info, format("unexpected eof on a {} while reading compression header: expected 4 got {:d}", FrameType::role(), compress_header.size()));
456 }
457 return FrameType::empty_value();
458 }
459 auto ptr = compress_header.get();
460 auto size = read_le<uint32_t>(ptr);
461 return read_rcv_buf(in, size).then([this, size, &compressor, info] (rcv_buf compressed_data) {
462 if (compressed_data.size != size) {
463 _logger(info, format("unexpected eof on a {} while reading compressed data: expected {:d} got {:d}", FrameType::role(), size, compressed_data.size));
464 return FrameType::empty_value();
465 }
466 auto eb = compressor->decompress(std::move(compressed_data));
467 net::packet p;
468 auto* one = std::get_if<temporary_buffer<char>>(&eb.bufs);
469 if (one) {
470 p = net::packet(std::move(p), std::move(*one));
471 } else {
472 auto&& bufs = std::get<std::vector<temporary_buffer<char>>>(eb.bufs);
473 p.reserve(bufs.size());
474 for (auto&& b : bufs) {
475 p = net::packet(std::move(p), std::move(b));
476 }
477 }
478 return do_with(as_input_stream(std::move(p)), [this, info] (input_stream<char>& in) {
479 return read_frame<FrameType>(info, in);
480 });
481 });
482 });
483 } else {
484 return read_frame<FrameType>(info, in);
485 }
486 }
487
488 struct stream_frame {
489 using opt_buf_type = std::optional<rcv_buf>;
490 using return_type = future<opt_buf_type>;
491 struct header_type {
492 uint32_t size;
493 bool eos;
494 };
495 static size_t header_size() {
496 return 4;
497 }
498 static const char* role() {
499 return "stream";
500 }
501 static future<opt_buf_type> empty_value() {
502 return make_ready_future<opt_buf_type>(std::nullopt);
503 }
504 static header_type decode_header(const char* ptr) {
505 header_type h{read_le<uint32_t>(ptr), false};
506 if (h.size == -1U) {
507 h.size = 0;
508 h.eos = true;
509 }
510 return h;
511 }
512 static uint32_t get_size(const header_type& t) {
513 return t.size;
514 }
515 static future<opt_buf_type> make_value(const header_type& t, rcv_buf data) {
516 if (t.eos) {
517 data.size = -1U;
518 }
519 return make_ready_future<opt_buf_type>(std::move(data));
520 }
521 };
522
523 future<std::optional<rcv_buf>>
524 connection::read_stream_frame_compressed(input_stream<char>& in) {
525 return read_frame_compressed<stream_frame>(peer_address(), _compressor, in);
526 }
527
528 future<> connection::stream_close() {
529 auto f = make_ready_future<>();
530 if (!error()) {
531 promise<bool> p;
532 _sink_closed_future = p.get_future();
533 // stop_send_loop(), which also calls _write_buf.close(), and this code can run in parallel.
534 // Use _sink_closed_future to serialize them and skip second call to close()
535 f = _write_buf.close().finally([p = std::move(p)] () mutable { p.set_value(true);});
536 }
537 return f.finally([this] () mutable { return stop(); });
538 }
539
540 future<> connection::stream_process_incoming(rcv_buf&& buf) {
541 // we do not want to dead lock on huge packets, so let them in
542 // but only one at a time
543 auto size = std::min(size_t(buf.size), max_stream_buffers_memory);
544 return get_units(_stream_sem, size).then([this, buf = std::move(buf)] (semaphore_units<>&& su) mutable {
545 buf.su = std::move(su);
546 return _stream_queue.push_eventually(std::move(buf));
547 });
548 }
549
550 future<> connection::handle_stream_frame() {
551 return read_stream_frame_compressed(_read_buf).then([this] (std::optional<rcv_buf> data) {
552 if (!data) {
553 _error = true;
554 return make_ready_future<>();
555 }
556 return stream_process_incoming(std::move(*data));
557 });
558 }
559
560 future<> connection::stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs) {
561 return _stream_queue.not_empty().then([this, &bufs] {
562 bool eof = !_stream_queue.consume([&bufs] (rcv_buf&& b) {
563 if (b.size == -1U) { // max fragment length marks an end of a stream
564 return false;
565 } else {
566 bufs.push_back(make_foreign(std::make_unique<rcv_buf>(std::move(b))));
567 return true;
568 }
569 });
570 if (eof && !bufs.empty()) {
571 assert(_stream_queue.empty());
572 _stream_queue.push(rcv_buf(-1U)); // push eof marker back for next read to notice it
573 }
574 });
575 }
576
577 void connection::register_stream(connection_id id, xshard_connection_ptr c) {
578 _streams.emplace(id, std::move(c));
579 }
580
581 xshard_connection_ptr connection::get_stream(connection_id id) const {
582 auto it = _streams.find(id);
583 if (it == _streams.end()) {
584 throw std::logic_error(format("rpc stream id {} not found", id).c_str());
585 }
586 return it->second;
587 }
588
589 void
590 client::negotiate(feature_map provided) {
591 // record features returned here
592 for (auto&& e : provided) {
593 auto id = e.first;
594 switch (id) {
595 // supported features go here
596 case protocol_features::COMPRESS:
597 if (_options.compressor_factory) {
598 _compressor = _options.compressor_factory->negotiate(e.second, false);
599 }
600 if (!_compressor) {
601 throw std::runtime_error(format("RPC server responded with compression {} - unsupported", e.second));
602 }
603 break;
604 case protocol_features::TIMEOUT:
605 _timeout_negotiated = true;
606 break;
607 case protocol_features::CONNECTION_ID: {
608 _id = deserialize_connection_id(e.second);
609 break;
610 }
611 default:
612 // nothing to do
613 ;
614 }
615 }
616 }
617
618 future<>
619 client::negotiate_protocol(input_stream<char>& in) {
620 return receive_negotiation_frame(*this, in).then([this] (feature_map features) {
621 return negotiate(features);
622 });
623 }
624
625 struct response_frame {
626 using opt_buf_type = std::optional<rcv_buf>;
627 using header_and_buffer_type = std::tuple<int64_t, opt_buf_type>;
628 using return_type = future<header_and_buffer_type>;
629 using header_type = std::tuple<int64_t, uint32_t>;
630 static size_t header_size() {
631 return 12;
632 }
633 static const char* role() {
634 return "client";
635 }
636 static auto empty_value() {
637 return make_ready_future<header_and_buffer_type>(header_and_buffer_type(0, std::nullopt));
638 }
639 static header_type decode_header(const char* ptr) {
640 auto msgid = read_le<int64_t>(ptr);
641 auto size = read_le<uint32_t>(ptr + 8);
642 return std::make_tuple(msgid, size);
643 }
644 static uint32_t get_size(const header_type& t) {
645 return std::get<1>(t);
646 }
647 static auto make_value(const header_type& t, rcv_buf data) {
648 return make_ready_future<header_and_buffer_type>(header_and_buffer_type(std::get<0>(t), std::move(data)));
649 }
650 };
651
652
653 future<response_frame::header_and_buffer_type>
654 client::read_response_frame(input_stream<char>& in) {
655 return read_frame<response_frame>(_server_addr, in);
656 }
657
658 future<response_frame::header_and_buffer_type>
659 client::read_response_frame_compressed(input_stream<char>& in) {
660 return read_frame_compressed<response_frame>(_server_addr, _compressor, in);
661 }
662
663 stats client::get_stats() const {
664 stats res = _stats;
665 res.wait_reply = _outstanding.size();
666 res.pending = outgoing_queue_length();
667 return res;
668 }
669
670 void client::wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel) {
671 if (timeout) {
672 h->t.set_callback(std::bind(std::mem_fn(&client::wait_timed_out), this, id));
673 h->t.arm(timeout.value());
674 }
675 if (cancel) {
676 cancel->cancel_wait = [this, id] {
677 _outstanding[id]->cancel();
678 _outstanding.erase(id);
679 };
680 h->pcancel = cancel;
681 cancel->wait_back_pointer = &h->pcancel;
682 }
683 _outstanding.emplace(id, std::move(h));
684 }
685 void client::wait_timed_out(id_type id) {
686 _stats.timeout++;
687 _outstanding[id]->timeout();
688 _outstanding.erase(id);
689 }
690
691 future<> client::stop() noexcept {
692 _error = true;
693 try {
694 _socket.shutdown();
695 } catch(...) {
696 log_exception(*this, log_level::error, "fail to shutdown connection while stopping", std::current_exception());
697 }
698 return _stopped.get_future();
699 }
700
701 void client::abort_all_streams() {
702 while (!_streams.empty()) {
703 auto&& s = _streams.begin();
704 assert(s->second->get_owner_shard() == this_shard_id()); // abort can be called only locally
705 s->second->get()->abort();
706 _streams.erase(s);
707 }
708 }
709
710 void client::deregister_this_stream() {
711 if (_parent) {
712 _parent->_streams.erase(_id);
713 }
714 }
715
716 client::client(const logger& l, void* s, client_options ops, socket socket, const socket_address& addr, const socket_address& local)
717 : rpc::connection(l, s), _socket(std::move(socket)), _server_addr(addr), _local_addr(local), _options(ops) {
718 _socket.set_reuseaddr(ops.reuseaddr);
719 // Run client in the background.
720 // Communicate result via _stopped.
721 // The caller has to call client::stop() to synchronize.
722 (void)_socket.connect(addr, local).then([this, ops = std::move(ops)] (connected_socket fd) {
723 fd.set_nodelay(ops.tcp_nodelay);
724 if (ops.keepalive) {
725 fd.set_keepalive(true);
726 fd.set_keepalive_parameters(ops.keepalive.value());
727 }
728 set_socket(std::move(fd));
729
730 feature_map features;
731 if (_options.compressor_factory) {
732 features[protocol_features::COMPRESS] = _options.compressor_factory->supported();
733 }
734 if (_options.send_timeout_data) {
735 features[protocol_features::TIMEOUT] = "";
736 }
737 if (_options.stream_parent) {
738 features[protocol_features::STREAM_PARENT] = serialize_connection_id(_options.stream_parent);
739 }
740 if (!_options.isolation_cookie.empty()) {
741 features[protocol_features::ISOLATION] = _options.isolation_cookie;
742 }
743
744 return send_negotiation_frame(std::move(features)).then([this] {
745 return negotiate_protocol(_read_buf);
746 }).then([this] () {
747 _propagate_timeout = !is_stream();
748 set_negotiated();
749 return do_until([this] { return _read_buf.eof() || _error; }, [this] () mutable {
750 if (is_stream()) {
751 return handle_stream_frame();
752 }
753 return read_response_frame_compressed(_read_buf).then([this] (std::tuple<int64_t, std::optional<rcv_buf>> msg_id_and_data) {
754 auto& msg_id = std::get<0>(msg_id_and_data);
755 auto& data = std::get<1>(msg_id_and_data);
756 auto it = _outstanding.find(std::abs(msg_id));
757 if (!data) {
758 _error = true;
759 } else if (it != _outstanding.end()) {
760 auto handler = std::move(it->second);
761 _outstanding.erase(it);
762 (*handler)(*this, msg_id, std::move(data.value()));
763 } else if (msg_id < 0) {
764 try {
765 std::rethrow_exception(unmarshal_exception(data.value()));
766 } catch(const unknown_verb_error& ex) {
767 // if this is unknown verb exception with unknown id ignore it
768 // can happen if unknown verb was used by no_wait client
769 get_logger()(peer_address(), format("unknown verb exception {:d} ignored", ex.type));
770 } catch(...) {
771 // We've got error response but handler is no longer waiting, could be timed out.
772 log_exception(*this, log_level::info, "ignoring error response", std::current_exception());
773 }
774 } else {
775 // we get a reply for a message id not in _outstanding
776 // this can happened if the message id is timed out already
777 get_logger()(peer_address(), log_level::debug, "got a reply for an expired message id");
778 }
779 });
780 });
781 });
782 }).then_wrapped([this] (future<> f) {
783 std::exception_ptr ep;
784 if (f.failed()) {
785 ep = f.get_exception();
786 if (_connected) {
787 if (is_stream()) {
788 log_exception(*this, log_level::error, "client stream connection dropped", ep);
789 } else {
790 log_exception(*this, log_level::error, "client connection dropped", ep);
791 }
792 } else {
793 if (is_stream()) {
794 log_exception(*this, log_level::debug, "stream fail to connect", ep);
795 } else {
796 log_exception(*this, log_level::debug, "fail to connect", ep);
797 }
798 }
799 }
800 _error = true;
801 _stream_queue.abort(std::make_exception_ptr(stream_closed()));
802 return stop_send_loop(ep).then_wrapped([this] (future<> f) {
803 f.ignore_ready_future();
804 _outstanding.clear();
805 if (is_stream()) {
806 deregister_this_stream();
807 } else {
808 abort_all_streams();
809 }
810 }).finally([this]{
811 _stopped.set_value();
812 });
813 });
814 }
815
816 client::client(const logger& l, void* s, const socket_address& addr, const socket_address& local)
817 : client(l, s, client_options{}, make_socket(), addr, local)
818 {}
819
820 client::client(const logger& l, void* s, client_options options, const socket_address& addr, const socket_address& local)
821 : client(l, s, options, make_socket(), addr, local)
822 {}
823
824 client::client(const logger& l, void* s, socket socket, const socket_address& addr, const socket_address& local)
825 : client(l, s, client_options{}, std::move(socket), addr, local)
826 {}
827
828
829 future<feature_map>
830 server::connection::negotiate(feature_map requested) {
831 feature_map ret;
832 future<> f = make_ready_future<>();
833 for (auto&& e : requested) {
834 auto id = e.first;
835 switch (id) {
836 // supported features go here
837 case protocol_features::COMPRESS: {
838 if (_server._options.compressor_factory) {
839 _compressor = _server._options.compressor_factory->negotiate(e.second, true);
840 if (_compressor) {
841 ret[protocol_features::COMPRESS] = _compressor->name();
842 }
843 }
844 }
845 break;
846 case protocol_features::TIMEOUT:
847 _timeout_negotiated = true;
848 ret[protocol_features::TIMEOUT] = "";
849 break;
850 case protocol_features::STREAM_PARENT: {
851 if (!_server._options.streaming_domain) {
852 f = f.then([] {
853 return make_exception_future<>(std::runtime_error("streaming is not configured for the server"));
854 });
855 } else {
856 _parent_id = deserialize_connection_id(e.second);
857 _is_stream = true;
858 // remove stream connection from rpc connection list
859 _server._conns.erase(get_connection_id());
860 f = f.then([this, c = shared_from_this()] () mutable {
861 return smp::submit_to(_parent_id.shard(), [this, c = make_foreign(static_pointer_cast<rpc::connection>(c))] () mutable {
862 auto sit = _servers.find(*_server._options.streaming_domain);
863 if (sit == _servers.end()) {
864 throw std::logic_error(format("Shard {:d} does not have server with streaming domain {}", this_shard_id(), *_server._options.streaming_domain).c_str());
865 }
866 auto s = sit->second;
867 auto it = s->_conns.find(_parent_id);
868 if (it == s->_conns.end()) {
869 throw std::logic_error(format("Unknown parent connection {} on shard {:d}", _parent_id, this_shard_id()).c_str());
870 }
871 auto id = c->get_connection_id();
872 it->second->register_stream(id, make_lw_shared(std::move(c)));
873 });
874 });
875 }
876 break;
877 }
878 case protocol_features::ISOLATION: {
879 auto&& isolation_cookie = e.second;
880 struct isolation_function_visitor {
881 isolation_function_visitor(const sstring& isolation_cookie) :
882 _isolation_cookie(isolation_cookie) { }
883 future<isolation_config> operator() (resource_limits::syncronous_isolation_function f) const {
884 return futurize_invoke(f, _isolation_cookie);
885 }
886 future<isolation_config> operator() (resource_limits::asyncronous_isolation_function f) const {
887 return f(_isolation_cookie);
888 }
889 private:
890 sstring _isolation_cookie;
891 };
892
893 auto visitor = isolation_function_visitor(isolation_cookie);
894 f = f.then([visitor = std::move(visitor), this] () mutable {
895 return std::visit(visitor, _server._limits.isolate_connection).then([this] (isolation_config conf) {
896 _isolation_config = conf;
897 });
898 });
899 ret.emplace(e);
900 break;
901 }
902 default:
903 // nothing to do
904 ;
905 }
906 }
907 if (_server._options.streaming_domain) {
908 ret[protocol_features::CONNECTION_ID] = serialize_connection_id(_id);
909 }
910 return f.then([ret = std::move(ret)] {
911 return ret;
912 });
913 }
914
915 future<>
916 server::connection::negotiate_protocol(input_stream<char>& in) {
917 return receive_negotiation_frame(*this, in).then([this] (feature_map requested_features) {
918 return negotiate(std::move(requested_features)).then([this] (feature_map returned_features) {
919 return send_negotiation_frame(std::move(returned_features));
920 });
921 });
922 }
923
924 struct request_frame {
925 using opt_buf_type = std::optional<rcv_buf>;
926 using header_and_buffer_type = std::tuple<std::optional<uint64_t>, uint64_t, int64_t, opt_buf_type>;
927 using return_type = future<header_and_buffer_type>;
928 using header_type = std::tuple<std::optional<uint64_t>, uint64_t, int64_t, uint32_t>;
929 static size_t header_size() {
930 return 20;
931 }
932 static const char* role() {
933 return "server";
934 }
935 static auto empty_value() {
936 return make_ready_future<header_and_buffer_type>(header_and_buffer_type(std::nullopt, uint64_t(0), 0, std::nullopt));
937 }
938 static header_type decode_header(const char* ptr) {
939 auto type = read_le<uint64_t>(ptr);
940 auto msgid = read_le<int64_t>(ptr + 8);
941 auto size = read_le<uint32_t>(ptr + 16);
942 return std::make_tuple(std::nullopt, type, msgid, size);
943 }
944 static uint32_t get_size(const header_type& t) {
945 return std::get<3>(t);
946 }
947 static auto make_value(const header_type& t, rcv_buf data) {
948 return make_ready_future<header_and_buffer_type>(header_and_buffer_type(std::get<0>(t), std::get<1>(t), std::get<2>(t), std::move(data)));
949 }
950 };
951
952 struct request_frame_with_timeout : request_frame {
953 using super = request_frame;
954 static size_t header_size() {
955 return 28;
956 }
957 static typename super::header_type decode_header(const char* ptr) {
958 auto h = super::decode_header(ptr + 8);
959 std::get<0>(h) = read_le<uint64_t>(ptr);
960 return h;
961 }
962 };
963
964 future<request_frame::header_and_buffer_type>
965 server::connection::read_request_frame_compressed(input_stream<char>& in) {
966 if (_timeout_negotiated) {
967 return read_frame_compressed<request_frame_with_timeout>(_info.addr, _compressor, in);
968 } else {
969 return read_frame_compressed<request_frame>(_info.addr, _compressor, in);
970 }
971 }
972
973 future<>
974 server::connection::respond(int64_t msg_id, snd_buf&& data, std::optional<rpc_clock_type::time_point> timeout) {
975 static_assert(snd_buf::chunk_size >= 12, "send buffer chunk size is too small");
976 auto p = data.front().get_write();
977 write_le<int64_t>(p, msg_id);
978 write_le<uint32_t>(p + 8, data.size - 12);
979 return send(std::move(data), timeout);
980 }
981
982 future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_type::time_point> timeout, int64_t msg_id, uint64_t type) {
983 return wait_for_resources(28, timeout).then([this, timeout, msg_id, type] (auto permit) {
984 // send unknown_verb exception back
985 snd_buf data(28);
986 static_assert(snd_buf::chunk_size >= 28, "send buffer chunk size is too small");
987 auto p = data.front().get_write() + 12;
988 write_le<uint32_t>(p, uint32_t(exception_type::UNKNOWN_VERB));
989 write_le<uint32_t>(p + 4, uint32_t(8));
990 write_le<uint64_t>(p + 8, type);
991 try {
992 // Send asynchronously.
993 // This is safe since connection::stop() will wait for background work.
994 (void)with_gate(_server._reply_gate, [this, timeout, msg_id, data = std::move(data), permit = std::move(permit)] () mutable {
995 // workaround for https://gcc.gnu.org/bugzilla/show_bug.cgi?id=83268
996 auto c = shared_from_this();
997 return respond(-msg_id, std::move(data), timeout).then([c = std::move(c), permit = std::move(permit)] {});
998 });
999 } catch(gate_closed_exception&) {/* ignore */}
1000 });
1001 }
1002
1003 future<> server::connection::process() {
1004 return negotiate_protocol(_read_buf).then([this] () mutable {
1005 auto sg = _isolation_config ? _isolation_config->sched_group : current_scheduling_group();
1006 return with_scheduling_group(sg, [this] {
1007 set_negotiated();
1008 return do_until([this] { return _read_buf.eof() || _error; }, [this] () mutable {
1009 if (is_stream()) {
1010 return handle_stream_frame();
1011 }
1012 return read_request_frame_compressed(_read_buf).then([this] (request_frame::header_and_buffer_type header_and_buffer) {
1013 auto& expire = std::get<0>(header_and_buffer);
1014 auto& type = std::get<1>(header_and_buffer);
1015 auto& msg_id = std::get<2>(header_and_buffer);
1016 auto& data = std::get<3>(header_and_buffer);
1017 if (!data) {
1018 _error = true;
1019 return make_ready_future<>();
1020 } else {
1021 std::optional<rpc_clock_type::time_point> timeout;
1022 if (expire && *expire) {
1023 timeout = relative_timeout_to_absolute(std::chrono::milliseconds(*expire));
1024 }
1025 auto h = _server._proto->get_handler(type);
1026 if (!h) {
1027 return send_unknown_verb_reply(timeout, msg_id, type);
1028 }
1029
1030 // If the new method of per-connection scheduling group was used, honor it.
1031 // Otherwise, use the old per-handler scheduling group.
1032 auto sg = _isolation_config ? _isolation_config->sched_group : h->sg;
1033 return with_scheduling_group(sg, [this, timeout, msg_id, h, data = std::move(data.value())] () mutable {
1034 return h->func(shared_from_this(), timeout, msg_id, std::move(data)).finally([this, h] {
1035 // If anything between get_handler() and here throws, we leak put_handler
1036 _server._proto->put_handler(h);
1037 });
1038 });
1039 }
1040 });
1041 });
1042 });
1043 }).then_wrapped([this] (future<> f) {
1044 std::exception_ptr ep;
1045 if (f.failed()) {
1046 ep = f.get_exception();
1047 log_exception(*this, log_level::error,
1048 format("server{} connection dropped", is_stream() ? " stream" : "").c_str(), ep);
1049 }
1050 _fd.shutdown_input();
1051 _error = true;
1052 _stream_queue.abort(std::make_exception_ptr(stream_closed()));
1053 return stop_send_loop(ep).then_wrapped([this] (future<> f) {
1054 f.ignore_ready_future();
1055 _server._conns.erase(get_connection_id());
1056 if (is_stream()) {
1057 return deregister_this_stream();
1058 } else {
1059 return make_ready_future<>();
1060 }
1061 }).finally([this] {
1062 _stopped.set_value();
1063 });
1064 }).finally([conn_ptr = shared_from_this()] {
1065 // hold onto connection pointer until do_until() exists
1066 });
1067 }
1068
1069 server::connection::connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* serializer, connection_id id)
1070 : rpc::connection(std::move(fd), l, serializer, id), _server(s) {
1071 _info.addr = std::move(addr);
1072 }
1073
1074 future<> server::connection::deregister_this_stream() {
1075 if (!_server._options.streaming_domain) {
1076 return make_ready_future<>();
1077 }
1078 return smp::submit_to(_parent_id.shard(), [this] () mutable {
1079 auto sit = server::_servers.find(*_server._options.streaming_domain);
1080 if (sit != server::_servers.end()) {
1081 auto s = sit->second;
1082 auto it = s->_conns.find(_parent_id);
1083 if (it != s->_conns.end()) {
1084 it->second->_streams.erase(get_connection_id());
1085 }
1086 }
1087 });
1088 }
1089
1090 thread_local std::unordered_map<streaming_domain_type, server*> server::_servers;
1091
1092 server::server(protocol_base* proto, const socket_address& addr, resource_limits limits)
1093 : server(proto, seastar::listen(addr, listen_options{true}), limits, server_options{})
1094 {}
1095
1096 server::server(protocol_base* proto, server_options opts, const socket_address& addr, resource_limits limits)
1097 : server(proto, seastar::listen(addr, listen_options{true, opts.load_balancing_algorithm}), limits, opts)
1098 {}
1099
1100 server::server(protocol_base* proto, server_socket ss, resource_limits limits, server_options opts)
1101 : _proto(proto), _ss(std::move(ss)), _limits(limits), _resources_available(limits.max_memory), _options(opts)
1102 {
1103 if (_options.streaming_domain) {
1104 if (_servers.find(*_options.streaming_domain) != _servers.end()) {
1105 throw std::runtime_error(format("An RPC server with the streaming domain {} is already exist", *_options.streaming_domain));
1106 }
1107 _servers[*_options.streaming_domain] = this;
1108 }
1109 accept();
1110 }
1111
1112 server::server(protocol_base* proto, server_options opts, server_socket ss, resource_limits limits)
1113 : server(proto, std::move(ss), limits, opts)
1114 {}
1115
1116 void server::accept() {
1117 // Run asynchronously in background.
1118 // Communicate result via __ss_stopped.
1119 // The caller has to call server::stop() to synchronize.
1120 (void)keep_doing([this] () mutable {
1121 return _ss.accept().then([this] (accept_result ar) mutable {
1122 if (_options.filter_connection && !_options.filter_connection(ar.remote_address)) {
1123 return;
1124 }
1125 auto fd = std::move(ar.connection);
1126 auto addr = std::move(ar.remote_address);
1127 fd.set_nodelay(_options.tcp_nodelay);
1128 connection_id id = _options.streaming_domain ?
1129 connection_id::make_id(_next_client_id++, uint16_t(this_shard_id())) :
1130 connection_id::make_invalid_id(_next_client_id++);
1131 auto conn = _proto->make_server_connection(*this, std::move(fd), std::move(addr), id);
1132 auto r = _conns.emplace(id, conn);
1133 assert(r.second);
1134 // Process asynchronously in background.
1135 (void)conn->process();
1136 });
1137 }).then_wrapped([this] (future<>&& f){
1138 try {
1139 f.get();
1140 assert(false);
1141 } catch (...) {
1142 _ss_stopped.set_value();
1143 }
1144 });
1145 }
1146
1147 future<> server::stop() {
1148 _ss.abort_accept();
1149 _resources_available.broken();
1150 if (_options.streaming_domain) {
1151 _servers.erase(*_options.streaming_domain);
1152 }
1153 return when_all(_ss_stopped.get_future(),
1154 parallel_for_each(_conns | boost::adaptors::map_values, [] (shared_ptr<connection> conn) {
1155 return conn->stop();
1156 }),
1157 _reply_gate.close()
1158 ).discard_result();
1159 }
1160
1161 std::ostream& operator<<(std::ostream& os, const connection_id& id) {
1162 fmt::print(os, "{:x}", id.id);
1163 return os;
1164 }
1165
1166 std::ostream& operator<<(std::ostream& os, const streaming_domain_type& domain) {
1167 fmt::print(os, "{:d}", domain._id);
1168 return os;
1169 }
1170
1171 isolation_config default_isolate_connection(sstring isolation_cookie) {
1172 return isolation_config{};
1173 }
1174
1175 }
1176
1177 }