]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <unistd.h> | |
18 | ||
19 | #include "include/Context.h" | |
20 | #include "common/errno.h" | |
21 | #include "AsyncMessenger.h" | |
22 | #include "AsyncConnection.h" | |
23 | ||
24 | #include "messages/MOSDOp.h" | |
25 | #include "messages/MOSDOpReply.h" | |
26 | #include "common/EventTrace.h" | |
27 | ||
28 | // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR | |
29 | #define SEQ_MASK 0x7fffffff | |
30 | ||
31 | #define dout_subsys ceph_subsys_ms | |
32 | #undef dout_prefix | |
33 | #define dout_prefix _conn_prefix(_dout) | |
34 | ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { | |
35 | return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this | |
36 | << " :" << port | |
37 | << " s=" << get_state_name(state) | |
38 | << " pgs=" << peer_global_seq | |
39 | << " cs=" << connect_seq | |
40 | << " l=" << policy.lossy | |
41 | << ")."; | |
42 | } | |
43 | ||
44 | // Notes: | |
45 | // 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead | |
46 | ||
47 | const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512; | |
48 | const int ASYNC_COALESCE_THRESHOLD = 256; | |
49 | ||
50 | class C_time_wakeup : public EventCallback { | |
51 | AsyncConnectionRef conn; | |
52 | ||
53 | public: | |
54 | explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {} | |
55 | void do_request(int fd_or_id) override { | |
56 | conn->wakeup_from(fd_or_id); | |
57 | } | |
58 | }; | |
59 | ||
60 | class C_handle_read : public EventCallback { | |
61 | AsyncConnectionRef conn; | |
62 | ||
63 | public: | |
64 | explicit C_handle_read(AsyncConnectionRef c): conn(c) {} | |
65 | void do_request(int fd_or_id) override { | |
66 | conn->process(); | |
67 | } | |
68 | }; | |
69 | ||
70 | class C_handle_write : public EventCallback { | |
71 | AsyncConnectionRef conn; | |
72 | ||
73 | public: | |
74 | explicit C_handle_write(AsyncConnectionRef c): conn(c) {} | |
75 | void do_request(int fd) override { | |
76 | conn->handle_write(); | |
77 | } | |
78 | }; | |
79 | ||
80 | class C_clean_handler : public EventCallback { | |
81 | AsyncConnectionRef conn; | |
82 | public: | |
83 | explicit C_clean_handler(AsyncConnectionRef c): conn(c) {} | |
84 | void do_request(int id) override { | |
85 | conn->cleanup(); | |
86 | delete this; | |
87 | } | |
88 | }; | |
89 | ||
90 | class C_tick_wakeup : public EventCallback { | |
91 | AsyncConnectionRef conn; | |
92 | ||
93 | public: | |
94 | explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {} | |
95 | void do_request(int fd_or_id) override { | |
96 | conn->tick(fd_or_id); | |
97 | } | |
98 | }; | |
99 | ||
100 | static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) | |
101 | { | |
102 | // create a buffer to read into that matches the data alignment | |
103 | unsigned left = len; | |
104 | if (off & ~CEPH_PAGE_MASK) { | |
105 | // head | |
106 | unsigned head = 0; | |
107 | head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left); | |
108 | data.push_back(buffer::create(head)); | |
109 | left -= head; | |
110 | } | |
111 | unsigned middle = left & CEPH_PAGE_MASK; | |
112 | if (middle > 0) { | |
113 | data.push_back(buffer::create_page_aligned(middle)); | |
114 | left -= middle; | |
115 | } | |
116 | if (left) { | |
117 | data.push_back(buffer::create(left)); | |
118 | } | |
119 | } | |
120 | ||
121 | AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, | |
122 | Worker *w) | |
123 | : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), | |
124 | logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), | |
125 | out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1), | |
126 | dispatch_queue(q), can_write(WriteStatus::NOWRITE), | |
127 | open_write(false), keepalive(false), recv_buf(NULL), | |
128 | recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), | |
129 | recv_start(0), recv_end(0), | |
130 | last_active(ceph::coarse_mono_clock::now()), | |
131 | inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), | |
132 | got_bad_auth(false), authorizer(NULL), replacing(false), | |
133 | is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), | |
134 | worker(w), center(&w->center) | |
135 | { | |
136 | read_handler = new C_handle_read(this); | |
137 | write_handler = new C_handle_write(this); | |
138 | wakeup_handler = new C_time_wakeup(this); | |
139 | tick_handler = new C_tick_wakeup(this); | |
140 | memset(msgvec, 0, sizeof(msgvec)); | |
141 | // double recv_max_prefetch see "read_until" | |
142 | recv_buf = new char[2*recv_max_prefetch]; | |
143 | state_buffer = new char[4096]; | |
144 | logger->inc(l_msgr_created_connections); | |
145 | } | |
146 | ||
147 | AsyncConnection::~AsyncConnection() | |
148 | { | |
149 | assert(out_q.empty()); | |
150 | assert(sent.empty()); | |
151 | delete authorizer; | |
152 | if (recv_buf) | |
153 | delete[] recv_buf; | |
154 | if (state_buffer) | |
155 | delete[] state_buffer; | |
156 | assert(!delay_state); | |
157 | } | |
158 | ||
159 | void AsyncConnection::maybe_start_delay_thread() | |
160 | { | |
161 | if (!delay_state) { | |
162 | auto pos = async_msgr->cct->_conf->get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(peer_type)); | |
163 | if (pos != string::npos) { | |
164 | ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl; | |
165 | delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, conn_id); | |
166 | } | |
167 | } | |
168 | } | |
169 | ||
170 | /* return -1 means `fd` occurs error or closed, it should be closed | |
171 | * return 0 means EAGAIN or EINTR */ | |
172 | ssize_t AsyncConnection::read_bulk(char *buf, unsigned len) | |
173 | { | |
174 | ssize_t nread; | |
175 | again: | |
176 | nread = cs.read(buf, len); | |
177 | if (nread < 0) { | |
178 | if (nread == -EAGAIN) { | |
179 | nread = 0; | |
180 | } else if (nread == -EINTR) { | |
181 | goto again; | |
182 | } else { | |
183 | ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd() | |
184 | << " : "<< strerror(nread) << dendl; | |
185 | return -1; | |
186 | } | |
187 | } else if (nread == 0) { | |
188 | ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor " | |
189 | << cs.fd() << dendl; | |
190 | return -1; | |
191 | } | |
192 | return nread; | |
193 | } | |
194 | ||
195 | // return the remaining bytes, it may larger than the length of ptr | |
196 | // else return < 0 means error | |
197 | ssize_t AsyncConnection::_try_send(bool more) | |
198 | { | |
199 | if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { | |
200 | if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { | |
201 | ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; | |
202 | cs.shutdown(); | |
203 | } | |
204 | } | |
205 | ||
206 | ssize_t r = cs.send(outcoming_bl, more); | |
207 | if (r < 0) { | |
208 | ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; | |
209 | return r; | |
210 | } | |
211 | ||
212 | ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r | |
213 | << " remaining bytes " << outcoming_bl.length() << dendl; | |
214 | ||
215 | if (!open_write && is_queued()) { | |
216 | if (center->in_thread()) { | |
217 | center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler); | |
218 | open_write = true; | |
219 | } else { | |
220 | center->dispatch_event_external(write_handler); | |
221 | } | |
222 | } | |
223 | ||
224 | if (open_write && !is_queued()) { | |
225 | if (center->in_thread()) { | |
226 | center->delete_file_event(cs.fd(), EVENT_WRITABLE); | |
227 | open_write = false; | |
228 | } else { | |
229 | center->dispatch_event_external(write_handler); | |
230 | } | |
231 | if (state_after_send != STATE_NONE) | |
232 | center->dispatch_event_external(read_handler); | |
233 | } | |
234 | ||
235 | return outcoming_bl.length(); | |
236 | } | |
237 | ||
238 | // Because this func will be called multi times to populate | |
239 | // the needed buffer, so the passed in bufferptr must be the same. | |
240 | // Normally, only "read_message" will pass existing bufferptr in | |
241 | // | |
242 | // And it will uses readahead method to reduce small read overhead, | |
243 | // "recv_buf" is used to store read buffer | |
244 | // | |
245 | // return the remaining bytes, 0 means this buffer is finished | |
246 | // else return < 0 means error | |
247 | ssize_t AsyncConnection::read_until(unsigned len, char *p) | |
248 | { | |
249 | ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is " | |
250 | << state_offset << dendl; | |
251 | ||
252 | if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { | |
253 | if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { | |
254 | ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; | |
255 | cs.shutdown(); | |
256 | } | |
257 | } | |
258 | ||
259 | ssize_t r = 0; | |
260 | uint64_t left = len - state_offset; | |
261 | if (recv_end > recv_start) { | |
262 | uint64_t to_read = MIN(recv_end - recv_start, left); | |
263 | memcpy(p, recv_buf+recv_start, to_read); | |
264 | recv_start += to_read; | |
265 | left -= to_read; | |
266 | ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer " | |
267 | << " left is " << left << " buffer still has " | |
268 | << recv_end - recv_start << dendl; | |
269 | if (left == 0) { | |
270 | return 0; | |
271 | } | |
272 | state_offset += to_read; | |
273 | } | |
274 | ||
275 | recv_end = recv_start = 0; | |
276 | /* nothing left in the prefetch buffer */ | |
277 | if (len > recv_max_prefetch) { | |
278 | /* this was a large read, we don't prefetch for these */ | |
279 | do { | |
280 | r = read_bulk(p+state_offset, left); | |
281 | ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl; | |
282 | if (r < 0) { | |
283 | ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; | |
284 | return -1; | |
285 | } else if (r == static_cast<int>(left)) { | |
286 | state_offset = 0; | |
287 | return 0; | |
288 | } | |
289 | state_offset += r; | |
290 | left -= r; | |
291 | } while (r > 0); | |
292 | } else { | |
293 | do { | |
294 | r = read_bulk(recv_buf+recv_end, recv_max_prefetch); | |
295 | ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end | |
296 | << " left is " << left << " got " << r << dendl; | |
297 | if (r < 0) { | |
298 | ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; | |
299 | return -1; | |
300 | } | |
301 | recv_end += r; | |
302 | if (r >= static_cast<int>(left)) { | |
303 | recv_start = len - state_offset; | |
304 | memcpy(p+state_offset, recv_buf, recv_start); | |
305 | state_offset = 0; | |
306 | return 0; | |
307 | } | |
308 | left -= r; | |
309 | } while (r > 0); | |
310 | memcpy(p+state_offset, recv_buf, recv_end-recv_start); | |
311 | state_offset += (recv_end - recv_start); | |
312 | recv_end = recv_start = 0; | |
313 | } | |
314 | ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining " | |
315 | << len - state_offset << " bytes" << dendl; | |
316 | return len - state_offset; | |
317 | } | |
318 | ||
319 | void AsyncConnection::inject_delay() { | |
320 | if (async_msgr->cct->_conf->ms_inject_internal_delays) { | |
321 | ldout(async_msgr->cct, 10) << __func__ << " sleep for " << | |
322 | async_msgr->cct->_conf->ms_inject_internal_delays << dendl; | |
323 | utime_t t; | |
324 | t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); | |
325 | t.sleep(); | |
326 | } | |
327 | } | |
328 | ||
329 | void AsyncConnection::process() | |
330 | { | |
331 | ssize_t r = 0; | |
332 | int prev_state = state; | |
333 | #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) | |
334 | utime_t ltt_recv_stamp = ceph_clock_now(); | |
335 | #endif | |
336 | bool need_dispatch_writer = false; | |
337 | std::lock_guard<std::mutex> l(lock); | |
338 | last_active = ceph::coarse_mono_clock::now(); | |
339 | do { | |
340 | ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl; | |
341 | prev_state = state; | |
342 | switch (state) { | |
343 | case STATE_OPEN: | |
344 | { | |
345 | char tag = -1; | |
346 | r = read_until(sizeof(tag), &tag); | |
347 | if (r < 0) { | |
348 | ldout(async_msgr->cct, 1) << __func__ << " read tag failed" << dendl; | |
349 | goto fail; | |
350 | } else if (r > 0) { | |
351 | break; | |
352 | } | |
353 | ||
354 | if (tag == CEPH_MSGR_TAG_KEEPALIVE) { | |
355 | ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl; | |
356 | set_last_keepalive(ceph_clock_now()); | |
357 | } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) { | |
358 | state = STATE_OPEN_KEEPALIVE2; | |
359 | } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { | |
360 | state = STATE_OPEN_KEEPALIVE2_ACK; | |
361 | } else if (tag == CEPH_MSGR_TAG_ACK) { | |
362 | state = STATE_OPEN_TAG_ACK; | |
363 | } else if (tag == CEPH_MSGR_TAG_MSG) { | |
364 | state = STATE_OPEN_MESSAGE_HEADER; | |
365 | } else if (tag == CEPH_MSGR_TAG_CLOSE) { | |
366 | state = STATE_OPEN_TAG_CLOSE; | |
367 | } else { | |
368 | ldout(async_msgr->cct, 0) << __func__ << " bad tag " << (int)tag << dendl; | |
369 | goto fail; | |
370 | } | |
371 | ||
372 | break; | |
373 | } | |
374 | ||
375 | case STATE_OPEN_KEEPALIVE2: | |
376 | { | |
377 | ceph_timespec *t; | |
378 | r = read_until(sizeof(*t), state_buffer); | |
379 | if (r < 0) { | |
380 | ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl; | |
381 | goto fail; | |
382 | } else if (r > 0) { | |
383 | break; | |
384 | } | |
385 | ||
386 | ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; | |
387 | t = (ceph_timespec*)state_buffer; | |
388 | utime_t kp_t = utime_t(*t); | |
389 | write_lock.lock(); | |
390 | _append_keepalive_or_ack(true, &kp_t); | |
391 | write_lock.unlock(); | |
392 | ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl; | |
393 | set_last_keepalive(ceph_clock_now()); | |
394 | need_dispatch_writer = true; | |
395 | state = STATE_OPEN; | |
396 | break; | |
397 | } | |
398 | ||
399 | case STATE_OPEN_KEEPALIVE2_ACK: | |
400 | { | |
401 | ceph_timespec *t; | |
402 | r = read_until(sizeof(*t), state_buffer); | |
403 | if (r < 0) { | |
404 | ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl; | |
405 | goto fail; | |
406 | } else if (r > 0) { | |
407 | break; | |
408 | } | |
409 | ||
410 | t = (ceph_timespec*)state_buffer; | |
411 | set_last_keepalive_ack(utime_t(*t)); | |
412 | ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl; | |
413 | state = STATE_OPEN; | |
414 | break; | |
415 | } | |
416 | ||
417 | case STATE_OPEN_TAG_ACK: | |
418 | { | |
419 | ceph_le64 *seq; | |
420 | r = read_until(sizeof(*seq), state_buffer); | |
421 | if (r < 0) { | |
422 | ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl; | |
423 | goto fail; | |
424 | } else if (r > 0) { | |
425 | break; | |
426 | } | |
427 | ||
428 | seq = (ceph_le64*)state_buffer; | |
429 | ldout(async_msgr->cct, 20) << __func__ << " got ACK" << dendl; | |
430 | handle_ack(*seq); | |
431 | state = STATE_OPEN; | |
432 | break; | |
433 | } | |
434 | ||
435 | case STATE_OPEN_MESSAGE_HEADER: | |
436 | { | |
437 | #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) | |
438 | ltt_recv_stamp = ceph_clock_now(); | |
439 | #endif | |
440 | ldout(async_msgr->cct, 20) << __func__ << " begin MSG" << dendl; | |
441 | ceph_msg_header header; | |
442 | ceph_msg_header_old oldheader; | |
443 | __u32 header_crc = 0; | |
444 | unsigned len; | |
445 | if (has_feature(CEPH_FEATURE_NOSRCADDR)) | |
446 | len = sizeof(header); | |
447 | else | |
448 | len = sizeof(oldheader); | |
449 | ||
450 | r = read_until(len, state_buffer); | |
451 | if (r < 0) { | |
452 | ldout(async_msgr->cct, 1) << __func__ << " read message header failed" << dendl; | |
453 | goto fail; | |
454 | } else if (r > 0) { | |
455 | break; | |
456 | } | |
457 | ||
458 | ldout(async_msgr->cct, 20) << __func__ << " got MSG header" << dendl; | |
459 | ||
460 | if (has_feature(CEPH_FEATURE_NOSRCADDR)) { | |
461 | header = *((ceph_msg_header*)state_buffer); | |
462 | if (msgr->crcflags & MSG_CRC_HEADER) | |
463 | header_crc = ceph_crc32c(0, (unsigned char *)&header, | |
464 | sizeof(header) - sizeof(header.crc)); | |
465 | } else { | |
466 | oldheader = *((ceph_msg_header_old*)state_buffer); | |
467 | // this is fugly | |
468 | memcpy(&header, &oldheader, sizeof(header)); | |
469 | header.src = oldheader.src.name; | |
470 | header.reserved = oldheader.reserved; | |
471 | if (msgr->crcflags & MSG_CRC_HEADER) { | |
472 | header.crc = oldheader.crc; | |
473 | header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); | |
474 | } | |
475 | } | |
476 | ||
477 | ldout(async_msgr->cct, 20) << __func__ << " got envelope type=" << header.type | |
478 | << " src " << entity_name_t(header.src) | |
479 | << " front=" << header.front_len | |
480 | << " data=" << header.data_len | |
481 | << " off " << header.data_off << dendl; | |
482 | ||
483 | // verify header crc | |
484 | if (msgr->crcflags & MSG_CRC_HEADER && header_crc != header.crc) { | |
485 | ldout(async_msgr->cct,0) << __func__ << " got bad header crc " | |
486 | << header_crc << " != " << header.crc << dendl; | |
487 | goto fail; | |
488 | } | |
489 | ||
490 | // Reset state | |
491 | data_buf.clear(); | |
492 | front.clear(); | |
493 | middle.clear(); | |
494 | data.clear(); | |
495 | recv_stamp = ceph_clock_now(); | |
496 | current_header = header; | |
497 | state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE; | |
498 | break; | |
499 | } | |
500 | ||
501 | case STATE_OPEN_MESSAGE_THROTTLE_MESSAGE: | |
502 | { | |
503 | if (policy.throttler_messages) { | |
504 | ldout(async_msgr->cct, 10) << __func__ << " wants " << 1 << " message from policy throttler " | |
505 | << policy.throttler_messages->get_current() << "/" | |
506 | << policy.throttler_messages->get_max() << dendl; | |
507 | if (!policy.throttler_messages->get_or_fail()) { | |
508 | ldout(async_msgr->cct, 10) << __func__ << " wants 1 message from policy throttle " | |
509 | << policy.throttler_messages->get_current() << "/" | |
510 | << policy.throttler_messages->get_max() << " failed, just wait." << dendl; | |
511 | // following thread pool deal with th full message queue isn't a | |
512 | // short time, so we can wait a ms. | |
513 | if (register_time_events.empty()) | |
514 | register_time_events.insert(center->create_time_event(1000, wakeup_handler)); | |
515 | break; | |
516 | } | |
517 | } | |
518 | ||
519 | state = STATE_OPEN_MESSAGE_THROTTLE_BYTES; | |
520 | break; | |
521 | } | |
522 | ||
523 | case STATE_OPEN_MESSAGE_THROTTLE_BYTES: | |
524 | { | |
525 | cur_msg_size = current_header.front_len + current_header.middle_len + current_header.data_len; | |
526 | if (cur_msg_size) { | |
527 | if (policy.throttler_bytes) { | |
528 | ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler " | |
529 | << policy.throttler_bytes->get_current() << "/" | |
530 | << policy.throttler_bytes->get_max() << dendl; | |
531 | if (!policy.throttler_bytes->get_or_fail(cur_msg_size)) { | |
532 | ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler " | |
533 | << policy.throttler_bytes->get_current() << "/" | |
534 | << policy.throttler_bytes->get_max() << " failed, just wait." << dendl; | |
535 | // following thread pool deal with th full message queue isn't a | |
536 | // short time, so we can wait a ms. | |
537 | if (register_time_events.empty()) | |
538 | register_time_events.insert(center->create_time_event(1000, wakeup_handler)); | |
539 | break; | |
540 | } | |
541 | } | |
542 | } | |
543 | ||
544 | state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE; | |
545 | break; | |
546 | } | |
547 | ||
548 | case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE: | |
549 | { | |
550 | if (cur_msg_size) { | |
551 | if (!dispatch_queue->dispatch_throttler.get_or_fail(cur_msg_size)) { | |
552 | ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from dispatch throttle " | |
553 | << dispatch_queue->dispatch_throttler.get_current() << "/" | |
554 | << dispatch_queue->dispatch_throttler.get_max() << " failed, just wait." << dendl; | |
555 | // following thread pool deal with th full message queue isn't a | |
556 | // short time, so we can wait a ms. | |
557 | if (register_time_events.empty()) | |
558 | register_time_events.insert(center->create_time_event(1000, wakeup_handler)); | |
559 | break; | |
560 | } | |
561 | } | |
562 | ||
563 | throttle_stamp = ceph_clock_now(); | |
564 | state = STATE_OPEN_MESSAGE_READ_FRONT; | |
565 | break; | |
566 | } | |
567 | ||
568 | case STATE_OPEN_MESSAGE_READ_FRONT: | |
569 | { | |
570 | // read front | |
571 | unsigned front_len = current_header.front_len; | |
572 | if (front_len) { | |
573 | if (!front.length()) | |
574 | front.push_back(buffer::create(front_len)); | |
575 | ||
576 | r = read_until(front_len, front.c_str()); | |
577 | if (r < 0) { | |
578 | ldout(async_msgr->cct, 1) << __func__ << " read message front failed" << dendl; | |
579 | goto fail; | |
580 | } else if (r > 0) { | |
581 | break; | |
582 | } | |
583 | ||
584 | ldout(async_msgr->cct, 20) << __func__ << " got front " << front.length() << dendl; | |
585 | } | |
586 | state = STATE_OPEN_MESSAGE_READ_MIDDLE; | |
587 | } | |
588 | ||
589 | case STATE_OPEN_MESSAGE_READ_MIDDLE: | |
590 | { | |
591 | // read middle | |
592 | unsigned middle_len = current_header.middle_len; | |
593 | if (middle_len) { | |
594 | if (!middle.length()) | |
595 | middle.push_back(buffer::create(middle_len)); | |
596 | ||
597 | r = read_until(middle_len, middle.c_str()); | |
598 | if (r < 0) { | |
599 | ldout(async_msgr->cct, 1) << __func__ << " read message middle failed" << dendl; | |
600 | goto fail; | |
601 | } else if (r > 0) { | |
602 | break; | |
603 | } | |
604 | ldout(async_msgr->cct, 20) << __func__ << " got middle " << middle.length() << dendl; | |
605 | } | |
606 | ||
607 | state = STATE_OPEN_MESSAGE_READ_DATA_PREPARE; | |
608 | } | |
609 | ||
610 | case STATE_OPEN_MESSAGE_READ_DATA_PREPARE: | |
611 | { | |
612 | // read data | |
613 | unsigned data_len = le32_to_cpu(current_header.data_len); | |
614 | unsigned data_off = le32_to_cpu(current_header.data_off); | |
615 | if (data_len) { | |
616 | // get a buffer | |
617 | map<ceph_tid_t,pair<bufferlist,int> >::iterator p = rx_buffers.find(current_header.tid); | |
618 | if (p != rx_buffers.end()) { | |
619 | ldout(async_msgr->cct,10) << __func__ << " seleting rx buffer v " << p->second.second | |
620 | << " at offset " << data_off | |
621 | << " len " << p->second.first.length() << dendl; | |
622 | data_buf = p->second.first; | |
623 | // make sure it's big enough | |
624 | if (data_buf.length() < data_len) | |
625 | data_buf.push_back(buffer::create(data_len - data_buf.length())); | |
626 | data_blp = data_buf.begin(); | |
627 | } else { | |
628 | ldout(async_msgr->cct,20) << __func__ << " allocating new rx buffer at offset " << data_off << dendl; | |
629 | alloc_aligned_buffer(data_buf, data_len, data_off); | |
630 | data_blp = data_buf.begin(); | |
631 | } | |
632 | } | |
633 | ||
634 | msg_left = data_len; | |
635 | state = STATE_OPEN_MESSAGE_READ_DATA; | |
636 | } | |
637 | ||
638 | case STATE_OPEN_MESSAGE_READ_DATA: | |
639 | { | |
640 | while (msg_left > 0) { | |
641 | bufferptr bp = data_blp.get_current_ptr(); | |
642 | unsigned read = MIN(bp.length(), msg_left); | |
643 | r = read_until(read, bp.c_str()); | |
644 | if (r < 0) { | |
645 | ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl; | |
646 | goto fail; | |
647 | } else if (r > 0) { | |
648 | break; | |
649 | } | |
650 | ||
651 | data_blp.advance(read); | |
652 | data.append(bp, 0, read); | |
653 | msg_left -= read; | |
654 | } | |
655 | ||
656 | if (msg_left > 0) | |
657 | break; | |
658 | ||
659 | state = STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH; | |
660 | } | |
661 | ||
662 | case STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH: | |
663 | { | |
664 | ceph_msg_footer footer; | |
665 | ceph_msg_footer_old old_footer; | |
666 | unsigned len; | |
667 | // footer | |
668 | if (has_feature(CEPH_FEATURE_MSG_AUTH)) | |
669 | len = sizeof(footer); | |
670 | else | |
671 | len = sizeof(old_footer); | |
672 | ||
673 | r = read_until(len, state_buffer); | |
674 | if (r < 0) { | |
675 | ldout(async_msgr->cct, 1) << __func__ << " read footer data error " << dendl; | |
676 | goto fail; | |
677 | } else if (r > 0) { | |
678 | break; | |
679 | } | |
680 | ||
681 | if (has_feature(CEPH_FEATURE_MSG_AUTH)) { | |
682 | footer = *((ceph_msg_footer*)state_buffer); | |
683 | } else { | |
684 | old_footer = *((ceph_msg_footer_old*)state_buffer); | |
685 | footer.front_crc = old_footer.front_crc; | |
686 | footer.middle_crc = old_footer.middle_crc; | |
687 | footer.data_crc = old_footer.data_crc; | |
688 | footer.sig = 0; | |
689 | footer.flags = old_footer.flags; | |
690 | } | |
691 | int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; | |
692 | ldout(async_msgr->cct, 10) << __func__ << " aborted = " << aborted << dendl; | |
693 | if (aborted) { | |
694 | ldout(async_msgr->cct, 0) << __func__ << " got " << front.length() << " + " << middle.length() << " + " << data.length() | |
695 | << " byte message.. ABORTED" << dendl; | |
696 | goto fail; | |
697 | } | |
698 | ||
699 | ldout(async_msgr->cct, 20) << __func__ << " got " << front.length() << " + " << middle.length() | |
700 | << " + " << data.length() << " byte message" << dendl; | |
701 | Message *message = decode_message(async_msgr->cct, async_msgr->crcflags, current_header, footer, | |
702 | front, middle, data, this); | |
703 | if (!message) { | |
704 | ldout(async_msgr->cct, 1) << __func__ << " decode message failed " << dendl; | |
705 | goto fail; | |
706 | } | |
707 | ||
708 | // | |
709 | // Check the signature if one should be present. A zero return indicates success. PLR | |
710 | // | |
711 | ||
712 | if (session_security.get() == NULL) { | |
713 | ldout(async_msgr->cct, 10) << __func__ << " no session security set" << dendl; | |
714 | } else { | |
715 | if (session_security->check_message_signature(message)) { | |
716 | ldout(async_msgr->cct, 0) << __func__ << " Signature check failed" << dendl; | |
717 | message->put(); | |
718 | goto fail; | |
719 | } | |
720 | } | |
721 | message->set_byte_throttler(policy.throttler_bytes); | |
722 | message->set_message_throttler(policy.throttler_messages); | |
723 | ||
724 | // store reservation size in message, so we don't get confused | |
725 | // by messages entering the dispatch queue through other paths. | |
726 | message->set_dispatch_throttle_size(cur_msg_size); | |
727 | ||
728 | message->set_recv_stamp(recv_stamp); | |
729 | message->set_throttle_stamp(throttle_stamp); | |
730 | message->set_recv_complete_stamp(ceph_clock_now()); | |
731 | ||
732 | // check received seq#. if it is old, drop the message. | |
733 | // note that incoming messages may skip ahead. this is convenient for the client | |
734 | // side queueing because messages can't be renumbered, but the (kernel) client will | |
735 | // occasionally pull a message out of the sent queue to send elsewhere. in that case | |
736 | // it doesn't matter if we "got" it or not. | |
737 | uint64_t cur_seq = in_seq.read(); | |
738 | if (message->get_seq() <= cur_seq) { | |
739 | ldout(async_msgr->cct,0) << __func__ << " got old message " | |
740 | << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message | |
741 | << ", discarding" << dendl; | |
742 | message->put(); | |
743 | if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message) | |
744 | assert(0 == "old msgs despite reconnect_seq feature"); | |
745 | break; | |
746 | } | |
747 | if (message->get_seq() > cur_seq + 1) { | |
748 | ldout(async_msgr->cct, 0) << __func__ << " missed message? skipped from seq " | |
749 | << cur_seq << " to " << message->get_seq() << dendl; | |
750 | if (async_msgr->cct->_conf->ms_die_on_skipped_message) | |
751 | assert(0 == "skipped incoming seq"); | |
752 | } | |
753 | ||
754 | message->set_connection(this); | |
755 | ||
756 | #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) | |
757 | if (message->get_type() == CEPH_MSG_OSD_OP || message->get_type() == CEPH_MSG_OSD_OPREPLY) { | |
758 | utime_t ltt_processed_stamp = ceph_clock_now(); | |
759 | double usecs_elapsed = (ltt_processed_stamp.to_nsec()-ltt_recv_stamp.to_nsec())/1000; | |
760 | ostringstream buf; | |
761 | if (message->get_type() == CEPH_MSG_OSD_OP) | |
762 | OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP", false); | |
763 | else | |
764 | OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY", false); | |
765 | } | |
766 | #endif | |
767 | ||
768 | // note last received message. | |
769 | in_seq.set(message->get_seq()); | |
770 | ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq " | |
771 | << message->get_seq() << " " << message | |
772 | << " " << *message << dendl; | |
773 | ||
774 | if (!policy.lossy) { | |
775 | ack_left.inc(); | |
776 | need_dispatch_writer = true; | |
777 | } | |
778 | state = STATE_OPEN; | |
779 | ||
780 | logger->inc(l_msgr_recv_messages); | |
781 | logger->inc(l_msgr_recv_bytes, cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer)); | |
782 | ||
783 | async_msgr->ms_fast_preprocess(message); | |
784 | if (delay_state) { | |
785 | utime_t release = message->get_recv_stamp(); | |
786 | double delay_period = 0; | |
787 | if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { | |
788 | delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; | |
789 | release += delay_period; | |
790 | ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on " | |
791 | << message << " " << *message << dendl; | |
792 | } | |
793 | delay_state->queue(delay_period, release, message); | |
794 | } else if (async_msgr->ms_can_fast_dispatch(message)) { | |
795 | lock.unlock(); | |
796 | dispatch_queue->fast_dispatch(message); | |
797 | lock.lock(); | |
798 | } else { | |
799 | dispatch_queue->enqueue(message, message->get_priority(), conn_id); | |
800 | } | |
801 | ||
802 | break; | |
803 | } | |
804 | ||
805 | case STATE_OPEN_TAG_CLOSE: | |
806 | { | |
807 | ldout(async_msgr->cct, 20) << __func__ << " got CLOSE" << dendl; | |
808 | _stop(); | |
809 | return ; | |
810 | } | |
811 | ||
812 | case STATE_STANDBY: | |
813 | { | |
814 | ldout(async_msgr->cct, 20) << __func__ << " enter STANDY" << dendl; | |
815 | ||
816 | break; | |
817 | } | |
818 | ||
819 | case STATE_NONE: | |
820 | { | |
821 | ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl; | |
822 | break; | |
823 | } | |
824 | ||
825 | case STATE_CLOSED: | |
826 | { | |
827 | ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl; | |
828 | break; | |
829 | } | |
830 | ||
831 | case STATE_WAIT: | |
832 | { | |
833 | ldout(async_msgr->cct, 1) << __func__ << " enter wait state, failing" << dendl; | |
834 | goto fail; | |
835 | } | |
836 | ||
837 | default: | |
838 | { | |
839 | if (_process_connection() < 0) | |
840 | goto fail; | |
841 | break; | |
842 | } | |
843 | } | |
844 | } while (prev_state != state); | |
845 | ||
846 | if (need_dispatch_writer && is_connected()) | |
847 | center->dispatch_event_external(write_handler); | |
848 | return; | |
849 | ||
850 | fail: | |
851 | fault(); | |
852 | } | |
853 | ||
854 | ssize_t AsyncConnection::_process_connection() | |
855 | { | |
856 | ssize_t r = 0; | |
857 | ||
858 | switch(state) { | |
859 | case STATE_WAIT_SEND: | |
860 | { | |
861 | std::lock_guard<std::mutex> l(write_lock); | |
862 | if (!outcoming_bl.length()) { | |
863 | assert(state_after_send); | |
864 | state = state_after_send; | |
865 | state_after_send = STATE_NONE; | |
866 | } | |
867 | break; | |
868 | } | |
869 | ||
870 | case STATE_CONNECTING: | |
871 | { | |
872 | assert(!policy.server); | |
873 | ||
874 | // reset connect state variables | |
875 | got_bad_auth = false; | |
876 | delete authorizer; | |
877 | authorizer = NULL; | |
878 | authorizer_buf.clear(); | |
879 | memset(&connect_msg, 0, sizeof(connect_msg)); | |
880 | memset(&connect_reply, 0, sizeof(connect_reply)); | |
881 | ||
882 | global_seq = async_msgr->get_global_seq(); | |
883 | // close old socket. this is safe because we stopped the reader thread above. | |
884 | if (cs) { | |
885 | center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); | |
886 | cs.close(); | |
887 | } | |
888 | ||
889 | SocketOptions opts; | |
890 | opts.priority = async_msgr->get_socket_priority(); | |
891 | opts.connect_bind_addr = msgr->get_myaddr(); | |
892 | r = worker->connect(get_peer_addr(), opts, &cs); | |
893 | if (r < 0) | |
894 | goto fail; | |
895 | ||
896 | center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); | |
897 | state = STATE_CONNECTING_RE; | |
898 | break; | |
899 | } | |
900 | ||
901 | case STATE_CONNECTING_RE: | |
902 | { | |
903 | r = cs.is_connected(); | |
904 | if (r < 0) { | |
905 | ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl; | |
906 | if (r == -ECONNREFUSED) { | |
907 | ldout(async_msgr->cct, 2) << __func__ << " connection refused!" << dendl; | |
908 | dispatch_queue->queue_refused(this); | |
909 | } | |
910 | goto fail; | |
911 | } else if (r == 0) { | |
912 | ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl; | |
913 | if (async_msgr->get_stack()->nonblock_connect_need_writable_event()) | |
914 | center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler); | |
915 | break; | |
916 | } | |
917 | ||
918 | center->delete_file_event(cs.fd(), EVENT_WRITABLE); | |
919 | ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl; | |
920 | ||
921 | bufferlist bl; | |
922 | bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); | |
923 | r = try_send(bl); | |
924 | if (r == 0) { | |
925 | state = STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY; | |
926 | ldout(async_msgr->cct, 10) << __func__ << " connect write banner done: " | |
927 | << get_peer_addr() << dendl; | |
928 | } else if (r > 0) { | |
929 | state = STATE_WAIT_SEND; | |
930 | state_after_send = STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY; | |
931 | ldout(async_msgr->cct, 10) << __func__ << " connect wait for write banner: " | |
932 | << get_peer_addr() << dendl; | |
933 | } else { | |
934 | goto fail; | |
935 | } | |
936 | ||
937 | break; | |
938 | } | |
939 | ||
940 | case STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY: | |
941 | { | |
942 | entity_addr_t paddr, peer_addr_for_me; | |
943 | bufferlist myaddrbl; | |
944 | unsigned banner_len = strlen(CEPH_BANNER); | |
945 | unsigned need_len = banner_len + sizeof(ceph_entity_addr)*2; | |
946 | r = read_until(need_len, state_buffer); | |
947 | if (r < 0) { | |
948 | ldout(async_msgr->cct, 1) << __func__ << " read banner and identify addresses failed" << dendl; | |
949 | goto fail; | |
950 | } else if (r > 0) { | |
951 | break; | |
952 | } | |
953 | ||
954 | if (memcmp(state_buffer, CEPH_BANNER, banner_len)) { | |
955 | ldout(async_msgr->cct, 0) << __func__ << " connect protocol error (bad banner) on peer " | |
956 | << get_peer_addr() << dendl; | |
957 | goto fail; | |
958 | } | |
959 | ||
960 | bufferlist bl; | |
961 | bl.append(state_buffer+banner_len, sizeof(ceph_entity_addr)*2); | |
962 | bufferlist::iterator p = bl.begin(); | |
963 | try { | |
964 | ::decode(paddr, p); | |
965 | ::decode(peer_addr_for_me, p); | |
966 | } catch (const buffer::error& e) { | |
967 | lderr(async_msgr->cct) << __func__ << " decode peer addr failed " << dendl; | |
968 | goto fail; | |
969 | } | |
970 | ldout(async_msgr->cct, 20) << __func__ << " connect read peer addr " | |
971 | << paddr << " on socket " << cs.fd() << dendl; | |
972 | if (peer_addr != paddr) { | |
973 | if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() && | |
974 | peer_addr.get_nonce() == paddr.get_nonce()) { | |
975 | ldout(async_msgr->cct, 0) << __func__ << " connect claims to be " << paddr | |
976 | << " not " << peer_addr | |
977 | << " - presumably this is the same node!" << dendl; | |
978 | } else { | |
979 | ldout(async_msgr->cct, 0) << __func__ << " connect claims to be " | |
980 | << paddr << " not " << peer_addr << " - wrong node!" << dendl; | |
981 | goto fail; | |
982 | } | |
983 | } | |
984 | ||
985 | ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl; | |
986 | lock.unlock(); | |
987 | async_msgr->learned_addr(peer_addr_for_me); | |
988 | if (async_msgr->cct->_conf->ms_inject_internal_delays) { | |
989 | if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { | |
990 | ldout(msgr->cct, 10) << __func__ << " sleep for " | |
991 | << async_msgr->cct->_conf->ms_inject_internal_delays << dendl; | |
992 | utime_t t; | |
993 | t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); | |
994 | t.sleep(); | |
995 | } | |
996 | } | |
997 | ||
998 | lock.lock(); | |
999 | if (state != STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY) { | |
1000 | ldout(async_msgr->cct, 1) << __func__ << " state changed while learned_addr, mark_down or " | |
1001 | << " replacing must be happened just now" << dendl; | |
1002 | return 0; | |
1003 | } | |
1004 | ||
1005 | ::encode(async_msgr->get_myaddr(), myaddrbl, 0); // legacy | |
1006 | r = try_send(myaddrbl); | |
1007 | if (r == 0) { | |
1008 | state = STATE_CONNECTING_SEND_CONNECT_MSG; | |
1009 | ldout(async_msgr->cct, 10) << __func__ << " connect sent my addr " | |
1010 | << async_msgr->get_myaddr() << dendl; | |
1011 | } else if (r > 0) { | |
1012 | state = STATE_WAIT_SEND; | |
1013 | state_after_send = STATE_CONNECTING_SEND_CONNECT_MSG; | |
1014 | ldout(async_msgr->cct, 10) << __func__ << " connect send my addr done: " | |
1015 | << async_msgr->get_myaddr() << dendl; | |
1016 | } else { | |
1017 | ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, " | |
1018 | << cpp_strerror(r) << dendl; | |
1019 | goto fail; | |
1020 | } | |
1021 | ||
1022 | break; | |
1023 | } | |
1024 | ||
1025 | case STATE_CONNECTING_SEND_CONNECT_MSG: | |
1026 | { | |
1027 | if (!got_bad_auth) { | |
1028 | delete authorizer; | |
1029 | authorizer = async_msgr->get_authorizer(peer_type, false); | |
1030 | } | |
1031 | bufferlist bl; | |
1032 | ||
1033 | connect_msg.features = policy.features_supported; | |
1034 | connect_msg.host_type = async_msgr->get_myinst().name.type(); | |
1035 | connect_msg.global_seq = global_seq; | |
1036 | connect_msg.connect_seq = connect_seq; | |
1037 | connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true); | |
1038 | connect_msg.authorizer_protocol = authorizer ? authorizer->protocol : 0; | |
1039 | connect_msg.authorizer_len = authorizer ? authorizer->bl.length() : 0; | |
1040 | if (authorizer) | |
1041 | ldout(async_msgr->cct, 10) << __func__ << " connect_msg.authorizer_len=" | |
1042 | << connect_msg.authorizer_len << " protocol=" | |
1043 | << connect_msg.authorizer_protocol << dendl; | |
1044 | connect_msg.flags = 0; | |
1045 | if (policy.lossy) | |
1046 | connect_msg.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides! | |
1047 | bl.append((char*)&connect_msg, sizeof(connect_msg)); | |
1048 | if (authorizer) { | |
1049 | bl.append(authorizer->bl.c_str(), authorizer->bl.length()); | |
1050 | } | |
1051 | ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq=" | |
1052 | << connect_seq << " proto=" << connect_msg.protocol_version << dendl; | |
1053 | ||
1054 | r = try_send(bl); | |
1055 | if (r == 0) { | |
1056 | state = STATE_CONNECTING_WAIT_CONNECT_REPLY; | |
1057 | ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl; | |
1058 | } else if (r > 0) { | |
1059 | state = STATE_WAIT_SEND; | |
1060 | state_after_send = STATE_CONNECTING_WAIT_CONNECT_REPLY; | |
1061 | ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl; | |
1062 | } else { | |
1063 | ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply " | |
1064 | << cpp_strerror(r) << dendl; | |
1065 | goto fail; | |
1066 | } | |
1067 | ||
1068 | break; | |
1069 | } | |
1070 | ||
1071 | case STATE_CONNECTING_WAIT_CONNECT_REPLY: | |
1072 | { | |
1073 | r = read_until(sizeof(connect_reply), state_buffer); | |
1074 | if (r < 0) { | |
1075 | ldout(async_msgr->cct, 1) << __func__ << " read connect reply failed" << dendl; | |
1076 | goto fail; | |
1077 | } else if (r > 0) { | |
1078 | break; | |
1079 | } | |
1080 | ||
1081 | connect_reply = *((ceph_msg_connect_reply*)state_buffer); | |
1082 | ||
1083 | ldout(async_msgr->cct, 20) << __func__ << " connect got reply tag " << (int)connect_reply.tag | |
1084 | << " connect_seq " << connect_reply.connect_seq << " global_seq " | |
1085 | << connect_reply.global_seq << " proto " << connect_reply.protocol_version | |
1086 | << " flags " << (int)connect_reply.flags << " features " | |
1087 | << connect_reply.features << dendl; | |
1088 | state = STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH; | |
1089 | ||
1090 | break; | |
1091 | } | |
1092 | ||
1093 | case STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH: | |
1094 | { | |
1095 | bufferlist authorizer_reply; | |
1096 | if (connect_reply.authorizer_len) { | |
1097 | ldout(async_msgr->cct, 10) << __func__ << " reply.authorizer_len=" << connect_reply.authorizer_len << dendl; | |
1098 | assert(connect_reply.authorizer_len < 4096); | |
1099 | r = read_until(connect_reply.authorizer_len, state_buffer); | |
1100 | if (r < 0) { | |
1101 | ldout(async_msgr->cct, 1) << __func__ << " read connect reply authorizer failed" << dendl; | |
1102 | goto fail; | |
1103 | } else if (r > 0) { | |
1104 | break; | |
1105 | } | |
1106 | ||
1107 | authorizer_reply.append(state_buffer, connect_reply.authorizer_len); | |
1108 | bufferlist::iterator iter = authorizer_reply.begin(); | |
1109 | if (authorizer && !authorizer->verify_reply(iter)) { | |
1110 | ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl; | |
1111 | goto fail; | |
1112 | } | |
1113 | } | |
1114 | r = handle_connect_reply(connect_msg, connect_reply); | |
1115 | if (r < 0) | |
1116 | goto fail; | |
1117 | ||
1118 | // state must be changed! | |
1119 | assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH); | |
1120 | break; | |
1121 | } | |
1122 | ||
1123 | case STATE_CONNECTING_WAIT_ACK_SEQ: | |
1124 | { | |
1125 | uint64_t newly_acked_seq = 0; | |
1126 | ||
1127 | r = read_until(sizeof(newly_acked_seq), state_buffer); | |
1128 | if (r < 0) { | |
1129 | ldout(async_msgr->cct, 1) << __func__ << " read connect ack seq failed" << dendl; | |
1130 | goto fail; | |
1131 | } else if (r > 0) { | |
1132 | break; | |
1133 | } | |
1134 | ||
1135 | newly_acked_seq = *((uint64_t*)state_buffer); | |
1136 | ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq | |
1137 | << " vs out_seq " << out_seq.read() << dendl; | |
1138 | discard_requeued_up_to(newly_acked_seq); | |
1139 | //while (newly_acked_seq > out_seq.read()) { | |
1140 | // Message *m = _get_next_outgoing(NULL); | |
1141 | // assert(m); | |
1142 | // ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq() | |
1143 | // << " " << *m << dendl; | |
1144 | // assert(m->get_seq() <= newly_acked_seq); | |
1145 | // m->put(); | |
1146 | // out_seq.inc(); | |
1147 | //} | |
1148 | ||
1149 | bufferlist bl; | |
1150 | uint64_t s = in_seq.read(); | |
1151 | bl.append((char*)&s, sizeof(s)); | |
1152 | r = try_send(bl); | |
1153 | if (r == 0) { | |
1154 | state = STATE_CONNECTING_READY; | |
1155 | ldout(async_msgr->cct, 10) << __func__ << " send in_seq done " << dendl; | |
1156 | } else if (r > 0) { | |
1157 | state_after_send = STATE_CONNECTING_READY; | |
1158 | state = STATE_WAIT_SEND; | |
1159 | ldout(async_msgr->cct, 10) << __func__ << " continue send in_seq " << dendl; | |
1160 | } else { | |
1161 | goto fail; | |
1162 | } | |
1163 | break; | |
1164 | } | |
1165 | ||
1166 | case STATE_CONNECTING_READY: | |
1167 | { | |
1168 | // hooray! | |
1169 | peer_global_seq = connect_reply.global_seq; | |
1170 | policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY; | |
1171 | state = STATE_OPEN; | |
1172 | once_ready = true; | |
1173 | connect_seq += 1; | |
1174 | assert(connect_seq == connect_reply.connect_seq); | |
1175 | backoff = utime_t(); | |
1176 | set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features); | |
1177 | ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq | |
1178 | << ", lossy = " << policy.lossy << ", features " | |
1179 | << get_features() << dendl; | |
1180 | ||
1181 | // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the | |
1182 | // connection. PLR | |
1183 | if (authorizer != NULL) { | |
1184 | session_security.reset( | |
1185 | get_auth_session_handler(async_msgr->cct, | |
1186 | authorizer->protocol, | |
1187 | authorizer->session_key, | |
1188 | get_features())); | |
1189 | } else { | |
1190 | // We have no authorizer, so we shouldn't be applying security to messages in this AsyncConnection. PLR | |
1191 | session_security.reset(); | |
1192 | } | |
1193 | ||
1194 | if (delay_state) | |
1195 | assert(delay_state->ready()); | |
1196 | dispatch_queue->queue_connect(this); | |
1197 | async_msgr->ms_deliver_handle_fast_connect(this); | |
1198 | ||
1199 | // make sure no pending tick timer | |
1200 | if (last_tick_id) | |
1201 | center->delete_time_event(last_tick_id); | |
1202 | last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); | |
1203 | ||
1204 | // message may in queue between last _try_send and connection ready | |
1205 | // write event may already notify and we need to force scheduler again | |
1206 | write_lock.lock(); | |
1207 | can_write = WriteStatus::CANWRITE; | |
1208 | if (is_queued()) | |
1209 | center->dispatch_event_external(write_handler); | |
1210 | write_lock.unlock(); | |
1211 | maybe_start_delay_thread(); | |
1212 | break; | |
1213 | } | |
1214 | ||
1215 | case STATE_ACCEPTING: | |
1216 | { | |
1217 | bufferlist bl; | |
1218 | center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); | |
1219 | ||
1220 | bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); | |
1221 | ||
1222 | ::encode(async_msgr->get_myaddr(), bl, 0); // legacy | |
1223 | port = async_msgr->get_myaddr().get_port(); | |
1224 | ::encode(socket_addr, bl, 0); // legacy | |
1225 | ldout(async_msgr->cct, 1) << __func__ << " sd=" << cs.fd() << " " << socket_addr << dendl; | |
1226 | ||
1227 | r = try_send(bl); | |
1228 | if (r == 0) { | |
1229 | state = STATE_ACCEPTING_WAIT_BANNER_ADDR; | |
1230 | ldout(async_msgr->cct, 10) << __func__ << " write banner and addr done: " | |
1231 | << get_peer_addr() << dendl; | |
1232 | } else if (r > 0) { | |
1233 | state = STATE_WAIT_SEND; | |
1234 | state_after_send = STATE_ACCEPTING_WAIT_BANNER_ADDR; | |
1235 | ldout(async_msgr->cct, 10) << __func__ << " wait for write banner and addr: " | |
1236 | << get_peer_addr() << dendl; | |
1237 | } else { | |
1238 | goto fail; | |
1239 | } | |
1240 | ||
1241 | break; | |
1242 | } | |
1243 | case STATE_ACCEPTING_WAIT_BANNER_ADDR: | |
1244 | { | |
1245 | bufferlist addr_bl; | |
1246 | entity_addr_t peer_addr; | |
1247 | ||
1248 | r = read_until(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr), state_buffer); | |
1249 | if (r < 0) { | |
1250 | ldout(async_msgr->cct, 1) << __func__ << " read peer banner and addr failed" << dendl; | |
1251 | goto fail; | |
1252 | } else if (r > 0) { | |
1253 | break; | |
1254 | } | |
1255 | ||
1256 | if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) { | |
1257 | ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer | |
1258 | << "' (should be '" << CEPH_BANNER << "')" << dendl; | |
1259 | goto fail; | |
1260 | } | |
1261 | ||
1262 | addr_bl.append(state_buffer+strlen(CEPH_BANNER), sizeof(ceph_entity_addr)); | |
1263 | { | |
1264 | bufferlist::iterator ti = addr_bl.begin(); | |
1265 | ::decode(peer_addr, ti); | |
1266 | } | |
1267 | ||
1268 | ldout(async_msgr->cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl; | |
1269 | if (peer_addr.is_blank_ip()) { | |
1270 | // peer apparently doesn't know what ip they have; figure it out for them. | |
1271 | int port = peer_addr.get_port(); | |
1272 | peer_addr.u = socket_addr.u; | |
1273 | peer_addr.set_port(port); | |
1274 | ldout(async_msgr->cct, 0) << __func__ << " accept peer addr is really " << peer_addr | |
1275 | << " (socket is " << socket_addr << ")" << dendl; | |
1276 | } | |
1277 | set_peer_addr(peer_addr); // so that connection_state gets set up | |
1278 | state = STATE_ACCEPTING_WAIT_CONNECT_MSG; | |
1279 | break; | |
1280 | } | |
1281 | ||
1282 | case STATE_ACCEPTING_WAIT_CONNECT_MSG: | |
1283 | { | |
1284 | r = read_until(sizeof(connect_msg), state_buffer); | |
1285 | if (r < 0) { | |
1286 | ldout(async_msgr->cct, 1) << __func__ << " read connect msg failed" << dendl; | |
1287 | goto fail; | |
1288 | } else if (r > 0) { | |
1289 | break; | |
1290 | } | |
1291 | ||
1292 | connect_msg = *((ceph_msg_connect*)state_buffer); | |
1293 | state = STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH; | |
1294 | break; | |
1295 | } | |
1296 | ||
1297 | case STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH: | |
1298 | { | |
1299 | bufferlist authorizer_reply; | |
1300 | ||
1301 | if (connect_msg.authorizer_len) { | |
1302 | if (!authorizer_buf.length()) | |
1303 | authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len)); | |
1304 | ||
1305 | r = read_until(connect_msg.authorizer_len, authorizer_buf.c_str()); | |
1306 | if (r < 0) { | |
1307 | ldout(async_msgr->cct, 1) << __func__ << " read connect authorizer failed" << dendl; | |
1308 | goto fail; | |
1309 | } else if (r > 0) { | |
1310 | break; | |
1311 | } | |
1312 | } | |
1313 | ||
1314 | ldout(async_msgr->cct, 20) << __func__ << " accept got peer connect_seq " | |
1315 | << connect_msg.connect_seq << " global_seq " | |
1316 | << connect_msg.global_seq << dendl; | |
1317 | set_peer_type(connect_msg.host_type); | |
1318 | policy = async_msgr->get_policy(connect_msg.host_type); | |
1319 | ldout(async_msgr->cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type | |
1320 | << ", policy.lossy=" << policy.lossy << " policy.server=" | |
1321 | << policy.server << " policy.standby=" << policy.standby | |
1322 | << " policy.resetcheck=" << policy.resetcheck << dendl; | |
1323 | ||
1324 | r = handle_connect_msg(connect_msg, authorizer_buf, authorizer_reply); | |
1325 | if (r < 0) | |
1326 | goto fail; | |
1327 | ||
1328 | // state is changed by "handle_connect_msg" | |
1329 | assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH); | |
1330 | break; | |
1331 | } | |
1332 | ||
1333 | case STATE_ACCEPTING_WAIT_SEQ: | |
1334 | { | |
1335 | uint64_t newly_acked_seq; | |
1336 | r = read_until(sizeof(newly_acked_seq), state_buffer); | |
1337 | if (r < 0) { | |
1338 | ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl; | |
1339 | goto fail_registered; | |
1340 | } else if (r > 0) { | |
1341 | break; | |
1342 | } | |
1343 | ||
1344 | newly_acked_seq = *((uint64_t*)state_buffer); | |
1345 | ldout(async_msgr->cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq << dendl; | |
1346 | discard_requeued_up_to(newly_acked_seq); | |
1347 | state = STATE_ACCEPTING_READY; | |
1348 | break; | |
1349 | } | |
1350 | ||
1351 | case STATE_ACCEPTING_READY: | |
1352 | { | |
1353 | ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl; | |
1354 | state = STATE_OPEN; | |
1355 | memset(&connect_msg, 0, sizeof(connect_msg)); | |
1356 | ||
1357 | if (delay_state) | |
1358 | assert(delay_state->ready()); | |
1359 | // make sure no pending tick timer | |
1360 | if (last_tick_id) | |
1361 | center->delete_time_event(last_tick_id); | |
1362 | last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); | |
1363 | ||
1364 | write_lock.lock(); | |
1365 | can_write = WriteStatus::CANWRITE; | |
1366 | if (is_queued()) | |
1367 | center->dispatch_event_external(write_handler); | |
1368 | write_lock.unlock(); | |
1369 | maybe_start_delay_thread(); | |
1370 | break; | |
1371 | } | |
1372 | ||
1373 | default: | |
1374 | { | |
1375 | lderr(async_msgr->cct) << __func__ << " bad state: " << state << dendl; | |
1376 | ceph_abort(); | |
1377 | } | |
1378 | } | |
1379 | ||
1380 | return 0; | |
1381 | ||
1382 | fail_registered: | |
1383 | ldout(async_msgr->cct, 10) << "accept fault after register" << dendl; | |
1384 | inject_delay(); | |
1385 | ||
1386 | fail: | |
1387 | return -1; | |
1388 | } | |
1389 | ||
1390 | int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &reply) | |
1391 | { | |
1392 | uint64_t feat_missing; | |
1393 | if (reply.tag == CEPH_MSGR_TAG_FEATURES) { | |
1394 | ldout(async_msgr->cct, 0) << __func__ << " connect protocol feature mismatch, my " | |
1395 | << std::hex << connect.features << " < peer " | |
1396 | << reply.features << " missing " | |
1397 | << (reply.features & ~policy.features_supported) | |
1398 | << std::dec << dendl; | |
1399 | goto fail; | |
1400 | } | |
1401 | ||
1402 | if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) { | |
1403 | ldout(async_msgr->cct, 0) << __func__ << " connect protocol version mismatch, my " | |
1404 | << connect.protocol_version << " != " << reply.protocol_version | |
1405 | << dendl; | |
1406 | goto fail; | |
1407 | } | |
1408 | ||
1409 | if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) { | |
1410 | ldout(async_msgr->cct,0) << __func__ << " connect got BADAUTHORIZER" << dendl; | |
1411 | if (got_bad_auth) | |
1412 | goto fail; | |
1413 | got_bad_auth = true; | |
1414 | delete authorizer; | |
1415 | authorizer = async_msgr->get_authorizer(peer_type, true); // try harder | |
1416 | state = STATE_CONNECTING_SEND_CONNECT_MSG; | |
1417 | } | |
1418 | if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) { | |
1419 | ldout(async_msgr->cct, 0) << __func__ << " connect got RESETSESSION" << dendl; | |
1420 | was_session_reset(); | |
1421 | state = STATE_CONNECTING_SEND_CONNECT_MSG; | |
1422 | } | |
1423 | if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { | |
1424 | global_seq = async_msgr->get_global_seq(reply.global_seq); | |
1425 | ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_GLOBAL " | |
1426 | << reply.global_seq << " chose new " | |
1427 | << global_seq << dendl; | |
1428 | state = STATE_CONNECTING_SEND_CONNECT_MSG; | |
1429 | } | |
1430 | if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { | |
1431 | assert(reply.connect_seq > connect_seq); | |
1432 | ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_SESSION " | |
1433 | << connect_seq << " -> " | |
1434 | << reply.connect_seq << dendl; | |
1435 | connect_seq = reply.connect_seq; | |
1436 | state = STATE_CONNECTING_SEND_CONNECT_MSG; | |
1437 | } | |
1438 | if (reply.tag == CEPH_MSGR_TAG_WAIT) { | |
1439 | ldout(async_msgr->cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl; | |
1440 | state = STATE_WAIT; | |
1441 | } | |
1442 | ||
1443 | feat_missing = policy.features_required & ~(uint64_t)connect_reply.features; | |
1444 | if (feat_missing) { | |
1445 | ldout(async_msgr->cct, 1) << __func__ << " missing required features " << std::hex | |
1446 | << feat_missing << std::dec << dendl; | |
1447 | goto fail; | |
1448 | } | |
1449 | ||
1450 | if (reply.tag == CEPH_MSGR_TAG_SEQ) { | |
1451 | ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; | |
1452 | state = STATE_CONNECTING_WAIT_ACK_SEQ; | |
1453 | } | |
1454 | if (reply.tag == CEPH_MSGR_TAG_READY) { | |
1455 | ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl; | |
1456 | state = STATE_CONNECTING_READY; | |
1457 | } | |
1458 | ||
1459 | return 0; | |
1460 | ||
1461 | fail: | |
1462 | return -1; | |
1463 | } | |
1464 | ||
1465 | ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl, | |
1466 | bufferlist &authorizer_reply) | |
1467 | { | |
1468 | ssize_t r = 0; | |
1469 | ceph_msg_connect_reply reply; | |
1470 | bufferlist reply_bl; | |
1471 | ||
1472 | memset(&reply, 0, sizeof(reply)); | |
1473 | reply.protocol_version = async_msgr->get_proto_version(peer_type, false); | |
1474 | ||
1475 | // mismatch? | |
1476 | ldout(async_msgr->cct, 10) << __func__ << " accept my proto " << reply.protocol_version | |
1477 | << ", their proto " << connect.protocol_version << dendl; | |
1478 | if (connect.protocol_version != reply.protocol_version) { | |
1479 | return _reply_accept(CEPH_MSGR_TAG_BADPROTOVER, connect, reply, authorizer_reply); | |
1480 | } | |
1481 | // require signatures for cephx? | |
1482 | if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) { | |
1483 | if (peer_type == CEPH_ENTITY_TYPE_OSD || | |
1484 | peer_type == CEPH_ENTITY_TYPE_MDS) { | |
1485 | if (async_msgr->cct->_conf->cephx_require_signatures || | |
1486 | async_msgr->cct->_conf->cephx_cluster_require_signatures) { | |
1487 | ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for cluster" << dendl; | |
1488 | policy.features_required |= CEPH_FEATURE_MSG_AUTH; | |
1489 | } | |
1490 | } else { | |
1491 | if (async_msgr->cct->_conf->cephx_require_signatures || | |
1492 | async_msgr->cct->_conf->cephx_service_require_signatures) { | |
1493 | ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for service" << dendl; | |
1494 | policy.features_required |= CEPH_FEATURE_MSG_AUTH; | |
1495 | } | |
1496 | } | |
1497 | } | |
1498 | uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features; | |
1499 | if (feat_missing) { | |
1500 | ldout(async_msgr->cct, 1) << __func__ << " peer missing required features " | |
1501 | << std::hex << feat_missing << std::dec << dendl; | |
1502 | return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply); | |
1503 | } | |
1504 | ||
1505 | lock.unlock(); | |
1506 | ||
1507 | bool authorizer_valid; | |
1508 | if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl, | |
1509 | authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { | |
1510 | lock.lock(); | |
1511 | ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl; | |
1512 | session_security.reset(); | |
1513 | return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply); | |
1514 | } | |
1515 | ||
1516 | // We've verified the authorizer for this AsyncConnection, so set up the session security structure. PLR | |
1517 | ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl; | |
1518 | ||
1519 | // existing? | |
1520 | AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr); | |
1521 | ||
1522 | inject_delay(); | |
1523 | ||
1524 | lock.lock(); | |
1525 | if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { | |
1526 | ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl; | |
1527 | assert(state == STATE_CLOSED); | |
1528 | goto fail; | |
1529 | } | |
1530 | ||
1531 | if (existing == this) | |
1532 | existing = NULL; | |
1533 | if (existing) { | |
1534 | // There is no possible that existing connection will acquire this | |
1535 | // connection's lock | |
1536 | existing->lock.lock(); // skip lockdep check (we are locking a second AsyncConnection here) | |
1537 | ||
1538 | if (existing->replacing || existing->state == STATE_CLOSED) { | |
1539 | ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing." | |
1540 | << " existing_state=" << get_state_name(existing->state) << dendl; | |
1541 | reply.global_seq = existing->peer_global_seq; | |
1542 | r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); | |
1543 | existing->lock.unlock(); | |
1544 | if (r < 0) | |
1545 | goto fail; | |
1546 | return 0; | |
1547 | } | |
1548 | ||
1549 | if (connect.global_seq < existing->peer_global_seq) { | |
1550 | ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing | |
1551 | << ".gseq " << existing->peer_global_seq << " > " | |
1552 | << connect.global_seq << ", RETRY_GLOBAL" << dendl; | |
1553 | reply.global_seq = existing->peer_global_seq; // so we can send it below.. | |
1554 | existing->lock.unlock(); | |
1555 | return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); | |
1556 | } else { | |
1557 | ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing | |
1558 | << ".gseq " << existing->peer_global_seq | |
1559 | << " <= " << connect.global_seq << ", looks ok" << dendl; | |
1560 | } | |
1561 | ||
1562 | if (existing->policy.lossy) { | |
1563 | ldout(async_msgr->cct, 0) << __func__ << " accept replacing existing (lossy) channel (new one lossy=" | |
1564 | << policy.lossy << ")" << dendl; | |
1565 | existing->was_session_reset(); | |
1566 | goto replace; | |
1567 | } | |
1568 | ||
1569 | ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq | |
1570 | << " vs existing csq=" << existing->connect_seq << " existing_state=" | |
1571 | << get_state_name(existing->state) << dendl; | |
1572 | ||
1573 | if (connect.connect_seq == 0 && existing->connect_seq > 0) { | |
1574 | ldout(async_msgr->cct,0) << __func__ << " accept peer reset, then tried to connect to us, replacing" << dendl; | |
1575 | // this is a hard reset from peer | |
1576 | is_reset_from_peer = true; | |
1577 | if (policy.resetcheck) | |
1578 | existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s | |
1579 | goto replace; | |
1580 | } | |
1581 | ||
1582 | if (connect.connect_seq < existing->connect_seq) { | |
1583 | // old attempt, or we sent READY but they didn't get it. | |
1584 | ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing << ".cseq " | |
1585 | << existing->connect_seq << " > " << connect.connect_seq | |
1586 | << ", RETRY_SESSION" << dendl; | |
1587 | reply.connect_seq = existing->connect_seq + 1; | |
1588 | existing->lock.unlock(); | |
1589 | return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); | |
1590 | } | |
1591 | ||
1592 | if (connect.connect_seq == existing->connect_seq) { | |
1593 | // if the existing connection successfully opened, and/or | |
1594 | // subsequently went to standby, then the peer should bump | |
1595 | // their connect_seq and retry: this is not a connection race | |
1596 | // we need to resolve here. | |
1597 | if (existing->state == STATE_OPEN || | |
1598 | existing->state == STATE_STANDBY) { | |
1599 | ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing | |
1600 | << ".cseq " << existing->connect_seq << " == " | |
1601 | << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl; | |
1602 | reply.connect_seq = existing->connect_seq + 1; | |
1603 | existing->lock.unlock(); | |
1604 | return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); | |
1605 | } | |
1606 | ||
1607 | // connection race? | |
1608 | if (peer_addr < async_msgr->get_myaddr() || existing->policy.server) { | |
1609 | // incoming wins | |
1610 | ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing | |
1611 | << ".cseq " << existing->connect_seq << " == " << connect.connect_seq | |
1612 | << ", or we are server, replacing my attempt" << dendl; | |
1613 | goto replace; | |
1614 | } else { | |
1615 | // our existing outgoing wins | |
1616 | ldout(async_msgr->cct,10) << __func__ << " accept connection race, existing " | |
1617 | << existing << ".cseq " << existing->connect_seq | |
1618 | << " == " << connect.connect_seq << ", sending WAIT" << dendl; | |
1619 | assert(peer_addr > async_msgr->get_myaddr()); | |
1620 | existing->lock.unlock(); | |
1621 | return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply); | |
1622 | } | |
1623 | } | |
1624 | ||
1625 | assert(connect.connect_seq > existing->connect_seq); | |
1626 | assert(connect.global_seq >= existing->peer_global_seq); | |
1627 | if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other | |
1628 | existing->connect_seq == 0) { | |
1629 | ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " | |
1630 | << connect.connect_seq << ", " << existing << ".cseq = " | |
1631 | << existing->connect_seq << "), sending RESETSESSION" << dendl; | |
1632 | existing->lock.unlock(); | |
1633 | return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); | |
1634 | } | |
1635 | ||
1636 | // reconnect | |
1637 | ldout(async_msgr->cct, 10) << __func__ << " accept peer sent cseq " << connect.connect_seq | |
1638 | << " > " << existing->connect_seq << dendl; | |
1639 | goto replace; | |
1640 | } // existing | |
1641 | else if (!replacing && connect.connect_seq > 0) { | |
1642 | // we reset, and they are opening a new session | |
1643 | ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " | |
1644 | << connect.connect_seq << "), sending RESETSESSION" << dendl; | |
1645 | return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); | |
1646 | } else { | |
1647 | // new session | |
1648 | ldout(async_msgr->cct, 10) << __func__ << " accept new session" << dendl; | |
1649 | existing = NULL; | |
1650 | goto open; | |
1651 | } | |
1652 | ceph_abort(); | |
1653 | ||
1654 | replace: | |
1655 | ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl; | |
1656 | ||
1657 | inject_delay(); | |
1658 | if (existing->policy.lossy) { | |
1659 | // disconnect from the Connection | |
1660 | ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl; | |
1661 | existing->_stop(); | |
1662 | existing->dispatch_queue->queue_reset(existing.get()); | |
1663 | } else { | |
1664 | assert(can_write == WriteStatus::NOWRITE); | |
1665 | existing->write_lock.lock(); | |
1666 | ||
1667 | // reset the in_seq if this is a hard reset from peer, | |
1668 | // otherwise we respect our original connection's value | |
1669 | if (is_reset_from_peer) { | |
1670 | existing->is_reset_from_peer = true; | |
1671 | } | |
1672 | ||
1673 | center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); | |
1674 | ||
1675 | // Clean up output buffer | |
1676 | existing->outcoming_bl.clear(); | |
1677 | if (existing->delay_state) { | |
1678 | existing->delay_state->flush(); | |
1679 | assert(!delay_state); | |
1680 | } | |
1681 | existing->requeue_sent(); | |
1682 | existing->reset_recv_state(); | |
1683 | ||
1684 | auto temp_cs = std::move(cs); | |
1685 | EventCenter *new_center = center; | |
1686 | Worker *new_worker = worker; | |
1687 | // avoid _stop shutdown replacing socket | |
1688 | // queue a reset on the new connection, which we're dumping for the old | |
1689 | _stop(); | |
1690 | ||
1691 | dispatch_queue->queue_reset(this); | |
1692 | ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl; | |
1693 | existing->can_write = WriteStatus::REPLACING; | |
1694 | existing->open_write = false; | |
1695 | existing->replacing = true; | |
1696 | existing->state_offset = 0; | |
1697 | // avoid previous thread modify event | |
1698 | existing->state = STATE_NONE; | |
1699 | // Discard existing prefetch buffer in `recv_buf` | |
1700 | existing->recv_start = existing->recv_end = 0; | |
1701 | // there shouldn't exist any buffer | |
1702 | assert(recv_start == recv_end); | |
1703 | ||
1704 | auto deactivate_existing = std::bind( | |
1705 | [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable { | |
1706 | // we need to delete time event in original thread | |
1707 | { | |
1708 | std::lock_guard<std::mutex> l(existing->lock); | |
1709 | if (existing->state == STATE_NONE) { | |
1710 | existing->shutdown_socket(); | |
1711 | existing->cs = std::move(cs); | |
1712 | existing->worker->references--; | |
1713 | new_worker->references++; | |
1714 | existing->logger = new_worker->get_perf_counter(); | |
1715 | existing->worker = new_worker; | |
1716 | existing->center = new_center; | |
1717 | if (existing->delay_state) | |
1718 | existing->delay_state->set_center(new_center); | |
1719 | } else if (existing->state == STATE_CLOSED) { | |
1720 | auto back_to_close = std::bind( | |
1721 | [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs)); | |
1722 | new_center->submit_to( | |
1723 | new_center->get_id(), std::move(back_to_close), true); | |
1724 | return ; | |
1725 | } else { | |
1726 | ceph_abort(); | |
1727 | } | |
1728 | } | |
1729 | ||
1730 | // Before changing existing->center, it may already exists some events in existing->center's queue. | |
1731 | // Then if we mark down `existing`, it will execute in another thread and clean up connection. | |
1732 | // Previous event will result in segment fault | |
1733 | auto transfer_existing = [existing, connect, reply, authorizer_reply]() mutable { | |
1734 | std::lock_guard<std::mutex> l(existing->lock); | |
1735 | if (existing->state == STATE_CLOSED) | |
1736 | return ; | |
1737 | assert(existing->state == STATE_NONE); | |
1738 | ||
1739 | existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; | |
1740 | existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler); | |
1741 | reply.global_seq = existing->peer_global_seq; | |
1742 | if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { | |
1743 | // handle error | |
1744 | existing->fault(); | |
1745 | } | |
1746 | }; | |
1747 | if (existing->center->in_thread()) | |
1748 | transfer_existing(); | |
1749 | else | |
1750 | existing->center->submit_to( | |
1751 | existing->center->get_id(), std::move(transfer_existing), true); | |
1752 | }, std::move(temp_cs)); | |
1753 | ||
1754 | existing->center->submit_to( | |
1755 | existing->center->get_id(), std::move(deactivate_existing), true); | |
1756 | existing->write_lock.unlock(); | |
1757 | existing->lock.unlock(); | |
1758 | return 0; | |
1759 | } | |
1760 | existing->lock.unlock(); | |
1761 | ||
1762 | open: | |
1763 | connect_seq = connect.connect_seq + 1; | |
1764 | peer_global_seq = connect.global_seq; | |
1765 | ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = " | |
1766 | << connect_seq << " in_seq=" << in_seq.read() << ", sending READY" << dendl; | |
1767 | ||
1768 | int next_state; | |
1769 | ||
1770 | // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence | |
1771 | if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) { | |
1772 | reply.tag = CEPH_MSGR_TAG_SEQ; | |
1773 | next_state = STATE_ACCEPTING_WAIT_SEQ; | |
1774 | } else { | |
1775 | reply.tag = CEPH_MSGR_TAG_READY; | |
1776 | next_state = STATE_ACCEPTING_READY; | |
1777 | discard_requeued_up_to(0); | |
1778 | is_reset_from_peer = false; | |
1779 | in_seq.set(0); | |
1780 | } | |
1781 | ||
1782 | // send READY reply | |
1783 | reply.features = policy.features_supported; | |
1784 | reply.global_seq = async_msgr->get_global_seq(); | |
1785 | reply.connect_seq = connect_seq; | |
1786 | reply.flags = 0; | |
1787 | reply.authorizer_len = authorizer_reply.length(); | |
1788 | if (policy.lossy) | |
1789 | reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; | |
1790 | ||
1791 | set_features((uint64_t)reply.features & (uint64_t)connect.features); | |
1792 | ldout(async_msgr->cct, 10) << __func__ << " accept features " << get_features() << dendl; | |
1793 | ||
1794 | session_security.reset( | |
1795 | get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol, | |
1796 | session_key, get_features())); | |
1797 | ||
1798 | reply_bl.append((char*)&reply, sizeof(reply)); | |
1799 | ||
1800 | if (reply.authorizer_len) | |
1801 | reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); | |
1802 | ||
1803 | if (reply.tag == CEPH_MSGR_TAG_SEQ) { | |
1804 | uint64_t s = in_seq.read(); | |
1805 | reply_bl.append((char*)&s, sizeof(s)); | |
1806 | } | |
1807 | ||
1808 | lock.unlock(); | |
1809 | // Because "replacing" will prevent other connections preempt this addr, | |
1810 | // it's safe that here we don't acquire Connection's lock | |
1811 | r = async_msgr->accept_conn(this); | |
1812 | ||
1813 | inject_delay(); | |
1814 | ||
1815 | lock.lock(); | |
1816 | replacing = false; | |
1817 | if (r < 0) { | |
1818 | ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr | |
1819 | << " just fail later one(this)" << dendl; | |
1820 | goto fail_registered; | |
1821 | } | |
1822 | if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { | |
1823 | ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl; | |
1824 | assert(state == STATE_CLOSED); | |
1825 | goto fail_registered; | |
1826 | } | |
1827 | ||
1828 | r = try_send(reply_bl); | |
1829 | if (r < 0) | |
1830 | goto fail_registered; | |
1831 | ||
1832 | // notify | |
1833 | dispatch_queue->queue_accept(this); | |
1834 | async_msgr->ms_deliver_handle_fast_accept(this); | |
1835 | once_ready = true; | |
1836 | ||
1837 | if (r == 0) { | |
1838 | state = next_state; | |
1839 | ldout(async_msgr->cct, 2) << __func__ << " accept write reply msg done" << dendl; | |
1840 | } else { | |
1841 | state = STATE_WAIT_SEND; | |
1842 | state_after_send = next_state; | |
1843 | } | |
1844 | ||
1845 | return 0; | |
1846 | ||
1847 | fail_registered: | |
1848 | ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl; | |
1849 | inject_delay(); | |
1850 | ||
1851 | fail: | |
1852 | ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl; | |
1853 | return -1; | |
1854 | } | |
1855 | ||
1856 | void AsyncConnection::_connect() | |
1857 | { | |
1858 | ldout(async_msgr->cct, 10) << __func__ << " csq=" << connect_seq << dendl; | |
1859 | ||
1860 | state = STATE_CONNECTING; | |
1861 | // rescheduler connection in order to avoid lock dep | |
1862 | // may called by external thread(send_message) | |
1863 | center->dispatch_event_external(read_handler); | |
1864 | } | |
1865 | ||
1866 | void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr) | |
1867 | { | |
1868 | ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl; | |
1869 | assert(socket.fd() >= 0); | |
1870 | ||
1871 | std::lock_guard<std::mutex> l(lock); | |
1872 | cs = std::move(socket); | |
1873 | socket_addr = addr; | |
1874 | state = STATE_ACCEPTING; | |
1875 | // rescheduler connection in order to avoid lock dep | |
1876 | center->dispatch_event_external(read_handler); | |
1877 | } | |
1878 | ||
1879 | int AsyncConnection::send_message(Message *m) | |
1880 | { | |
1881 | FUNCTRACE(); | |
1882 | lgeneric_subdout(async_msgr->cct, ms, | |
1883 | 1) << "-- " << async_msgr->get_myaddr() << " --> " | |
1884 | << get_peer_addr() << " -- " | |
1885 | << *m << " -- " << m << " con " | |
1886 | << m->get_connection().get() | |
1887 | << dendl; | |
1888 | ||
1889 | // optimistic think it's ok to encode(actually may broken now) | |
1890 | if (!m->get_priority()) | |
1891 | m->set_priority(async_msgr->get_default_send_priority()); | |
1892 | ||
1893 | m->get_header().src = async_msgr->get_myname(); | |
1894 | m->set_connection(this); | |
1895 | ||
1896 | if (m->get_type() == CEPH_MSG_OSD_OP) | |
1897 | OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true); | |
1898 | else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) | |
1899 | OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true); | |
1900 | ||
1901 | if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection | |
1902 | ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; | |
1903 | std::lock_guard<std::mutex> l(write_lock); | |
1904 | if (can_write != WriteStatus::CLOSED) { | |
1905 | dispatch_queue->local_delivery(m, m->get_priority()); | |
1906 | } else { | |
1907 | ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed." | |
1908 | << " Drop message " << m << dendl; | |
1909 | m->put(); | |
1910 | } | |
1911 | return 0; | |
1912 | } | |
1913 | ||
1914 | last_active = ceph::coarse_mono_clock::now(); | |
1915 | // we don't want to consider local message here, it's too lightweight which | |
1916 | // may disturb users | |
1917 | logger->inc(l_msgr_send_messages); | |
1918 | ||
1919 | bufferlist bl; | |
1920 | uint64_t f = get_features(); | |
1921 | ||
1922 | // TODO: Currently not all messages supports reencode like MOSDMap, so here | |
1923 | // only let fast dispatch support messages prepare message | |
1924 | bool can_fast_prepare = async_msgr->ms_can_fast_dispatch(m); | |
1925 | if (can_fast_prepare) | |
1926 | prepare_send_message(f, m, bl); | |
1927 | ||
1928 | std::lock_guard<std::mutex> l(write_lock); | |
1929 | // "features" changes will change the payload encoding | |
1930 | if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) { | |
1931 | // ensure the correctness of message encoding | |
1932 | bl.clear(); | |
1933 | m->get_payload().clear(); | |
1934 | ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous " | |
1935 | << f << " != " << get_features() << dendl; | |
1936 | } | |
1937 | if (!is_queued() && can_write == WriteStatus::CANWRITE && async_msgr->cct->_conf->ms_async_send_inline) { | |
1938 | if (!bl.length()) | |
1939 | prepare_send_message(get_features(), m, bl); | |
1940 | logger->inc(l_msgr_send_messages_inline); | |
1941 | if (write_message(m, bl, false) < 0) { | |
1942 | ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; | |
1943 | // we want to handle fault within internal thread | |
1944 | center->dispatch_event_external(write_handler); | |
1945 | } | |
1946 | } else if (can_write == WriteStatus::CLOSED) { | |
1947 | ldout(async_msgr->cct, 10) << __func__ << " connection closed." | |
1948 | << " Drop message " << m << dendl; | |
1949 | m->put(); | |
1950 | } else { | |
1951 | m->trace.event("async enqueueing message"); | |
1952 | out_q[m->get_priority()].emplace_back(std::move(bl), m); | |
1953 | ldout(async_msgr->cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl; | |
1954 | if (can_write != WriteStatus::REPLACING) | |
1955 | center->dispatch_event_external(write_handler); | |
1956 | } | |
1957 | return 0; | |
1958 | } | |
1959 | ||
1960 | void AsyncConnection::requeue_sent() | |
1961 | { | |
1962 | if (sent.empty()) | |
1963 | return; | |
1964 | ||
1965 | list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; | |
1966 | while (!sent.empty()) { | |
1967 | Message* m = sent.back(); | |
1968 | sent.pop_back(); | |
1969 | ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend " | |
1970 | << " (" << m->get_seq() << ")" << dendl; | |
1971 | rq.push_front(make_pair(bufferlist(), m)); | |
1972 | out_seq.dec(); | |
1973 | } | |
1974 | } | |
1975 | ||
1976 | void AsyncConnection::discard_requeued_up_to(uint64_t seq) | |
1977 | { | |
1978 | ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl; | |
1979 | std::lock_guard<std::mutex> l(write_lock); | |
1980 | if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) | |
1981 | return; | |
1982 | list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; | |
1983 | while (!rq.empty()) { | |
1984 | pair<bufferlist, Message*> p = rq.front(); | |
1985 | if (p.second->get_seq() == 0 || p.second->get_seq() > seq) | |
1986 | break; | |
1987 | ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq " << p.second->get_seq() | |
1988 | << " <= " << seq << ", discarding" << dendl; | |
1989 | p.second->put(); | |
1990 | rq.pop_front(); | |
1991 | out_seq.inc(); | |
1992 | } | |
1993 | if (rq.empty()) | |
1994 | out_q.erase(CEPH_MSG_PRIO_HIGHEST); | |
1995 | } | |
1996 | ||
1997 | /* | |
1998 | * Tears down the AsyncConnection's message queues, and removes them from the DispatchQueue | |
1999 | * Must hold pipe_lock prior to calling. | |
2000 | */ | |
2001 | void AsyncConnection::discard_out_queue() | |
2002 | { | |
2003 | ldout(async_msgr->cct, 10) << __func__ << " started" << dendl; | |
2004 | ||
2005 | for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) { | |
2006 | ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl; | |
2007 | (*p)->put(); | |
2008 | } | |
2009 | sent.clear(); | |
2010 | for (map<int, list<pair<bufferlist, Message*> > >::iterator p = out_q.begin(); p != out_q.end(); ++p) | |
2011 | for (list<pair<bufferlist, Message*> >::iterator r = p->second.begin(); r != p->second.end(); ++r) { | |
2012 | ldout(async_msgr->cct, 20) << __func__ << " discard " << r->second << dendl; | |
2013 | r->second->put(); | |
2014 | } | |
2015 | out_q.clear(); | |
2016 | outcoming_bl.clear(); | |
2017 | } | |
2018 | ||
2019 | int AsyncConnection::randomize_out_seq() | |
2020 | { | |
2021 | if (get_features() & CEPH_FEATURE_MSG_AUTH) { | |
2022 | // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error | |
2023 | // here. We'll check it on the call. PLR | |
2024 | uint64_t rand_seq; | |
2025 | int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq)); | |
2026 | rand_seq &= SEQ_MASK; | |
2027 | lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl; | |
2028 | out_seq.set(rand_seq); | |
2029 | return seq_error; | |
2030 | } else { | |
2031 | // previously, seq #'s always started at 0. | |
2032 | out_seq.set(0); | |
2033 | return 0; | |
2034 | } | |
2035 | } | |
2036 | ||
2037 | void AsyncConnection::fault() | |
2038 | { | |
2039 | if (state == STATE_CLOSED || state == STATE_NONE) { | |
2040 | ldout(async_msgr->cct, 10) << __func__ << " connection is already closed" << dendl; | |
2041 | return ; | |
2042 | } | |
2043 | ||
2044 | if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) { | |
2045 | ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl; | |
2046 | _stop(); | |
2047 | dispatch_queue->queue_reset(this); | |
2048 | return ; | |
2049 | } | |
2050 | ||
2051 | write_lock.lock(); | |
2052 | can_write = WriteStatus::NOWRITE; | |
2053 | shutdown_socket(); | |
2054 | open_write = false; | |
2055 | ||
2056 | // queue delayed items immediately | |
2057 | if (delay_state) | |
2058 | delay_state->flush(); | |
2059 | // requeue sent items | |
2060 | requeue_sent(); | |
2061 | recv_start = recv_end = 0; | |
2062 | state_offset = 0; | |
2063 | replacing = false; | |
2064 | is_reset_from_peer = false; | |
2065 | outcoming_bl.clear(); | |
2066 | if (!once_ready && !is_queued() && | |
2067 | state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { | |
2068 | ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half " | |
2069 | << " accept state just closed" << dendl; | |
2070 | write_lock.unlock(); | |
2071 | _stop(); | |
2072 | dispatch_queue->queue_reset(this); | |
2073 | return ; | |
2074 | } | |
2075 | reset_recv_state(); | |
2076 | if (policy.standby && !is_queued() && state != STATE_WAIT) { | |
2077 | ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl; | |
2078 | state = STATE_STANDBY; | |
2079 | write_lock.unlock(); | |
2080 | return; | |
2081 | } | |
2082 | ||
2083 | write_lock.unlock(); | |
2084 | if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY) && | |
2085 | state != STATE_WAIT) { // STATE_WAIT is coming from STATE_CONNECTING_* | |
2086 | // policy maybe empty when state is in accept | |
2087 | if (policy.server) { | |
2088 | ldout(async_msgr->cct, 0) << __func__ << " server, going to standby" << dendl; | |
2089 | state = STATE_STANDBY; | |
2090 | } else { | |
2091 | ldout(async_msgr->cct, 0) << __func__ << " initiating reconnect" << dendl; | |
2092 | connect_seq++; | |
2093 | state = STATE_CONNECTING; | |
2094 | } | |
2095 | backoff = utime_t(); | |
2096 | center->dispatch_event_external(read_handler); | |
2097 | } else { | |
2098 | if (state == STATE_WAIT) { | |
2099 | backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff); | |
2100 | } else if (backoff == utime_t()) { | |
2101 | backoff.set_from_double(async_msgr->cct->_conf->ms_initial_backoff); | |
2102 | } else { | |
2103 | backoff += backoff; | |
2104 | if (backoff > async_msgr->cct->_conf->ms_max_backoff) | |
2105 | backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff); | |
2106 | } | |
2107 | ||
2108 | state = STATE_CONNECTING; | |
2109 | ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl; | |
2110 | // woke up again; | |
2111 | register_time_events.insert(center->create_time_event( | |
2112 | backoff.to_nsec()/1000, wakeup_handler)); | |
2113 | } | |
2114 | } | |
2115 | ||
2116 | void AsyncConnection::was_session_reset() | |
2117 | { | |
2118 | ldout(async_msgr->cct,10) << __func__ << " started" << dendl; | |
2119 | std::lock_guard<std::mutex> l(write_lock); | |
2120 | if (delay_state) | |
2121 | delay_state->discard(); | |
2122 | dispatch_queue->discard_queue(conn_id); | |
2123 | discard_out_queue(); | |
2124 | ||
2125 | dispatch_queue->queue_remote_reset(this); | |
2126 | ||
2127 | if (randomize_out_seq()) { | |
2128 | ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl; | |
2129 | } | |
2130 | ||
2131 | in_seq.set(0); | |
2132 | connect_seq = 0; | |
2133 | // it's safe to directly set 0, double locked | |
2134 | ack_left.set(0); | |
2135 | once_ready = false; | |
2136 | can_write = WriteStatus::NOWRITE; | |
2137 | } | |
2138 | ||
2139 | void AsyncConnection::_stop() | |
2140 | { | |
2141 | if (state == STATE_CLOSED) | |
2142 | return ; | |
2143 | ||
2144 | if (delay_state) | |
2145 | delay_state->flush(); | |
2146 | ||
2147 | ldout(async_msgr->cct, 2) << __func__ << dendl; | |
2148 | std::lock_guard<std::mutex> l(write_lock); | |
2149 | ||
2150 | reset_recv_state(); | |
2151 | dispatch_queue->discard_queue(conn_id); | |
2152 | discard_out_queue(); | |
2153 | async_msgr->unregister_conn(this); | |
2154 | worker->release_worker(); | |
2155 | ||
2156 | state = STATE_CLOSED; | |
2157 | open_write = false; | |
2158 | can_write = WriteStatus::CLOSED; | |
2159 | state_offset = 0; | |
2160 | // Make sure in-queue events will been processed | |
2161 | center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); | |
2162 | } | |
2163 | ||
2164 | void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl) | |
2165 | { | |
2166 | ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl; | |
2167 | ||
2168 | // associate message with Connection (for benefit of encode_payload) | |
2169 | if (m->empty_payload()) | |
2170 | ldout(async_msgr->cct, 20) << __func__ << " encoding features " | |
2171 | << features << " " << m << " " << *m << dendl; | |
2172 | else | |
2173 | ldout(async_msgr->cct, 20) << __func__ << " half-reencoding features " | |
2174 | << features << " " << m << " " << *m << dendl; | |
2175 | ||
2176 | // encode and copy out of *m | |
2177 | m->encode(features, msgr->crcflags); | |
2178 | ||
2179 | bl.append(m->get_payload()); | |
2180 | bl.append(m->get_middle()); | |
2181 | bl.append(m->get_data()); | |
2182 | } | |
2183 | ||
2184 | ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) | |
2185 | { | |
2186 | FUNCTRACE(); | |
2187 | assert(can_write == WriteStatus::CANWRITE); | |
2188 | m->set_seq(out_seq.inc()); | |
2189 | ||
2190 | if (!policy.lossy) { | |
2191 | // put on sent list | |
2192 | sent.push_back(m); | |
2193 | m->get(); | |
2194 | } | |
2195 | ||
2196 | if (msgr->crcflags & MSG_CRC_HEADER) | |
2197 | m->calc_header_crc(); | |
2198 | ||
2199 | ceph_msg_header& header = m->get_header(); | |
2200 | ceph_msg_footer& footer = m->get_footer(); | |
2201 | ||
2202 | // TODO: let sign_message could be reentry? | |
2203 | // Now that we have all the crcs calculated, handle the | |
2204 | // digital signature for the message, if the AsyncConnection has session | |
2205 | // security set up. Some session security options do not | |
2206 | // actually calculate and check the signature, but they should | |
2207 | // handle the calls to sign_message and check_signature. PLR | |
2208 | if (session_security.get() == NULL) { | |
2209 | ldout(async_msgr->cct, 20) << __func__ << " no session security" << dendl; | |
2210 | } else { | |
2211 | if (session_security->sign_message(m)) { | |
2212 | ldout(async_msgr->cct, 20) << __func__ << " failed to sign m=" | |
2213 | << m << "): sig = " << footer.sig << dendl; | |
2214 | } else { | |
2215 | ldout(async_msgr->cct, 20) << __func__ << " signed m=" << m | |
2216 | << "): sig = " << footer.sig << dendl; | |
2217 | } | |
2218 | } | |
2219 | ||
2220 | unsigned original_bl_len = outcoming_bl.length(); | |
2221 | ||
2222 | outcoming_bl.append(CEPH_MSGR_TAG_MSG); | |
2223 | ||
2224 | if (has_feature(CEPH_FEATURE_NOSRCADDR)) { | |
2225 | outcoming_bl.append((char*)&header, sizeof(header)); | |
2226 | } else { | |
2227 | ceph_msg_header_old oldheader; | |
2228 | memcpy(&oldheader, &header, sizeof(header)); | |
2229 | oldheader.src.name = header.src; | |
2230 | oldheader.src.addr = get_peer_addr(); | |
2231 | oldheader.orig_src = oldheader.src; | |
2232 | oldheader.reserved = header.reserved; | |
2233 | oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader, | |
2234 | sizeof(oldheader) - sizeof(oldheader.crc)); | |
2235 | outcoming_bl.append((char*)&oldheader, sizeof(oldheader)); | |
2236 | } | |
2237 | ||
2238 | ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type | |
2239 | << " src " << entity_name_t(header.src) | |
2240 | << " front=" << header.front_len | |
2241 | << " data=" << header.data_len | |
2242 | << " off " << header.data_off << dendl; | |
2243 | ||
2244 | if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) { | |
2245 | std::list<buffer::ptr>::const_iterator pb; | |
2246 | for (pb = bl.buffers().begin(); pb != bl.buffers().end(); ++pb) { | |
2247 | outcoming_bl.append((char*)pb->c_str(), pb->length()); | |
2248 | } | |
2249 | } else { | |
2250 | outcoming_bl.claim_append(bl); | |
2251 | } | |
2252 | ||
2253 | // send footer; if receiver doesn't support signatures, use the old footer format | |
2254 | ceph_msg_footer_old old_footer; | |
2255 | if (has_feature(CEPH_FEATURE_MSG_AUTH)) { | |
2256 | outcoming_bl.append((char*)&footer, sizeof(footer)); | |
2257 | } else { | |
2258 | if (msgr->crcflags & MSG_CRC_HEADER) { | |
2259 | old_footer.front_crc = footer.front_crc; | |
2260 | old_footer.middle_crc = footer.middle_crc; | |
2261 | old_footer.data_crc = footer.data_crc; | |
2262 | } else { | |
2263 | old_footer.front_crc = old_footer.middle_crc = 0; | |
2264 | } | |
2265 | old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0; | |
2266 | old_footer.flags = footer.flags; | |
2267 | outcoming_bl.append((char*)&old_footer, sizeof(old_footer)); | |
2268 | } | |
2269 | ||
2270 | m->trace.event("async writing message"); | |
2271 | logger->inc(l_msgr_send_bytes, outcoming_bl.length() - original_bl_len); | |
2272 | ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq() | |
2273 | << " " << m << dendl; | |
2274 | ssize_t rc = _try_send(more); | |
2275 | if (rc < 0) { | |
2276 | ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " | |
2277 | << cpp_strerror(rc) << dendl; | |
2278 | } else if (rc == 0) { | |
2279 | ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl; | |
2280 | } else { | |
2281 | ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl; | |
2282 | } | |
2283 | if (m->get_type() == CEPH_MSG_OSD_OP) | |
2284 | OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false); | |
2285 | else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) | |
2286 | OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false); | |
2287 | m->put(); | |
2288 | ||
2289 | return rc; | |
2290 | } | |
2291 | ||
2292 | void AsyncConnection::reset_recv_state() | |
2293 | { | |
2294 | // clean up state internal variables and states | |
2295 | if (state >= STATE_CONNECTING_SEND_CONNECT_MSG && | |
2296 | state <= STATE_CONNECTING_READY) { | |
2297 | delete authorizer; | |
2298 | authorizer = NULL; | |
2299 | got_bad_auth = false; | |
2300 | } | |
2301 | ||
2302 | if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE && | |
2303 | state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH | |
2304 | && policy.throttler_messages) { | |
2305 | ldout(async_msgr->cct, 10) << __func__ << " releasing " << 1 | |
2306 | << " message to policy throttler " | |
2307 | << policy.throttler_messages->get_current() << "/" | |
2308 | << policy.throttler_messages->get_max() << dendl; | |
2309 | policy.throttler_messages->put(); | |
2310 | } | |
2311 | if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES && | |
2312 | state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { | |
2313 | if (policy.throttler_bytes) { | |
2314 | ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size | |
2315 | << " bytes to policy throttler " | |
2316 | << policy.throttler_bytes->get_current() << "/" | |
2317 | << policy.throttler_bytes->get_max() << dendl; | |
2318 | policy.throttler_bytes->put(cur_msg_size); | |
2319 | } | |
2320 | } | |
2321 | if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE && | |
2322 | state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { | |
2323 | ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size | |
2324 | << " bytes to dispatch_queue throttler " | |
2325 | << dispatch_queue->dispatch_throttler.get_current() << "/" | |
2326 | << dispatch_queue->dispatch_throttler.get_max() << dendl; | |
2327 | dispatch_queue->dispatch_throttle_release(cur_msg_size); | |
2328 | } | |
2329 | } | |
2330 | ||
2331 | void AsyncConnection::handle_ack(uint64_t seq) | |
2332 | { | |
2333 | ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl; | |
2334 | // trim sent list | |
2335 | std::lock_guard<std::mutex> l(write_lock); | |
2336 | while (!sent.empty() && sent.front()->get_seq() <= seq) { | |
2337 | Message* m = sent.front(); | |
2338 | sent.pop_front(); | |
2339 | ldout(async_msgr->cct, 10) << __func__ << " got ack seq " | |
2340 | << seq << " >= " << m->get_seq() << " on " | |
2341 | << m << " " << *m << dendl; | |
2342 | m->put(); | |
2343 | } | |
2344 | } | |
2345 | ||
2346 | void AsyncConnection::DelayedDelivery::do_request(int id) | |
2347 | { | |
2348 | Message *m = nullptr; | |
2349 | { | |
2350 | std::lock_guard<std::mutex> l(delay_lock); | |
2351 | register_time_events.erase(id); | |
2352 | if (stop_dispatch) | |
2353 | return ; | |
2354 | if (delay_queue.empty()) | |
2355 | return ; | |
2356 | utime_t release = delay_queue.front().first; | |
2357 | m = delay_queue.front().second; | |
2358 | string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type; | |
2359 | utime_t now = ceph_clock_now(); | |
2360 | if ((release > now && | |
2361 | (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) { | |
2362 | utime_t t = release - now; | |
2363 | t.sleep(); | |
2364 | } | |
2365 | delay_queue.pop_front(); | |
2366 | } | |
2367 | if (msgr->ms_can_fast_dispatch(m)) { | |
2368 | dispatch_queue->fast_dispatch(m); | |
2369 | } else { | |
2370 | dispatch_queue->enqueue(m, m->get_priority(), conn_id); | |
2371 | } | |
2372 | } | |
2373 | ||
2374 | void AsyncConnection::DelayedDelivery::flush() { | |
2375 | stop_dispatch = true; | |
2376 | center->submit_to( | |
2377 | center->get_id(), [this] () mutable { | |
2378 | std::lock_guard<std::mutex> l(delay_lock); | |
2379 | while (!delay_queue.empty()) { | |
2380 | Message *m = delay_queue.front().second; | |
2381 | if (msgr->ms_can_fast_dispatch(m)) { | |
2382 | dispatch_queue->fast_dispatch(m); | |
2383 | } else { | |
2384 | dispatch_queue->enqueue(m, m->get_priority(), conn_id); | |
2385 | } | |
2386 | delay_queue.pop_front(); | |
2387 | } | |
2388 | for (auto i : register_time_events) | |
2389 | center->delete_time_event(i); | |
2390 | register_time_events.clear(); | |
2391 | stop_dispatch = false; | |
2392 | }, true); | |
2393 | } | |
2394 | ||
2395 | void AsyncConnection::send_keepalive() | |
2396 | { | |
2397 | ldout(async_msgr->cct, 10) << __func__ << dendl; | |
2398 | std::lock_guard<std::mutex> l(write_lock); | |
2399 | if (can_write != WriteStatus::CLOSED) { | |
2400 | keepalive = true; | |
2401 | center->dispatch_event_external(write_handler); | |
2402 | } | |
2403 | } | |
2404 | ||
2405 | void AsyncConnection::mark_down() | |
2406 | { | |
2407 | ldout(async_msgr->cct, 1) << __func__ << dendl; | |
2408 | std::lock_guard<std::mutex> l(lock); | |
2409 | _stop(); | |
2410 | } | |
2411 | ||
2412 | void AsyncConnection::_append_keepalive_or_ack(bool ack, utime_t *tp) | |
2413 | { | |
2414 | ldout(async_msgr->cct, 10) << __func__ << dendl; | |
2415 | if (ack) { | |
2416 | assert(tp); | |
2417 | struct ceph_timespec ts; | |
2418 | tp->encode_timeval(&ts); | |
2419 | outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); | |
2420 | outcoming_bl.append((char*)&ts, sizeof(ts)); | |
2421 | } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { | |
2422 | struct ceph_timespec ts; | |
2423 | utime_t t = ceph_clock_now(); | |
2424 | t.encode_timeval(&ts); | |
2425 | outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2); | |
2426 | outcoming_bl.append((char*)&ts, sizeof(ts)); | |
2427 | } else { | |
2428 | outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE); | |
2429 | } | |
2430 | } | |
2431 | ||
2432 | void AsyncConnection::handle_write() | |
2433 | { | |
2434 | ldout(async_msgr->cct, 10) << __func__ << dendl; | |
2435 | ssize_t r = 0; | |
2436 | ||
2437 | write_lock.lock(); | |
2438 | if (can_write == WriteStatus::CANWRITE) { | |
2439 | if (keepalive) { | |
2440 | _append_keepalive_or_ack(); | |
2441 | keepalive = false; | |
2442 | } | |
2443 | ||
2444 | while (1) { | |
2445 | bufferlist data; | |
2446 | Message *m = _get_next_outgoing(&data); | |
2447 | if (!m) | |
2448 | break; | |
2449 | ||
2450 | // send_message or requeue messages may not encode message | |
2451 | if (!data.length()) | |
2452 | prepare_send_message(get_features(), m, data); | |
2453 | ||
2454 | r = write_message(m, data, _has_next_outgoing()); | |
2455 | if (r < 0) { | |
2456 | ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; | |
2457 | write_lock.unlock(); | |
2458 | goto fail; | |
2459 | } else if (r > 0) { | |
2460 | break; | |
2461 | } | |
2462 | } | |
2463 | ||
2464 | uint64_t left = ack_left.read(); | |
2465 | if (left) { | |
2466 | ceph_le64 s; | |
2467 | s = in_seq.read(); | |
2468 | outcoming_bl.append(CEPH_MSGR_TAG_ACK); | |
2469 | outcoming_bl.append((char*)&s, sizeof(s)); | |
2470 | ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; | |
2471 | ack_left.sub(left); | |
2472 | left = ack_left.read(); | |
2473 | r = _try_send(left); | |
2474 | } else if (is_queued()) { | |
2475 | r = _try_send(); | |
2476 | } | |
2477 | ||
2478 | write_lock.unlock(); | |
2479 | if (r < 0) { | |
2480 | ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; | |
2481 | goto fail; | |
2482 | } | |
2483 | } else { | |
2484 | write_lock.unlock(); | |
2485 | lock.lock(); | |
2486 | write_lock.lock(); | |
2487 | if (state == STATE_STANDBY && !policy.server && is_queued()) { | |
2488 | ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl; | |
2489 | _connect(); | |
2490 | } else if (cs && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) { | |
2491 | r = _try_send(); | |
2492 | if (r < 0) { | |
2493 | ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; | |
2494 | write_lock.unlock(); | |
2495 | fault(); | |
2496 | lock.unlock(); | |
2497 | return ; | |
2498 | } | |
2499 | } | |
2500 | write_lock.unlock(); | |
2501 | lock.unlock(); | |
2502 | } | |
2503 | ||
2504 | return ; | |
2505 | ||
2506 | fail: | |
2507 | lock.lock(); | |
2508 | fault(); | |
2509 | lock.unlock(); | |
2510 | } | |
2511 | ||
2512 | void AsyncConnection::wakeup_from(uint64_t id) | |
2513 | { | |
2514 | lock.lock(); | |
2515 | register_time_events.erase(id); | |
2516 | lock.unlock(); | |
2517 | process(); | |
2518 | } | |
2519 | ||
2520 | void AsyncConnection::tick(uint64_t id) | |
2521 | { | |
2522 | auto now = ceph::coarse_mono_clock::now(); | |
2523 | ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id | |
2524 | << " last_active" << last_active << dendl; | |
2525 | assert(last_tick_id == id); | |
2526 | std::lock_guard<std::mutex> l(lock); | |
2527 | last_tick_id = 0; | |
2528 | auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count(); | |
2529 | if (inactive_timeout_us < (uint64_t)idle_period) { | |
2530 | ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than " | |
2531 | << inactive_timeout_us | |
2532 | << " us, mark self fault." << dendl; | |
2533 | fault(); | |
2534 | } else if (is_connected()) { | |
2535 | last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); | |
2536 | } | |
2537 | } |