]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
7c673cae FG |
2 | // vim: ts=8 sw=2 smarttab |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include "acconfig.h" | |
18 | ||
19 | #include <iostream> | |
20 | #include <fstream> | |
21 | ||
22 | #include "AsyncMessenger.h" | |
23 | ||
24 | #include "common/config.h" | |
25 | #include "common/Timer.h" | |
26 | #include "common/errno.h" | |
27 | ||
28 | #include "messages/MOSDOp.h" | |
29 | #include "messages/MOSDOpReply.h" | |
30 | #include "common/EventTrace.h" | |
31 | ||
32 | #define dout_subsys ceph_subsys_ms | |
33 | #undef dout_prefix | |
34 | #define dout_prefix _prefix(_dout, this) | |
f67539c2 | 35 | static std::ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) { |
11fdf7f2 | 36 | return *_dout << "-- " << m->get_myaddrs() << " "; |
7c673cae FG |
37 | } |
38 | ||
f67539c2 | 39 | static std::ostream& _prefix(std::ostream *_dout, Processor *p) { |
7c673cae FG |
40 | return *_dout << " Processor -- "; |
41 | } | |
42 | ||
43 | ||
44 | /******************* | |
45 | * Processor | |
46 | */ | |
47 | ||
48 | class Processor::C_processor_accept : public EventCallback { | |
49 | Processor *pro; | |
50 | ||
51 | public: | |
52 | explicit C_processor_accept(Processor *p): pro(p) {} | |
11fdf7f2 | 53 | void do_request(uint64_t id) override { |
7c673cae FG |
54 | pro->accept(); |
55 | } | |
56 | }; | |
57 | ||
58 | Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c) | |
59 | : msgr(r), net(c), worker(w), | |
60 | listen_handler(new C_processor_accept(this)) {} | |
61 | ||
11fdf7f2 | 62 | int Processor::bind(const entity_addrvec_t &bind_addrs, |
f67539c2 | 63 | const std::set<int>& avoid_ports, |
11fdf7f2 | 64 | entity_addrvec_t* bound_addrs) |
7c673cae | 65 | { |
11fdf7f2 TL |
66 | const auto& conf = msgr->cct->_conf; |
67 | // bind to socket(s) | |
68 | ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl; | |
7c673cae FG |
69 | |
70 | SocketOptions opts; | |
71 | opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; | |
72 | opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; | |
73 | ||
11fdf7f2 TL |
74 | listen_sockets.resize(bind_addrs.v.size()); |
75 | *bound_addrs = bind_addrs; | |
7c673cae | 76 | |
11fdf7f2 TL |
77 | for (unsigned k = 0; k < bind_addrs.v.size(); ++k) { |
78 | auto& listen_addr = bound_addrs->v[k]; | |
7c673cae | 79 | |
11fdf7f2 TL |
80 | /* bind to port */ |
81 | int r = -1; | |
7c673cae | 82 | |
11fdf7f2 TL |
83 | for (int i = 0; i < conf->ms_bind_retry_count; i++) { |
84 | if (i > 0) { | |
85 | lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " | |
86 | << conf->ms_bind_retry_delay << " seconds " << dendl; | |
87 | sleep(conf->ms_bind_retry_delay); | |
7c673cae | 88 | } |
11fdf7f2 TL |
89 | |
90 | if (listen_addr.get_port()) { | |
91 | worker->center.submit_to( | |
92 | worker->center.get_id(), | |
93 | [this, k, &listen_addr, &opts, &r]() { | |
94 | r = worker->listen(listen_addr, k, opts, &listen_sockets[k]); | |
95 | }, false); | |
96 | if (r < 0) { | |
97 | lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr | |
98 | << ": " << cpp_strerror(r) << dendl; | |
99 | continue; | |
100 | } | |
101 | } else { | |
102 | // try a range of ports | |
103 | for (int port = msgr->cct->_conf->ms_bind_port_min; | |
104 | port <= msgr->cct->_conf->ms_bind_port_max; | |
105 | port++) { | |
106 | if (avoid_ports.count(port)) | |
107 | continue; | |
108 | ||
109 | listen_addr.set_port(port); | |
110 | worker->center.submit_to( | |
111 | worker->center.get_id(), | |
112 | [this, k, &listen_addr, &opts, &r]() { | |
113 | r = worker->listen(listen_addr, k, opts, &listen_sockets[k]); | |
114 | }, false); | |
115 | if (r == 0) | |
116 | break; | |
117 | } | |
118 | if (r < 0) { | |
119 | lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr | |
120 | << " on any port in range " | |
121 | << msgr->cct->_conf->ms_bind_port_min | |
122 | << "-" << msgr->cct->_conf->ms_bind_port_max << ": " | |
123 | << cpp_strerror(r) << dendl; | |
124 | listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again. | |
125 | continue; | |
126 | } | |
127 | ldout(msgr->cct, 10) << __func__ << " bound on random port " | |
128 | << listen_addr << dendl; | |
7c673cae | 129 | } |
11fdf7f2 TL |
130 | if (r == 0) { |
131 | break; | |
7c673cae | 132 | } |
7c673cae | 133 | } |
11fdf7f2 TL |
134 | |
135 | // It seems that binding completely failed, return with that exit status | |
136 | if (r < 0) { | |
137 | lderr(msgr->cct) << __func__ << " was unable to bind after " | |
138 | << conf->ms_bind_retry_count | |
139 | << " attempts: " << cpp_strerror(r) << dendl; | |
140 | for (unsigned j = 0; j < k; ++j) { | |
141 | // clean up previous bind | |
142 | listen_sockets[j].abort_accept(); | |
143 | } | |
144 | return r; | |
145 | } | |
7c673cae FG |
146 | } |
147 | ||
11fdf7f2 | 148 | ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl; |
7c673cae FG |
149 | return 0; |
150 | } | |
151 | ||
152 | void Processor::start() | |
153 | { | |
154 | ldout(msgr->cct, 1) << __func__ << dendl; | |
155 | ||
156 | // start thread | |
11fdf7f2 | 157 | worker->center.submit_to(worker->center.get_id(), [this]() { |
9f95a23c TL |
158 | for (auto& listen_socket : listen_sockets) { |
159 | if (listen_socket) { | |
160 | if (listen_socket.fd() == -1) { | |
161 | ldout(msgr->cct, 1) << __func__ | |
162 | << " Error: processor restart after listen_socket.fd closed. " | |
163 | << this << dendl; | |
164 | return; | |
165 | } | |
166 | worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, | |
11fdf7f2 TL |
167 | listen_handler); } |
168 | } | |
169 | }, false); | |
7c673cae FG |
170 | } |
171 | ||
172 | void Processor::accept() | |
173 | { | |
7c673cae FG |
174 | SocketOptions opts; |
175 | opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; | |
176 | opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; | |
177 | opts.priority = msgr->get_socket_priority(); | |
11fdf7f2 TL |
178 | |
179 | for (auto& listen_socket : listen_sockets) { | |
180 | ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() | |
181 | << dendl; | |
182 | unsigned accept_error_num = 0; | |
183 | ||
184 | while (true) { | |
185 | entity_addr_t addr; | |
186 | ConnectedSocket cli_socket; | |
187 | Worker *w = worker; | |
188 | if (!msgr->get_stack()->support_local_listen_table()) | |
189 | w = msgr->get_stack()->get_worker(); | |
190 | else | |
191 | ++w->references; | |
192 | int r = listen_socket.accept(&cli_socket, opts, &addr, w); | |
193 | if (r == 0) { | |
194 | ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " | |
195 | << cli_socket.fd() << dendl; | |
196 | ||
197 | msgr->add_accept( | |
198 | w, std::move(cli_socket), | |
199 | msgr->get_myaddrs().v[listen_socket.get_addr_slot()], | |
200 | addr); | |
201 | accept_error_num = 0; | |
91327a77 | 202 | continue; |
7c673cae | 203 | } else { |
9f95a23c | 204 | --w->references; |
11fdf7f2 TL |
205 | if (r == -EINTR) { |
206 | continue; | |
207 | } else if (r == -EAGAIN) { | |
208 | break; | |
209 | } else if (r == -EMFILE || r == -ENFILE) { | |
210 | lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd() | |
211 | << " errno " << r << " " << cpp_strerror(r) << dendl; | |
212 | if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) { | |
213 | lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl; | |
214 | ceph_abort(); | |
215 | } | |
216 | continue; | |
217 | } else if (r == -ECONNABORTED) { | |
218 | ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd() | |
219 | << " errno " << r << " " << cpp_strerror(r) << dendl; | |
220 | continue; | |
221 | } else { | |
222 | lderr(msgr->cct) << __func__ << " no incoming connection?" | |
223 | << " errno " << r << " " << cpp_strerror(r) << dendl; | |
224 | if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) { | |
225 | lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl; | |
226 | ceph_abort(); | |
227 | } | |
228 | continue; | |
91327a77 | 229 | } |
7c673cae FG |
230 | } |
231 | } | |
232 | } | |
233 | } | |
234 | ||
235 | void Processor::stop() | |
236 | { | |
237 | ldout(msgr->cct,10) << __func__ << dendl; | |
238 | ||
11fdf7f2 TL |
239 | worker->center.submit_to(worker->center.get_id(), [this]() { |
240 | for (auto& listen_socket : listen_sockets) { | |
241 | if (listen_socket) { | |
242 | worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE); | |
243 | listen_socket.abort_accept(); | |
244 | } | |
245 | } | |
7c673cae | 246 | }, false); |
7c673cae FG |
247 | } |
248 | ||
249 | ||
250 | struct StackSingleton { | |
251 | CephContext *cct; | |
252 | std::shared_ptr<NetworkStack> stack; | |
253 | ||
11fdf7f2 | 254 | explicit StackSingleton(CephContext *c): cct(c) {} |
7c673cae FG |
255 | void ready(std::string &type) { |
256 | if (!stack) | |
257 | stack = NetworkStack::create(cct, type); | |
258 | } | |
259 | ~StackSingleton() { | |
260 | stack->stop(); | |
261 | } | |
262 | }; | |
263 | ||
264 | ||
265 | class C_handle_reap : public EventCallback { | |
266 | AsyncMessenger *msgr; | |
267 | ||
268 | public: | |
269 | explicit C_handle_reap(AsyncMessenger *m): msgr(m) {} | |
11fdf7f2 | 270 | void do_request(uint64_t id) override { |
7c673cae FG |
271 | // judge whether is a time event |
272 | msgr->reap_dead(); | |
273 | } | |
274 | }; | |
275 | ||
276 | /******************* | |
277 | * AsyncMessenger | |
278 | */ | |
279 | ||
280 | AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, | |
f67539c2 | 281 | const std::string &type, std::string mname, uint64_t _nonce) |
9f95a23c | 282 | : SimplePolicyMessenger(cct, name), |
7c673cae | 283 | dispatch_queue(cct, this, mname), |
9f95a23c | 284 | nonce(_nonce) |
7c673cae FG |
285 | { |
286 | std::string transport_type = "posix"; | |
287 | if (type.find("rdma") != std::string::npos) | |
288 | transport_type = "rdma"; | |
289 | else if (type.find("dpdk") != std::string::npos) | |
290 | transport_type = "dpdk"; | |
291 | ||
11fdf7f2 TL |
292 | auto single = &cct->lookup_or_create_singleton_object<StackSingleton>( |
293 | "AsyncMessenger::NetworkStack::" + transport_type, true, cct); | |
7c673cae FG |
294 | single->ready(transport_type); |
295 | stack = single->stack.get(); | |
296 | stack->start(); | |
297 | local_worker = stack->get_worker(); | |
9f95a23c | 298 | local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, |
11fdf7f2 | 299 | local_worker, true, true); |
7c673cae FG |
300 | init_local_connection(); |
301 | reap_handler = new C_handle_reap(this); | |
302 | unsigned processor_num = 1; | |
303 | if (stack->support_local_listen_table()) | |
304 | processor_num = stack->get_num_worker(); | |
305 | for (unsigned i = 0; i < processor_num; ++i) | |
306 | processors.push_back(new Processor(this, stack->get_worker(i), cct)); | |
307 | } | |
308 | ||
309 | /** | |
310 | * Destroy the AsyncMessenger. Pretty simple since all the work is done | |
311 | * elsewhere. | |
312 | */ | |
313 | AsyncMessenger::~AsyncMessenger() | |
314 | { | |
315 | delete reap_handler; | |
11fdf7f2 | 316 | ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor |
7c673cae FG |
317 | for (auto &&p : processors) |
318 | delete p; | |
319 | } | |
320 | ||
321 | void AsyncMessenger::ready() | |
322 | { | |
11fdf7f2 | 323 | ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl; |
7c673cae FG |
324 | |
325 | stack->ready(); | |
326 | if (pending_bind) { | |
39ae355f | 327 | int err = bindv(pending_bind_addrs, saved_public_addrs); |
7c673cae FG |
328 | if (err) { |
329 | lderr(cct) << __func__ << " postponed bind failed" << dendl; | |
330 | ceph_abort(); | |
331 | } | |
332 | } | |
333 | ||
9f95a23c | 334 | std::lock_guard l{lock}; |
7c673cae FG |
335 | for (auto &&p : processors) |
336 | p->start(); | |
337 | dispatch_queue.start(); | |
338 | } | |
339 | ||
340 | int AsyncMessenger::shutdown() | |
341 | { | |
11fdf7f2 | 342 | ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl; |
7c673cae FG |
343 | |
344 | // done! clean up. | |
345 | for (auto &&p : processors) | |
346 | p->stop(); | |
347 | mark_down_all(); | |
348 | // break ref cycles on the loopback connection | |
9f95a23c TL |
349 | local_connection->clear_priv(); |
350 | local_connection->mark_down(); | |
7c673cae | 351 | did_bind = false; |
9f95a23c TL |
352 | lock.lock(); |
353 | stop_cond.notify_all(); | |
7c673cae | 354 | stopped = true; |
9f95a23c | 355 | lock.unlock(); |
7c673cae FG |
356 | stack->drain(); |
357 | return 0; | |
358 | } | |
359 | ||
39ae355f TL |
360 | int AsyncMessenger::bind(const entity_addr_t &bind_addr, |
361 | std::optional<entity_addrvec_t> public_addrs) | |
11fdf7f2 | 362 | { |
39ae355f TL |
363 | ldout(cct, 10) << __func__ << " " << bind_addr |
364 | << " public " << public_addrs << dendl; | |
11fdf7f2 TL |
365 | // old bind() can take entity_addr_t(). new bindv() can take a |
366 | // 0.0.0.0-like address but needs type and family to be set. | |
367 | auto a = bind_addr; | |
368 | if (a == entity_addr_t()) { | |
369 | a.set_type(entity_addr_t::TYPE_LEGACY); | |
370 | if (cct->_conf->ms_bind_ipv6) { | |
371 | a.set_family(AF_INET6); | |
372 | } else { | |
373 | a.set_family(AF_INET); | |
374 | } | |
375 | } | |
39ae355f | 376 | return bindv(entity_addrvec_t(a), public_addrs); |
11fdf7f2 TL |
377 | } |
378 | ||
39ae355f TL |
379 | int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs, |
380 | std::optional<entity_addrvec_t> public_addrs) | |
7c673cae | 381 | { |
9f95a23c | 382 | lock.lock(); |
7c673cae FG |
383 | |
384 | if (!pending_bind && started) { | |
385 | ldout(cct,10) << __func__ << " already started" << dendl; | |
9f95a23c | 386 | lock.unlock(); |
7c673cae FG |
387 | return -1; |
388 | } | |
389 | ||
39ae355f TL |
390 | ldout(cct, 10) << __func__ << " " << bind_addrs |
391 | << " public " << public_addrs << dendl; | |
392 | if (public_addrs && bind_addrs != public_addrs) { | |
393 | // for the sake of rebind() and the is-not-ready case let's | |
394 | // store public_addrs. there is no point in that if public | |
395 | // addrs are indifferent from bind_addrs. | |
396 | saved_public_addrs = std::move(public_addrs); | |
397 | } | |
7c673cae FG |
398 | |
399 | if (!stack->is_ready()) { | |
400 | ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl; | |
11fdf7f2 | 401 | pending_bind_addrs = bind_addrs; |
7c673cae | 402 | pending_bind = true; |
9f95a23c | 403 | lock.unlock(); |
7c673cae FG |
404 | return 0; |
405 | } | |
406 | ||
9f95a23c | 407 | lock.unlock(); |
7c673cae FG |
408 | |
409 | // bind to a socket | |
f67539c2 | 410 | std::set<int> avoid_ports; |
11fdf7f2 | 411 | entity_addrvec_t bound_addrs; |
7c673cae FG |
412 | unsigned i = 0; |
413 | for (auto &&p : processors) { | |
11fdf7f2 | 414 | int r = p->bind(bind_addrs, avoid_ports, &bound_addrs); |
7c673cae FG |
415 | if (r) { |
416 | // Note: this is related to local tcp listen table problem. | |
417 | // Posix(default kernel implementation) backend shares listen table | |
418 | // in the kernel, so all threads can use the same listen table naturally | |
419 | // and only one thread need to bind. But other backends(like dpdk) uses local | |
420 | // listen table, we need to bind/listen tcp port for each worker. So if the | |
421 | // first worker failed to bind, it could be think the normal error then handle | |
422 | // it, like port is used case. But if the first worker successfully to bind | |
423 | // but the second worker failed, it's not expected and we need to assert | |
424 | // here | |
11fdf7f2 | 425 | ceph_assert(i == 0); |
7c673cae FG |
426 | return r; |
427 | } | |
428 | ++i; | |
429 | } | |
11fdf7f2 | 430 | _finish_bind(bind_addrs, bound_addrs); |
7c673cae FG |
431 | return 0; |
432 | } | |
433 | ||
f67539c2 | 434 | int AsyncMessenger::rebind(const std::set<int>& avoid_ports) |
7c673cae FG |
435 | { |
436 | ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; | |
11fdf7f2 | 437 | ceph_assert(did_bind); |
7c673cae FG |
438 | |
439 | for (auto &&p : processors) | |
440 | p->stop(); | |
441 | mark_down_all(); | |
442 | ||
443 | // adjust the nonce; we want our entity_addr_t to be truly unique. | |
444 | nonce += 1000000; | |
445 | ldout(cct, 10) << __func__ << " new nonce " << nonce | |
11fdf7f2 | 446 | << " and addr " << get_myaddrs() << dendl; |
7c673cae | 447 | |
11fdf7f2 TL |
448 | entity_addrvec_t bound_addrs; |
449 | entity_addrvec_t bind_addrs = get_myaddrs(); | |
f67539c2 | 450 | std::set<int> new_avoid(avoid_ports); |
11fdf7f2 TL |
451 | for (auto& a : bind_addrs.v) { |
452 | new_avoid.insert(a.get_port()); | |
453 | a.set_port(0); | |
454 | } | |
455 | ldout(cct, 10) << __func__ << " will try " << bind_addrs | |
7c673cae FG |
456 | << " and avoid ports " << new_avoid << dendl; |
457 | unsigned i = 0; | |
458 | for (auto &&p : processors) { | |
11fdf7f2 | 459 | int r = p->bind(bind_addrs, avoid_ports, &bound_addrs); |
7c673cae | 460 | if (r) { |
11fdf7f2 | 461 | ceph_assert(i == 0); |
7c673cae FG |
462 | return r; |
463 | } | |
464 | ++i; | |
465 | } | |
11fdf7f2 | 466 | _finish_bind(bind_addrs, bound_addrs); |
7c673cae FG |
467 | for (auto &&p : processors) { |
468 | p->start(); | |
469 | } | |
470 | return 0; | |
471 | } | |
472 | ||
473 | int AsyncMessenger::client_bind(const entity_addr_t &bind_addr) | |
474 | { | |
31f18b77 FG |
475 | if (!cct->_conf->ms_bind_before_connect) |
476 | return 0; | |
9f95a23c | 477 | std::lock_guard l{lock}; |
7c673cae | 478 | if (did_bind) { |
7c673cae FG |
479 | return 0; |
480 | } | |
481 | if (started) { | |
482 | ldout(cct, 10) << __func__ << " already started" << dendl; | |
483 | return -1; | |
484 | } | |
485 | ldout(cct, 10) << __func__ << " " << bind_addr << dendl; | |
486 | ||
11fdf7f2 | 487 | set_myaddrs(entity_addrvec_t(bind_addr)); |
7c673cae FG |
488 | return 0; |
489 | } | |
490 | ||
11fdf7f2 TL |
491 | void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs, |
492 | const entity_addrvec_t& listen_addrs) | |
7c673cae | 493 | { |
11fdf7f2 TL |
494 | set_myaddrs(bind_addrs); |
495 | for (auto& a : bind_addrs.v) { | |
496 | if (!a.is_blank_ip()) { | |
497 | learned_addr(a); | |
498 | } | |
499 | } | |
7c673cae | 500 | |
11fdf7f2 TL |
501 | if (get_myaddrs().front().get_port() == 0) { |
502 | set_myaddrs(listen_addrs); | |
7c673cae | 503 | } |
05a536ef TL |
504 | |
505 | entity_addrvec_t newaddrs; | |
506 | if (saved_public_addrs) { | |
507 | newaddrs = *saved_public_addrs; | |
508 | for (auto& public_addr : newaddrs.v) { | |
509 | public_addr.set_nonce(nonce); | |
510 | if (public_addr.is_ip() && public_addr.get_port() == 0) { | |
511 | // port is not explicitly set. This is fine as it can be figured | |
512 | // out by msgr. For instance, the low-level `Processor::bind` | |
513 | // scans for free ports in a range controlled by ms_bind_port_min | |
514 | // and ms_bind_port_max. | |
515 | for (const auto& a : my_addrs->v) { | |
516 | if (public_addr.get_type() == a.get_type() && a.is_ip()) { | |
517 | public_addr.set_port(a.get_port()); | |
518 | } | |
519 | } | |
520 | } | |
521 | } | |
522 | } else { | |
523 | newaddrs = *my_addrs; | |
524 | for (auto& a : newaddrs.v) { | |
525 | a.set_nonce(nonce); | |
39ae355f | 526 | } |
11fdf7f2 TL |
527 | } |
528 | set_myaddrs(newaddrs); | |
7c673cae FG |
529 | |
530 | init_local_connection(); | |
531 | ||
11fdf7f2 | 532 | ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl; |
7c673cae FG |
533 | did_bind = true; |
534 | } | |
535 | ||
f6b5b4d7 TL |
536 | int AsyncMessenger::client_reset() |
537 | { | |
538 | mark_down_all(); | |
539 | ||
540 | std::scoped_lock l{lock}; | |
541 | // adjust the nonce; we want our entity_addr_t to be truly unique. | |
542 | nonce += 1000000; | |
543 | ldout(cct, 10) << __func__ << " new nonce " << nonce << dendl; | |
544 | ||
545 | entity_addrvec_t newaddrs = *my_addrs; | |
546 | for (auto& a : newaddrs.v) { | |
547 | a.set_nonce(nonce); | |
548 | } | |
549 | set_myaddrs(newaddrs); | |
550 | _init_local_connection(); | |
551 | return 0; | |
552 | } | |
553 | ||
7c673cae FG |
554 | int AsyncMessenger::start() |
555 | { | |
9f95a23c | 556 | std::scoped_lock l{lock}; |
7c673cae FG |
557 | ldout(cct,1) << __func__ << " start" << dendl; |
558 | ||
559 | // register at least one entity, first! | |
11fdf7f2 | 560 | ceph_assert(my_name.type() >= 0); |
7c673cae | 561 | |
11fdf7f2 | 562 | ceph_assert(!started); |
7c673cae FG |
563 | started = true; |
564 | stopped = false; | |
565 | ||
566 | if (!did_bind) { | |
11fdf7f2 TL |
567 | entity_addrvec_t newaddrs = *my_addrs; |
568 | for (auto& a : newaddrs.v) { | |
569 | a.nonce = nonce; | |
570 | } | |
571 | set_myaddrs(newaddrs); | |
7c673cae FG |
572 | _init_local_connection(); |
573 | } | |
574 | ||
7c673cae FG |
575 | return 0; |
576 | } | |
577 | ||
578 | void AsyncMessenger::wait() | |
579 | { | |
9f95a23c TL |
580 | { |
581 | std::unique_lock locker{lock}; | |
582 | if (!started) { | |
583 | return; | |
584 | } | |
585 | if (!stopped) | |
586 | stop_cond.wait(locker); | |
7c673cae | 587 | } |
7c673cae FG |
588 | dispatch_queue.shutdown(); |
589 | if (dispatch_queue.is_started()) { | |
590 | ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl; | |
591 | dispatch_queue.wait(); | |
592 | dispatch_queue.discard_local(); | |
593 | ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl; | |
594 | } | |
595 | ||
596 | // close all connections | |
597 | shutdown_connections(false); | |
598 | stack->drain(); | |
599 | ||
600 | ldout(cct, 10) << __func__ << ": done." << dendl; | |
601 | ldout(cct, 1) << __func__ << " complete." << dendl; | |
602 | started = false; | |
603 | } | |
604 | ||
11fdf7f2 TL |
605 | void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, |
606 | const entity_addr_t &listen_addr, | |
607 | const entity_addr_t &peer_addr) | |
7c673cae | 608 | { |
9f95a23c TL |
609 | std::lock_guard l{lock}; |
610 | auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w, | |
11fdf7f2 TL |
611 | listen_addr.is_msgr2(), false); |
612 | conn->accept(std::move(cli_socket), listen_addr, peer_addr); | |
7c673cae | 613 | accepting_conns.insert(conn); |
7c673cae FG |
614 | } |
615 | ||
11fdf7f2 | 616 | AsyncConnectionRef AsyncMessenger::create_connect( |
9f95a23c | 617 | const entity_addrvec_t& addrs, int type, bool anon) |
7c673cae | 618 | { |
9f95a23c | 619 | ceph_assert(ceph_mutex_is_locked(lock)); |
7c673cae | 620 | |
11fdf7f2 | 621 | ldout(cct, 10) << __func__ << " " << addrs |
7c673cae FG |
622 | << ", creating connection and registering" << dendl; |
623 | ||
11fdf7f2 TL |
624 | // here is where we decide which of the addrs to connect to. always prefer |
625 | // the first one, if we support it. | |
626 | entity_addr_t target; | |
627 | for (auto& a : addrs.v) { | |
628 | if (!a.is_msgr2() && !a.is_legacy()) { | |
629 | continue; | |
630 | } | |
631 | // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before | |
632 | // trying it? for now, just pick whichever is listed first. | |
633 | target = a; | |
634 | break; | |
635 | } | |
636 | ||
7c673cae FG |
637 | // create connection |
638 | Worker *w = stack->get_worker(); | |
9f95a23c | 639 | auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w, |
11fdf7f2 | 640 | target.is_msgr2(), false); |
9f95a23c | 641 | conn->anon = anon; |
11fdf7f2 | 642 | conn->connect(addrs, type, target); |
9f95a23c TL |
643 | if (anon) { |
644 | anon_conns.insert(conn); | |
645 | } else { | |
646 | ceph_assert(!conns.count(addrs)); | |
647 | ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " " | |
648 | << *conn->peer_addrs << dendl; | |
649 | conns[addrs] = conn; | |
650 | } | |
7c673cae FG |
651 | w->get_perf_counter()->inc(l_msgr_active_connections); |
652 | ||
653 | return conn; | |
654 | } | |
655 | ||
7c673cae | 656 | |
11fdf7f2 TL |
657 | ConnectionRef AsyncMessenger::get_loopback_connection() |
658 | { | |
659 | return local_connection; | |
660 | } | |
7c673cae | 661 | |
11fdf7f2 TL |
662 | bool AsyncMessenger::should_use_msgr2() |
663 | { | |
664 | // if we are bound to v1 only, and we are connecting to a v2 peer, | |
665 | // we cannot use the peer's v2 address. otherwise the connection | |
666 | // is assymetrical, because they would have to use v1 to connect | |
667 | // to us, and we would use v2, and connection race detection etc | |
668 | // would totally break down (among other things). or, the other | |
669 | // end will be confused that we advertise ourselve with a v1 | |
670 | // address only (that we bound to) but connected with protocol v2. | |
671 | return !did_bind || get_myaddrs().has_msgr2(); | |
7c673cae FG |
672 | } |
673 | ||
9f95a23c | 674 | entity_addrvec_t AsyncMessenger::_filter_addrs(const entity_addrvec_t& addrs) |
7c673cae | 675 | { |
11fdf7f2 | 676 | if (!should_use_msgr2()) { |
9f95a23c | 677 | ldout(cct, 10) << __func__ << " " << addrs << " limiting to v1 ()" << dendl; |
11fdf7f2 TL |
678 | entity_addrvec_t r; |
679 | for (auto& i : addrs.v) { | |
680 | if (i.is_msgr2()) { | |
681 | continue; | |
682 | } | |
683 | r.v.push_back(i); | |
684 | } | |
685 | return r; | |
686 | } else { | |
687 | return addrs; | |
688 | } | |
7c673cae FG |
689 | } |
690 | ||
11fdf7f2 | 691 | int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs) |
7c673cae | 692 | { |
11fdf7f2 TL |
693 | FUNCTRACE(cct); |
694 | ceph_assert(m); | |
7c673cae | 695 | |
9f95a23c | 696 | #if defined(WITH_EVENTTRACE) |
7c673cae FG |
697 | if (m->get_type() == CEPH_MSG_OSD_OP) |
698 | OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP"); | |
699 | else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) | |
700 | OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY"); | |
9f95a23c | 701 | #endif |
7c673cae | 702 | |
11fdf7f2 TL |
703 | ldout(cct, 1) << __func__ << "--> " << ceph_entity_type_name(type) << " " |
704 | << addrs << " -- " << *m << " -- ?+" | |
7c673cae FG |
705 | << m->get_data().length() << " " << m << dendl; |
706 | ||
11fdf7f2 | 707 | if (addrs.empty()) { |
7c673cae | 708 | ldout(cct,0) << __func__ << " message " << *m |
11fdf7f2 | 709 | << " with empty dest " << addrs << dendl; |
7c673cae FG |
710 | m->put(); |
711 | return -EINVAL; | |
712 | } | |
713 | ||
7c673cae FG |
714 | if (cct->_conf->ms_dump_on_send) { |
715 | m->encode(-1, MSG_CRC_ALL); | |
11fdf7f2 | 716 | ldout(cct, 0) << __func__ << " submit_message " << *m << "\n"; |
7c673cae FG |
717 | m->get_payload().hexdump(*_dout); |
718 | if (m->get_data().length() > 0) { | |
719 | *_dout << " data:\n"; | |
720 | m->get_data().hexdump(*_dout); | |
721 | } | |
722 | *_dout << dendl; | |
723 | m->clear_payload(); | |
724 | } | |
725 | ||
9f95a23c TL |
726 | connect_to(type, addrs, false)->send_message(m); |
727 | return 0; | |
728 | } | |
729 | ||
730 | ConnectionRef AsyncMessenger::connect_to(int type, | |
731 | const entity_addrvec_t& addrs, | |
732 | bool anon, bool not_local_dest) | |
733 | { | |
734 | if (!not_local_dest) { | |
735 | if (*my_addrs == addrs || | |
736 | (addrs.v.size() == 1 && | |
737 | my_addrs->contains(addrs.front()))) { | |
738 | // local | |
739 | return local_connection; | |
740 | } | |
7c673cae FG |
741 | } |
742 | ||
9f95a23c TL |
743 | auto av = _filter_addrs(addrs); |
744 | std::lock_guard l{lock}; | |
745 | if (anon) { | |
746 | return create_connect(av, type, anon); | |
7c673cae FG |
747 | } |
748 | ||
9f95a23c TL |
749 | AsyncConnectionRef conn = _lookup_conn(av); |
750 | if (conn) { | |
751 | ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl; | |
7c673cae | 752 | } else { |
9f95a23c TL |
753 | conn = create_connect(av, type, false); |
754 | ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl; | |
7c673cae | 755 | } |
9f95a23c TL |
756 | |
757 | return conn; | |
7c673cae FG |
758 | } |
759 | ||
760 | /** | |
11fdf7f2 | 761 | * If my_addr doesn't have an IP set, this function |
7c673cae FG |
762 | * will fill it in from the passed addr. Otherwise it does nothing and returns. |
763 | */ | |
11fdf7f2 | 764 | bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) |
7c673cae | 765 | { |
11fdf7f2 TL |
766 | ldout(cct,1) << __func__ << " " << addrs << dendl; |
767 | bool ret = false; | |
9f95a23c | 768 | std::lock_guard l{lock}; |
11fdf7f2 TL |
769 | |
770 | entity_addrvec_t newaddrs = *my_addrs; | |
771 | for (auto& a : newaddrs.v) { | |
772 | if (a.is_blank_ip()) { | |
773 | int type = a.get_type(); | |
774 | int port = a.get_port(); | |
775 | uint32_t nonce = a.get_nonce(); | |
776 | for (auto& b : addrs.v) { | |
777 | if (a.get_family() == b.get_family()) { | |
778 | ldout(cct,1) << __func__ << " assuming my addr " << a | |
779 | << " matches provided addr " << b << dendl; | |
780 | a = b; | |
781 | a.set_nonce(nonce); | |
782 | a.set_type(type); | |
783 | a.set_port(port); | |
784 | ret = true; | |
785 | break; | |
786 | } | |
787 | } | |
788 | } | |
789 | } | |
790 | set_myaddrs(newaddrs); | |
791 | if (ret) { | |
7c673cae FG |
792 | _init_local_connection(); |
793 | } | |
11fdf7f2 TL |
794 | ldout(cct,1) << __func__ << " now " << *my_addrs << dendl; |
795 | return ret; | |
7c673cae FG |
796 | } |
797 | ||
798 | void AsyncMessenger::shutdown_connections(bool queue_reset) | |
799 | { | |
800 | ldout(cct,1) << __func__ << " " << dendl; | |
9f95a23c TL |
801 | std::lock_guard l{lock}; |
802 | for (const auto& c : accepting_conns) { | |
803 | ldout(cct, 5) << __func__ << " accepting_conn " << c << dendl; | |
804 | c->stop(queue_reset); | |
7c673cae FG |
805 | } |
806 | accepting_conns.clear(); | |
807 | ||
9f95a23c TL |
808 | for (const auto& [e, c] : conns) { |
809 | ldout(cct, 5) << __func__ << " mark down " << e << " " << c << dendl; | |
9f95a23c | 810 | c->stop(queue_reset); |
7c673cae | 811 | } |
9f95a23c TL |
812 | conns.clear(); |
813 | ||
814 | for (const auto& c : anon_conns) { | |
815 | ldout(cct, 5) << __func__ << " mark down " << c << dendl; | |
9f95a23c TL |
816 | c->stop(queue_reset); |
817 | } | |
818 | anon_conns.clear(); | |
7c673cae FG |
819 | |
820 | { | |
9f95a23c | 821 | std::lock_guard l{deleted_lock}; |
a4b75251 TL |
822 | for (const auto& c : deleted_conns) { |
823 | ldout(cct, 5) << __func__ << " delete " << c << dendl; | |
824 | c->get_perf_counter()->dec(l_msgr_active_connections); | |
7c673cae | 825 | } |
9f95a23c | 826 | deleted_conns.clear(); |
7c673cae | 827 | } |
7c673cae FG |
828 | } |
829 | ||
11fdf7f2 | 830 | void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs) |
7c673cae | 831 | { |
9f95a23c TL |
832 | std::lock_guard l{lock}; |
833 | const AsyncConnectionRef& conn = _lookup_conn(addrs); | |
834 | if (conn) { | |
835 | ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl; | |
836 | conn->stop(true); | |
7c673cae | 837 | } else { |
11fdf7f2 | 838 | ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl; |
7c673cae | 839 | } |
7c673cae FG |
840 | } |
841 | ||
842 | int AsyncMessenger::get_proto_version(int peer_type, bool connect) const | |
843 | { | |
11fdf7f2 | 844 | int my_type = my_name.type(); |
7c673cae FG |
845 | |
846 | // set reply protocol version | |
847 | if (peer_type == my_type) { | |
848 | // internal | |
849 | return cluster_protocol; | |
850 | } else { | |
851 | // public | |
852 | switch (connect ? peer_type : my_type) { | |
853 | case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; | |
854 | case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; | |
855 | case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; | |
856 | } | |
857 | } | |
858 | return 0; | |
859 | } | |
860 | ||
9f95a23c | 861 | int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn) |
11fdf7f2 | 862 | { |
9f95a23c TL |
863 | std::lock_guard l{lock}; |
864 | if (conn->policy.server && | |
865 | conn->policy.lossy && | |
866 | !conn->policy.register_lossy_clients) { | |
867 | anon_conns.insert(conn); | |
868 | conn->get_perf_counter()->inc(l_msgr_active_connections); | |
869 | return 0; | |
870 | } | |
11fdf7f2 TL |
871 | auto it = conns.find(*conn->peer_addrs); |
872 | if (it != conns.end()) { | |
9f95a23c | 873 | auto& existing = it->second; |
11fdf7f2 TL |
874 | |
875 | // lazy delete, see "deleted_conns" | |
876 | // If conn already in, we will return 0 | |
9f95a23c | 877 | std::lock_guard l{deleted_lock}; |
11fdf7f2 | 878 | if (deleted_conns.erase(existing)) { |
a4b75251 | 879 | it->second->get_perf_counter()->dec(l_msgr_active_connections); |
11fdf7f2 TL |
880 | conns.erase(it); |
881 | } else if (conn != existing) { | |
882 | return -1; | |
883 | } | |
884 | } | |
885 | ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl; | |
886 | conns[*conn->peer_addrs] = conn; | |
887 | conn->get_perf_counter()->inc(l_msgr_active_connections); | |
888 | accepting_conns.erase(conn); | |
889 | return 0; | |
890 | } | |
891 | ||
892 | ||
893 | bool AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) | |
7c673cae FG |
894 | { |
895 | // be careful here: multiple threads may block here, and readers of | |
11fdf7f2 | 896 | // my_addr do NOT hold any lock. |
7c673cae FG |
897 | |
898 | // this always goes from true -> false under the protection of the | |
899 | // mutex. if it is already false, we need not retake the mutex at | |
900 | // all. | |
901 | if (!need_addr) | |
11fdf7f2 TL |
902 | return false; |
903 | std::lock_guard l(lock); | |
7c673cae | 904 | if (need_addr) { |
11fdf7f2 TL |
905 | if (my_addrs->empty()) { |
906 | auto a = peer_addr_for_me; | |
907 | a.set_type(entity_addr_t::TYPE_ANY); | |
908 | a.set_nonce(nonce); | |
909 | if (!did_bind) { | |
910 | a.set_port(0); | |
911 | } | |
912 | set_myaddrs(entity_addrvec_t(a)); | |
913 | ldout(cct,10) << __func__ << " had no addrs" << dendl; | |
914 | } else { | |
915 | // fix all addrs of the same family, regardless of type (msgr2 vs legacy) | |
916 | entity_addrvec_t newaddrs = *my_addrs; | |
917 | for (auto& a : newaddrs.v) { | |
918 | if (a.is_blank_ip() && | |
919 | a.get_family() == peer_addr_for_me.get_family()) { | |
920 | entity_addr_t t = peer_addr_for_me; | |
921 | if (!did_bind) { | |
922 | t.set_type(entity_addr_t::TYPE_ANY); | |
923 | t.set_port(0); | |
924 | } else { | |
925 | t.set_type(a.get_type()); | |
926 | t.set_port(a.get_port()); | |
927 | } | |
928 | t.set_nonce(a.get_nonce()); | |
929 | ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl; | |
930 | a = t; | |
931 | } | |
932 | } | |
933 | set_myaddrs(newaddrs); | |
934 | } | |
935 | ldout(cct, 1) << __func__ << " learned my addr " << *my_addrs | |
936 | << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl; | |
7c673cae | 937 | _init_local_connection(); |
11fdf7f2 TL |
938 | need_addr = false; |
939 | return true; | |
7c673cae | 940 | } |
11fdf7f2 | 941 | return false; |
7c673cae FG |
942 | } |
943 | ||
9f95a23c | 944 | void AsyncMessenger::reap_dead() |
7c673cae FG |
945 | { |
946 | ldout(cct, 1) << __func__ << " start" << dendl; | |
7c673cae | 947 | |
9f95a23c | 948 | std::lock_guard l1{lock}; |
7c673cae | 949 | |
9f95a23c TL |
950 | { |
951 | std::lock_guard l2{deleted_lock}; | |
952 | for (auto& c : deleted_conns) { | |
953 | ldout(cct, 5) << __func__ << " delete " << c << dendl; | |
954 | auto conns_it = conns.find(*c->peer_addrs); | |
955 | if (conns_it != conns.end() && conns_it->second == c) | |
956 | conns.erase(conns_it); | |
957 | accepting_conns.erase(c); | |
958 | anon_conns.erase(c); | |
a4b75251 | 959 | c->get_perf_counter()->dec(l_msgr_active_connections); |
9f95a23c TL |
960 | } |
961 | deleted_conns.clear(); | |
7c673cae | 962 | } |
7c673cae | 963 | } |