]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/rpc/rpc.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / seastar / src / rpc / rpc.cc
1 #include <seastar/rpc/rpc.hh>
2 #include <boost/range/adaptor/map.hpp>
3
4 namespace seastar {
5
6 namespace rpc {
7 no_wait_type no_wait;
8
9 constexpr size_t snd_buf::chunk_size;
10
11 snd_buf::snd_buf(size_t size_) : size(size_) {
12 if (size <= chunk_size) {
13 bufs = temporary_buffer<char>(size);
14 } else {
15 std::vector<temporary_buffer<char>> v;
16 v.reserve(align_up(size_t(size), chunk_size) / chunk_size);
17 while (size_) {
18 v.push_back(temporary_buffer<char>(std::min(chunk_size, size_)));
19 size_ -= v.back().size();
20 }
21 bufs = std::move(v);
22 }
23 }
24
25 temporary_buffer<char>& snd_buf::front() {
26 auto* one = compat::get_if<temporary_buffer<char>>(&bufs);
27 if (one) {
28 return *one;
29 } else {
30 return compat::get<std::vector<temporary_buffer<char>>>(bufs).front();
31 }
32 }
33
34 // Make a copy of a remote buffer. No data is actually copied, only pointers and
35 // a deleter of a new buffer takes care of deleting the original buffer
36 template<typename T> // T is either snd_buf or rcv_buf
37 T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org) {
38 if (org.get_owner_shard() == engine().cpu_id()) {
39 return std::move(*org);
40 }
41 T buf(org->size);
42 auto* one = compat::get_if<temporary_buffer<char>>(&org->bufs);
43
44 if (one) {
45 buf.bufs = temporary_buffer<char>(one->get_write(), one->size(), make_object_deleter(std::move(org)));
46 } else {
47 auto& orgbufs = compat::get<std::vector<temporary_buffer<char>>>(org->bufs);
48 std::vector<temporary_buffer<char>> newbufs;
49 newbufs.reserve(orgbufs.size());
50 deleter d = make_object_deleter(std::move(org));
51 for (auto&& b : orgbufs) {
52 newbufs.push_back(temporary_buffer<char>(b.get_write(), b.size(), d.share()));
53 }
54 buf.bufs = std::move(newbufs);
55 }
56
57 return buf;
58 }
59
60 template snd_buf make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<snd_buf>>);
61 template rcv_buf make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<rcv_buf>>);
62
63 snd_buf connection::compress(snd_buf buf) {
64 if (_compressor) {
65 buf = _compressor->compress(4, std::move(buf));
66 static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
67 write_le<uint32_t>(buf.front().get_write(), buf.size - 4);
68 return std::move(buf);
69 }
70 return std::move(buf);
71 }
72
73 future<> connection::send_buffer(snd_buf buf) {
74 auto* b = compat::get_if<temporary_buffer<char>>(&buf.bufs);
75 if (b) {
76 return _write_buf.write(std::move(*b));
77 } else {
78 return do_with(std::move(compat::get<std::vector<temporary_buffer<char>>>(buf.bufs)),
79 [this] (std::vector<temporary_buffer<char>>& ar) {
80 return do_for_each(ar.begin(), ar.end(), [this] (auto& b) {
81 return _write_buf.write(std::move(b));
82 });
83 });
84 }
85 }
86
87 template<connection::outgoing_queue_type QueueType>
88 void connection::send_loop() {
89 _send_loop_stopped = do_until([this] { return _error; }, [this] {
90 return _outgoing_queue_cond.wait([this] { return !_outgoing_queue.empty(); }).then([this] {
91 // despite using wait with predicated above _outgoing_queue can still be empty here if
92 // there is only one entry on the list and its expire timer runs after wait() returned ready future,
93 // but before this continuation runs.
94 if (_outgoing_queue.empty()) {
95 return make_ready_future();
96 }
97 auto d = std::move(_outgoing_queue.front());
98 _outgoing_queue.pop_front();
99 d.t.cancel(); // cancel timeout timer
100 if (d.pcancel) {
101 d.pcancel->cancel_send = std::function<void()>(); // request is no longer cancellable
102 }
103 if (QueueType == outgoing_queue_type::request) {
104 static_assert(snd_buf::chunk_size >= 8, "send buffer chunk size is too small");
105 if (_timeout_negotiated) {
106 auto expire = d.t.get_timeout();
107 uint64_t left = 0;
108 if (expire != typename timer<rpc_clock_type>::time_point()) {
109 left = std::chrono::duration_cast<std::chrono::milliseconds>(expire - timer<rpc_clock_type>::clock::now()).count();
110 }
111 write_le<uint64_t>(d.buf.front().get_write(), left);
112 } else {
113 d.buf.front().trim_front(8);
114 d.buf.size -= 8;
115 }
116 }
117 d.buf = compress(std::move(d.buf));
118 auto f = send_buffer(std::move(d.buf)).then([this] {
119 _stats.sent_messages++;
120 return _write_buf.flush();
121 });
122 return f.finally([d = std::move(d)] {});
123 });
124 }).handle_exception([this] (std::exception_ptr eptr) {
125 _error = true;
126 });
127 }
128
129 future<> connection::stop_send_loop() {
130 _error = true;
131 if (_connected) {
132 _outgoing_queue_cond.broken();
133 _fd.shutdown_output();
134 }
135 return when_all(std::move(_send_loop_stopped), std::move(_sink_closed_future)).then([this] (std::tuple<future<>, future<bool>> res){
136 _outgoing_queue.clear();
137 // both _send_loop_stopped and _sink_closed_future are never exceptional
138 bool sink_closed = std::get<1>(res).get0();
139 return _connected && !sink_closed ? _write_buf.close() : make_ready_future();
140 });
141 }
142
143 void connection::set_socket(connected_socket&& fd) {
144 if (_connected) {
145 throw std::runtime_error("already connected");
146 }
147 _fd = std::move(fd);
148 _read_buf =_fd.input();
149 _write_buf = _fd.output();
150 _connected = true;
151 }
152
153 future<> connection::send_negotiation_frame(feature_map features) {
154 auto negotiation_frame_feature_record_size = [] (const feature_map::value_type& e) {
155 return 8 + e.second.size();
156 };
157 auto extra_len = boost::accumulate(
158 features | boost::adaptors::transformed(negotiation_frame_feature_record_size),
159 uint32_t(0));
160 temporary_buffer<char> reply(sizeof(negotiation_frame) + extra_len);
161 auto p = reply.get_write();
162 p = std::copy_n(rpc_magic, 8, p);
163 write_le<uint32_t>(p, extra_len);
164 p += 4;
165 for (auto&& e : features) {
166 write_le<uint32_t>(p, static_cast<uint32_t>(e.first));
167 p += 4;
168 write_le<uint32_t>(p, e.second.size());
169 p += 4;
170 p = std::copy_n(e.second.begin(), e.second.size(), p);
171 }
172 return _write_buf.write(std::move(reply)).then([this] {
173 _stats.sent_messages++;
174 return _write_buf.flush();
175 });
176 }
177
178 future<> connection::send(snd_buf buf, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel) {
179 if (!_error) {
180 if (timeout && *timeout <= rpc_clock_type::now()) {
181 return make_ready_future<>();
182 }
183 _outgoing_queue.emplace_back(std::move(buf));
184 auto deleter = [this, it = std::prev(_outgoing_queue.cend())] {
185 _outgoing_queue.erase(it);
186 };
187 if (timeout) {
188 auto& t = _outgoing_queue.back().t;
189 t.set_callback(deleter);
190 t.arm(timeout.value());
191 }
192 if (cancel) {
193 cancel->cancel_send = std::move(deleter);
194 cancel->send_back_pointer = &_outgoing_queue.back().pcancel;
195 _outgoing_queue.back().pcancel = cancel;
196 }
197 _outgoing_queue_cond.signal();
198 return _outgoing_queue.back().p->get_future();
199 } else {
200 return make_exception_future<>(closed_error());
201 }
202 }
203
204 void connection::abort() {
205 if (!_error) {
206 _error = true;
207 _fd.shutdown_input();
208 }
209 }
210
211 future<> connection::stop() {
212 abort();
213 return _stopped.get_future();
214 }
215
216 template<typename Connection>
217 static bool verify_frame(Connection& c, temporary_buffer<char>& buf, size_t expected, const char* log) {
218 if (buf.size() != expected) {
219 if (buf.size() != 0) {
220 c.get_logger()(c.peer_address(), log);
221 }
222 return false;
223 }
224 return true;
225 }
226
227 template<typename Connection>
228 static
229 future<feature_map>
230 receive_negotiation_frame(Connection& c, input_stream<char>& in) {
231 return in.read_exactly(sizeof(negotiation_frame)).then([&c, &in] (temporary_buffer<char> neg) {
232 if (!verify_frame(c, neg, sizeof(negotiation_frame), "unexpected eof during negotiation frame")) {
233 return make_exception_future<feature_map>(closed_error());
234 }
235 negotiation_frame frame;
236 std::copy_n(neg.get_write(), sizeof(frame.magic), frame.magic);
237 frame.len = read_le<uint32_t>(neg.get_write() + 8);
238 if (std::memcmp(frame.magic, rpc_magic, sizeof(frame.magic)) != 0) {
239 c.get_logger()(c.peer_address(), "wrong protocol magic");
240 return make_exception_future<feature_map>(closed_error());
241 }
242 auto len = frame.len;
243 return in.read_exactly(len).then([&c, len] (temporary_buffer<char> extra) {
244 if (extra.size() != len) {
245 c.get_logger()(c.peer_address(), "unexpected eof during negotiation frame");
246 return make_exception_future<feature_map>(closed_error());
247 }
248 feature_map map;
249 auto p = extra.get();
250 auto end = p + extra.size();
251 while (p != end) {
252 if (end - p < 8) {
253 c.get_logger()(c.peer_address(), "bad feature data format in negotiation frame");
254 return make_exception_future<feature_map>(closed_error());
255 }
256 auto feature = static_cast<protocol_features>(read_le<uint32_t>(p));
257 auto f_len = read_le<uint32_t>(p + 4);
258 p += 8;
259 if (f_len > end - p) {
260 c.get_logger()(c.peer_address(), "buffer underflow in feature data in negotiation frame");
261 return make_exception_future<feature_map>(closed_error());
262 }
263 auto data = sstring(p, f_len);
264 p += f_len;
265 map.emplace(feature, std::move(data));
266 }
267 return make_ready_future<feature_map>(std::move(map));
268 });
269 });
270 }
271
272 inline future<rcv_buf>
273 read_rcv_buf(input_stream<char>& in, uint32_t size) {
274 return in.read_up_to(size).then([&, size] (temporary_buffer<char> data) mutable {
275 rcv_buf rb(size);
276 if (data.size() == 0) {
277 return make_ready_future<rcv_buf>(rcv_buf());
278 } else if (data.size() == size) {
279 rb.bufs = std::move(data);
280 return make_ready_future<rcv_buf>(std::move(rb));
281 } else {
282 size -= data.size();
283 std::vector<temporary_buffer<char>> v;
284 v.push_back(std::move(data));
285 rb.bufs = std::move(v);
286 return do_with(std::move(rb), std::move(size), [&in] (rcv_buf& rb, uint32_t& left) {
287 return repeat([&] () {
288 return in.read_up_to(left).then([&] (temporary_buffer<char> data) {
289 if (!data.size()) {
290 rb.size -= left;
291 return stop_iteration::yes;
292 } else {
293 left -= data.size();
294 compat::get<std::vector<temporary_buffer<char>>>(rb.bufs).push_back(std::move(data));
295 return left ? stop_iteration::no : stop_iteration::yes;
296 }
297 });
298 }).then([&rb] {
299 return std::move(rb);
300 });
301 });
302 }
303 });
304 }
305
306 template<typename FrameType, typename Info>
307 typename FrameType::return_type
308 connection::read_frame(const Info& info, input_stream<char>& in) {
309 auto header_size = FrameType::header_size();
310 return in.read_exactly(header_size).then([this, header_size, &info, &in] (temporary_buffer<char> header) {
311 if (header.size() != header_size) {
312 if (header.size() != 0) {
313 _logger(info, format("unexpected eof on a {} while reading header: expected {:d} got {:d}", FrameType::role(), header_size, header.size()));
314 }
315 return FrameType::empty_value();
316 }
317 auto h = FrameType::decode_header(header.get());
318 auto size = FrameType::get_size(h);
319 if (!size) {
320 return FrameType::make_value(h, rcv_buf());
321 } else {
322 return read_rcv_buf(in, size).then([this, &info, h = std::move(h), size] (rcv_buf rb) {
323 if (rb.size != size) {
324 _logger(info, format("unexpected eof on a {} while reading data: expected {:d} got {:d}", FrameType::role(), size, rb.size));
325 return FrameType::empty_value();
326 } else {
327 return FrameType::make_value(h, std::move(rb));
328 }
329 });
330 }
331 });
332 }
333
334 template<typename FrameType, typename Info>
335 typename FrameType::return_type
336 connection::read_frame_compressed(const Info& info, std::unique_ptr<compressor>& compressor, input_stream<char>& in) {
337 if (compressor) {
338 return in.read_exactly(4).then([&] (temporary_buffer<char> compress_header) {
339 if (compress_header.size() != 4) {
340 if (compress_header.size() != 0) {
341 _logger(info, format("unexpected eof on a {} while reading compression header: expected 4 got {:d}", FrameType::role(), compress_header.size()));
342 }
343 return FrameType::empty_value();
344 }
345 auto ptr = compress_header.get();
346 auto size = read_le<uint32_t>(ptr);
347 return read_rcv_buf(in, size).then([this, size, &compressor, &info] (rcv_buf compressed_data) {
348 if (compressed_data.size != size) {
349 _logger(info, format("unexpected eof on a {} while reading compressed data: expected {:d} got {:d}", FrameType::role(), size, compressed_data.size));
350 return FrameType::empty_value();
351 }
352 auto eb = compressor->decompress(std::move(compressed_data));
353 net::packet p;
354 auto* one = compat::get_if<temporary_buffer<char>>(&eb.bufs);
355 if (one) {
356 p = net::packet(std::move(p), std::move(*one));
357 } else {
358 for (auto&& b : compat::get<std::vector<temporary_buffer<char>>>(eb.bufs)) {
359 p = net::packet(std::move(p), std::move(b));
360 }
361 }
362 return do_with(as_input_stream(std::move(p)), [this, &info] (input_stream<char>& in) {
363 return read_frame<FrameType>(info, in);
364 });
365 });
366 });
367 } else {
368 return read_frame<FrameType>(info, in);
369 }
370 }
371
372 struct stream_frame {
373 using opt_buf_type = compat::optional<rcv_buf>;
374 using return_type = future<opt_buf_type>;
375 struct header_type {
376 uint32_t size;
377 bool eos;
378 };
379 static size_t header_size() {
380 return 4;
381 }
382 static const char* role() {
383 return "stream";
384 }
385 static future<opt_buf_type> empty_value() {
386 return make_ready_future<opt_buf_type>(compat::nullopt);
387 }
388 static header_type decode_header(const char* ptr) {
389 header_type h{read_le<uint32_t>(ptr), false};
390 if (h.size == -1U) {
391 h.size = 0;
392 h.eos = true;
393 }
394 return h;
395 }
396 static uint32_t get_size(const header_type& t) {
397 return t.size;
398 }
399 static future<opt_buf_type> make_value(const header_type& t, rcv_buf data) {
400 if (t.eos) {
401 data.size = -1U;
402 }
403 return make_ready_future<opt_buf_type>(std::move(data));
404 }
405 };
406
407 future<compat::optional<rcv_buf>>
408 connection::read_stream_frame_compressed(input_stream<char>& in) {
409 return read_frame_compressed<stream_frame>(peer_address(), _compressor, in);
410 }
411
412 future<> connection::stream_close() {
413 auto f = make_ready_future<>();
414 if (!error()) {
415 promise<bool> p;
416 _sink_closed_future = p.get_future();
417 // stop_send_loop(), which also calls _write_buf.close(), and this code can run in parallel.
418 // Use _sink_closed_future to serialize them and skip second call to close()
419 f = _write_buf.close().finally([p = std::move(p)] () mutable { p.set_value(true);});
420 }
421 return f.finally([this] () mutable { return stop(); });
422 }
423
424 future<> connection::stream_process_incoming(rcv_buf&& buf) {
425 // we do not want to dead lock on huge packets, so let them in
426 // but only one at a time
427 auto size = std::min(size_t(buf.size), max_stream_buffers_memory);
428 return get_units(_stream_sem, size).then([this, buf = std::move(buf)] (semaphore_units<>&& su) mutable {
429 buf.su = std::move(su);
430 return _stream_queue.push_eventually(std::move(buf));
431 });
432 }
433
434 future<> connection::handle_stream_frame() {
435 return read_stream_frame_compressed(_read_buf).then([this] (compat::optional<rcv_buf> data) {
436 if (!data) {
437 _error = true;
438 return make_ready_future<>();
439 }
440 return stream_process_incoming(std::move(*data));
441 });
442 }
443
444 future<> connection::stream_receive(circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>>& bufs) {
445 if (_source_closed) {
446 return make_exception_future<>(stream_closed());
447 }
448
449 return _stream_queue.not_empty().then([this, &bufs] {
450 bool eof = !_stream_queue.consume([this, &bufs] (rcv_buf&& b) {
451 if (b.size == -1U) { // max fragment length marks an end of a stream
452 return false;
453 } else {
454 bufs.push_back(make_foreign(std::make_unique<rcv_buf>(std::move(b))));
455 return true;
456 }
457 });
458 if (eof && !bufs.empty()) {
459 assert(_stream_queue.empty());
460 _stream_queue.push(rcv_buf(-1U)); // push eof marker back for next read to notice it
461 }
462 });
463 }
464
465 void connection::register_stream(connection_id id, xshard_connection_ptr c) {
466 _streams.emplace(id, std::move(c));
467 }
468
469 xshard_connection_ptr connection::get_stream(connection_id id) const {
470 auto it = _streams.find(id);
471 if (it == _streams.end()) {
472 throw std::logic_error(format("rpc stream id {:d} not found", id).c_str());
473 }
474 return it->second;
475 }
476
477 static void log_exception(connection& c, const char* log, std::exception_ptr eptr) {
478 const char* s;
479 try {
480 std::rethrow_exception(eptr);
481 } catch (std::exception& ex) {
482 s = ex.what();
483 } catch (...) {
484 s = "unknown exception";
485 }
486 c.get_logger()(c.peer_address(), format("{}: {}", log, s));
487 }
488
489
490 void
491 client::negotiate(feature_map provided) {
492 // record features returned here
493 for (auto&& e : provided) {
494 auto id = e.first;
495 switch (id) {
496 // supported features go here
497 case protocol_features::COMPRESS:
498 if (_options.compressor_factory) {
499 _compressor = _options.compressor_factory->negotiate(e.second, false);
500 }
501 break;
502 case protocol_features::TIMEOUT:
503 _timeout_negotiated = true;
504 break;
505 case protocol_features::CONNECTION_ID: {
506 _id = deserialize_connection_id(e.second);
507 break;
508 }
509 default:
510 // nothing to do
511 ;
512 }
513 }
514 }
515
516 future<>
517 client::negotiate_protocol(input_stream<char>& in) {
518 return receive_negotiation_frame(*this, in).then([this] (feature_map features) {
519 return negotiate(features);
520 });
521 }
522
523 struct response_frame {
524 using opt_buf_type = compat::optional<rcv_buf>;
525 using return_type = future<int64_t, opt_buf_type>;
526 using header_type = std::tuple<int64_t, uint32_t>;
527 static size_t header_size() {
528 return 12;
529 }
530 static const char* role() {
531 return "client";
532 }
533 static auto empty_value() {
534 return make_ready_future<int64_t, opt_buf_type>(0, compat::nullopt);
535 }
536 static header_type decode_header(const char* ptr) {
537 auto msgid = read_le<int64_t>(ptr);
538 auto size = read_le<uint32_t>(ptr + 8);
539 return std::make_tuple(msgid, size);
540 }
541 static uint32_t get_size(const header_type& t) {
542 return std::get<1>(t);
543 }
544 static auto make_value(const header_type& t, rcv_buf data) {
545 return make_ready_future<int64_t, opt_buf_type>(std::get<0>(t), std::move(data));
546 }
547 };
548
549
550 future<int64_t, compat::optional<rcv_buf>>
551 client::read_response_frame(input_stream<char>& in) {
552 return read_frame<response_frame>(_server_addr, in);
553 }
554
555 future<int64_t, compat::optional<rcv_buf>>
556 client::read_response_frame_compressed(input_stream<char>& in) {
557 return read_frame_compressed<response_frame>(_server_addr, _compressor, in);
558 }
559
560 stats client::get_stats() const {
561 stats res = _stats;
562 res.wait_reply = _outstanding.size();
563 res.pending = _outgoing_queue.size();
564 return res;
565 }
566
567 void client::wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, compat::optional<rpc_clock_type::time_point> timeout, cancellable* cancel) {
568 if (timeout) {
569 h->t.set_callback(std::bind(std::mem_fn(&client::wait_timed_out), this, id));
570 h->t.arm(timeout.value());
571 }
572 if (cancel) {
573 cancel->cancel_wait = [this, id] {
574 _outstanding[id]->cancel();
575 _outstanding.erase(id);
576 };
577 h->pcancel = cancel;
578 cancel->wait_back_pointer = &h->pcancel;
579 }
580 _outstanding.emplace(id, std::move(h));
581 }
582 void client::wait_timed_out(id_type id) {
583 _stats.timeout++;
584 _outstanding[id]->timeout();
585 _outstanding.erase(id);
586 }
587
588 future<> client::stop() {
589 if (!_error) {
590 _error = true;
591 _socket.shutdown();
592 }
593 return _stopped.get_future();
594 }
595
596 void client::abort_all_streams() {
597 while (!_streams.empty()) {
598 auto&& s = _streams.begin();
599 assert(s->second->get_owner_shard() == engine().cpu_id()); // abort can be called only locally
600 s->second->get()->abort();
601 _streams.erase(s);
602 }
603 }
604
605 void client::deregister_this_stream() {
606 if (_parent) {
607 _parent->_streams.erase(_id);
608 }
609 }
610
611 client::client(const logger& l, void* s, client_options ops, socket socket, ipv4_addr addr, ipv4_addr local)
612 : rpc::connection(l, s), _socket(std::move(socket)), _server_addr(addr), _options(ops) {
613 _socket.connect(addr, local).then([this, ops = std::move(ops)] (connected_socket fd) {
614 fd.set_nodelay(ops.tcp_nodelay);
615 if (ops.keepalive) {
616 fd.set_keepalive(true);
617 fd.set_keepalive_parameters(ops.keepalive.value());
618 }
619 set_socket(std::move(fd));
620
621 feature_map features;
622 if (_options.compressor_factory) {
623 features[protocol_features::COMPRESS] = _options.compressor_factory->supported();
624 }
625 if (_options.send_timeout_data) {
626 features[protocol_features::TIMEOUT] = "";
627 }
628 if (_options.stream_parent) {
629 features[protocol_features::STREAM_PARENT] = serialize_connection_id(_options.stream_parent);
630 }
631 if (!_options.isolation_cookie.empty()) {
632 features[protocol_features::ISOLATION] = _options.isolation_cookie;
633 }
634
635 send_negotiation_frame(std::move(features));
636
637 return negotiate_protocol(_read_buf).then([this] () {
638 _client_negotiated->set_value();
639 _client_negotiated = compat::nullopt;
640 send_loop();
641 return do_until([this] { return _read_buf.eof() || _error; }, [this] () mutable {
642 if (is_stream()) {
643 return handle_stream_frame();
644 }
645 return read_response_frame_compressed(_read_buf).then([this] (int64_t msg_id, compat::optional<rcv_buf> data) {
646 auto it = _outstanding.find(std::abs(msg_id));
647 if (!data) {
648 _error = true;
649 } else if (it != _outstanding.end()) {
650 auto handler = std::move(it->second);
651 _outstanding.erase(it);
652 (*handler)(*this, msg_id, std::move(data.value()));
653 } else if (msg_id < 0) {
654 try {
655 std::rethrow_exception(unmarshal_exception(data.value()));
656 } catch(const unknown_verb_error& ex) {
657 // if this is unknown verb exception with unknown id ignore it
658 // can happen if unknown verb was used by no_wait client
659 get_logger()(peer_address(), format("unknown verb exception {:d} ignored", ex.type));
660 } catch(...) {
661 // We've got error response but handler is no longer waiting, could be timed out.
662 log_exception(*this, "ignoring error response", std::current_exception());
663 }
664 } else {
665 // we get a reply for a message id not in _outstanding
666 // this can happened if the message id is timed out already
667 // FIXME: log it but with low level, currently log levels are not supported
668 }
669 });
670 });
671 });
672 }).then_wrapped([this] (future<> f) {
673 std::exception_ptr ep;
674 if (f.failed()) {
675 ep = f.get_exception();
676 if (is_stream()) {
677 log_exception(*this, _connected ? "client stream connection dropped" : "stream fail to connect", ep);
678 } else {
679 log_exception(*this, _connected ? "client connection dropped" : "fail to connect", ep);
680 }
681 }
682 _error = true;
683 _stream_queue.abort(std::make_exception_ptr(stream_closed()));
684 return stop_send_loop().then_wrapped([this] (future<> f) {
685 f.ignore_ready_future();
686 _outstanding.clear();
687 if (is_stream()) {
688 deregister_this_stream();
689 } else {
690 abort_all_streams();
691 }
692 }).finally([this, ep]{
693 if (_client_negotiated && ep) {
694 _client_negotiated->set_exception(ep);
695 }
696 _stopped.set_value();
697 });
698 });
699 }
700
701 client::client(const logger& l, void* s, ipv4_addr addr, ipv4_addr local)
702 : client(l, s, client_options{}, engine().net().socket(), addr, local)
703 {}
704
705 client::client(const logger& l, void* s, client_options options, ipv4_addr addr, ipv4_addr local)
706 : client(l, s, options, engine().net().socket(), addr, local)
707 {}
708
709 client::client(const logger& l, void* s, socket socket, ipv4_addr addr, ipv4_addr local)
710 : client(l, s, client_options{}, std::move(socket), addr, local)
711 {}
712
713
714 future<feature_map>
715 server::connection::negotiate(feature_map requested) {
716 feature_map ret;
717 future<> f = make_ready_future<>();
718 for (auto&& e : requested) {
719 auto id = e.first;
720 switch (id) {
721 // supported features go here
722 case protocol_features::COMPRESS: {
723 if (_server._options.compressor_factory) {
724 _compressor = _server._options.compressor_factory->negotiate(e.second, true);
725 ret[protocol_features::COMPRESS] = _server._options.compressor_factory->supported();
726 }
727 }
728 break;
729 case protocol_features::TIMEOUT:
730 _timeout_negotiated = true;
731 ret[protocol_features::TIMEOUT] = "";
732 break;
733 case protocol_features::STREAM_PARENT: {
734 if (!_server._options.streaming_domain) {
735 f = make_exception_future<>(std::runtime_error("streaming is not configured for the server"));
736 } else {
737 _parent_id = deserialize_connection_id(e.second);
738 _is_stream = true;
739 // remove stream connection from rpc connection list
740 _server._conns.erase(get_connection_id());
741 f = smp::submit_to(_parent_id.shard(), [this, c = make_foreign(static_pointer_cast<rpc::connection>(shared_from_this()))] () mutable {
742 auto sit = _servers.find(*_server._options.streaming_domain);
743 if (sit == _servers.end()) {
744 throw std::logic_error(format("Shard {:d} does not have server with streaming domain {:x}", engine().cpu_id(), *_server._options.streaming_domain).c_str());
745 }
746 auto s = sit->second;
747 auto it = s->_conns.find(_parent_id);
748 if (it == s->_conns.end()) {
749 throw std::logic_error(format("Unknown parent connection {:d} on shard {:d}", _parent_id, engine().cpu_id()).c_str());
750 }
751 auto id = c->get_connection_id();
752 it->second->register_stream(id, make_lw_shared(std::move(c)));
753 });
754 }
755 break;
756 }
757 case protocol_features::ISOLATION: {
758 auto&& isolation_cookie = e.second;
759 _isolation_config = _server._limits.isolate_connection(isolation_cookie);
760 ret.emplace(e);
761 break;
762 }
763 default:
764 // nothing to do
765 ;
766 }
767 }
768 if (_server._options.streaming_domain) {
769 ret[protocol_features::CONNECTION_ID] = serialize_connection_id(_id);
770 }
771 return f.then([ret = std::move(ret)] {
772 return ret;
773 });
774 }
775
776 future<>
777 server::connection::negotiate_protocol(input_stream<char>& in) {
778 return receive_negotiation_frame(*this, in).then([this] (feature_map requested_features) {
779 return negotiate(std::move(requested_features)).then([this] (feature_map returned_features) {
780 return send_negotiation_frame(std::move(returned_features));
781 });
782 });
783 }
784
785 struct request_frame {
786 using opt_buf_type = compat::optional<rcv_buf>;
787 using return_type = future<compat::optional<uint64_t>, uint64_t, int64_t, opt_buf_type>;
788 using header_type = std::tuple<compat::optional<uint64_t>, uint64_t, int64_t, uint32_t>;
789 static size_t header_size() {
790 return 20;
791 }
792 static const char* role() {
793 return "server";
794 }
795 static auto empty_value() {
796 return make_ready_future<compat::optional<uint64_t>, uint64_t, int64_t, opt_buf_type>(compat::nullopt, uint64_t(0), 0, compat::nullopt);
797 }
798 static header_type decode_header(const char* ptr) {
799 auto type = read_le<uint64_t>(ptr);
800 auto msgid = read_le<int64_t>(ptr + 8);
801 auto size = read_le<uint32_t>(ptr + 16);
802 return std::make_tuple(compat::nullopt, type, msgid, size);
803 }
804 static uint32_t get_size(const header_type& t) {
805 return std::get<3>(t);
806 }
807 static auto make_value(const header_type& t, rcv_buf data) {
808 return make_ready_future<compat::optional<uint64_t>, uint64_t, int64_t, opt_buf_type>(std::get<0>(t), std::get<1>(t), std::get<2>(t), std::move(data));
809 }
810 };
811
812 struct request_frame_with_timeout : request_frame {
813 using super = request_frame;
814 static size_t header_size() {
815 return 28;
816 }
817 static typename super::header_type decode_header(const char* ptr) {
818 auto h = super::decode_header(ptr + 8);
819 std::get<0>(h) = read_le<uint64_t>(ptr);
820 return h;
821 }
822 };
823
824 future<compat::optional<uint64_t>, uint64_t, int64_t, compat::optional<rcv_buf>>
825 server::connection::read_request_frame_compressed(input_stream<char>& in) {
826 if (_timeout_negotiated) {
827 return read_frame_compressed<request_frame_with_timeout>(_info, _compressor, in);
828 } else {
829 return read_frame_compressed<request_frame>(_info, _compressor, in);
830 }
831 }
832
833 future<>
834 server::connection::respond(int64_t msg_id, snd_buf&& data, compat::optional<rpc_clock_type::time_point> timeout) {
835 static_assert(snd_buf::chunk_size >= 12, "send buffer chunk size is too small");
836 auto p = data.front().get_write();
837 write_le<int64_t>(p, msg_id);
838 write_le<uint32_t>(p + 8, data.size - 12);
839 return send(std::move(data), timeout);
840 }
841
842 future<> server::connection::process() {
843 return negotiate_protocol(_read_buf).then([this] () mutable {
844 auto sg = _isolation_config ? _isolation_config->sched_group : current_scheduling_group();
845 return with_scheduling_group(sg, [this] {
846 send_loop();
847 return do_until([this] { return _read_buf.eof() || _error; }, [this] () mutable {
848 if (is_stream()) {
849 return handle_stream_frame();
850 }
851 return read_request_frame_compressed(_read_buf).then([this] (compat::optional<uint64_t> expire, uint64_t type, int64_t msg_id, compat::optional<rcv_buf> data) {
852 if (!data) {
853 _error = true;
854 return make_ready_future<>();
855 } else {
856 compat::optional<rpc_clock_type::time_point> timeout;
857 if (expire && *expire) {
858 timeout = rpc_clock_type::now() + std::chrono::milliseconds(*expire);
859 }
860 auto h = _server._proto->get_handler(type);
861 if (h) {
862 // If the new method of per-connection scheduling group was used, honor it.
863 // Otherwise, use the old per-handler scheduling group.
864 auto sg = _isolation_config ? _isolation_config->sched_group : h->sg;
865 return with_scheduling_group(sg, std::ref(h->func), shared_from_this(), timeout, msg_id, std::move(data.value()));
866 } else {
867 return wait_for_resources(28, timeout).then([this, timeout, msg_id, type] (auto permit) {
868 // send unknown_verb exception back
869 snd_buf data(28);
870 static_assert(snd_buf::chunk_size >= 28, "send buffer chunk size is too small");
871 auto p = data.front().get_write() + 12;
872 write_le<uint32_t>(p, uint32_t(exception_type::UNKNOWN_VERB));
873 write_le<uint32_t>(p + 4, uint32_t(8));
874 write_le<uint64_t>(p + 8, type);
875 try {
876 with_gate(_server._reply_gate, [this, timeout, msg_id, data = std::move(data), permit = std::move(permit)] () mutable {
877 // workaround for https://gcc.gnu.org/bugzilla/show_bug.cgi?id=83268
878 auto c = shared_from_this();
879 return respond(-msg_id, std::move(data), timeout).then([c = std::move(c), permit = std::move(permit)] {});
880 });
881 } catch(gate_closed_exception&) {/* ignore */}
882 });
883 }
884 }
885 });
886 });
887 });
888 }).then_wrapped([this] (future<> f) {
889 if (f.failed()) {
890 log_exception(*this, format("server{} connection dropped", is_stream() ? " stream" : "").c_str(), f.get_exception());
891 }
892 _fd.shutdown_input();
893 _error = true;
894 _stream_queue.abort(std::make_exception_ptr(stream_closed()));
895 return stop_send_loop().then_wrapped([this] (future<> f) {
896 f.ignore_ready_future();
897 _server._conns.erase(get_connection_id());
898 if (is_stream()) {
899 return deregister_this_stream();
900 } else {
901 return make_ready_future<>();
902 }
903 }).finally([this] {
904 _stopped.set_value();
905 });
906 }).finally([conn_ptr = shared_from_this()] {
907 // hold onto connection pointer until do_until() exists
908 });
909 }
910
911 server::connection::connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* serializer, connection_id id)
912 : rpc::connection(std::move(fd), l, serializer, id), _server(s) {
913 _info.addr = std::move(addr);
914 }
915
916 future<> server::connection::deregister_this_stream() {
917 if (!_server._options.streaming_domain) {
918 return make_ready_future<>();
919 }
920 return smp::submit_to(_parent_id.shard(), [this] () mutable {
921 auto sit = server::_servers.find(*_server._options.streaming_domain);
922 if (sit != server::_servers.end()) {
923 auto s = sit->second;
924 auto it = s->_conns.find(_parent_id);
925 if (it != s->_conns.end()) {
926 it->second->_streams.erase(get_connection_id());
927 }
928 }
929 });
930 }
931
932 thread_local std::unordered_map<streaming_domain_type, server*> server::_servers;
933
934 server::server(protocol_base* proto, ipv4_addr addr, resource_limits limits)
935 : server(proto, engine().listen(addr, listen_options{true}), limits, server_options{})
936 {}
937
938 server::server(protocol_base* proto, server_options opts, ipv4_addr addr, resource_limits limits)
939 : server(proto, engine().listen(addr, listen_options{true, opts.load_balancing_algorithm}), limits, opts)
940 {}
941
942 server::server(protocol_base* proto, server_socket ss, resource_limits limits, server_options opts)
943 : _proto(proto), _ss(std::move(ss)), _limits(limits), _resources_available(limits.max_memory), _options(opts)
944 {
945 if (_options.streaming_domain) {
946 _servers[*_options.streaming_domain] = this;
947 }
948 accept();
949 }
950
951 server::server(protocol_base* proto, server_options opts, server_socket ss, resource_limits limits)
952 : server(proto, std::move(ss), limits, opts)
953 {}
954
955 void server::accept() {
956 keep_doing([this] () mutable {
957 return _ss.accept().then([this] (connected_socket fd, socket_address addr) mutable {
958 fd.set_nodelay(_options.tcp_nodelay);
959 connection_id id = invalid_connection_id;
960 if (_options.streaming_domain) {
961 id = {_next_client_id++ << 16 | uint16_t(engine().cpu_id())};
962 }
963 auto conn = _proto->make_server_connection(*this, std::move(fd), std::move(addr), id);
964 _conns.emplace(id, conn);
965 conn->process();
966 });
967 }).then_wrapped([this] (future<>&& f){
968 try {
969 f.get();
970 assert(false);
971 } catch (...) {
972 _ss_stopped.set_value();
973 }
974 });
975 }
976
977 future<> server::stop() {
978 _ss.abort_accept();
979 _resources_available.broken();
980 if (_options.streaming_domain) {
981 _servers.erase(*_options.streaming_domain);
982 }
983 return when_all(_ss_stopped.get_future(),
984 parallel_for_each(_conns | boost::adaptors::map_values, [] (shared_ptr<connection> conn) {
985 return conn->stop();
986 }),
987 _reply_gate.close()
988 ).discard_result();
989 }
990
991 std::ostream& operator<<(std::ostream& os, const connection_id& id) {
992 return fmt_print(os, "{:x}", id.id);
993 }
994
995 std::ostream& operator<<(std::ostream& os, const streaming_domain_type& domain) {
996 return fmt_print(os, "{:d}", domain._id);
997 }
998
999 isolation_config default_isolate_connection(sstring isolation_cookie) {
1000 return isolation_config{};
1001 }
1002
1003 }
1004
1005 }