]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
7c673cae FG |
2 | // vim: ts=8 sw=2 smarttab |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <unistd.h> | |
18 | ||
19 | #include "include/Context.h" | |
11fdf7f2 | 20 | #include "include/random.h" |
7c673cae FG |
21 | #include "common/errno.h" |
22 | #include "AsyncMessenger.h" | |
23 | #include "AsyncConnection.h" | |
24 | ||
11fdf7f2 TL |
25 | #include "ProtocolV1.h" |
26 | #include "ProtocolV2.h" | |
27 | ||
7c673cae FG |
28 | #include "messages/MOSDOp.h" |
29 | #include "messages/MOSDOpReply.h" | |
30 | #include "common/EventTrace.h" | |
31 | ||
32 | // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR | |
11fdf7f2 | 33 | #define SEQ_MASK 0x7fffffff |
7c673cae FG |
34 | |
35 | #define dout_subsys ceph_subsys_ms | |
36 | #undef dout_prefix | |
37 | #define dout_prefix _conn_prefix(_dout) | |
f67539c2 | 38 | std::ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { |
11fdf7f2 TL |
39 | return *_dout << "-- " << async_msgr->get_myaddrs() << " >> " |
40 | << *peer_addrs << " conn(" << this | |
41 | << (msgr2 ? " msgr2=" : " legacy=") | |
42 | << protocol.get() | |
43 | << " " << ceph_con_mode_name(protocol->auth_meta->con_mode) | |
7c673cae FG |
44 | << " :" << port |
45 | << " s=" << get_state_name(state) | |
7c673cae FG |
46 | << " l=" << policy.lossy |
47 | << ")."; | |
48 | } | |
49 | ||
50 | // Notes: | |
51 | // 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead | |
52 | ||
11fdf7f2 | 53 | const uint32_t AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512; |
7c673cae FG |
54 | |
55 | class C_time_wakeup : public EventCallback { | |
56 | AsyncConnectionRef conn; | |
57 | ||
58 | public: | |
59 | explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {} | |
11fdf7f2 | 60 | void do_request(uint64_t fd_or_id) override { |
7c673cae FG |
61 | conn->wakeup_from(fd_or_id); |
62 | } | |
63 | }; | |
64 | ||
65 | class C_handle_read : public EventCallback { | |
66 | AsyncConnectionRef conn; | |
67 | ||
68 | public: | |
69 | explicit C_handle_read(AsyncConnectionRef c): conn(c) {} | |
11fdf7f2 | 70 | void do_request(uint64_t fd_or_id) override { |
7c673cae FG |
71 | conn->process(); |
72 | } | |
73 | }; | |
74 | ||
75 | class C_handle_write : public EventCallback { | |
76 | AsyncConnectionRef conn; | |
77 | ||
78 | public: | |
79 | explicit C_handle_write(AsyncConnectionRef c): conn(c) {} | |
11fdf7f2 | 80 | void do_request(uint64_t fd) override { |
7c673cae FG |
81 | conn->handle_write(); |
82 | } | |
83 | }; | |
84 | ||
11fdf7f2 TL |
85 | class C_handle_write_callback : public EventCallback { |
86 | AsyncConnectionRef conn; | |
87 | ||
88 | public: | |
89 | explicit C_handle_write_callback(AsyncConnectionRef c) : conn(c) {} | |
90 | void do_request(uint64_t fd) override { conn->handle_write_callback(); } | |
91 | }; | |
92 | ||
7c673cae FG |
93 | class C_clean_handler : public EventCallback { |
94 | AsyncConnectionRef conn; | |
95 | public: | |
96 | explicit C_clean_handler(AsyncConnectionRef c): conn(c) {} | |
11fdf7f2 | 97 | void do_request(uint64_t id) override { |
7c673cae FG |
98 | conn->cleanup(); |
99 | delete this; | |
100 | } | |
101 | }; | |
102 | ||
103 | class C_tick_wakeup : public EventCallback { | |
104 | AsyncConnectionRef conn; | |
105 | ||
106 | public: | |
107 | explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {} | |
11fdf7f2 | 108 | void do_request(uint64_t fd_or_id) override { |
7c673cae FG |
109 | conn->tick(fd_or_id); |
110 | } | |
111 | }; | |
112 | ||
7c673cae FG |
113 | |
114 | AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, | |
11fdf7f2 | 115 | Worker *w, bool m2, bool local) |
9f95a23c TL |
116 | : Connection(cct, m), |
117 | delay_state(NULL), async_msgr(m), conn_id(q->get_id()), | |
11fdf7f2 | 118 | logger(w->get_perf_counter()), |
aee94f69 | 119 | labeled_logger(w->get_labeled_perf_counter()), |
11fdf7f2 TL |
120 | state(STATE_NONE), port(-1), |
121 | dispatch_queue(q), recv_buf(NULL), | |
122 | recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), | |
7c673cae FG |
123 | recv_start(0), recv_end(0), |
124 | last_active(ceph::coarse_mono_clock::now()), | |
81eedcae TL |
125 | connect_timeout_us(cct->_conf->ms_connection_ready_timeout*1000*1000), |
126 | inactive_timeout_us(cct->_conf->ms_connection_idle_timeout*1000*1000), | |
11fdf7f2 TL |
127 | msgr2(m2), state_offset(0), |
128 | worker(w), center(&w->center),read_buffer(nullptr) | |
7c673cae | 129 | { |
11fdf7f2 TL |
130 | #ifdef UNIT_TESTS_BUILT |
131 | this->interceptor = m->interceptor; | |
132 | #endif | |
7c673cae FG |
133 | read_handler = new C_handle_read(this); |
134 | write_handler = new C_handle_write(this); | |
11fdf7f2 | 135 | write_callback_handler = new C_handle_write_callback(this); |
7c673cae FG |
136 | wakeup_handler = new C_time_wakeup(this); |
137 | tick_handler = new C_tick_wakeup(this); | |
7c673cae FG |
138 | // double recv_max_prefetch see "read_until" |
139 | recv_buf = new char[2*recv_max_prefetch]; | |
11fdf7f2 TL |
140 | if (local) { |
141 | protocol = std::unique_ptr<Protocol>(new LoopbackProtocolV1(this)); | |
142 | } else if (m2) { | |
143 | protocol = std::unique_ptr<Protocol>(new ProtocolV2(this)); | |
144 | } else { | |
145 | protocol = std::unique_ptr<Protocol>(new ProtocolV1(this)); | |
146 | } | |
7c673cae FG |
147 | logger->inc(l_msgr_created_connections); |
148 | } | |
149 | ||
150 | AsyncConnection::~AsyncConnection() | |
151 | { | |
7c673cae FG |
152 | if (recv_buf) |
153 | delete[] recv_buf; | |
11fdf7f2 | 154 | ceph_assert(!delay_state); |
7c673cae FG |
155 | } |
156 | ||
9f95a23c TL |
157 | int AsyncConnection::get_con_mode() const |
158 | { | |
11fdf7f2 | 159 | return protocol->get_con_mode(); |
7c673cae FG |
160 | } |
161 | ||
9f95a23c TL |
162 | bool AsyncConnection::is_msgr2() const |
163 | { | |
164 | return protocol->proto_type == 2; | |
165 | } | |
166 | ||
11fdf7f2 | 167 | void AsyncConnection::maybe_start_delay_thread() |
7c673cae | 168 | { |
11fdf7f2 TL |
169 | if (!delay_state) { |
170 | async_msgr->cct->_conf.with_val<std::string>( | |
171 | "ms_inject_delay_type", | |
f67539c2 TL |
172 | [this](const std::string& s) { |
173 | if (s.find(ceph_entity_type_name(peer_type)) != std::string::npos) { | |
11fdf7f2 TL |
174 | ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" |
175 | << dendl; | |
176 | delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, | |
177 | conn_id); | |
178 | } | |
179 | }); | |
7c673cae | 180 | } |
7c673cae FG |
181 | } |
182 | ||
7c673cae | 183 | |
11fdf7f2 TL |
184 | ssize_t AsyncConnection::read(unsigned len, char *buffer, |
185 | std::function<void(char *, ssize_t)> callback) { | |
186 | ldout(async_msgr->cct, 20) << __func__ | |
187 | << (pendingReadLen ? " continue" : " start") | |
188 | << " len=" << len << dendl; | |
189 | ssize_t r = read_until(len, buffer); | |
190 | if (r > 0) { | |
191 | readCallback = callback; | |
192 | pendingReadLen = len; | |
193 | read_buffer = buffer; | |
7c673cae | 194 | } |
11fdf7f2 | 195 | return r; |
7c673cae FG |
196 | } |
197 | ||
198 | // Because this func will be called multi times to populate | |
199 | // the needed buffer, so the passed in bufferptr must be the same. | |
200 | // Normally, only "read_message" will pass existing bufferptr in | |
201 | // | |
202 | // And it will uses readahead method to reduce small read overhead, | |
203 | // "recv_buf" is used to store read buffer | |
204 | // | |
205 | // return the remaining bytes, 0 means this buffer is finished | |
206 | // else return < 0 means error | |
207 | ssize_t AsyncConnection::read_until(unsigned len, char *p) | |
208 | { | |
209 | ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is " | |
210 | << state_offset << dendl; | |
211 | ||
212 | if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { | |
213 | if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { | |
214 | ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; | |
215 | cs.shutdown(); | |
216 | } | |
217 | } | |
218 | ||
219 | ssize_t r = 0; | |
220 | uint64_t left = len - state_offset; | |
221 | if (recv_end > recv_start) { | |
11fdf7f2 | 222 | uint64_t to_read = std::min<uint64_t>(recv_end - recv_start, left); |
7c673cae FG |
223 | memcpy(p, recv_buf+recv_start, to_read); |
224 | recv_start += to_read; | |
225 | left -= to_read; | |
226 | ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer " | |
227 | << " left is " << left << " buffer still has " | |
228 | << recv_end - recv_start << dendl; | |
229 | if (left == 0) { | |
1e59de90 | 230 | state_offset = 0; |
7c673cae FG |
231 | return 0; |
232 | } | |
233 | state_offset += to_read; | |
234 | } | |
235 | ||
236 | recv_end = recv_start = 0; | |
237 | /* nothing left in the prefetch buffer */ | |
11fdf7f2 | 238 | if (left > (uint64_t)recv_max_prefetch) { |
7c673cae FG |
239 | /* this was a large read, we don't prefetch for these */ |
240 | do { | |
241 | r = read_bulk(p+state_offset, left); | |
242 | ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl; | |
243 | if (r < 0) { | |
244 | ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; | |
245 | return -1; | |
246 | } else if (r == static_cast<int>(left)) { | |
247 | state_offset = 0; | |
248 | return 0; | |
249 | } | |
250 | state_offset += r; | |
251 | left -= r; | |
252 | } while (r > 0); | |
253 | } else { | |
254 | do { | |
255 | r = read_bulk(recv_buf+recv_end, recv_max_prefetch); | |
256 | ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end | |
257 | << " left is " << left << " got " << r << dendl; | |
258 | if (r < 0) { | |
259 | ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; | |
260 | return -1; | |
261 | } | |
262 | recv_end += r; | |
263 | if (r >= static_cast<int>(left)) { | |
264 | recv_start = len - state_offset; | |
265 | memcpy(p+state_offset, recv_buf, recv_start); | |
266 | state_offset = 0; | |
267 | return 0; | |
268 | } | |
269 | left -= r; | |
270 | } while (r > 0); | |
271 | memcpy(p+state_offset, recv_buf, recv_end-recv_start); | |
272 | state_offset += (recv_end - recv_start); | |
273 | recv_end = recv_start = 0; | |
274 | } | |
275 | ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining " | |
276 | << len - state_offset << " bytes" << dendl; | |
277 | return len - state_offset; | |
278 | } | |
279 | ||
11fdf7f2 TL |
280 | /* return -1 means `fd` occurs error or closed, it should be closed |
281 | * return 0 means EAGAIN or EINTR */ | |
282 | ssize_t AsyncConnection::read_bulk(char *buf, unsigned len) | |
7c673cae | 283 | { |
11fdf7f2 TL |
284 | ssize_t nread; |
285 | again: | |
286 | nread = cs.read(buf, len); | |
287 | if (nread < 0) { | |
288 | if (nread == -EAGAIN) { | |
289 | nread = 0; | |
290 | } else if (nread == -EINTR) { | |
291 | goto again; | |
292 | } else { | |
293 | ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd() | |
f67539c2 | 294 | << " : "<< nread << " " << strerror(nread) << dendl; |
11fdf7f2 | 295 | return -1; |
7c673cae | 296 | } |
11fdf7f2 TL |
297 | } else if (nread == 0) { |
298 | ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor " | |
299 | << cs.fd() << dendl; | |
300 | return -1; | |
7c673cae | 301 | } |
11fdf7f2 | 302 | return nread; |
7c673cae FG |
303 | } |
304 | ||
f67539c2 | 305 | ssize_t AsyncConnection::write(ceph::buffer::list &bl, |
11fdf7f2 TL |
306 | std::function<void(ssize_t)> callback, |
307 | bool more) { | |
7c673cae | 308 | |
11fdf7f2 | 309 | std::unique_lock<std::mutex> l(write_lock); |
9f95a23c | 310 | outgoing_bl.claim_append(bl); |
11fdf7f2 TL |
311 | ssize_t r = _try_send(more); |
312 | if (r > 0) { | |
313 | writeCallback = callback; | |
314 | } | |
315 | return r; | |
7c673cae FG |
316 | } |
317 | ||
11fdf7f2 TL |
318 | // return the remaining bytes, it may larger than the length of ptr |
319 | // else return < 0 means error | |
320 | ssize_t AsyncConnection::_try_send(bool more) | |
7c673cae | 321 | { |
11fdf7f2 TL |
322 | if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { |
323 | if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { | |
324 | ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; | |
325 | cs.shutdown(); | |
7c673cae FG |
326 | } |
327 | } | |
28e407b8 | 328 | |
11fdf7f2 | 329 | ceph_assert(center->in_thread()); |
9f95a23c | 330 | ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length() |
11fdf7f2 | 331 | << " bytes" << dendl; |
1e59de90 TL |
332 | // network block would make ::send return EAGAIN, that would make here looks |
333 | // like do not call cs.send() and r = 0 | |
334 | ssize_t r = 0; | |
335 | if (likely(!inject_network_congestion())) { | |
336 | r = cs.send(outgoing_bl, more); | |
337 | } | |
11fdf7f2 TL |
338 | if (r < 0) { |
339 | ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; | |
340 | return r; | |
7c673cae FG |
341 | } |
342 | ||
11fdf7f2 | 343 | ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r |
9f95a23c | 344 | << " remaining bytes " << outgoing_bl.length() << dendl; |
7c673cae | 345 | |
11fdf7f2 TL |
346 | if (!open_write && is_queued()) { |
347 | center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler); | |
348 | open_write = true; | |
7c673cae FG |
349 | } |
350 | ||
11fdf7f2 TL |
351 | if (open_write && !is_queued()) { |
352 | center->delete_file_event(cs.fd(), EVENT_WRITABLE); | |
353 | open_write = false; | |
354 | if (writeCallback) { | |
355 | center->dispatch_event_external(write_callback_handler); | |
356 | } | |
7c673cae FG |
357 | } |
358 | ||
9f95a23c | 359 | return outgoing_bl.length(); |
11fdf7f2 | 360 | } |
31f18b77 | 361 | |
11fdf7f2 TL |
362 | void AsyncConnection::inject_delay() { |
363 | if (async_msgr->cct->_conf->ms_inject_internal_delays) { | |
364 | ldout(async_msgr->cct, 10) << __func__ << " sleep for " << | |
365 | async_msgr->cct->_conf->ms_inject_internal_delays << dendl; | |
366 | utime_t t; | |
367 | t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); | |
368 | t.sleep(); | |
369 | } | |
370 | } | |
7c673cae | 371 | |
1e59de90 TL |
372 | bool AsyncConnection::inject_network_congestion() const { |
373 | return (async_msgr->cct->_conf->ms_inject_network_congestion > 0 && | |
374 | rand() % async_msgr->cct->_conf->ms_inject_network_congestion != 0); | |
375 | } | |
376 | ||
11fdf7f2 TL |
377 | void AsyncConnection::process() { |
378 | std::lock_guard<std::mutex> l(lock); | |
379 | last_active = ceph::coarse_mono_clock::now(); | |
380 | recv_start_time = ceph::mono_clock::now(); | |
7c673cae | 381 | |
11fdf7f2 | 382 | ldout(async_msgr->cct, 20) << __func__ << dendl; |
7c673cae | 383 | |
11fdf7f2 TL |
384 | switch (state) { |
385 | case STATE_NONE: { | |
386 | ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl; | |
387 | return; | |
7c673cae | 388 | } |
11fdf7f2 TL |
389 | case STATE_CLOSED: { |
390 | ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl; | |
391 | return; | |
7c673cae | 392 | } |
11fdf7f2 TL |
393 | case STATE_CONNECTING: { |
394 | ceph_assert(!policy.server); | |
7c673cae | 395 | |
81eedcae TL |
396 | // clear timer (if any) since we are connecting/re-connecting |
397 | if (last_tick_id) { | |
398 | center->delete_time_event(last_tick_id); | |
81eedcae | 399 | } |
f67539c2 TL |
400 | last_connect_started = ceph::coarse_mono_clock::now(); |
401 | last_tick_id = center->create_time_event( | |
402 | connect_timeout_us, tick_handler); | |
81eedcae | 403 | |
11fdf7f2 TL |
404 | if (cs) { |
405 | center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE); | |
406 | cs.close(); | |
7c673cae FG |
407 | } |
408 | ||
11fdf7f2 TL |
409 | SocketOptions opts; |
410 | opts.priority = async_msgr->get_socket_priority(); | |
1e59de90 TL |
411 | if (async_msgr->cct->_conf->mon_use_min_delay_socket) { |
412 | if (async_msgr->get_mytype() == CEPH_ENTITY_TYPE_MON && | |
413 | peer_is_mon()) { | |
414 | opts.priority = SOCKET_PRIORITY_MIN_DELAY; | |
415 | } | |
416 | } | |
11fdf7f2 TL |
417 | opts.connect_bind_addr = msgr->get_myaddrs().front(); |
418 | ssize_t r = worker->connect(target_addr, opts, &cs); | |
419 | if (r < 0) { | |
420 | protocol->fault(); | |
421 | return; | |
7c673cae | 422 | } |
7c673cae | 423 | |
11fdf7f2 TL |
424 | center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); |
425 | state = STATE_CONNECTING_RE; | |
7c673cae | 426 | } |
11fdf7f2 TL |
427 | case STATE_CONNECTING_RE: { |
428 | ssize_t r = cs.is_connected(); | |
429 | if (r < 0) { | |
430 | ldout(async_msgr->cct, 1) << __func__ << " reconnect failed to " | |
431 | << target_addr << dendl; | |
432 | if (r == -ECONNREFUSED) { | |
433 | ldout(async_msgr->cct, 2) | |
434 | << __func__ << " connection refused!" << dendl; | |
435 | dispatch_queue->queue_refused(this); | |
436 | } | |
437 | protocol->fault(); | |
438 | return; | |
439 | } else if (r == 0) { | |
440 | ldout(async_msgr->cct, 10) | |
441 | << __func__ << " nonblock connect inprogress" << dendl; | |
442 | if (async_msgr->get_stack()->nonblock_connect_need_writable_event()) { | |
443 | center->create_file_event(cs.fd(), EVENT_WRITABLE, | |
444 | read_handler); | |
445 | } | |
446 | logger->tinc(l_msgr_running_recv_time, | |
447 | ceph::mono_clock::now() - recv_start_time); | |
448 | return; | |
449 | } | |
7c673cae | 450 | |
11fdf7f2 TL |
451 | center->delete_file_event(cs.fd(), EVENT_WRITABLE); |
452 | ldout(async_msgr->cct, 10) | |
453 | << __func__ << " connect successfully, ready to send banner" << dendl; | |
454 | state = STATE_CONNECTION_ESTABLISHED; | |
455 | break; | |
7c673cae FG |
456 | } |
457 | ||
11fdf7f2 TL |
458 | case STATE_ACCEPTING: { |
459 | center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); | |
460 | state = STATE_CONNECTION_ESTABLISHED; | |
1e59de90 TL |
461 | if (async_msgr->cct->_conf->mon_use_min_delay_socket) { |
462 | if (async_msgr->get_mytype() == CEPH_ENTITY_TYPE_MON && | |
463 | peer_is_mon()) { | |
464 | cs.set_priority(cs.fd(), SOCKET_PRIORITY_MIN_DELAY, | |
465 | target_addr.get_family()); | |
466 | } | |
467 | } | |
11fdf7f2 | 468 | break; |
7c673cae | 469 | } |
7c673cae | 470 | |
11fdf7f2 TL |
471 | case STATE_CONNECTION_ESTABLISHED: { |
472 | if (pendingReadLen) { | |
473 | ssize_t r = read(*pendingReadLen, read_buffer, readCallback); | |
474 | if (r <= 0) { // read all bytes, or an error occured | |
475 | pendingReadLen.reset(); | |
476 | char *buf_tmp = read_buffer; | |
477 | read_buffer = nullptr; | |
478 | readCallback(buf_tmp, r); | |
7c673cae | 479 | } |
9f95a23c TL |
480 | logger->tinc(l_msgr_running_recv_time, |
481 | ceph::mono_clock::now() - recv_start_time); | |
11fdf7f2 TL |
482 | return; |
483 | } | |
484 | break; | |
485 | } | |
7c673cae FG |
486 | } |
487 | ||
11fdf7f2 | 488 | protocol->read_event(); |
7c673cae | 489 | |
11fdf7f2 TL |
490 | logger->tinc(l_msgr_running_recv_time, |
491 | ceph::mono_clock::now() - recv_start_time); | |
492 | } | |
7c673cae | 493 | |
11fdf7f2 TL |
494 | bool AsyncConnection::is_connected() { |
495 | return protocol->is_connected(); | |
496 | } | |
7c673cae | 497 | |
11fdf7f2 TL |
498 | void AsyncConnection::connect(const entity_addrvec_t &addrs, int type, |
499 | entity_addr_t &target) { | |
7c673cae | 500 | |
11fdf7f2 TL |
501 | std::lock_guard<std::mutex> l(lock); |
502 | set_peer_type(type); | |
503 | set_peer_addrs(addrs); | |
504 | policy = msgr->get_policy(type); | |
505 | target_addr = target; | |
506 | _connect(); | |
7c673cae FG |
507 | } |
508 | ||
509 | void AsyncConnection::_connect() | |
510 | { | |
11fdf7f2 | 511 | ldout(async_msgr->cct, 10) << __func__ << dendl; |
7c673cae FG |
512 | |
513 | state = STATE_CONNECTING; | |
11fdf7f2 | 514 | protocol->connect(); |
7c673cae FG |
515 | // rescheduler connection in order to avoid lock dep |
516 | // may called by external thread(send_message) | |
517 | center->dispatch_event_external(read_handler); | |
518 | } | |
519 | ||
11fdf7f2 TL |
520 | void AsyncConnection::accept(ConnectedSocket socket, |
521 | const entity_addr_t &listen_addr, | |
522 | const entity_addr_t &peer_addr) | |
7c673cae | 523 | { |
11fdf7f2 TL |
524 | ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() |
525 | << " listen_addr " << listen_addr | |
526 | << " peer_addr " << peer_addr << dendl; | |
527 | ceph_assert(socket.fd() >= 0); | |
7c673cae FG |
528 | |
529 | std::lock_guard<std::mutex> l(lock); | |
530 | cs = std::move(socket); | |
11fdf7f2 TL |
531 | socket_addr = listen_addr; |
532 | target_addr = peer_addr; // until we know better | |
7c673cae | 533 | state = STATE_ACCEPTING; |
11fdf7f2 | 534 | protocol->accept(); |
7c673cae FG |
535 | // rescheduler connection in order to avoid lock dep |
536 | center->dispatch_event_external(read_handler); | |
537 | } | |
538 | ||
539 | int AsyncConnection::send_message(Message *m) | |
540 | { | |
11fdf7f2 | 541 | FUNCTRACE(async_msgr->cct); |
7c673cae | 542 | lgeneric_subdout(async_msgr->cct, ms, |
11fdf7f2 TL |
543 | 1) << "-- " << async_msgr->get_myaddrs() << " --> " |
544 | << get_peer_addrs() << " -- " | |
7c673cae | 545 | << *m << " -- " << m << " con " |
11fdf7f2 | 546 | << this |
7c673cae FG |
547 | << dendl; |
548 | ||
9f95a23c TL |
549 | if (is_blackhole()) { |
550 | lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type) | |
551 | << " blackhole " << *m << dendl; | |
552 | m->put(); | |
553 | return 0; | |
554 | } | |
555 | ||
7c673cae FG |
556 | // optimistic think it's ok to encode(actually may broken now) |
557 | if (!m->get_priority()) | |
558 | m->set_priority(async_msgr->get_default_send_priority()); | |
559 | ||
560 | m->get_header().src = async_msgr->get_myname(); | |
561 | m->set_connection(this); | |
562 | ||
9f95a23c | 563 | #if defined(WITH_EVENTTRACE) |
7c673cae FG |
564 | if (m->get_type() == CEPH_MSG_OSD_OP) |
565 | OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true); | |
566 | else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) | |
567 | OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true); | |
9f95a23c | 568 | #endif |
7c673cae | 569 | |
9f95a23c | 570 | if (is_loopback) { //loopback connection |
7c673cae FG |
571 | ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; |
572 | std::lock_guard<std::mutex> l(write_lock); | |
11fdf7f2 | 573 | if (protocol->is_connected()) { |
7c673cae FG |
574 | dispatch_queue->local_delivery(m, m->get_priority()); |
575 | } else { | |
576 | ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed." | |
577 | << " Drop message " << m << dendl; | |
578 | m->put(); | |
579 | } | |
580 | return 0; | |
581 | } | |
582 | ||
7c673cae FG |
583 | // we don't want to consider local message here, it's too lightweight which |
584 | // may disturb users | |
585 | logger->inc(l_msgr_send_messages); | |
586 | ||
11fdf7f2 | 587 | protocol->send_message(m); |
7c673cae FG |
588 | return 0; |
589 | } | |
590 | ||
11fdf7f2 | 591 | entity_addr_t AsyncConnection::_infer_target_addr(const entity_addrvec_t& av) |
7c673cae | 592 | { |
11fdf7f2 TL |
593 | // pick the first addr of the same address family as socket_addr. it could be |
594 | // an any: or v2: addr, we don't care. it should not be a v1 addr. | |
595 | for (auto& i : av.v) { | |
596 | if (i.is_legacy()) { | |
597 | continue; | |
598 | } | |
599 | if (i.get_family() == socket_addr.get_family()) { | |
600 | ldout(async_msgr->cct,10) << __func__ << " " << av << " -> " << i << dendl; | |
601 | return i; | |
7c673cae | 602 | } |
7c673cae | 603 | } |
11fdf7f2 TL |
604 | ldout(async_msgr->cct,10) << __func__ << " " << av << " -> nothing to match " |
605 | << socket_addr << dendl; | |
606 | return {}; | |
7c673cae FG |
607 | } |
608 | ||
609 | void AsyncConnection::fault() | |
610 | { | |
7c673cae FG |
611 | shutdown_socket(); |
612 | open_write = false; | |
613 | ||
614 | // queue delayed items immediately | |
615 | if (delay_state) | |
616 | delay_state->flush(); | |
11fdf7f2 | 617 | |
7c673cae FG |
618 | recv_start = recv_end = 0; |
619 | state_offset = 0; | |
9f95a23c | 620 | outgoing_bl.clear(); |
7c673cae FG |
621 | } |
622 | ||
11fdf7f2 TL |
623 | void AsyncConnection::_stop() { |
624 | writeCallback.reset(); | |
7c673cae | 625 | dispatch_queue->discard_queue(conn_id); |
7c673cae FG |
626 | async_msgr->unregister_conn(this); |
627 | worker->release_worker(); | |
628 | ||
629 | state = STATE_CLOSED; | |
630 | open_write = false; | |
11fdf7f2 | 631 | |
7c673cae FG |
632 | state_offset = 0; |
633 | // Make sure in-queue events will been processed | |
634 | center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); | |
635 | } | |
636 | ||
11fdf7f2 | 637 | bool AsyncConnection::is_queued() const { |
9f95a23c | 638 | return outgoing_bl.length(); |
7c673cae FG |
639 | } |
640 | ||
11fdf7f2 TL |
641 | void AsyncConnection::shutdown_socket() { |
642 | for (auto &&t : register_time_events) center->delete_time_event(t); | |
643 | register_time_events.clear(); | |
644 | if (last_tick_id) { | |
645 | center->delete_time_event(last_tick_id); | |
646 | last_tick_id = 0; | |
7c673cae | 647 | } |
11fdf7f2 TL |
648 | if (cs) { |
649 | center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE); | |
650 | cs.shutdown(); | |
651 | cs.close(); | |
7c673cae FG |
652 | } |
653 | } | |
654 | ||
11fdf7f2 | 655 | void AsyncConnection::DelayedDelivery::do_request(uint64_t id) |
7c673cae FG |
656 | { |
657 | Message *m = nullptr; | |
658 | { | |
659 | std::lock_guard<std::mutex> l(delay_lock); | |
660 | register_time_events.erase(id); | |
661 | if (stop_dispatch) | |
662 | return ; | |
663 | if (delay_queue.empty()) | |
664 | return ; | |
11fdf7f2 | 665 | m = delay_queue.front(); |
7c673cae FG |
666 | delay_queue.pop_front(); |
667 | } | |
668 | if (msgr->ms_can_fast_dispatch(m)) { | |
669 | dispatch_queue->fast_dispatch(m); | |
670 | } else { | |
671 | dispatch_queue->enqueue(m, m->get_priority(), conn_id); | |
672 | } | |
673 | } | |
674 | ||
11fdf7f2 TL |
675 | void AsyncConnection::DelayedDelivery::discard() { |
676 | stop_dispatch = true; | |
677 | center->submit_to(center->get_id(), | |
678 | [this]() mutable { | |
679 | std::lock_guard<std::mutex> l(delay_lock); | |
680 | while (!delay_queue.empty()) { | |
681 | Message *m = delay_queue.front(); | |
682 | dispatch_queue->dispatch_throttle_release( | |
683 | m->get_dispatch_throttle_size()); | |
684 | m->put(); | |
685 | delay_queue.pop_front(); | |
686 | } | |
687 | for (auto i : register_time_events) | |
688 | center->delete_time_event(i); | |
689 | register_time_events.clear(); | |
690 | stop_dispatch = false; | |
691 | }, | |
692 | true); | |
693 | } | |
694 | ||
7c673cae FG |
695 | void AsyncConnection::DelayedDelivery::flush() { |
696 | stop_dispatch = true; | |
697 | center->submit_to( | |
698 | center->get_id(), [this] () mutable { | |
699 | std::lock_guard<std::mutex> l(delay_lock); | |
700 | while (!delay_queue.empty()) { | |
11fdf7f2 | 701 | Message *m = delay_queue.front(); |
7c673cae FG |
702 | if (msgr->ms_can_fast_dispatch(m)) { |
703 | dispatch_queue->fast_dispatch(m); | |
704 | } else { | |
705 | dispatch_queue->enqueue(m, m->get_priority(), conn_id); | |
706 | } | |
707 | delay_queue.pop_front(); | |
708 | } | |
709 | for (auto i : register_time_events) | |
710 | center->delete_time_event(i); | |
711 | register_time_events.clear(); | |
712 | stop_dispatch = false; | |
713 | }, true); | |
714 | } | |
715 | ||
716 | void AsyncConnection::send_keepalive() | |
717 | { | |
11fdf7f2 | 718 | protocol->send_keepalive(); |
7c673cae FG |
719 | } |
720 | ||
721 | void AsyncConnection::mark_down() | |
722 | { | |
723 | ldout(async_msgr->cct, 1) << __func__ << dendl; | |
724 | std::lock_guard<std::mutex> l(lock); | |
11fdf7f2 | 725 | protocol->stop(); |
7c673cae FG |
726 | } |
727 | ||
728 | void AsyncConnection::handle_write() | |
729 | { | |
730 | ldout(async_msgr->cct, 10) << __func__ << dendl; | |
11fdf7f2 TL |
731 | protocol->write_event(); |
732 | } | |
7c673cae | 733 | |
11fdf7f2 TL |
734 | void AsyncConnection::handle_write_callback() { |
735 | std::lock_guard<std::mutex> l(lock); | |
736 | last_active = ceph::coarse_mono_clock::now(); | |
737 | recv_start_time = ceph::mono_clock::now(); | |
7c673cae | 738 | write_lock.lock(); |
11fdf7f2 TL |
739 | if (writeCallback) { |
740 | auto callback = *writeCallback; | |
741 | writeCallback.reset(); | |
31f18b77 | 742 | write_lock.unlock(); |
11fdf7f2 TL |
743 | callback(0); |
744 | return; | |
7c673cae | 745 | } |
11fdf7f2 TL |
746 | write_lock.unlock(); |
747 | } | |
7c673cae | 748 | |
11fdf7f2 | 749 | void AsyncConnection::stop(bool queue_reset) { |
7c673cae | 750 | lock.lock(); |
11fdf7f2 TL |
751 | bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; |
752 | protocol->stop(); | |
7c673cae | 753 | lock.unlock(); |
11fdf7f2 TL |
754 | if (need_queue_reset) dispatch_queue->queue_reset(this); |
755 | } | |
756 | ||
757 | void AsyncConnection::cleanup() { | |
758 | shutdown_socket(); | |
759 | delete read_handler; | |
760 | delete write_handler; | |
761 | delete write_callback_handler; | |
762 | delete wakeup_handler; | |
763 | delete tick_handler; | |
764 | if (delay_state) { | |
765 | delete delay_state; | |
766 | delay_state = NULL; | |
767 | } | |
7c673cae FG |
768 | } |
769 | ||
770 | void AsyncConnection::wakeup_from(uint64_t id) | |
771 | { | |
772 | lock.lock(); | |
773 | register_time_events.erase(id); | |
774 | lock.unlock(); | |
775 | process(); | |
776 | } | |
777 | ||
778 | void AsyncConnection::tick(uint64_t id) | |
779 | { | |
780 | auto now = ceph::coarse_mono_clock::now(); | |
781 | ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id | |
11fdf7f2 | 782 | << " last_active=" << last_active << dendl; |
7c673cae FG |
783 | std::lock_guard<std::mutex> l(lock); |
784 | last_tick_id = 0; | |
81eedcae TL |
785 | if (!is_connected()) { |
786 | if (connect_timeout_us <= | |
787 | (uint64_t)std::chrono::duration_cast<std::chrono::microseconds> | |
788 | (now - last_connect_started).count()) { | |
789 | ldout(async_msgr->cct, 1) << __func__ << " see no progress in more than " | |
790 | << connect_timeout_us | |
1e59de90 TL |
791 | << " us during connecting to " |
792 | << target_addr << ", fault." | |
81eedcae TL |
793 | << dendl; |
794 | protocol->fault(); | |
aee94f69 | 795 | labeled_logger->inc(l_msgr_connection_ready_timeouts); |
81eedcae TL |
796 | } else { |
797 | last_tick_id = center->create_time_event(connect_timeout_us, tick_handler); | |
798 | } | |
799 | } else { | |
800 | auto idle_period = std::chrono::duration_cast<std::chrono::microseconds> | |
801 | (now - last_active).count(); | |
802 | if (inactive_timeout_us < (uint64_t)idle_period) { | |
803 | ldout(async_msgr->cct, 1) << __func__ << " idle (" << idle_period | |
804 | << ") for more than " << inactive_timeout_us | |
805 | << " us, fault." | |
806 | << dendl; | |
807 | protocol->fault(); | |
aee94f69 | 808 | labeled_logger->inc(l_msgr_connection_idle_timeouts); |
81eedcae TL |
809 | } else { |
810 | last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); | |
811 | } | |
7c673cae FG |
812 | } |
813 | } |