]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
7c673cae FG |
2 | // vim: ts=8 sw=2 smarttab |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <unistd.h> | |
18 | ||
19 | #include "include/Context.h" | |
11fdf7f2 | 20 | #include "include/random.h" |
7c673cae FG |
21 | #include "common/errno.h" |
22 | #include "AsyncMessenger.h" | |
23 | #include "AsyncConnection.h" | |
24 | ||
11fdf7f2 TL |
25 | #include "ProtocolV1.h" |
26 | #include "ProtocolV2.h" | |
27 | ||
7c673cae FG |
28 | #include "messages/MOSDOp.h" |
29 | #include "messages/MOSDOpReply.h" | |
30 | #include "common/EventTrace.h" | |
31 | ||
32 | // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR | |
11fdf7f2 | 33 | #define SEQ_MASK 0x7fffffff |
7c673cae FG |
34 | |
35 | #define dout_subsys ceph_subsys_ms | |
36 | #undef dout_prefix | |
37 | #define dout_prefix _conn_prefix(_dout) | |
38 | ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { | |
11fdf7f2 TL |
39 | return *_dout << "-- " << async_msgr->get_myaddrs() << " >> " |
40 | << *peer_addrs << " conn(" << this | |
41 | << (msgr2 ? " msgr2=" : " legacy=") | |
42 | << protocol.get() | |
43 | << " " << ceph_con_mode_name(protocol->auth_meta->con_mode) | |
7c673cae FG |
44 | << " :" << port |
45 | << " s=" << get_state_name(state) | |
7c673cae FG |
46 | << " l=" << policy.lossy |
47 | << ")."; | |
48 | } | |
49 | ||
50 | // Notes: | |
51 | // 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead | |
52 | ||
11fdf7f2 | 53 | const uint32_t AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512; |
7c673cae FG |
54 | |
55 | class C_time_wakeup : public EventCallback { | |
56 | AsyncConnectionRef conn; | |
57 | ||
58 | public: | |
59 | explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {} | |
11fdf7f2 | 60 | void do_request(uint64_t fd_or_id) override { |
7c673cae FG |
61 | conn->wakeup_from(fd_or_id); |
62 | } | |
63 | }; | |
64 | ||
65 | class C_handle_read : public EventCallback { | |
66 | AsyncConnectionRef conn; | |
67 | ||
68 | public: | |
69 | explicit C_handle_read(AsyncConnectionRef c): conn(c) {} | |
11fdf7f2 | 70 | void do_request(uint64_t fd_or_id) override { |
7c673cae FG |
71 | conn->process(); |
72 | } | |
73 | }; | |
74 | ||
75 | class C_handle_write : public EventCallback { | |
76 | AsyncConnectionRef conn; | |
77 | ||
78 | public: | |
79 | explicit C_handle_write(AsyncConnectionRef c): conn(c) {} | |
11fdf7f2 | 80 | void do_request(uint64_t fd) override { |
7c673cae FG |
81 | conn->handle_write(); |
82 | } | |
83 | }; | |
84 | ||
11fdf7f2 TL |
85 | class C_handle_write_callback : public EventCallback { |
86 | AsyncConnectionRef conn; | |
87 | ||
88 | public: | |
89 | explicit C_handle_write_callback(AsyncConnectionRef c) : conn(c) {} | |
90 | void do_request(uint64_t fd) override { conn->handle_write_callback(); } | |
91 | }; | |
92 | ||
7c673cae FG |
93 | class C_clean_handler : public EventCallback { |
94 | AsyncConnectionRef conn; | |
95 | public: | |
96 | explicit C_clean_handler(AsyncConnectionRef c): conn(c) {} | |
11fdf7f2 | 97 | void do_request(uint64_t id) override { |
7c673cae FG |
98 | conn->cleanup(); |
99 | delete this; | |
100 | } | |
101 | }; | |
102 | ||
103 | class C_tick_wakeup : public EventCallback { | |
104 | AsyncConnectionRef conn; | |
105 | ||
106 | public: | |
107 | explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {} | |
11fdf7f2 | 108 | void do_request(uint64_t fd_or_id) override { |
7c673cae FG |
109 | conn->tick(fd_or_id); |
110 | } | |
111 | }; | |
112 | ||
7c673cae FG |
113 | |
114 | AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, | |
11fdf7f2 | 115 | Worker *w, bool m2, bool local) |
7c673cae | 116 | : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), |
11fdf7f2 TL |
117 | logger(w->get_perf_counter()), |
118 | state(STATE_NONE), port(-1), | |
119 | dispatch_queue(q), recv_buf(NULL), | |
120 | recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), | |
7c673cae FG |
121 | recv_start(0), recv_end(0), |
122 | last_active(ceph::coarse_mono_clock::now()), | |
123 | inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), | |
11fdf7f2 TL |
124 | msgr2(m2), state_offset(0), |
125 | worker(w), center(&w->center),read_buffer(nullptr) | |
7c673cae | 126 | { |
11fdf7f2 TL |
127 | #ifdef UNIT_TESTS_BUILT |
128 | this->interceptor = m->interceptor; | |
129 | #endif | |
7c673cae FG |
130 | read_handler = new C_handle_read(this); |
131 | write_handler = new C_handle_write(this); | |
11fdf7f2 | 132 | write_callback_handler = new C_handle_write_callback(this); |
7c673cae FG |
133 | wakeup_handler = new C_time_wakeup(this); |
134 | tick_handler = new C_tick_wakeup(this); | |
7c673cae FG |
135 | // double recv_max_prefetch see "read_until" |
136 | recv_buf = new char[2*recv_max_prefetch]; | |
11fdf7f2 TL |
137 | if (local) { |
138 | protocol = std::unique_ptr<Protocol>(new LoopbackProtocolV1(this)); | |
139 | } else if (m2) { | |
140 | protocol = std::unique_ptr<Protocol>(new ProtocolV2(this)); | |
141 | } else { | |
142 | protocol = std::unique_ptr<Protocol>(new ProtocolV1(this)); | |
143 | } | |
7c673cae FG |
144 | logger->inc(l_msgr_created_connections); |
145 | } | |
146 | ||
147 | AsyncConnection::~AsyncConnection() | |
148 | { | |
7c673cae FG |
149 | if (recv_buf) |
150 | delete[] recv_buf; | |
11fdf7f2 | 151 | ceph_assert(!delay_state); |
7c673cae FG |
152 | } |
153 | ||
11fdf7f2 TL |
154 | int AsyncConnection::get_con_mode() const { |
155 | return protocol->get_con_mode(); | |
7c673cae FG |
156 | } |
157 | ||
11fdf7f2 | 158 | void AsyncConnection::maybe_start_delay_thread() |
7c673cae | 159 | { |
11fdf7f2 TL |
160 | if (!delay_state) { |
161 | async_msgr->cct->_conf.with_val<std::string>( | |
162 | "ms_inject_delay_type", | |
163 | [this](const string& s) { | |
164 | if (s.find(ceph_entity_type_name(peer_type)) != string::npos) { | |
165 | ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" | |
166 | << dendl; | |
167 | delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, | |
168 | conn_id); | |
169 | } | |
170 | }); | |
7c673cae | 171 | } |
7c673cae FG |
172 | } |
173 | ||
7c673cae | 174 | |
11fdf7f2 TL |
175 | ssize_t AsyncConnection::read(unsigned len, char *buffer, |
176 | std::function<void(char *, ssize_t)> callback) { | |
177 | ldout(async_msgr->cct, 20) << __func__ | |
178 | << (pendingReadLen ? " continue" : " start") | |
179 | << " len=" << len << dendl; | |
180 | ssize_t r = read_until(len, buffer); | |
181 | if (r > 0) { | |
182 | readCallback = callback; | |
183 | pendingReadLen = len; | |
184 | read_buffer = buffer; | |
7c673cae | 185 | } |
11fdf7f2 | 186 | return r; |
7c673cae FG |
187 | } |
188 | ||
189 | // Because this func will be called multi times to populate | |
190 | // the needed buffer, so the passed in bufferptr must be the same. | |
191 | // Normally, only "read_message" will pass existing bufferptr in | |
192 | // | |
193 | // And it will uses readahead method to reduce small read overhead, | |
194 | // "recv_buf" is used to store read buffer | |
195 | // | |
196 | // return the remaining bytes, 0 means this buffer is finished | |
197 | // else return < 0 means error | |
198 | ssize_t AsyncConnection::read_until(unsigned len, char *p) | |
199 | { | |
200 | ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is " | |
201 | << state_offset << dendl; | |
202 | ||
203 | if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { | |
204 | if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { | |
205 | ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; | |
206 | cs.shutdown(); | |
207 | } | |
208 | } | |
209 | ||
210 | ssize_t r = 0; | |
211 | uint64_t left = len - state_offset; | |
212 | if (recv_end > recv_start) { | |
11fdf7f2 | 213 | uint64_t to_read = std::min<uint64_t>(recv_end - recv_start, left); |
7c673cae FG |
214 | memcpy(p, recv_buf+recv_start, to_read); |
215 | recv_start += to_read; | |
216 | left -= to_read; | |
217 | ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer " | |
218 | << " left is " << left << " buffer still has " | |
219 | << recv_end - recv_start << dendl; | |
220 | if (left == 0) { | |
221 | return 0; | |
222 | } | |
223 | state_offset += to_read; | |
224 | } | |
225 | ||
226 | recv_end = recv_start = 0; | |
227 | /* nothing left in the prefetch buffer */ | |
11fdf7f2 | 228 | if (left > (uint64_t)recv_max_prefetch) { |
7c673cae FG |
229 | /* this was a large read, we don't prefetch for these */ |
230 | do { | |
231 | r = read_bulk(p+state_offset, left); | |
232 | ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl; | |
233 | if (r < 0) { | |
234 | ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; | |
235 | return -1; | |
236 | } else if (r == static_cast<int>(left)) { | |
237 | state_offset = 0; | |
238 | return 0; | |
239 | } | |
240 | state_offset += r; | |
241 | left -= r; | |
242 | } while (r > 0); | |
243 | } else { | |
244 | do { | |
245 | r = read_bulk(recv_buf+recv_end, recv_max_prefetch); | |
246 | ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end | |
247 | << " left is " << left << " got " << r << dendl; | |
248 | if (r < 0) { | |
249 | ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; | |
250 | return -1; | |
251 | } | |
252 | recv_end += r; | |
253 | if (r >= static_cast<int>(left)) { | |
254 | recv_start = len - state_offset; | |
255 | memcpy(p+state_offset, recv_buf, recv_start); | |
256 | state_offset = 0; | |
257 | return 0; | |
258 | } | |
259 | left -= r; | |
260 | } while (r > 0); | |
261 | memcpy(p+state_offset, recv_buf, recv_end-recv_start); | |
262 | state_offset += (recv_end - recv_start); | |
263 | recv_end = recv_start = 0; | |
264 | } | |
265 | ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining " | |
266 | << len - state_offset << " bytes" << dendl; | |
267 | return len - state_offset; | |
268 | } | |
269 | ||
11fdf7f2 TL |
270 | /* return -1 means `fd` occurs error or closed, it should be closed |
271 | * return 0 means EAGAIN or EINTR */ | |
272 | ssize_t AsyncConnection::read_bulk(char *buf, unsigned len) | |
7c673cae | 273 | { |
11fdf7f2 TL |
274 | ssize_t nread; |
275 | again: | |
276 | nread = cs.read(buf, len); | |
277 | if (nread < 0) { | |
278 | if (nread == -EAGAIN) { | |
279 | nread = 0; | |
280 | } else if (nread == -EINTR) { | |
281 | goto again; | |
282 | } else { | |
283 | ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd() | |
284 | << " : "<< strerror(nread) << dendl; | |
285 | return -1; | |
7c673cae | 286 | } |
11fdf7f2 TL |
287 | } else if (nread == 0) { |
288 | ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor " | |
289 | << cs.fd() << dendl; | |
290 | return -1; | |
7c673cae | 291 | } |
11fdf7f2 | 292 | return nread; |
7c673cae FG |
293 | } |
294 | ||
11fdf7f2 TL |
295 | ssize_t AsyncConnection::write(bufferlist &bl, |
296 | std::function<void(ssize_t)> callback, | |
297 | bool more) { | |
7c673cae | 298 | |
11fdf7f2 TL |
299 | std::unique_lock<std::mutex> l(write_lock); |
300 | outcoming_bl.claim_append(bl); | |
301 | ssize_t r = _try_send(more); | |
302 | if (r > 0) { | |
303 | writeCallback = callback; | |
304 | } | |
305 | return r; | |
7c673cae FG |
306 | } |
307 | ||
11fdf7f2 TL |
308 | // return the remaining bytes, it may larger than the length of ptr |
309 | // else return < 0 means error | |
310 | ssize_t AsyncConnection::_try_send(bool more) | |
7c673cae | 311 | { |
11fdf7f2 TL |
312 | if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { |
313 | if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { | |
314 | ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; | |
315 | cs.shutdown(); | |
7c673cae FG |
316 | } |
317 | } | |
28e407b8 | 318 | |
11fdf7f2 TL |
319 | ceph_assert(center->in_thread()); |
320 | ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outcoming_bl.length() | |
321 | << " bytes" << dendl; | |
322 | ssize_t r = cs.send(outcoming_bl, more); | |
323 | if (r < 0) { | |
324 | ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; | |
325 | return r; | |
7c673cae FG |
326 | } |
327 | ||
11fdf7f2 TL |
328 | ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r |
329 | << " remaining bytes " << outcoming_bl.length() << dendl; | |
7c673cae | 330 | |
11fdf7f2 TL |
331 | if (!open_write && is_queued()) { |
332 | center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler); | |
333 | open_write = true; | |
7c673cae FG |
334 | } |
335 | ||
11fdf7f2 TL |
336 | if (open_write && !is_queued()) { |
337 | center->delete_file_event(cs.fd(), EVENT_WRITABLE); | |
338 | open_write = false; | |
339 | if (writeCallback) { | |
340 | center->dispatch_event_external(write_callback_handler); | |
341 | } | |
7c673cae FG |
342 | } |
343 | ||
11fdf7f2 TL |
344 | return outcoming_bl.length(); |
345 | } | |
31f18b77 | 346 | |
11fdf7f2 TL |
347 | void AsyncConnection::inject_delay() { |
348 | if (async_msgr->cct->_conf->ms_inject_internal_delays) { | |
349 | ldout(async_msgr->cct, 10) << __func__ << " sleep for " << | |
350 | async_msgr->cct->_conf->ms_inject_internal_delays << dendl; | |
351 | utime_t t; | |
352 | t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); | |
353 | t.sleep(); | |
354 | } | |
355 | } | |
7c673cae | 356 | |
11fdf7f2 TL |
357 | void AsyncConnection::process() { |
358 | std::lock_guard<std::mutex> l(lock); | |
359 | last_active = ceph::coarse_mono_clock::now(); | |
360 | recv_start_time = ceph::mono_clock::now(); | |
7c673cae | 361 | |
11fdf7f2 | 362 | ldout(async_msgr->cct, 20) << __func__ << dendl; |
7c673cae | 363 | |
11fdf7f2 TL |
364 | switch (state) { |
365 | case STATE_NONE: { | |
366 | ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl; | |
367 | return; | |
7c673cae | 368 | } |
11fdf7f2 TL |
369 | case STATE_CLOSED: { |
370 | ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl; | |
371 | return; | |
7c673cae | 372 | } |
11fdf7f2 TL |
373 | case STATE_CONNECTING: { |
374 | ceph_assert(!policy.server); | |
7c673cae | 375 | |
11fdf7f2 TL |
376 | if (cs) { |
377 | center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE); | |
378 | cs.close(); | |
7c673cae FG |
379 | } |
380 | ||
11fdf7f2 TL |
381 | SocketOptions opts; |
382 | opts.priority = async_msgr->get_socket_priority(); | |
383 | opts.connect_bind_addr = msgr->get_myaddrs().front(); | |
384 | ssize_t r = worker->connect(target_addr, opts, &cs); | |
385 | if (r < 0) { | |
386 | protocol->fault(); | |
387 | return; | |
7c673cae | 388 | } |
7c673cae | 389 | |
11fdf7f2 TL |
390 | center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); |
391 | state = STATE_CONNECTING_RE; | |
7c673cae | 392 | } |
11fdf7f2 TL |
393 | case STATE_CONNECTING_RE: { |
394 | ssize_t r = cs.is_connected(); | |
395 | if (r < 0) { | |
396 | ldout(async_msgr->cct, 1) << __func__ << " reconnect failed to " | |
397 | << target_addr << dendl; | |
398 | if (r == -ECONNREFUSED) { | |
399 | ldout(async_msgr->cct, 2) | |
400 | << __func__ << " connection refused!" << dendl; | |
401 | dispatch_queue->queue_refused(this); | |
402 | } | |
403 | protocol->fault(); | |
404 | return; | |
405 | } else if (r == 0) { | |
406 | ldout(async_msgr->cct, 10) | |
407 | << __func__ << " nonblock connect inprogress" << dendl; | |
408 | if (async_msgr->get_stack()->nonblock_connect_need_writable_event()) { | |
409 | center->create_file_event(cs.fd(), EVENT_WRITABLE, | |
410 | read_handler); | |
411 | } | |
412 | logger->tinc(l_msgr_running_recv_time, | |
413 | ceph::mono_clock::now() - recv_start_time); | |
414 | return; | |
415 | } | |
7c673cae | 416 | |
11fdf7f2 TL |
417 | center->delete_file_event(cs.fd(), EVENT_WRITABLE); |
418 | ldout(async_msgr->cct, 10) | |
419 | << __func__ << " connect successfully, ready to send banner" << dendl; | |
420 | state = STATE_CONNECTION_ESTABLISHED; | |
421 | break; | |
7c673cae FG |
422 | } |
423 | ||
11fdf7f2 TL |
424 | case STATE_ACCEPTING: { |
425 | center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); | |
426 | state = STATE_CONNECTION_ESTABLISHED; | |
7c673cae | 427 | |
11fdf7f2 | 428 | break; |
7c673cae | 429 | } |
7c673cae | 430 | |
11fdf7f2 TL |
431 | case STATE_CONNECTION_ESTABLISHED: { |
432 | if (pendingReadLen) { | |
433 | ssize_t r = read(*pendingReadLen, read_buffer, readCallback); | |
434 | if (r <= 0) { // read all bytes, or an error occured | |
435 | pendingReadLen.reset(); | |
436 | char *buf_tmp = read_buffer; | |
437 | read_buffer = nullptr; | |
438 | readCallback(buf_tmp, r); | |
7c673cae | 439 | } |
11fdf7f2 TL |
440 | return; |
441 | } | |
442 | break; | |
443 | } | |
7c673cae FG |
444 | } |
445 | ||
11fdf7f2 | 446 | protocol->read_event(); |
7c673cae | 447 | |
11fdf7f2 TL |
448 | logger->tinc(l_msgr_running_recv_time, |
449 | ceph::mono_clock::now() - recv_start_time); | |
450 | } | |
7c673cae | 451 | |
11fdf7f2 TL |
452 | bool AsyncConnection::is_connected() { |
453 | return protocol->is_connected(); | |
454 | } | |
7c673cae | 455 | |
11fdf7f2 TL |
456 | void AsyncConnection::connect(const entity_addrvec_t &addrs, int type, |
457 | entity_addr_t &target) { | |
7c673cae | 458 | |
11fdf7f2 TL |
459 | std::lock_guard<std::mutex> l(lock); |
460 | set_peer_type(type); | |
461 | set_peer_addrs(addrs); | |
462 | policy = msgr->get_policy(type); | |
463 | target_addr = target; | |
464 | _connect(); | |
7c673cae FG |
465 | } |
466 | ||
467 | void AsyncConnection::_connect() | |
468 | { | |
11fdf7f2 | 469 | ldout(async_msgr->cct, 10) << __func__ << dendl; |
7c673cae FG |
470 | |
471 | state = STATE_CONNECTING; | |
11fdf7f2 | 472 | protocol->connect(); |
7c673cae FG |
473 | // rescheduler connection in order to avoid lock dep |
474 | // may called by external thread(send_message) | |
475 | center->dispatch_event_external(read_handler); | |
476 | } | |
477 | ||
11fdf7f2 TL |
478 | void AsyncConnection::accept(ConnectedSocket socket, |
479 | const entity_addr_t &listen_addr, | |
480 | const entity_addr_t &peer_addr) | |
7c673cae | 481 | { |
11fdf7f2 TL |
482 | ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() |
483 | << " listen_addr " << listen_addr | |
484 | << " peer_addr " << peer_addr << dendl; | |
485 | ceph_assert(socket.fd() >= 0); | |
7c673cae FG |
486 | |
487 | std::lock_guard<std::mutex> l(lock); | |
488 | cs = std::move(socket); | |
11fdf7f2 TL |
489 | socket_addr = listen_addr; |
490 | target_addr = peer_addr; // until we know better | |
7c673cae | 491 | state = STATE_ACCEPTING; |
11fdf7f2 | 492 | protocol->accept(); |
7c673cae FG |
493 | // rescheduler connection in order to avoid lock dep |
494 | center->dispatch_event_external(read_handler); | |
495 | } | |
496 | ||
497 | int AsyncConnection::send_message(Message *m) | |
498 | { | |
11fdf7f2 | 499 | FUNCTRACE(async_msgr->cct); |
7c673cae | 500 | lgeneric_subdout(async_msgr->cct, ms, |
11fdf7f2 TL |
501 | 1) << "-- " << async_msgr->get_myaddrs() << " --> " |
502 | << get_peer_addrs() << " -- " | |
7c673cae | 503 | << *m << " -- " << m << " con " |
11fdf7f2 | 504 | << this |
7c673cae FG |
505 | << dendl; |
506 | ||
507 | // optimistic think it's ok to encode(actually may broken now) | |
508 | if (!m->get_priority()) | |
509 | m->set_priority(async_msgr->get_default_send_priority()); | |
510 | ||
511 | m->get_header().src = async_msgr->get_myname(); | |
512 | m->set_connection(this); | |
513 | ||
514 | if (m->get_type() == CEPH_MSG_OSD_OP) | |
515 | OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true); | |
516 | else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) | |
517 | OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true); | |
518 | ||
11fdf7f2 | 519 | if (async_msgr->get_myaddrs() == get_peer_addrs()) { //loopback connection |
7c673cae FG |
520 | ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; |
521 | std::lock_guard<std::mutex> l(write_lock); | |
11fdf7f2 | 522 | if (protocol->is_connected()) { |
7c673cae FG |
523 | dispatch_queue->local_delivery(m, m->get_priority()); |
524 | } else { | |
525 | ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed." | |
526 | << " Drop message " << m << dendl; | |
527 | m->put(); | |
528 | } | |
529 | return 0; | |
530 | } | |
531 | ||
7c673cae FG |
532 | // we don't want to consider local message here, it's too lightweight which |
533 | // may disturb users | |
534 | logger->inc(l_msgr_send_messages); | |
535 | ||
11fdf7f2 | 536 | protocol->send_message(m); |
7c673cae FG |
537 | return 0; |
538 | } | |
539 | ||
11fdf7f2 | 540 | entity_addr_t AsyncConnection::_infer_target_addr(const entity_addrvec_t& av) |
7c673cae | 541 | { |
11fdf7f2 TL |
542 | // pick the first addr of the same address family as socket_addr. it could be |
543 | // an any: or v2: addr, we don't care. it should not be a v1 addr. | |
544 | for (auto& i : av.v) { | |
545 | if (i.is_legacy()) { | |
546 | continue; | |
547 | } | |
548 | if (i.get_family() == socket_addr.get_family()) { | |
549 | ldout(async_msgr->cct,10) << __func__ << " " << av << " -> " << i << dendl; | |
550 | return i; | |
7c673cae | 551 | } |
7c673cae | 552 | } |
11fdf7f2 TL |
553 | ldout(async_msgr->cct,10) << __func__ << " " << av << " -> nothing to match " |
554 | << socket_addr << dendl; | |
555 | return {}; | |
7c673cae FG |
556 | } |
557 | ||
558 | void AsyncConnection::fault() | |
559 | { | |
7c673cae FG |
560 | shutdown_socket(); |
561 | open_write = false; | |
562 | ||
563 | // queue delayed items immediately | |
564 | if (delay_state) | |
565 | delay_state->flush(); | |
11fdf7f2 | 566 | |
7c673cae FG |
567 | recv_start = recv_end = 0; |
568 | state_offset = 0; | |
7c673cae | 569 | outcoming_bl.clear(); |
7c673cae FG |
570 | } |
571 | ||
11fdf7f2 TL |
572 | void AsyncConnection::_stop() { |
573 | writeCallback.reset(); | |
7c673cae | 574 | dispatch_queue->discard_queue(conn_id); |
7c673cae FG |
575 | async_msgr->unregister_conn(this); |
576 | worker->release_worker(); | |
577 | ||
578 | state = STATE_CLOSED; | |
579 | open_write = false; | |
11fdf7f2 | 580 | |
7c673cae FG |
581 | state_offset = 0; |
582 | // Make sure in-queue events will been processed | |
583 | center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); | |
584 | } | |
585 | ||
11fdf7f2 TL |
586 | bool AsyncConnection::is_queued() const { |
587 | return outcoming_bl.length(); | |
7c673cae FG |
588 | } |
589 | ||
11fdf7f2 TL |
590 | void AsyncConnection::shutdown_socket() { |
591 | for (auto &&t : register_time_events) center->delete_time_event(t); | |
592 | register_time_events.clear(); | |
593 | if (last_tick_id) { | |
594 | center->delete_time_event(last_tick_id); | |
595 | last_tick_id = 0; | |
7c673cae | 596 | } |
11fdf7f2 TL |
597 | if (cs) { |
598 | center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE); | |
599 | cs.shutdown(); | |
600 | cs.close(); | |
7c673cae FG |
601 | } |
602 | } | |
603 | ||
11fdf7f2 | 604 | void AsyncConnection::DelayedDelivery::do_request(uint64_t id) |
7c673cae FG |
605 | { |
606 | Message *m = nullptr; | |
607 | { | |
608 | std::lock_guard<std::mutex> l(delay_lock); | |
609 | register_time_events.erase(id); | |
610 | if (stop_dispatch) | |
611 | return ; | |
612 | if (delay_queue.empty()) | |
613 | return ; | |
11fdf7f2 | 614 | m = delay_queue.front(); |
7c673cae FG |
615 | delay_queue.pop_front(); |
616 | } | |
617 | if (msgr->ms_can_fast_dispatch(m)) { | |
618 | dispatch_queue->fast_dispatch(m); | |
619 | } else { | |
620 | dispatch_queue->enqueue(m, m->get_priority(), conn_id); | |
621 | } | |
622 | } | |
623 | ||
11fdf7f2 TL |
624 | void AsyncConnection::DelayedDelivery::discard() { |
625 | stop_dispatch = true; | |
626 | center->submit_to(center->get_id(), | |
627 | [this]() mutable { | |
628 | std::lock_guard<std::mutex> l(delay_lock); | |
629 | while (!delay_queue.empty()) { | |
630 | Message *m = delay_queue.front(); | |
631 | dispatch_queue->dispatch_throttle_release( | |
632 | m->get_dispatch_throttle_size()); | |
633 | m->put(); | |
634 | delay_queue.pop_front(); | |
635 | } | |
636 | for (auto i : register_time_events) | |
637 | center->delete_time_event(i); | |
638 | register_time_events.clear(); | |
639 | stop_dispatch = false; | |
640 | }, | |
641 | true); | |
642 | } | |
643 | ||
7c673cae FG |
644 | void AsyncConnection::DelayedDelivery::flush() { |
645 | stop_dispatch = true; | |
646 | center->submit_to( | |
647 | center->get_id(), [this] () mutable { | |
648 | std::lock_guard<std::mutex> l(delay_lock); | |
649 | while (!delay_queue.empty()) { | |
11fdf7f2 | 650 | Message *m = delay_queue.front(); |
7c673cae FG |
651 | if (msgr->ms_can_fast_dispatch(m)) { |
652 | dispatch_queue->fast_dispatch(m); | |
653 | } else { | |
654 | dispatch_queue->enqueue(m, m->get_priority(), conn_id); | |
655 | } | |
656 | delay_queue.pop_front(); | |
657 | } | |
658 | for (auto i : register_time_events) | |
659 | center->delete_time_event(i); | |
660 | register_time_events.clear(); | |
661 | stop_dispatch = false; | |
662 | }, true); | |
663 | } | |
664 | ||
665 | void AsyncConnection::send_keepalive() | |
666 | { | |
11fdf7f2 | 667 | protocol->send_keepalive(); |
7c673cae FG |
668 | } |
669 | ||
670 | void AsyncConnection::mark_down() | |
671 | { | |
672 | ldout(async_msgr->cct, 1) << __func__ << dendl; | |
673 | std::lock_guard<std::mutex> l(lock); | |
11fdf7f2 | 674 | protocol->stop(); |
7c673cae FG |
675 | } |
676 | ||
677 | void AsyncConnection::handle_write() | |
678 | { | |
679 | ldout(async_msgr->cct, 10) << __func__ << dendl; | |
11fdf7f2 TL |
680 | protocol->write_event(); |
681 | } | |
7c673cae | 682 | |
11fdf7f2 TL |
683 | void AsyncConnection::handle_write_callback() { |
684 | std::lock_guard<std::mutex> l(lock); | |
685 | last_active = ceph::coarse_mono_clock::now(); | |
686 | recv_start_time = ceph::mono_clock::now(); | |
7c673cae | 687 | write_lock.lock(); |
11fdf7f2 TL |
688 | if (writeCallback) { |
689 | auto callback = *writeCallback; | |
690 | writeCallback.reset(); | |
31f18b77 | 691 | write_lock.unlock(); |
11fdf7f2 TL |
692 | callback(0); |
693 | return; | |
7c673cae | 694 | } |
11fdf7f2 TL |
695 | write_lock.unlock(); |
696 | } | |
7c673cae | 697 | |
11fdf7f2 | 698 | void AsyncConnection::stop(bool queue_reset) { |
7c673cae | 699 | lock.lock(); |
11fdf7f2 TL |
700 | bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; |
701 | protocol->stop(); | |
7c673cae | 702 | lock.unlock(); |
11fdf7f2 TL |
703 | if (need_queue_reset) dispatch_queue->queue_reset(this); |
704 | } | |
705 | ||
706 | void AsyncConnection::cleanup() { | |
707 | shutdown_socket(); | |
708 | delete read_handler; | |
709 | delete write_handler; | |
710 | delete write_callback_handler; | |
711 | delete wakeup_handler; | |
712 | delete tick_handler; | |
713 | if (delay_state) { | |
714 | delete delay_state; | |
715 | delay_state = NULL; | |
716 | } | |
7c673cae FG |
717 | } |
718 | ||
719 | void AsyncConnection::wakeup_from(uint64_t id) | |
720 | { | |
721 | lock.lock(); | |
722 | register_time_events.erase(id); | |
723 | lock.unlock(); | |
724 | process(); | |
725 | } | |
726 | ||
727 | void AsyncConnection::tick(uint64_t id) | |
728 | { | |
729 | auto now = ceph::coarse_mono_clock::now(); | |
730 | ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id | |
11fdf7f2 | 731 | << " last_active=" << last_active << dendl; |
7c673cae FG |
732 | std::lock_guard<std::mutex> l(lock); |
733 | last_tick_id = 0; | |
734 | auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count(); | |
735 | if (inactive_timeout_us < (uint64_t)idle_period) { | |
736 | ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than " | |
737 | << inactive_timeout_us | |
738 | << " us, mark self fault." << dendl; | |
11fdf7f2 | 739 | protocol->fault(); |
7c673cae FG |
740 | } else if (is_connected()) { |
741 | last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); | |
742 | } | |
743 | } |