]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "io_handler.h" | |
5 | ||
6 | #include "auth/Auth.h" | |
7 | ||
8 | #include "crimson/common/formatter.h" | |
9 | #include "crimson/common/log.h" | |
10 | #include "crimson/net/Errors.h" | |
11 | #include "crimson/net/chained_dispatchers.h" | |
12 | #include "crimson/net/SocketMessenger.h" | |
13 | #include "msg/Message.h" | |
14 | #include "msg/msg_fmt.h" | |
15 | ||
16 | using namespace ceph::msgr::v2; | |
17 | using crimson::common::local_conf; | |
18 | ||
19 | namespace { | |
20 | ||
21 | seastar::logger& logger() { | |
22 | return crimson::get_logger(ceph_subsys_ms); | |
23 | } | |
24 | ||
25 | [[noreturn]] void abort_in_fault() { | |
26 | throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); | |
27 | } | |
28 | ||
29 | [[noreturn]] void abort_protocol() { | |
30 | throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); | |
31 | } | |
32 | ||
33 | std::size_t get_msg_size(const FrameAssembler &rx_frame_asm) | |
34 | { | |
35 | ceph_assert(rx_frame_asm.get_num_segments() > 0); | |
36 | size_t sum = 0; | |
37 | // we don't include SegmentIndex::Msg::HEADER. | |
38 | for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) { | |
39 | sum += rx_frame_asm.get_segment_logical_len(idx); | |
40 | } | |
41 | return sum; | |
42 | } | |
43 | ||
44 | } // namespace anonymous | |
45 | ||
46 | namespace crimson::net { | |
47 | ||
48 | IOHandler::IOHandler(ChainedDispatchers &dispatchers, | |
49 | SocketConnection &conn) | |
aee94f69 TL |
50 | : shard_states(shard_states_t::create( |
51 | seastar::this_shard_id(), io_state_t::none)), | |
52 | dispatchers(dispatchers), | |
1e59de90 TL |
53 | conn(conn), |
54 | conn_ref(conn.get_local_shared_foreign_from_this()) | |
55 | {} | |
56 | ||
57 | IOHandler::~IOHandler() | |
58 | { | |
aee94f69 TL |
59 | // close_io() must be finished |
60 | ceph_assert_always(maybe_prv_shard_states == nullptr); | |
61 | // should be true in the according shard | |
62 | // ceph_assert_always(shard_states->assert_closed_and_exit()); | |
63 | assert(!conn_ref); | |
1e59de90 TL |
64 | } |
65 | ||
aee94f69 TL |
66 | #ifdef UNIT_TESTS_BUILT |
67 | IOHandler::sweep_ret | |
68 | #else | |
69 | ceph::bufferlist | |
70 | #endif | |
71 | IOHandler::sweep_out_pending_msgs_to_sent( | |
1e59de90 TL |
72 | bool require_keepalive, |
73 | std::optional<utime_t> maybe_keepalive_ack, | |
74 | bool require_ack) | |
75 | { | |
76 | std::size_t num_msgs = out_pending_msgs.size(); | |
77 | ceph::bufferlist bl; | |
78 | ||
aee94f69 TL |
79 | #ifdef UNIT_TESTS_BUILT |
80 | std::vector<Tag> tags; | |
81 | #endif | |
82 | ||
1e59de90 TL |
83 | if (unlikely(require_keepalive)) { |
84 | auto keepalive_frame = KeepAliveFrame::Encode(); | |
85 | bl.append(frame_assembler->get_buffer(keepalive_frame)); | |
aee94f69 TL |
86 | #ifdef UNIT_TESTS_BUILT |
87 | auto tag = KeepAliveFrame::tag; | |
88 | tags.push_back(tag); | |
89 | #endif | |
1e59de90 TL |
90 | } |
91 | ||
92 | if (unlikely(maybe_keepalive_ack.has_value())) { | |
93 | auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack); | |
94 | bl.append(frame_assembler->get_buffer(keepalive_ack_frame)); | |
aee94f69 TL |
95 | #ifdef UNIT_TESTS_BUILT |
96 | auto tag = KeepAliveFrameAck::tag; | |
97 | tags.push_back(tag); | |
98 | #endif | |
1e59de90 TL |
99 | } |
100 | ||
101 | if (require_ack && num_msgs == 0u) { | |
aee94f69 | 102 | auto ack_frame = AckFrame::Encode(in_seq); |
1e59de90 | 103 | bl.append(frame_assembler->get_buffer(ack_frame)); |
aee94f69 TL |
104 | #ifdef UNIT_TESTS_BUILT |
105 | auto tag = AckFrame::tag; | |
106 | tags.push_back(tag); | |
107 | #endif | |
1e59de90 TL |
108 | } |
109 | ||
110 | std::for_each( | |
111 | out_pending_msgs.begin(), | |
112 | out_pending_msgs.begin()+num_msgs, | |
aee94f69 TL |
113 | [this, &bl |
114 | #ifdef UNIT_TESTS_BUILT | |
115 | , &tags | |
116 | #endif | |
117 | ](const MessageFRef& msg) { | |
1e59de90 TL |
118 | // set priority |
119 | msg->get_header().src = conn.messenger.get_myname(); | |
120 | ||
121 | msg->encode(conn.features, 0); | |
122 | ||
123 | ceph_assert(!msg->get_seq() && "message already has seq"); | |
124 | msg->set_seq(++out_seq); | |
125 | ||
126 | ceph_msg_header &header = msg->get_header(); | |
127 | ceph_msg_footer &footer = msg->get_footer(); | |
128 | ||
129 | ceph_msg_header2 header2{header.seq, header.tid, | |
130 | header.type, header.priority, | |
131 | header.version, | |
132 | ceph_le32(0), header.data_off, | |
aee94f69 | 133 | ceph_le64(in_seq), |
1e59de90 TL |
134 | footer.flags, header.compat_version, |
135 | header.reserved}; | |
136 | ||
137 | auto message = MessageFrame::Encode(header2, | |
138 | msg->get_payload(), msg->get_middle(), msg->get_data()); | |
139 | logger().debug("{} --> #{} === {} ({})", | |
140 | conn, msg->get_seq(), *msg, msg->get_type()); | |
141 | bl.append(frame_assembler->get_buffer(message)); | |
aee94f69 TL |
142 | #ifdef UNIT_TESTS_BUILT |
143 | auto tag = MessageFrame::tag; | |
144 | tags.push_back(tag); | |
145 | #endif | |
1e59de90 TL |
146 | }); |
147 | ||
148 | if (!conn.policy.lossy) { | |
149 | out_sent_msgs.insert( | |
150 | out_sent_msgs.end(), | |
151 | std::make_move_iterator(out_pending_msgs.begin()), | |
152 | std::make_move_iterator(out_pending_msgs.end())); | |
153 | } | |
154 | out_pending_msgs.clear(); | |
aee94f69 TL |
155 | |
156 | #ifdef UNIT_TESTS_BUILT | |
157 | return sweep_ret{std::move(bl), tags}; | |
158 | #else | |
1e59de90 | 159 | return bl; |
aee94f69 | 160 | #endif |
1e59de90 TL |
161 | } |
162 | ||
aee94f69 | 163 | seastar::future<> IOHandler::send(MessageFRef msg) |
1e59de90 | 164 | { |
aee94f69 TL |
165 | // sid may be changed on-the-fly during the submission |
166 | if (seastar::this_shard_id() == get_shard_id()) { | |
167 | return do_send(std::move(msg)); | |
168 | } else { | |
169 | logger().trace("{} send() is directed to {} -- {}", | |
170 | conn, get_shard_id(), *msg); | |
171 | return seastar::smp::submit_to( | |
172 | get_shard_id(), [this, msg=std::move(msg)]() mutable { | |
173 | return send_redirected(std::move(msg)); | |
174 | }); | |
175 | } | |
176 | } | |
177 | ||
178 | seastar::future<> IOHandler::send_redirected(MessageFRef msg) | |
179 | { | |
180 | // sid may be changed on-the-fly during the submission | |
181 | if (seastar::this_shard_id() == get_shard_id()) { | |
182 | return do_send(std::move(msg)); | |
183 | } else { | |
184 | logger().debug("{} send() is redirected to {} -- {}", | |
185 | conn, get_shard_id(), *msg); | |
186 | return seastar::smp::submit_to( | |
187 | get_shard_id(), [this, msg=std::move(msg)]() mutable { | |
188 | return send_redirected(std::move(msg)); | |
189 | }); | |
190 | } | |
191 | } | |
192 | ||
193 | seastar::future<> IOHandler::do_send(MessageFRef msg) | |
194 | { | |
195 | assert(seastar::this_shard_id() == get_shard_id()); | |
196 | logger().trace("{} do_send() got message -- {}", conn, *msg); | |
197 | if (get_io_state() != io_state_t::drop) { | |
1e59de90 TL |
198 | out_pending_msgs.push_back(std::move(msg)); |
199 | notify_out_dispatch(); | |
200 | } | |
201 | return seastar::now(); | |
202 | } | |
203 | ||
204 | seastar::future<> IOHandler::send_keepalive() | |
205 | { | |
aee94f69 TL |
206 | // sid may be changed on-the-fly during the submission |
207 | if (seastar::this_shard_id() == get_shard_id()) { | |
208 | return do_send_keepalive(); | |
209 | } else { | |
210 | logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id()); | |
211 | return seastar::smp::submit_to( | |
212 | get_shard_id(), [this] { | |
213 | return send_keepalive_redirected(); | |
214 | }); | |
215 | } | |
216 | } | |
217 | ||
218 | seastar::future<> IOHandler::send_keepalive_redirected() | |
219 | { | |
220 | // sid may be changed on-the-fly during the submission | |
221 | if (seastar::this_shard_id() == get_shard_id()) { | |
222 | return do_send_keepalive(); | |
223 | } else { | |
224 | logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id()); | |
225 | return seastar::smp::submit_to( | |
226 | get_shard_id(), [this] { | |
227 | return send_keepalive_redirected(); | |
228 | }); | |
229 | } | |
230 | } | |
231 | ||
232 | seastar::future<> IOHandler::do_send_keepalive() | |
233 | { | |
234 | assert(seastar::this_shard_id() == get_shard_id()); | |
235 | logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive); | |
1e59de90 TL |
236 | if (!need_keepalive) { |
237 | need_keepalive = true; | |
238 | notify_out_dispatch(); | |
239 | } | |
240 | return seastar::now(); | |
241 | } | |
242 | ||
243 | void IOHandler::mark_down() | |
244 | { | |
aee94f69 TL |
245 | ceph_assert_always(seastar::this_shard_id() == get_shard_id()); |
246 | ceph_assert_always(get_io_state() != io_state_t::none); | |
1e59de90 | 247 | need_dispatch_reset = false; |
aee94f69 | 248 | if (get_io_state() == io_state_t::drop) { |
1e59de90 TL |
249 | return; |
250 | } | |
251 | ||
aee94f69 TL |
252 | auto cc_seq = crosscore.prepare_submit(); |
253 | logger().info("{} mark_down() at {}, send {} notify_mark_down()", | |
254 | conn, io_stat_printer{*this}, cc_seq); | |
255 | do_set_io_state(io_state_t::drop); | |
256 | shard_states->dispatch_in_background( | |
257 | "notify_mark_down", conn, [this, cc_seq] { | |
258 | return seastar::smp::submit_to( | |
259 | conn.get_messenger_shard_id(), [this, cc_seq] { | |
260 | return handshake_listener->notify_mark_down(cc_seq); | |
261 | }); | |
262 | }); | |
1e59de90 TL |
263 | } |
264 | ||
265 | void IOHandler::print_io_stat(std::ostream &out) const | |
266 | { | |
aee94f69 | 267 | assert(seastar::this_shard_id() == get_shard_id()); |
1e59de90 | 268 | out << "io_stat(" |
aee94f69 | 269 | << "io_state=" << fmt::format("{}", get_io_state()) |
1e59de90 TL |
270 | << ", in_seq=" << in_seq |
271 | << ", out_seq=" << out_seq | |
272 | << ", out_pending_msgs_size=" << out_pending_msgs.size() | |
273 | << ", out_sent_msgs_size=" << out_sent_msgs.size() | |
274 | << ", need_ack=" << (ack_left > 0) | |
275 | << ", need_keepalive=" << need_keepalive | |
276 | << ", need_keepalive_ack=" << bool(next_keepalive_ack) | |
277 | << ")"; | |
278 | } | |
279 | ||
aee94f69 TL |
280 | void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa) |
281 | { | |
282 | assert(fa != nullptr); | |
283 | ceph_assert_always(frame_assembler == nullptr); | |
284 | frame_assembler = std::move(fa); | |
285 | ceph_assert_always( | |
286 | frame_assembler->get_shard_id() == get_shard_id()); | |
287 | // should have been set through dispatch_accept/connect() | |
288 | ceph_assert_always( | |
289 | frame_assembler->get_socket_shard_id() == get_shard_id()); | |
290 | ceph_assert_always(frame_assembler->is_socket_valid()); | |
291 | } | |
292 | ||
293 | void IOHandler::do_set_io_state( | |
294 | io_state_t new_state, | |
295 | std::optional<crosscore_t::seq_t> cc_seq, | |
296 | FrameAssemblerV2Ref fa, | |
297 | bool set_notify_out) | |
1e59de90 | 298 | { |
aee94f69 TL |
299 | ceph_assert_always(seastar::this_shard_id() == get_shard_id()); |
300 | auto prv_state = get_io_state(); | |
301 | logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, " | |
302 | "fa={}, set_notify_out={}, at {}", | |
303 | conn, | |
304 | cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "", | |
305 | prv_state, new_state, | |
306 | fa ? "present" : "N/A", set_notify_out, | |
307 | io_stat_printer{*this}); | |
1e59de90 | 308 | ceph_assert_always(!( |
aee94f69 TL |
309 | (new_state == io_state_t::none && prv_state != io_state_t::none) || |
310 | (new_state == io_state_t::open && prv_state == io_state_t::open) | |
1e59de90 TL |
311 | )); |
312 | ||
aee94f69 TL |
313 | if (prv_state == io_state_t::drop) { |
314 | // only possible due to a racing mark_down() from user | |
315 | if (new_state == io_state_t::open) { | |
316 | assign_frame_assembler(std::move(fa)); | |
317 | frame_assembler->shutdown_socket<false>(nullptr); | |
318 | } else { | |
319 | assert(fa == nullptr); | |
320 | } | |
321 | return; | |
322 | } | |
323 | ||
1e59de90 TL |
324 | bool dispatch_in = false; |
325 | if (new_state == io_state_t::open) { | |
326 | // to open | |
327 | ceph_assert_always(protocol_is_connected == true); | |
aee94f69 | 328 | assign_frame_assembler(std::move(fa)); |
1e59de90 | 329 | dispatch_in = true; |
aee94f69 | 330 | } else if (prv_state == io_state_t::open) { |
1e59de90 TL |
331 | // from open |
332 | ceph_assert_always(protocol_is_connected == true); | |
333 | protocol_is_connected = false; | |
334 | assert(fa == nullptr); | |
335 | ceph_assert_always(frame_assembler->is_socket_valid()); | |
aee94f69 | 336 | frame_assembler->shutdown_socket<false>(nullptr); |
1e59de90 TL |
337 | } else { |
338 | assert(fa == nullptr); | |
339 | } | |
340 | ||
aee94f69 TL |
341 | if (new_state == io_state_t::delay) { |
342 | need_notify_out = set_notify_out; | |
343 | if (need_notify_out) { | |
344 | maybe_notify_out_dispatch(); | |
345 | } | |
346 | } else { | |
347 | assert(set_notify_out == false); | |
348 | need_notify_out = false; | |
349 | } | |
350 | ||
351 | // FIXME: simplify and drop the prv_state == new_state case | |
352 | if (prv_state != new_state) { | |
353 | shard_states->set_io_state(new_state); | |
1e59de90 TL |
354 | } |
355 | ||
356 | /* | |
357 | * not atomic below | |
358 | */ | |
359 | ||
360 | if (dispatch_in) { | |
361 | do_in_dispatch(); | |
362 | } | |
363 | } | |
364 | ||
aee94f69 TL |
365 | seastar::future<> IOHandler::set_io_state( |
366 | crosscore_t::seq_t cc_seq, | |
367 | io_state_t new_state, | |
368 | FrameAssemblerV2Ref fa, | |
369 | bool set_notify_out) | |
1e59de90 | 370 | { |
aee94f69 TL |
371 | assert(seastar::this_shard_id() == get_shard_id()); |
372 | if (!crosscore.proceed_or_wait(cc_seq)) { | |
373 | logger().debug("{} got {} set_io_state(), wait at {}", | |
374 | conn, cc_seq, crosscore.get_in_seq()); | |
375 | return crosscore.wait(cc_seq | |
376 | ).then([this, cc_seq, new_state, | |
377 | fa=std::move(fa), set_notify_out]() mutable { | |
378 | return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out); | |
379 | }); | |
380 | } | |
381 | ||
382 | do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out); | |
383 | return seastar::now(); | |
384 | } | |
385 | ||
386 | seastar::future<IOHandler::exit_dispatching_ret> | |
387 | IOHandler::wait_io_exit_dispatching( | |
388 | crosscore_t::seq_t cc_seq) | |
389 | { | |
390 | assert(seastar::this_shard_id() == get_shard_id()); | |
391 | if (!crosscore.proceed_or_wait(cc_seq)) { | |
392 | logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}", | |
393 | conn, cc_seq, crosscore.get_in_seq()); | |
394 | return crosscore.wait(cc_seq | |
395 | ).then([this, cc_seq] { | |
396 | return wait_io_exit_dispatching(cc_seq); | |
397 | }); | |
398 | } | |
399 | ||
400 | logger().debug("{} got {} wait_io_exit_dispatching()", | |
401 | conn, cc_seq); | |
402 | ceph_assert_always(get_io_state() != io_state_t::open); | |
1e59de90 TL |
403 | ceph_assert_always(frame_assembler != nullptr); |
404 | ceph_assert_always(!frame_assembler->is_socket_valid()); | |
aee94f69 TL |
405 | return seastar::futurize_invoke([this] { |
406 | // cannot be running in parallel with to_new_sid() | |
407 | if (maybe_dropped_sid.has_value()) { | |
408 | ceph_assert_always(get_io_state() == io_state_t::drop); | |
409 | assert(shard_states->assert_closed_and_exit()); | |
410 | auto prv_sid = *maybe_dropped_sid; | |
411 | return seastar::smp::submit_to(prv_sid, [this] { | |
412 | logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn); | |
413 | assert(maybe_prv_shard_states != nullptr); | |
414 | return maybe_prv_shard_states->wait_io_exit_dispatching(); | |
415 | }); | |
416 | } else { | |
417 | return shard_states->wait_io_exit_dispatching(); | |
418 | } | |
419 | }).then([this] { | |
420 | logger().debug("{} finish wait_io_exit_dispatching at {}", | |
421 | conn, io_stat_printer{*this}); | |
422 | ceph_assert_always(frame_assembler != nullptr); | |
423 | ceph_assert_always(!frame_assembler->is_socket_valid()); | |
424 | frame_assembler->set_shard_id(conn.get_messenger_shard_id()); | |
425 | return exit_dispatching_ret{ | |
426 | std::move(frame_assembler), | |
427 | get_states()}; | |
1e59de90 TL |
428 | }); |
429 | } | |
430 | ||
aee94f69 TL |
431 | seastar::future<> IOHandler::reset_session( |
432 | crosscore_t::seq_t cc_seq, | |
433 | bool full) | |
1e59de90 | 434 | { |
aee94f69 TL |
435 | assert(seastar::this_shard_id() == get_shard_id()); |
436 | if (!crosscore.proceed_or_wait(cc_seq)) { | |
437 | logger().debug("{} got {} reset_session(), wait at {}", | |
438 | conn, cc_seq, crosscore.get_in_seq()); | |
439 | return crosscore.wait(cc_seq | |
440 | ).then([this, cc_seq, full] { | |
441 | return reset_session(cc_seq, full); | |
442 | }); | |
443 | } | |
444 | ||
445 | logger().debug("{} got {} reset_session({})", | |
446 | conn, cc_seq, full); | |
447 | assert(get_io_state() != io_state_t::open); | |
448 | reset_in(); | |
1e59de90 TL |
449 | if (full) { |
450 | reset_out(); | |
451 | dispatch_remote_reset(); | |
452 | } | |
aee94f69 | 453 | return seastar::now(); |
1e59de90 TL |
454 | } |
455 | ||
aee94f69 TL |
456 | seastar::future<> IOHandler::reset_peer_state( |
457 | crosscore_t::seq_t cc_seq) | |
1e59de90 | 458 | { |
aee94f69 TL |
459 | assert(seastar::this_shard_id() == get_shard_id()); |
460 | if (!crosscore.proceed_or_wait(cc_seq)) { | |
461 | logger().debug("{} got {} reset_peer_state(), wait at {}", | |
462 | conn, cc_seq, crosscore.get_in_seq()); | |
463 | return crosscore.wait(cc_seq | |
464 | ).then([this, cc_seq] { | |
465 | return reset_peer_state(cc_seq); | |
466 | }); | |
467 | } | |
468 | ||
469 | logger().debug("{} got {} reset_peer_state()", | |
470 | conn, cc_seq); | |
471 | assert(get_io_state() != io_state_t::open); | |
472 | reset_in(); | |
473 | do_requeue_out_sent_up_to(0); | |
474 | discard_out_sent(); | |
475 | return seastar::now(); | |
476 | } | |
477 | ||
478 | seastar::future<> IOHandler::requeue_out_sent( | |
479 | crosscore_t::seq_t cc_seq) | |
480 | { | |
481 | assert(seastar::this_shard_id() == get_shard_id()); | |
482 | if (!crosscore.proceed_or_wait(cc_seq)) { | |
483 | logger().debug("{} got {} requeue_out_sent(), wait at {}", | |
484 | conn, cc_seq, crosscore.get_in_seq()); | |
485 | return crosscore.wait(cc_seq | |
486 | ).then([this, cc_seq] { | |
487 | return requeue_out_sent(cc_seq); | |
488 | }); | |
489 | } | |
490 | ||
491 | logger().debug("{} got {} requeue_out_sent()", | |
492 | conn, cc_seq); | |
493 | do_requeue_out_sent(); | |
494 | return seastar::now(); | |
495 | } | |
496 | ||
497 | void IOHandler::do_requeue_out_sent() | |
498 | { | |
499 | assert(get_io_state() != io_state_t::open); | |
1e59de90 TL |
500 | if (out_sent_msgs.empty()) { |
501 | return; | |
502 | } | |
503 | ||
504 | out_seq -= out_sent_msgs.size(); | |
505 | logger().debug("{} requeue {} items, revert out_seq to {}", | |
506 | conn, out_sent_msgs.size(), out_seq); | |
aee94f69 | 507 | for (MessageFRef& msg : out_sent_msgs) { |
1e59de90 TL |
508 | msg->clear_payload(); |
509 | msg->set_seq(0); | |
510 | } | |
511 | out_pending_msgs.insert( | |
512 | out_pending_msgs.begin(), | |
513 | std::make_move_iterator(out_sent_msgs.begin()), | |
514 | std::make_move_iterator(out_sent_msgs.end())); | |
515 | out_sent_msgs.clear(); | |
aee94f69 | 516 | maybe_notify_out_dispatch(); |
1e59de90 TL |
517 | } |
518 | ||
aee94f69 TL |
519 | seastar::future<> IOHandler::requeue_out_sent_up_to( |
520 | crosscore_t::seq_t cc_seq, | |
521 | seq_num_t msg_seq) | |
1e59de90 | 522 | { |
aee94f69 TL |
523 | assert(seastar::this_shard_id() == get_shard_id()); |
524 | if (!crosscore.proceed_or_wait(cc_seq)) { | |
525 | logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}", | |
526 | conn, cc_seq, crosscore.get_in_seq()); | |
527 | return crosscore.wait(cc_seq | |
528 | ).then([this, cc_seq, msg_seq] { | |
529 | return requeue_out_sent_up_to(cc_seq, msg_seq); | |
530 | }); | |
531 | } | |
532 | ||
533 | logger().debug("{} got {} requeue_out_sent_up_to({})", | |
534 | conn, cc_seq, msg_seq); | |
535 | do_requeue_out_sent_up_to(msg_seq); | |
536 | return seastar::now(); | |
537 | } | |
538 | ||
539 | void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq) | |
540 | { | |
541 | assert(get_io_state() != io_state_t::open); | |
1e59de90 TL |
542 | if (out_sent_msgs.empty() && out_pending_msgs.empty()) { |
543 | logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", | |
544 | conn, out_seq, seq); | |
545 | out_seq = seq; | |
546 | return; | |
547 | } | |
548 | logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})", | |
549 | conn, seq, out_sent_msgs.size(), out_seq); | |
550 | while (!out_sent_msgs.empty()) { | |
551 | auto cur_seq = out_sent_msgs.front()->get_seq(); | |
552 | if (cur_seq == 0 || cur_seq > seq) { | |
553 | break; | |
554 | } else { | |
555 | out_sent_msgs.pop_front(); | |
556 | } | |
557 | } | |
aee94f69 TL |
558 | do_requeue_out_sent(); |
559 | } | |
560 | ||
561 | void IOHandler::reset_in() | |
562 | { | |
563 | assert(get_io_state() != io_state_t::open); | |
564 | in_seq = 0; | |
1e59de90 TL |
565 | } |
566 | ||
567 | void IOHandler::reset_out() | |
568 | { | |
aee94f69 TL |
569 | assert(get_io_state() != io_state_t::open); |
570 | discard_out_sent(); | |
1e59de90 | 571 | out_pending_msgs.clear(); |
1e59de90 TL |
572 | need_keepalive = false; |
573 | next_keepalive_ack = std::nullopt; | |
574 | ack_left = 0; | |
575 | } | |
576 | ||
aee94f69 | 577 | void IOHandler::discard_out_sent() |
1e59de90 | 578 | { |
aee94f69 TL |
579 | assert(get_io_state() != io_state_t::open); |
580 | out_seq = 0; | |
581 | out_sent_msgs.clear(); | |
1e59de90 TL |
582 | } |
583 | ||
aee94f69 TL |
584 | seastar::future<> |
585 | IOHandler::dispatch_accept( | |
586 | crosscore_t::seq_t cc_seq, | |
587 | seastar::shard_id new_sid, | |
588 | ConnectionFRef conn_fref, | |
589 | bool is_replace) | |
1e59de90 | 590 | { |
aee94f69 TL |
591 | return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace); |
592 | } | |
593 | ||
594 | seastar::future<> | |
595 | IOHandler::dispatch_connect( | |
596 | crosscore_t::seq_t cc_seq, | |
597 | seastar::shard_id new_sid, | |
598 | ConnectionFRef conn_fref) | |
599 | { | |
600 | return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt); | |
601 | } | |
602 | ||
603 | seastar::future<> | |
604 | IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid) | |
605 | { | |
606 | assert(seastar::this_shard_id() == get_shard_id()); | |
607 | return seastar::smp::submit_to(prv_sid, [this] { | |
608 | logger().debug("{} got cleanup_prv_shard()", conn); | |
609 | assert(maybe_prv_shard_states != nullptr); | |
610 | auto ref_prv_states = std::move(maybe_prv_shard_states); | |
611 | auto &prv_states = *ref_prv_states; | |
612 | return prv_states.close( | |
613 | ).then([ref_prv_states=std::move(ref_prv_states)] { | |
614 | ceph_assert_always(ref_prv_states->assert_closed_and_exit()); | |
615 | }); | |
616 | }).then([this] { | |
617 | ceph_assert_always(maybe_prv_shard_states == nullptr); | |
618 | }); | |
619 | } | |
620 | ||
621 | seastar::future<> | |
622 | IOHandler::to_new_sid( | |
623 | crosscore_t::seq_t cc_seq, | |
624 | seastar::shard_id new_sid, | |
625 | ConnectionFRef conn_fref, | |
626 | std::optional<bool> is_replace) | |
627 | { | |
628 | ceph_assert_always(seastar::this_shard_id() == get_shard_id()); | |
629 | if (!crosscore.proceed_or_wait(cc_seq)) { | |
630 | logger().debug("{} got {} to_new_sid(), wait at {}", | |
631 | conn, cc_seq, crosscore.get_in_seq()); | |
632 | return crosscore.wait(cc_seq | |
633 | ).then([this, cc_seq, new_sid, is_replace, | |
634 | conn_fref=std::move(conn_fref)]() mutable { | |
635 | return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace); | |
636 | }); | |
1e59de90 | 637 | } |
aee94f69 TL |
638 | |
639 | bool is_accept_or_connect = is_replace.has_value(); | |
640 | logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}", | |
641 | conn, cc_seq, new_sid, | |
642 | fmt::format("{}", | |
643 | is_accept_or_connect ? | |
644 | (*is_replace ? "accept(replace)" : "accept(!replace)") : | |
645 | "connect"), | |
646 | io_stat_printer{*this}); | |
647 | auto next_cc_seq = ++cc_seq; | |
648 | ||
649 | if (get_io_state() != io_state_t::drop) { | |
650 | ceph_assert_always(conn_ref); | |
651 | if (new_sid != seastar::this_shard_id()) { | |
652 | dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect); | |
653 | // user can make changes | |
654 | } | |
655 | } else { | |
656 | // it is possible that both io_handler and protocolv2 are | |
657 | // trying to close each other from different cores simultaneously. | |
658 | assert(!protocol_is_connected); | |
659 | } | |
660 | ||
661 | if (get_io_state() != io_state_t::drop) { | |
662 | if (is_accept_or_connect) { | |
663 | // protocol_is_connected can be from true to true here if the replacing is | |
664 | // happening to a connected connection. | |
665 | } else { | |
666 | ceph_assert_always(protocol_is_connected == false); | |
667 | } | |
668 | protocol_is_connected = true; | |
669 | } else { | |
670 | assert(!protocol_is_connected); | |
671 | } | |
672 | ||
673 | bool is_dropped = false; | |
674 | if (get_io_state() == io_state_t::drop) { | |
675 | is_dropped = true; | |
676 | } | |
677 | ceph_assert_always(get_io_state() != io_state_t::open); | |
678 | ||
679 | // apply the switching atomically | |
680 | ceph_assert_always(conn_ref); | |
681 | conn_ref.reset(); | |
682 | auto prv_sid = get_shard_id(); | |
683 | ceph_assert_always(maybe_prv_shard_states == nullptr); | |
684 | maybe_prv_shard_states = std::move(shard_states); | |
685 | shard_states = shard_states_t::create_from_previous( | |
686 | *maybe_prv_shard_states, new_sid); | |
687 | assert(new_sid == get_shard_id()); | |
688 | ||
689 | return seastar::smp::submit_to(new_sid, | |
690 | [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable { | |
691 | logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}", | |
692 | conn, next_cc_seq, prv_sid, is_dropped, | |
693 | fmt::format("{}", | |
694 | is_replace.has_value() ? | |
695 | (*is_replace ? "accept(replace)" : "accept(!replace)") : | |
696 | "connect"), | |
697 | io_stat_printer{*this}); | |
698 | ||
699 | ceph_assert_always(seastar::this_shard_id() == get_shard_id()); | |
700 | ceph_assert_always(get_io_state() != io_state_t::open); | |
701 | ceph_assert_always(!maybe_dropped_sid.has_value()); | |
702 | ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq)); | |
703 | ||
704 | if (is_dropped) { | |
705 | ceph_assert_always(get_io_state() == io_state_t::drop); | |
706 | ceph_assert_always(shard_states->assert_closed_and_exit()); | |
707 | maybe_dropped_sid = prv_sid; | |
708 | // cleanup_prv_shard() will be done in a follow-up close_io() | |
709 | } else { | |
710 | // possible at io_state_t::drop | |
711 | ||
712 | // previous shard is not cleaned, | |
713 | // but close_io() is responsible to clean up the current shard, | |
714 | // so cleanup the previous shard here. | |
715 | shard_states->dispatch_in_background( | |
716 | "cleanup_prv_sid", conn, [this, prv_sid] { | |
717 | return cleanup_prv_shard(prv_sid); | |
718 | }); | |
719 | maybe_notify_out_dispatch(); | |
720 | } | |
721 | ||
722 | ceph_assert_always(!conn_ref); | |
723 | // assign even if already dropping | |
724 | conn_ref = make_local_shared_foreign(std::move(conn_fref)); | |
725 | ||
726 | if (get_io_state() != io_state_t::drop) { | |
727 | if (is_replace.has_value()) { | |
728 | dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace); | |
729 | } else { | |
730 | dispatchers.ms_handle_connect(conn_ref, prv_sid); | |
731 | } | |
732 | // user can make changes | |
733 | } | |
734 | }); | |
735 | } | |
736 | ||
737 | seastar::future<> IOHandler::set_accepted_sid( | |
738 | crosscore_t::seq_t cc_seq, | |
739 | seastar::shard_id sid, | |
740 | ConnectionFRef conn_fref) | |
741 | { | |
742 | assert(seastar::this_shard_id() == get_shard_id()); | |
743 | assert(get_io_state() == io_state_t::none); | |
744 | ceph_assert_always(conn_ref); | |
745 | conn_ref.reset(); | |
746 | assert(maybe_prv_shard_states == nullptr); | |
747 | shard_states.reset(); | |
748 | shard_states = shard_states_t::create(sid, io_state_t::none); | |
749 | return seastar::smp::submit_to(sid, | |
750 | [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable { | |
751 | // must be the first to proceed | |
752 | ceph_assert_always(crosscore.proceed_or_wait(cc_seq)); | |
753 | ||
754 | logger().debug("{} set accepted sid", conn); | |
755 | ceph_assert_always(seastar::this_shard_id() == get_shard_id()); | |
756 | ceph_assert_always(get_io_state() == io_state_t::none); | |
757 | assert(maybe_prv_shard_states == nullptr); | |
758 | ceph_assert_always(!conn_ref); | |
759 | conn_ref = make_local_shared_foreign(std::move(conn_fref)); | |
760 | }); | |
1e59de90 TL |
761 | } |
762 | ||
763 | void IOHandler::dispatch_reset(bool is_replace) | |
764 | { | |
aee94f69 | 765 | ceph_assert_always(get_io_state() == io_state_t::drop); |
1e59de90 TL |
766 | if (!need_dispatch_reset) { |
767 | return; | |
768 | } | |
769 | need_dispatch_reset = false; | |
aee94f69 TL |
770 | ceph_assert_always(conn_ref); |
771 | ||
1e59de90 | 772 | dispatchers.ms_handle_reset(conn_ref, is_replace); |
aee94f69 | 773 | // user can make changes |
1e59de90 TL |
774 | } |
775 | ||
776 | void IOHandler::dispatch_remote_reset() | |
777 | { | |
aee94f69 | 778 | if (get_io_state() == io_state_t::drop) { |
1e59de90 TL |
779 | return; |
780 | } | |
aee94f69 TL |
781 | ceph_assert_always(conn_ref); |
782 | ||
1e59de90 | 783 | dispatchers.ms_handle_remote_reset(conn_ref); |
aee94f69 | 784 | // user can make changes |
1e59de90 TL |
785 | } |
786 | ||
787 | void IOHandler::ack_out_sent(seq_num_t seq) | |
788 | { | |
789 | if (conn.policy.lossy) { // lossy connections don't keep sent messages | |
790 | return; | |
791 | } | |
792 | while (!out_sent_msgs.empty() && | |
793 | out_sent_msgs.front()->get_seq() <= seq) { | |
794 | logger().trace("{} got ack seq {} >= {}, pop {}", | |
795 | conn, seq, out_sent_msgs.front()->get_seq(), | |
796 | *out_sent_msgs.front()); | |
797 | out_sent_msgs.pop_front(); | |
798 | } | |
799 | } | |
800 | ||
aee94f69 TL |
801 | seastar::future<> |
802 | IOHandler::do_out_dispatch(shard_states_t &ctx) | |
1e59de90 | 803 | { |
aee94f69 TL |
804 | return seastar::repeat([this, &ctx] { |
805 | switch (ctx.get_io_state()) { | |
1e59de90 | 806 | case io_state_t::open: { |
aee94f69 TL |
807 | if (unlikely(!is_out_queued())) { |
808 | // try exit open dispatching | |
809 | return frame_assembler->flush<false>( | |
810 | ).then([this, &ctx] { | |
811 | if (ctx.get_io_state() != io_state_t::open || is_out_queued()) { | |
812 | return seastar::make_ready_future<stop_t>(stop_t::no); | |
813 | } | |
814 | // still nothing pending to send after flush, | |
815 | // open dispatching can ONLY stop now | |
816 | ctx.exit_out_dispatching("exit-open", conn); | |
817 | return seastar::make_ready_future<stop_t>(stop_t::yes); | |
818 | }); | |
1e59de90 | 819 | } |
aee94f69 TL |
820 | |
821 | auto require_keepalive = need_keepalive; | |
822 | need_keepalive = false; | |
823 | auto maybe_keepalive_ack = next_keepalive_ack; | |
824 | next_keepalive_ack = std::nullopt; | |
1e59de90 TL |
825 | auto to_ack = ack_left; |
826 | assert(to_ack == 0 || in_seq > 0); | |
aee94f69 TL |
827 | ack_left = 0; |
828 | #ifdef UNIT_TESTS_BUILT | |
829 | auto ret = sweep_out_pending_msgs_to_sent( | |
830 | require_keepalive, maybe_keepalive_ack, to_ack > 0); | |
831 | return frame_assembler->intercept_frames(ret.tags, true | |
832 | ).then([this, bl=std::move(ret.bl)]() mutable { | |
833 | return frame_assembler->write<false>(std::move(bl)); | |
834 | } | |
835 | #else | |
836 | auto bl = sweep_out_pending_msgs_to_sent( | |
837 | require_keepalive, maybe_keepalive_ack, to_ack > 0); | |
838 | return frame_assembler->write<false>(std::move(bl) | |
839 | #endif | |
840 | ).then([this, &ctx] { | |
841 | if (ctx.get_io_state() != io_state_t::open) { | |
842 | return frame_assembler->flush<false>( | |
843 | ).then([] { | |
844 | return seastar::make_ready_future<stop_t>(stop_t::no); | |
845 | }); | |
1e59de90 | 846 | } |
aee94f69 TL |
847 | |
848 | // FIXME: may leak a flush if state is changed after return and before | |
849 | // the next repeat body. | |
850 | return seastar::make_ready_future<stop_t>(stop_t::no); | |
1e59de90 TL |
851 | }); |
852 | } | |
853 | case io_state_t::delay: | |
854 | // delay out dispatching until open | |
aee94f69 TL |
855 | ctx.notify_out_dispatching_stopped("delay...", conn); |
856 | return ctx.wait_state_change( | |
1e59de90 TL |
857 | ).then([] { return stop_t::no; }); |
858 | case io_state_t::drop: | |
aee94f69 TL |
859 | ctx.exit_out_dispatching("dropped", conn); |
860 | return seastar::make_ready_future<stop_t>(stop_t::yes); | |
861 | case io_state_t::switched: | |
862 | ctx.exit_out_dispatching("switched", conn); | |
1e59de90 TL |
863 | return seastar::make_ready_future<stop_t>(stop_t::yes); |
864 | default: | |
aee94f69 | 865 | ceph_abort("impossible"); |
1e59de90 | 866 | } |
aee94f69 TL |
867 | }).handle_exception_type([this, &ctx](const std::system_error& e) { |
868 | auto io_state = ctx.get_io_state(); | |
1e59de90 TL |
869 | if (e.code() != std::errc::broken_pipe && |
870 | e.code() != std::errc::connection_reset && | |
871 | e.code() != error::negotiation_failure) { | |
872 | logger().error("{} do_out_dispatch(): unexpected error at {} -- {}", | |
873 | conn, io_state, e.what()); | |
874 | ceph_abort(); | |
875 | } | |
876 | ||
877 | if (io_state == io_state_t::open) { | |
aee94f69 TL |
878 | auto cc_seq = crosscore.prepare_submit(); |
879 | logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, " | |
880 | "send {} notify_out_fault()", | |
881 | conn, io_state, io_stat_printer{*this}, e.what(), cc_seq); | |
1e59de90 TL |
882 | std::exception_ptr eptr; |
883 | try { | |
884 | throw e; | |
885 | } catch(...) { | |
886 | eptr = std::current_exception(); | |
887 | } | |
aee94f69 TL |
888 | do_set_io_state(io_state_t::delay); |
889 | shard_states->dispatch_in_background( | |
890 | "notify_out_fault(out)", conn, [this, cc_seq, eptr] { | |
891 | auto states = get_states(); | |
892 | return seastar::smp::submit_to( | |
893 | conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] { | |
894 | return handshake_listener->notify_out_fault( | |
895 | cc_seq, "do_out_dispatch", eptr, states); | |
896 | }); | |
897 | }); | |
1e59de90 | 898 | } else { |
aee94f69 TL |
899 | if (io_state != io_state_t::switched) { |
900 | logger().info("{} do_out_dispatch(): fault at {}, {} -- {}", | |
901 | conn, io_state, io_stat_printer{*this}, e.what()); | |
902 | } else { | |
903 | logger().info("{} do_out_dispatch(): fault at {} -- {}", | |
904 | conn, io_state, e.what()); | |
905 | } | |
1e59de90 TL |
906 | } |
907 | ||
aee94f69 | 908 | return do_out_dispatch(ctx); |
1e59de90 TL |
909 | }); |
910 | } | |
911 | ||
aee94f69 TL |
912 | void IOHandler::maybe_notify_out_dispatch() |
913 | { | |
914 | ceph_assert_always(seastar::this_shard_id() == get_shard_id()); | |
915 | if (is_out_queued()) { | |
916 | notify_out_dispatch(); | |
917 | } | |
918 | } | |
919 | ||
1e59de90 TL |
920 | void IOHandler::notify_out_dispatch() |
921 | { | |
aee94f69 TL |
922 | ceph_assert_always(seastar::this_shard_id() == get_shard_id()); |
923 | assert(is_out_queued()); | |
924 | if (need_notify_out) { | |
925 | auto cc_seq = crosscore.prepare_submit(); | |
926 | logger().debug("{} send {} notify_out()", | |
927 | conn, cc_seq); | |
928 | shard_states->dispatch_in_background( | |
929 | "notify_out", conn, [this, cc_seq] { | |
930 | return seastar::smp::submit_to( | |
931 | conn.get_messenger_shard_id(), [this, cc_seq] { | |
932 | return handshake_listener->notify_out(cc_seq); | |
933 | }); | |
934 | }); | |
1e59de90 | 935 | } |
aee94f69 TL |
936 | if (shard_states->try_enter_out_dispatching()) { |
937 | shard_states->dispatch_in_background( | |
938 | "do_out_dispatch", conn, [this] { | |
939 | return do_out_dispatch(*shard_states); | |
1e59de90 | 940 | }); |
1e59de90 TL |
941 | } |
942 | } | |
943 | ||
944 | seastar::future<> | |
aee94f69 TL |
945 | IOHandler::read_message( |
946 | shard_states_t &ctx, | |
947 | utime_t throttle_stamp, | |
948 | std::size_t msg_size) | |
1e59de90 | 949 | { |
aee94f69 TL |
950 | return frame_assembler->read_frame_payload<false>( |
951 | ).then([this, throttle_stamp, msg_size, &ctx](auto payload) { | |
952 | if (unlikely(ctx.get_io_state() != io_state_t::open)) { | |
1e59de90 | 953 | logger().debug("{} triggered {} during read_message()", |
aee94f69 | 954 | conn, ctx.get_io_state()); |
1e59de90 TL |
955 | abort_protocol(); |
956 | } | |
957 | ||
958 | utime_t recv_stamp{seastar::lowres_system_clock::now()}; | |
959 | ||
960 | // we need to get the size before std::moving segments data | |
961 | auto msg_frame = MessageFrame::Decode(*payload); | |
962 | // XXX: paranoid copy just to avoid oops | |
963 | ceph_msg_header2 current_header = msg_frame.header(); | |
964 | ||
965 | logger().trace("{} got {} + {} + {} byte message," | |
966 | " envelope type={} src={} off={} seq={}", | |
967 | conn, | |
968 | msg_frame.front_len(), | |
969 | msg_frame.middle_len(), | |
970 | msg_frame.data_len(), | |
971 | current_header.type, | |
972 | conn.get_peer_name(), | |
973 | current_header.data_off, | |
974 | current_header.seq); | |
975 | ||
976 | ceph_msg_header header{current_header.seq, | |
977 | current_header.tid, | |
978 | current_header.type, | |
979 | current_header.priority, | |
980 | current_header.version, | |
981 | ceph_le32(msg_frame.front_len()), | |
982 | ceph_le32(msg_frame.middle_len()), | |
983 | ceph_le32(msg_frame.data_len()), | |
984 | current_header.data_off, | |
985 | conn.get_peer_name(), | |
986 | current_header.compat_version, | |
987 | current_header.reserved, | |
988 | ceph_le32(0)}; | |
989 | ceph_msg_footer footer{ceph_le32(0), ceph_le32(0), | |
990 | ceph_le32(0), ceph_le64(0), current_header.flags}; | |
991 | ||
992 | Message *message = decode_message(nullptr, 0, header, footer, | |
993 | msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr); | |
994 | if (!message) { | |
995 | logger().warn("{} decode message failed", conn); | |
996 | abort_in_fault(); | |
997 | } | |
998 | ||
999 | // store reservation size in message, so we don't get confused | |
1000 | // by messages entering the dispatch queue through other paths. | |
1001 | message->set_dispatch_throttle_size(msg_size); | |
1002 | ||
1003 | message->set_throttle_stamp(throttle_stamp); | |
1004 | message->set_recv_stamp(recv_stamp); | |
1005 | message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()}); | |
1006 | ||
1007 | // check received seq#. if it is old, drop the message. | |
1008 | // note that incoming messages may skip ahead. this is convenient for the | |
1009 | // client side queueing because messages can't be renumbered, but the (kernel) | |
1010 | // client will occasionally pull a message out of the sent queue to send | |
1011 | // elsewhere. in that case it doesn't matter if we "got" it or not. | |
aee94f69 | 1012 | uint64_t cur_seq = in_seq; |
1e59de90 TL |
1013 | if (message->get_seq() <= cur_seq) { |
1014 | logger().error("{} got old message {} <= {} {}, discarding", | |
1015 | conn, message->get_seq(), cur_seq, *message); | |
1016 | if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) && | |
1017 | local_conf()->ms_die_on_old_message) { | |
1018 | ceph_assert(0 == "old msgs despite reconnect_seq feature"); | |
1019 | } | |
1020 | return seastar::now(); | |
1021 | } else if (message->get_seq() > cur_seq + 1) { | |
1022 | logger().error("{} missed message? skipped from seq {} to {}", | |
1023 | conn, cur_seq, message->get_seq()); | |
1024 | if (local_conf()->ms_die_on_skipped_message) { | |
1025 | ceph_assert(0 == "skipped incoming seq"); | |
1026 | } | |
1027 | } | |
1028 | ||
1029 | // note last received message. | |
1030 | in_seq = message->get_seq(); | |
1031 | if (conn.policy.lossy) { | |
1032 | logger().debug("{} <== #{} === {} ({})", | |
1033 | conn, | |
1034 | message->get_seq(), | |
1035 | *message, | |
1036 | message->get_type()); | |
1037 | } else { | |
1038 | logger().debug("{} <== #{},{} === {} ({})", | |
1039 | conn, | |
1040 | message->get_seq(), | |
1041 | current_header.ack_seq, | |
1042 | *message, | |
1043 | message->get_type()); | |
1044 | } | |
1045 | ||
1046 | // notify ack | |
1047 | if (!conn.policy.lossy) { | |
1048 | ++ack_left; | |
1049 | notify_out_dispatch(); | |
1050 | } | |
1051 | ||
1052 | ack_out_sent(current_header.ack_seq); | |
1053 | ||
1054 | // TODO: change MessageRef with seastar::shared_ptr | |
1055 | auto msg_ref = MessageRef{message, false}; | |
aee94f69 TL |
1056 | assert(ctx.get_io_state() == io_state_t::open); |
1057 | assert(get_io_state() == io_state_t::open); | |
1058 | ceph_assert_always(conn_ref); | |
1059 | ||
1e59de90 TL |
1060 | // throttle the reading process by the returned future |
1061 | return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); | |
aee94f69 | 1062 | // user can make changes |
1e59de90 TL |
1063 | }); |
1064 | } | |
1065 | ||
1066 | void IOHandler::do_in_dispatch() | |
1067 | { | |
aee94f69 TL |
1068 | shard_states->enter_in_dispatching(); |
1069 | shard_states->dispatch_in_background( | |
1070 | "do_in_dispatch", conn, [this, &ctx=*shard_states] { | |
1071 | return seastar::keep_doing([this, &ctx] { | |
1072 | return frame_assembler->read_main_preamble<false>( | |
1073 | ).then([this, &ctx](auto ret) { | |
1e59de90 TL |
1074 | switch (ret.tag) { |
1075 | case Tag::MESSAGE: { | |
1076 | size_t msg_size = get_msg_size(*ret.rx_frame_asm); | |
1077 | return seastar::futurize_invoke([this] { | |
1078 | // throttle_message() logic | |
1079 | if (!conn.policy.throttler_messages) { | |
1080 | return seastar::now(); | |
1081 | } | |
1082 | // TODO: message throttler | |
aee94f69 | 1083 | ceph_abort("TODO"); |
1e59de90 TL |
1084 | return seastar::now(); |
1085 | }).then([this, msg_size] { | |
1086 | // throttle_bytes() logic | |
1087 | if (!conn.policy.throttler_bytes) { | |
1088 | return seastar::now(); | |
1089 | } | |
1090 | if (!msg_size) { | |
1091 | return seastar::now(); | |
1092 | } | |
1093 | logger().trace("{} wants {} bytes from policy throttler {}/{}", | |
1094 | conn, msg_size, | |
1095 | conn.policy.throttler_bytes->get_current(), | |
1096 | conn.policy.throttler_bytes->get_max()); | |
1097 | return conn.policy.throttler_bytes->get(msg_size); | |
aee94f69 | 1098 | }).then([this, msg_size, &ctx] { |
1e59de90 TL |
1099 | // TODO: throttle_dispatch_queue() logic |
1100 | utime_t throttle_stamp{seastar::lowres_system_clock::now()}; | |
aee94f69 | 1101 | return read_message(ctx, throttle_stamp, msg_size); |
1e59de90 TL |
1102 | }); |
1103 | } | |
1104 | case Tag::ACK: | |
aee94f69 | 1105 | return frame_assembler->read_frame_payload<false>( |
1e59de90 TL |
1106 | ).then([this](auto payload) { |
1107 | // handle_message_ack() logic | |
1108 | auto ack = AckFrame::Decode(payload->back()); | |
1109 | logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq()); | |
1110 | ack_out_sent(ack.seq()); | |
1111 | }); | |
1112 | case Tag::KEEPALIVE2: | |
aee94f69 | 1113 | return frame_assembler->read_frame_payload<false>( |
1e59de90 TL |
1114 | ).then([this](auto payload) { |
1115 | // handle_keepalive2() logic | |
1116 | auto keepalive_frame = KeepAliveFrame::Decode(payload->back()); | |
1117 | logger().debug("{} GOT KeepAliveFrame: timestamp={}", | |
1118 | conn, keepalive_frame.timestamp()); | |
1119 | // notify keepalive ack | |
1120 | next_keepalive_ack = keepalive_frame.timestamp(); | |
aee94f69 TL |
1121 | if (seastar::this_shard_id() == get_shard_id()) { |
1122 | notify_out_dispatch(); | |
1123 | } | |
1e59de90 TL |
1124 | |
1125 | last_keepalive = seastar::lowres_system_clock::now(); | |
1126 | }); | |
1127 | case Tag::KEEPALIVE2_ACK: | |
aee94f69 | 1128 | return frame_assembler->read_frame_payload<false>( |
1e59de90 TL |
1129 | ).then([this](auto payload) { |
1130 | // handle_keepalive2_ack() logic | |
1131 | auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back()); | |
1132 | auto _last_keepalive_ack = | |
1133 | seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()}; | |
1134 | set_last_keepalive_ack(_last_keepalive_ack); | |
1135 | logger().debug("{} GOT KeepAliveFrameAck: timestamp={}", | |
1136 | conn, _last_keepalive_ack); | |
1137 | }); | |
1138 | default: { | |
1139 | logger().warn("{} do_in_dispatch() received unexpected tag: {}", | |
1140 | conn, static_cast<uint32_t>(ret.tag)); | |
1141 | abort_in_fault(); | |
1142 | } | |
1143 | } | |
1144 | }); | |
aee94f69 | 1145 | }).handle_exception([this, &ctx](std::exception_ptr eptr) { |
1e59de90 TL |
1146 | const char *e_what; |
1147 | try { | |
1148 | std::rethrow_exception(eptr); | |
1149 | } catch (std::exception &e) { | |
1150 | e_what = e.what(); | |
1151 | } | |
1152 | ||
aee94f69 | 1153 | auto io_state = ctx.get_io_state(); |
1e59de90 | 1154 | if (io_state == io_state_t::open) { |
aee94f69 TL |
1155 | auto cc_seq = crosscore.prepare_submit(); |
1156 | logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, " | |
1157 | "send {} notify_out_fault()", | |
1158 | conn, io_state, io_stat_printer{*this}, e_what, cc_seq); | |
1159 | do_set_io_state(io_state_t::delay); | |
1160 | shard_states->dispatch_in_background( | |
1161 | "notify_out_fault(in)", conn, [this, cc_seq, eptr] { | |
1162 | auto states = get_states(); | |
1163 | return seastar::smp::submit_to( | |
1164 | conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] { | |
1165 | return handshake_listener->notify_out_fault( | |
1166 | cc_seq, "do_in_dispatch", eptr, states); | |
1167 | }); | |
1168 | }); | |
1e59de90 | 1169 | } else { |
aee94f69 TL |
1170 | if (io_state != io_state_t::switched) { |
1171 | logger().info("{} do_in_dispatch(): fault at {}, {} -- {}", | |
1172 | conn, io_state, io_stat_printer{*this}, e_what); | |
1173 | } else { | |
1174 | logger().info("{} do_in_dispatch(): fault at {} -- {}", | |
1175 | conn, io_state, e_what); | |
1176 | } | |
1e59de90 | 1177 | } |
aee94f69 TL |
1178 | }).finally([&ctx] { |
1179 | ctx.exit_in_dispatching(); | |
1e59de90 TL |
1180 | }); |
1181 | }); | |
1182 | } | |
1183 | ||
aee94f69 TL |
1184 | seastar::future<> |
1185 | IOHandler::close_io( | |
1186 | crosscore_t::seq_t cc_seq, | |
1187 | bool is_dispatch_reset, | |
1188 | bool is_replace) | |
1189 | { | |
1190 | ceph_assert_always(seastar::this_shard_id() == get_shard_id()); | |
1191 | if (!crosscore.proceed_or_wait(cc_seq)) { | |
1192 | logger().debug("{} got {} close_io(), wait at {}", | |
1193 | conn, cc_seq, crosscore.get_in_seq()); | |
1194 | return crosscore.wait(cc_seq | |
1195 | ).then([this, cc_seq, is_dispatch_reset, is_replace] { | |
1196 | return close_io(cc_seq, is_dispatch_reset, is_replace); | |
1197 | }); | |
1198 | } | |
1199 | ||
1200 | logger().debug("{} got {} close_io(reset={}, replace={})", | |
1201 | conn, cc_seq, is_dispatch_reset, is_replace); | |
1202 | ceph_assert_always(get_io_state() == io_state_t::drop); | |
1203 | ||
1204 | if (is_dispatch_reset) { | |
1205 | dispatch_reset(is_replace); | |
1206 | } | |
1207 | ||
1208 | ceph_assert_always(conn_ref); | |
1209 | conn_ref.reset(); | |
1210 | ||
1211 | // cannot be running in parallel with to_new_sid() | |
1212 | if (maybe_dropped_sid.has_value()) { | |
1213 | assert(shard_states->assert_closed_and_exit()); | |
1214 | auto prv_sid = *maybe_dropped_sid; | |
1215 | return cleanup_prv_shard(prv_sid); | |
1216 | } else { | |
1217 | return shard_states->close( | |
1218 | ).then([this] { | |
1219 | assert(shard_states->assert_closed_and_exit()); | |
1220 | }); | |
1221 | } | |
1222 | } | |
1223 | ||
1224 | /* | |
1225 | * IOHandler::shard_states_t | |
1226 | */ | |
1227 | ||
1228 | void | |
1229 | IOHandler::shard_states_t::notify_out_dispatching_stopped( | |
1230 | const char *what, SocketConnection &conn) | |
1231 | { | |
1232 | assert(seastar::this_shard_id() == sid); | |
1233 | if (unlikely(out_exit_dispatching.has_value())) { | |
1234 | out_exit_dispatching->set_value(); | |
1235 | out_exit_dispatching = std::nullopt; | |
1236 | logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching", | |
1237 | conn, what, io_state); | |
1238 | } else { | |
1239 | if (unlikely(io_state != io_state_t::open)) { | |
1240 | logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching", | |
1241 | conn, what, io_state); | |
1242 | } | |
1243 | } | |
1244 | } | |
1245 | ||
1246 | seastar::future<> | |
1247 | IOHandler::shard_states_t::wait_io_exit_dispatching() | |
1248 | { | |
1249 | assert(seastar::this_shard_id() == sid); | |
1250 | assert(io_state != io_state_t::open); | |
1251 | assert(!gate.is_closed()); | |
1252 | return seastar::when_all( | |
1253 | [this] { | |
1254 | if (out_exit_dispatching) { | |
1255 | return out_exit_dispatching->get_future(); | |
1256 | } else { | |
1257 | return seastar::now(); | |
1258 | } | |
1259 | }(), | |
1260 | [this] { | |
1261 | if (in_exit_dispatching) { | |
1262 | return in_exit_dispatching->get_future(); | |
1263 | } else { | |
1264 | return seastar::now(); | |
1265 | } | |
1266 | }() | |
1267 | ).discard_result(); | |
1268 | } | |
1269 | ||
1270 | IOHandler::shard_states_ref_t | |
1271 | IOHandler::shard_states_t::create_from_previous( | |
1272 | shard_states_t &prv_states, | |
1273 | seastar::shard_id new_sid) | |
1274 | { | |
1275 | auto io_state = prv_states.io_state; | |
1276 | assert(io_state != io_state_t::open); | |
1277 | auto ret = shard_states_t::create(new_sid, io_state); | |
1278 | if (io_state == io_state_t::drop) { | |
1279 | // the new gate should not never be used | |
1280 | auto fut = ret->gate.close(); | |
1281 | ceph_assert_always(fut.available()); | |
1282 | } | |
1283 | prv_states.set_io_state(io_state_t::switched); | |
1284 | return ret; | |
1285 | } | |
1286 | ||
1e59de90 | 1287 | } // namespace crimson::net |