]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/AsyncMessenger.cc
f5dd03295e92add48cf3dd88e199c33e9589ce2b
[ceph.git] / ceph / src / msg / async / AsyncMessenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include "acconfig.h"
18
19 #include <iostream>
20 #include <fstream>
21
22 #include "AsyncMessenger.h"
23
24 #include "common/config.h"
25 #include "common/Timer.h"
26 #include "common/errno.h"
27
28 #include "messages/MOSDOp.h"
29 #include "messages/MOSDOpReply.h"
30 #include "common/EventTrace.h"
31
32 #define dout_subsys ceph_subsys_ms
33 #undef dout_prefix
34 #define dout_prefix _prefix(_dout, this)
35 static std::ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
36 return *_dout << "-- " << m->get_myaddrs() << " ";
37 }
38
39 static std::ostream& _prefix(std::ostream *_dout, Processor *p) {
40 return *_dout << " Processor -- ";
41 }
42
43
44 /*******************
45 * Processor
46 */
47
48 class Processor::C_processor_accept : public EventCallback {
49 Processor *pro;
50
51 public:
52 explicit C_processor_accept(Processor *p): pro(p) {}
53 void do_request(uint64_t id) override {
54 pro->accept();
55 }
56 };
57
58 Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
59 : msgr(r), net(c), worker(w),
60 listen_handler(new C_processor_accept(this)) {}
61
62 int Processor::bind(const entity_addrvec_t &bind_addrs,
63 const std::set<int>& avoid_ports,
64 entity_addrvec_t* bound_addrs)
65 {
66 const auto& conf = msgr->cct->_conf;
67 // bind to socket(s)
68 ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl;
69
70 SocketOptions opts;
71 opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
72 opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
73
74 listen_sockets.resize(bind_addrs.v.size());
75 *bound_addrs = bind_addrs;
76
77 for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
78 auto& listen_addr = bound_addrs->v[k];
79
80 /* bind to port */
81 int r = -1;
82
83 for (int i = 0; i < conf->ms_bind_retry_count; i++) {
84 if (i > 0) {
85 lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
86 << conf->ms_bind_retry_delay << " seconds " << dendl;
87 sleep(conf->ms_bind_retry_delay);
88 }
89
90 if (listen_addr.get_port()) {
91 worker->center.submit_to(
92 worker->center.get_id(),
93 [this, k, &listen_addr, &opts, &r]() {
94 r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
95 }, false);
96 if (r < 0) {
97 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
98 << ": " << cpp_strerror(r) << dendl;
99 continue;
100 }
101 } else {
102 // try a range of ports
103 for (int port = msgr->cct->_conf->ms_bind_port_min;
104 port <= msgr->cct->_conf->ms_bind_port_max;
105 port++) {
106 if (avoid_ports.count(port))
107 continue;
108
109 listen_addr.set_port(port);
110 worker->center.submit_to(
111 worker->center.get_id(),
112 [this, k, &listen_addr, &opts, &r]() {
113 r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
114 }, false);
115 if (r == 0)
116 break;
117 }
118 if (r < 0) {
119 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
120 << " on any port in range "
121 << msgr->cct->_conf->ms_bind_port_min
122 << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
123 << cpp_strerror(r) << dendl;
124 listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
125 continue;
126 }
127 ldout(msgr->cct, 10) << __func__ << " bound on random port "
128 << listen_addr << dendl;
129 }
130 if (r == 0) {
131 break;
132 }
133 }
134
135 // It seems that binding completely failed, return with that exit status
136 if (r < 0) {
137 lderr(msgr->cct) << __func__ << " was unable to bind after "
138 << conf->ms_bind_retry_count
139 << " attempts: " << cpp_strerror(r) << dendl;
140 for (unsigned j = 0; j < k; ++j) {
141 // clean up previous bind
142 listen_sockets[j].abort_accept();
143 }
144 return r;
145 }
146 }
147
148 ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl;
149 return 0;
150 }
151
152 void Processor::start()
153 {
154 ldout(msgr->cct, 1) << __func__ << dendl;
155
156 // start thread
157 worker->center.submit_to(worker->center.get_id(), [this]() {
158 for (auto& listen_socket : listen_sockets) {
159 if (listen_socket) {
160 if (listen_socket.fd() == -1) {
161 ldout(msgr->cct, 1) << __func__
162 << " Error: processor restart after listen_socket.fd closed. "
163 << this << dendl;
164 return;
165 }
166 worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE,
167 listen_handler); }
168 }
169 }, false);
170 }
171
172 void Processor::accept()
173 {
174 SocketOptions opts;
175 opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
176 opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
177 opts.priority = msgr->get_socket_priority();
178
179 for (auto& listen_socket : listen_sockets) {
180 ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd()
181 << dendl;
182 unsigned accept_error_num = 0;
183
184 while (true) {
185 entity_addr_t addr;
186 ConnectedSocket cli_socket;
187 Worker *w = worker;
188 if (!msgr->get_stack()->support_local_listen_table())
189 w = msgr->get_stack()->get_worker();
190 else
191 ++w->references;
192 int r = listen_socket.accept(&cli_socket, opts, &addr, w);
193 if (r == 0) {
194 ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd "
195 << cli_socket.fd() << dendl;
196
197 msgr->add_accept(
198 w, std::move(cli_socket),
199 msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
200 addr);
201 accept_error_num = 0;
202 continue;
203 } else {
204 --w->references;
205 if (r == -EINTR) {
206 continue;
207 } else if (r == -EAGAIN) {
208 break;
209 } else if (r == -EMFILE || r == -ENFILE) {
210 lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
211 << " errno " << r << " " << cpp_strerror(r) << dendl;
212 if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
213 lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
214 ceph_abort();
215 }
216 continue;
217 } else if (r == -ECONNABORTED) {
218 ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
219 << " errno " << r << " " << cpp_strerror(r) << dendl;
220 continue;
221 } else {
222 lderr(msgr->cct) << __func__ << " no incoming connection?"
223 << " errno " << r << " " << cpp_strerror(r) << dendl;
224 if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
225 lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
226 ceph_abort();
227 }
228 continue;
229 }
230 }
231 }
232 }
233 }
234
235 void Processor::stop()
236 {
237 ldout(msgr->cct,10) << __func__ << dendl;
238
239 worker->center.submit_to(worker->center.get_id(), [this]() {
240 for (auto& listen_socket : listen_sockets) {
241 if (listen_socket) {
242 worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
243 listen_socket.abort_accept();
244 }
245 }
246 }, false);
247 }
248
249
250 struct StackSingleton {
251 CephContext *cct;
252 std::shared_ptr<NetworkStack> stack;
253
254 explicit StackSingleton(CephContext *c): cct(c) {}
255 void ready(std::string &type) {
256 if (!stack)
257 stack = NetworkStack::create(cct, type);
258 }
259 ~StackSingleton() {
260 stack->stop();
261 }
262 };
263
264
265 class C_handle_reap : public EventCallback {
266 AsyncMessenger *msgr;
267
268 public:
269 explicit C_handle_reap(AsyncMessenger *m): msgr(m) {}
270 void do_request(uint64_t id) override {
271 // judge whether is a time event
272 msgr->reap_dead();
273 }
274 };
275
276 /*******************
277 * AsyncMessenger
278 */
279
280 AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
281 const std::string &type, std::string mname, uint64_t _nonce)
282 : SimplePolicyMessenger(cct, name),
283 dispatch_queue(cct, this, mname),
284 nonce(_nonce)
285 {
286 std::string transport_type = "posix";
287 if (type.find("rdma") != std::string::npos)
288 transport_type = "rdma";
289 else if (type.find("dpdk") != std::string::npos)
290 transport_type = "dpdk";
291
292 auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
293 "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
294 single->ready(transport_type);
295 stack = single->stack.get();
296 stack->start();
297 local_worker = stack->get_worker();
298 local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,
299 local_worker, true, true);
300 init_local_connection();
301 reap_handler = new C_handle_reap(this);
302 unsigned processor_num = 1;
303 if (stack->support_local_listen_table())
304 processor_num = stack->get_num_worker();
305 for (unsigned i = 0; i < processor_num; ++i)
306 processors.push_back(new Processor(this, stack->get_worker(i), cct));
307 }
308
309 /**
310 * Destroy the AsyncMessenger. Pretty simple since all the work is done
311 * elsewhere.
312 */
313 AsyncMessenger::~AsyncMessenger()
314 {
315 delete reap_handler;
316 ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
317 for (auto &&p : processors)
318 delete p;
319 }
320
321 void AsyncMessenger::ready()
322 {
323 ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
324
325 stack->ready();
326 if (pending_bind) {
327 int err = bindv(pending_bind_addrs);
328 if (err) {
329 lderr(cct) << __func__ << " postponed bind failed" << dendl;
330 ceph_abort();
331 }
332 }
333
334 std::lock_guard l{lock};
335 for (auto &&p : processors)
336 p->start();
337 dispatch_queue.start();
338 }
339
340 int AsyncMessenger::shutdown()
341 {
342 ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
343
344 // done! clean up.
345 for (auto &&p : processors)
346 p->stop();
347 mark_down_all();
348 // break ref cycles on the loopback connection
349 local_connection->clear_priv();
350 local_connection->mark_down();
351 did_bind = false;
352 lock.lock();
353 stop_cond.notify_all();
354 stopped = true;
355 lock.unlock();
356 stack->drain();
357 return 0;
358 }
359
360 int AsyncMessenger::bind(const entity_addr_t &bind_addr)
361 {
362 ldout(cct,10) << __func__ << " " << bind_addr << dendl;
363 // old bind() can take entity_addr_t(). new bindv() can take a
364 // 0.0.0.0-like address but needs type and family to be set.
365 auto a = bind_addr;
366 if (a == entity_addr_t()) {
367 a.set_type(entity_addr_t::TYPE_LEGACY);
368 if (cct->_conf->ms_bind_ipv6) {
369 a.set_family(AF_INET6);
370 } else {
371 a.set_family(AF_INET);
372 }
373 }
374 return bindv(entity_addrvec_t(a));
375 }
376
377 int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
378 {
379 lock.lock();
380
381 if (!pending_bind && started) {
382 ldout(cct,10) << __func__ << " already started" << dendl;
383 lock.unlock();
384 return -1;
385 }
386
387 ldout(cct,10) << __func__ << " " << bind_addrs << dendl;
388
389 if (!stack->is_ready()) {
390 ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
391 pending_bind_addrs = bind_addrs;
392 pending_bind = true;
393 lock.unlock();
394 return 0;
395 }
396
397 lock.unlock();
398
399 // bind to a socket
400 std::set<int> avoid_ports;
401 entity_addrvec_t bound_addrs;
402 unsigned i = 0;
403 for (auto &&p : processors) {
404 int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
405 if (r) {
406 // Note: this is related to local tcp listen table problem.
407 // Posix(default kernel implementation) backend shares listen table
408 // in the kernel, so all threads can use the same listen table naturally
409 // and only one thread need to bind. But other backends(like dpdk) uses local
410 // listen table, we need to bind/listen tcp port for each worker. So if the
411 // first worker failed to bind, it could be think the normal error then handle
412 // it, like port is used case. But if the first worker successfully to bind
413 // but the second worker failed, it's not expected and we need to assert
414 // here
415 ceph_assert(i == 0);
416 return r;
417 }
418 ++i;
419 }
420 _finish_bind(bind_addrs, bound_addrs);
421 return 0;
422 }
423
424 int AsyncMessenger::rebind(const std::set<int>& avoid_ports)
425 {
426 ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
427 ceph_assert(did_bind);
428
429 for (auto &&p : processors)
430 p->stop();
431 mark_down_all();
432
433 // adjust the nonce; we want our entity_addr_t to be truly unique.
434 nonce += 1000000;
435 ldout(cct, 10) << __func__ << " new nonce " << nonce
436 << " and addr " << get_myaddrs() << dendl;
437
438 entity_addrvec_t bound_addrs;
439 entity_addrvec_t bind_addrs = get_myaddrs();
440 std::set<int> new_avoid(avoid_ports);
441 for (auto& a : bind_addrs.v) {
442 new_avoid.insert(a.get_port());
443 a.set_port(0);
444 }
445 ldout(cct, 10) << __func__ << " will try " << bind_addrs
446 << " and avoid ports " << new_avoid << dendl;
447 unsigned i = 0;
448 for (auto &&p : processors) {
449 int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
450 if (r) {
451 ceph_assert(i == 0);
452 return r;
453 }
454 ++i;
455 }
456 _finish_bind(bind_addrs, bound_addrs);
457 for (auto &&p : processors) {
458 p->start();
459 }
460 return 0;
461 }
462
463 int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
464 {
465 if (!cct->_conf->ms_bind_before_connect)
466 return 0;
467 std::lock_guard l{lock};
468 if (did_bind) {
469 return 0;
470 }
471 if (started) {
472 ldout(cct, 10) << __func__ << " already started" << dendl;
473 return -1;
474 }
475 ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
476
477 set_myaddrs(entity_addrvec_t(bind_addr));
478 return 0;
479 }
480
481 void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs,
482 const entity_addrvec_t& listen_addrs)
483 {
484 set_myaddrs(bind_addrs);
485 for (auto& a : bind_addrs.v) {
486 if (!a.is_blank_ip()) {
487 learned_addr(a);
488 }
489 }
490
491 if (get_myaddrs().front().get_port() == 0) {
492 set_myaddrs(listen_addrs);
493 }
494 entity_addrvec_t newaddrs = *my_addrs;
495 for (auto& a : newaddrs.v) {
496 a.set_nonce(nonce);
497 }
498 set_myaddrs(newaddrs);
499
500 init_local_connection();
501
502 ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl;
503 did_bind = true;
504 }
505
506 int AsyncMessenger::client_reset()
507 {
508 mark_down_all();
509
510 std::scoped_lock l{lock};
511 // adjust the nonce; we want our entity_addr_t to be truly unique.
512 nonce += 1000000;
513 ldout(cct, 10) << __func__ << " new nonce " << nonce << dendl;
514
515 entity_addrvec_t newaddrs = *my_addrs;
516 for (auto& a : newaddrs.v) {
517 a.set_nonce(nonce);
518 }
519 set_myaddrs(newaddrs);
520 _init_local_connection();
521 return 0;
522 }
523
524 int AsyncMessenger::start()
525 {
526 std::scoped_lock l{lock};
527 ldout(cct,1) << __func__ << " start" << dendl;
528
529 // register at least one entity, first!
530 ceph_assert(my_name.type() >= 0);
531
532 ceph_assert(!started);
533 started = true;
534 stopped = false;
535
536 if (!did_bind) {
537 entity_addrvec_t newaddrs = *my_addrs;
538 for (auto& a : newaddrs.v) {
539 a.nonce = nonce;
540 }
541 set_myaddrs(newaddrs);
542 _init_local_connection();
543 }
544
545 return 0;
546 }
547
548 void AsyncMessenger::wait()
549 {
550 {
551 std::unique_lock locker{lock};
552 if (!started) {
553 return;
554 }
555 if (!stopped)
556 stop_cond.wait(locker);
557 }
558 dispatch_queue.shutdown();
559 if (dispatch_queue.is_started()) {
560 ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
561 dispatch_queue.wait();
562 dispatch_queue.discard_local();
563 ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl;
564 }
565
566 // close all connections
567 shutdown_connections(false);
568 stack->drain();
569
570 ldout(cct, 10) << __func__ << ": done." << dendl;
571 ldout(cct, 1) << __func__ << " complete." << dendl;
572 started = false;
573 }
574
575 void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
576 const entity_addr_t &listen_addr,
577 const entity_addr_t &peer_addr)
578 {
579 std::lock_guard l{lock};
580 auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
581 listen_addr.is_msgr2(), false);
582 conn->accept(std::move(cli_socket), listen_addr, peer_addr);
583 accepting_conns.insert(conn);
584 }
585
586 AsyncConnectionRef AsyncMessenger::create_connect(
587 const entity_addrvec_t& addrs, int type, bool anon)
588 {
589 ceph_assert(ceph_mutex_is_locked(lock));
590
591 ldout(cct, 10) << __func__ << " " << addrs
592 << ", creating connection and registering" << dendl;
593
594 // here is where we decide which of the addrs to connect to. always prefer
595 // the first one, if we support it.
596 entity_addr_t target;
597 for (auto& a : addrs.v) {
598 if (!a.is_msgr2() && !a.is_legacy()) {
599 continue;
600 }
601 // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
602 // trying it? for now, just pick whichever is listed first.
603 target = a;
604 break;
605 }
606
607 // create connection
608 Worker *w = stack->get_worker();
609 auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
610 target.is_msgr2(), false);
611 conn->anon = anon;
612 conn->connect(addrs, type, target);
613 if (anon) {
614 anon_conns.insert(conn);
615 } else {
616 ceph_assert(!conns.count(addrs));
617 ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
618 << *conn->peer_addrs << dendl;
619 conns[addrs] = conn;
620 }
621 w->get_perf_counter()->inc(l_msgr_active_connections);
622
623 return conn;
624 }
625
626
627 ConnectionRef AsyncMessenger::get_loopback_connection()
628 {
629 return local_connection;
630 }
631
632 bool AsyncMessenger::should_use_msgr2()
633 {
634 // if we are bound to v1 only, and we are connecting to a v2 peer,
635 // we cannot use the peer's v2 address. otherwise the connection
636 // is assymetrical, because they would have to use v1 to connect
637 // to us, and we would use v2, and connection race detection etc
638 // would totally break down (among other things). or, the other
639 // end will be confused that we advertise ourselve with a v1
640 // address only (that we bound to) but connected with protocol v2.
641 return !did_bind || get_myaddrs().has_msgr2();
642 }
643
644 entity_addrvec_t AsyncMessenger::_filter_addrs(const entity_addrvec_t& addrs)
645 {
646 if (!should_use_msgr2()) {
647 ldout(cct, 10) << __func__ << " " << addrs << " limiting to v1 ()" << dendl;
648 entity_addrvec_t r;
649 for (auto& i : addrs.v) {
650 if (i.is_msgr2()) {
651 continue;
652 }
653 r.v.push_back(i);
654 }
655 return r;
656 } else {
657 return addrs;
658 }
659 }
660
661 int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
662 {
663 FUNCTRACE(cct);
664 ceph_assert(m);
665
666 #if defined(WITH_EVENTTRACE)
667 if (m->get_type() == CEPH_MSG_OSD_OP)
668 OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
669 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
670 OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY");
671 #endif
672
673 ldout(cct, 1) << __func__ << "--> " << ceph_entity_type_name(type) << " "
674 << addrs << " -- " << *m << " -- ?+"
675 << m->get_data().length() << " " << m << dendl;
676
677 if (addrs.empty()) {
678 ldout(cct,0) << __func__ << " message " << *m
679 << " with empty dest " << addrs << dendl;
680 m->put();
681 return -EINVAL;
682 }
683
684 if (cct->_conf->ms_dump_on_send) {
685 m->encode(-1, MSG_CRC_ALL);
686 ldout(cct, 0) << __func__ << " submit_message " << *m << "\n";
687 m->get_payload().hexdump(*_dout);
688 if (m->get_data().length() > 0) {
689 *_dout << " data:\n";
690 m->get_data().hexdump(*_dout);
691 }
692 *_dout << dendl;
693 m->clear_payload();
694 }
695
696 connect_to(type, addrs, false)->send_message(m);
697 return 0;
698 }
699
700 ConnectionRef AsyncMessenger::connect_to(int type,
701 const entity_addrvec_t& addrs,
702 bool anon, bool not_local_dest)
703 {
704 if (!not_local_dest) {
705 if (*my_addrs == addrs ||
706 (addrs.v.size() == 1 &&
707 my_addrs->contains(addrs.front()))) {
708 // local
709 return local_connection;
710 }
711 }
712
713 auto av = _filter_addrs(addrs);
714 std::lock_guard l{lock};
715 if (anon) {
716 return create_connect(av, type, anon);
717 }
718
719 AsyncConnectionRef conn = _lookup_conn(av);
720 if (conn) {
721 ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl;
722 } else {
723 conn = create_connect(av, type, false);
724 ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl;
725 }
726
727 return conn;
728 }
729
730 /**
731 * If my_addr doesn't have an IP set, this function
732 * will fill it in from the passed addr. Otherwise it does nothing and returns.
733 */
734 bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
735 {
736 ldout(cct,1) << __func__ << " " << addrs << dendl;
737 bool ret = false;
738 std::lock_guard l{lock};
739
740 entity_addrvec_t newaddrs = *my_addrs;
741 for (auto& a : newaddrs.v) {
742 if (a.is_blank_ip()) {
743 int type = a.get_type();
744 int port = a.get_port();
745 uint32_t nonce = a.get_nonce();
746 for (auto& b : addrs.v) {
747 if (a.get_family() == b.get_family()) {
748 ldout(cct,1) << __func__ << " assuming my addr " << a
749 << " matches provided addr " << b << dendl;
750 a = b;
751 a.set_nonce(nonce);
752 a.set_type(type);
753 a.set_port(port);
754 ret = true;
755 break;
756 }
757 }
758 }
759 }
760 set_myaddrs(newaddrs);
761 if (ret) {
762 _init_local_connection();
763 }
764 ldout(cct,1) << __func__ << " now " << *my_addrs << dendl;
765 return ret;
766 }
767
768 void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
769 {
770 std::lock_guard l{lock};
771 auto t = addrs;
772 for (auto& a : t.v) {
773 a.set_nonce(nonce);
774 }
775 set_myaddrs(t);
776 _init_local_connection();
777 }
778
779 void AsyncMessenger::shutdown_connections(bool queue_reset)
780 {
781 ldout(cct,1) << __func__ << " " << dendl;
782 std::lock_guard l{lock};
783 for (const auto& c : accepting_conns) {
784 ldout(cct, 5) << __func__ << " accepting_conn " << c << dendl;
785 c->stop(queue_reset);
786 }
787 accepting_conns.clear();
788
789 for (const auto& [e, c] : conns) {
790 ldout(cct, 5) << __func__ << " mark down " << e << " " << c << dendl;
791 c->stop(queue_reset);
792 }
793 conns.clear();
794
795 for (const auto& c : anon_conns) {
796 ldout(cct, 5) << __func__ << " mark down " << c << dendl;
797 c->stop(queue_reset);
798 }
799 anon_conns.clear();
800
801 {
802 std::lock_guard l{deleted_lock};
803 for (const auto& c : deleted_conns) {
804 ldout(cct, 5) << __func__ << " delete " << c << dendl;
805 c->get_perf_counter()->dec(l_msgr_active_connections);
806 }
807 deleted_conns.clear();
808 }
809 }
810
811 void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
812 {
813 std::lock_guard l{lock};
814 const AsyncConnectionRef& conn = _lookup_conn(addrs);
815 if (conn) {
816 ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl;
817 conn->stop(true);
818 } else {
819 ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
820 }
821 }
822
823 int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
824 {
825 int my_type = my_name.type();
826
827 // set reply protocol version
828 if (peer_type == my_type) {
829 // internal
830 return cluster_protocol;
831 } else {
832 // public
833 switch (connect ? peer_type : my_type) {
834 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
835 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
836 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
837 }
838 }
839 return 0;
840 }
841
842 int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn)
843 {
844 std::lock_guard l{lock};
845 if (conn->policy.server &&
846 conn->policy.lossy &&
847 !conn->policy.register_lossy_clients) {
848 anon_conns.insert(conn);
849 conn->get_perf_counter()->inc(l_msgr_active_connections);
850 return 0;
851 }
852 auto it = conns.find(*conn->peer_addrs);
853 if (it != conns.end()) {
854 auto& existing = it->second;
855
856 // lazy delete, see "deleted_conns"
857 // If conn already in, we will return 0
858 std::lock_guard l{deleted_lock};
859 if (deleted_conns.erase(existing)) {
860 it->second->get_perf_counter()->dec(l_msgr_active_connections);
861 conns.erase(it);
862 } else if (conn != existing) {
863 return -1;
864 }
865 }
866 ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
867 conns[*conn->peer_addrs] = conn;
868 conn->get_perf_counter()->inc(l_msgr_active_connections);
869 accepting_conns.erase(conn);
870 return 0;
871 }
872
873
874 bool AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
875 {
876 // be careful here: multiple threads may block here, and readers of
877 // my_addr do NOT hold any lock.
878
879 // this always goes from true -> false under the protection of the
880 // mutex. if it is already false, we need not retake the mutex at
881 // all.
882 if (!need_addr)
883 return false;
884 std::lock_guard l(lock);
885 if (need_addr) {
886 if (my_addrs->empty()) {
887 auto a = peer_addr_for_me;
888 a.set_type(entity_addr_t::TYPE_ANY);
889 a.set_nonce(nonce);
890 if (!did_bind) {
891 a.set_port(0);
892 }
893 set_myaddrs(entity_addrvec_t(a));
894 ldout(cct,10) << __func__ << " had no addrs" << dendl;
895 } else {
896 // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
897 entity_addrvec_t newaddrs = *my_addrs;
898 for (auto& a : newaddrs.v) {
899 if (a.is_blank_ip() &&
900 a.get_family() == peer_addr_for_me.get_family()) {
901 entity_addr_t t = peer_addr_for_me;
902 if (!did_bind) {
903 t.set_type(entity_addr_t::TYPE_ANY);
904 t.set_port(0);
905 } else {
906 t.set_type(a.get_type());
907 t.set_port(a.get_port());
908 }
909 t.set_nonce(a.get_nonce());
910 ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl;
911 a = t;
912 }
913 }
914 set_myaddrs(newaddrs);
915 }
916 ldout(cct, 1) << __func__ << " learned my addr " << *my_addrs
917 << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl;
918 _init_local_connection();
919 need_addr = false;
920 return true;
921 }
922 return false;
923 }
924
925 void AsyncMessenger::reap_dead()
926 {
927 ldout(cct, 1) << __func__ << " start" << dendl;
928
929 std::lock_guard l1{lock};
930
931 {
932 std::lock_guard l2{deleted_lock};
933 for (auto& c : deleted_conns) {
934 ldout(cct, 5) << __func__ << " delete " << c << dendl;
935 auto conns_it = conns.find(*c->peer_addrs);
936 if (conns_it != conns.end() && conns_it->second == c)
937 conns.erase(conns_it);
938 accepting_conns.erase(c);
939 anon_conns.erase(c);
940 c->get_perf_counter()->dec(l_msgr_active_connections);
941 }
942 deleted_conns.clear();
943 }
944 }