]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/AsyncMessenger.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / msg / async / AsyncMessenger.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include "acconfig.h"
18
19#include <iostream>
20#include <fstream>
21
22#include "AsyncMessenger.h"
23
24#include "common/config.h"
25#include "common/Timer.h"
26#include "common/errno.h"
27
28#include "messages/MOSDOp.h"
29#include "messages/MOSDOpReply.h"
30#include "common/EventTrace.h"
31
32#define dout_subsys ceph_subsys_ms
33#undef dout_prefix
34#define dout_prefix _prefix(_dout, this)
f67539c2 35static std::ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
11fdf7f2 36 return *_dout << "-- " << m->get_myaddrs() << " ";
7c673cae
FG
37}
38
f67539c2 39static std::ostream& _prefix(std::ostream *_dout, Processor *p) {
7c673cae
FG
40 return *_dout << " Processor -- ";
41}
42
43
44/*******************
45 * Processor
46 */
47
48class Processor::C_processor_accept : public EventCallback {
49 Processor *pro;
50
51 public:
52 explicit C_processor_accept(Processor *p): pro(p) {}
11fdf7f2 53 void do_request(uint64_t id) override {
7c673cae
FG
54 pro->accept();
55 }
56};
57
58Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
59 : msgr(r), net(c), worker(w),
60 listen_handler(new C_processor_accept(this)) {}
61
11fdf7f2 62int Processor::bind(const entity_addrvec_t &bind_addrs,
f67539c2 63 const std::set<int>& avoid_ports,
11fdf7f2 64 entity_addrvec_t* bound_addrs)
7c673cae 65{
11fdf7f2
TL
66 const auto& conf = msgr->cct->_conf;
67 // bind to socket(s)
68 ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl;
7c673cae
FG
69
70 SocketOptions opts;
71 opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
72 opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
73
11fdf7f2
TL
74 listen_sockets.resize(bind_addrs.v.size());
75 *bound_addrs = bind_addrs;
7c673cae 76
11fdf7f2
TL
77 for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
78 auto& listen_addr = bound_addrs->v[k];
7c673cae 79
11fdf7f2
TL
80 /* bind to port */
81 int r = -1;
7c673cae 82
11fdf7f2
TL
83 for (int i = 0; i < conf->ms_bind_retry_count; i++) {
84 if (i > 0) {
85 lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
86 << conf->ms_bind_retry_delay << " seconds " << dendl;
87 sleep(conf->ms_bind_retry_delay);
7c673cae 88 }
11fdf7f2
TL
89
90 if (listen_addr.get_port()) {
91 worker->center.submit_to(
92 worker->center.get_id(),
93 [this, k, &listen_addr, &opts, &r]() {
94 r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
95 }, false);
96 if (r < 0) {
97 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
98 << ": " << cpp_strerror(r) << dendl;
99 continue;
100 }
101 } else {
102 // try a range of ports
103 for (int port = msgr->cct->_conf->ms_bind_port_min;
104 port <= msgr->cct->_conf->ms_bind_port_max;
105 port++) {
106 if (avoid_ports.count(port))
107 continue;
108
109 listen_addr.set_port(port);
110 worker->center.submit_to(
111 worker->center.get_id(),
112 [this, k, &listen_addr, &opts, &r]() {
113 r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
114 }, false);
115 if (r == 0)
116 break;
117 }
118 if (r < 0) {
119 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
120 << " on any port in range "
121 << msgr->cct->_conf->ms_bind_port_min
122 << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
123 << cpp_strerror(r) << dendl;
124 listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
125 continue;
126 }
127 ldout(msgr->cct, 10) << __func__ << " bound on random port "
128 << listen_addr << dendl;
7c673cae 129 }
11fdf7f2
TL
130 if (r == 0) {
131 break;
7c673cae 132 }
7c673cae 133 }
11fdf7f2
TL
134
135 // It seems that binding completely failed, return with that exit status
136 if (r < 0) {
137 lderr(msgr->cct) << __func__ << " was unable to bind after "
138 << conf->ms_bind_retry_count
139 << " attempts: " << cpp_strerror(r) << dendl;
140 for (unsigned j = 0; j < k; ++j) {
141 // clean up previous bind
142 listen_sockets[j].abort_accept();
143 }
144 return r;
145 }
7c673cae
FG
146 }
147
11fdf7f2 148 ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl;
7c673cae
FG
149 return 0;
150}
151
152void Processor::start()
153{
154 ldout(msgr->cct, 1) << __func__ << dendl;
155
156 // start thread
11fdf7f2 157 worker->center.submit_to(worker->center.get_id(), [this]() {
9f95a23c
TL
158 for (auto& listen_socket : listen_sockets) {
159 if (listen_socket) {
160 if (listen_socket.fd() == -1) {
161 ldout(msgr->cct, 1) << __func__
162 << " Error: processor restart after listen_socket.fd closed. "
163 << this << dendl;
164 return;
165 }
166 worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE,
11fdf7f2
TL
167 listen_handler); }
168 }
169 }, false);
7c673cae
FG
170}
171
172void Processor::accept()
173{
7c673cae
FG
174 SocketOptions opts;
175 opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
176 opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
177 opts.priority = msgr->get_socket_priority();
11fdf7f2
TL
178
179 for (auto& listen_socket : listen_sockets) {
180 ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd()
181 << dendl;
182 unsigned accept_error_num = 0;
183
184 while (true) {
185 entity_addr_t addr;
186 ConnectedSocket cli_socket;
187 Worker *w = worker;
188 if (!msgr->get_stack()->support_local_listen_table())
189 w = msgr->get_stack()->get_worker();
190 else
191 ++w->references;
192 int r = listen_socket.accept(&cli_socket, opts, &addr, w);
193 if (r == 0) {
194 ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd "
195 << cli_socket.fd() << dendl;
196
197 msgr->add_accept(
198 w, std::move(cli_socket),
199 msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
200 addr);
201 accept_error_num = 0;
91327a77 202 continue;
7c673cae 203 } else {
9f95a23c 204 --w->references;
11fdf7f2
TL
205 if (r == -EINTR) {
206 continue;
207 } else if (r == -EAGAIN) {
208 break;
209 } else if (r == -EMFILE || r == -ENFILE) {
210 lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
211 << " errno " << r << " " << cpp_strerror(r) << dendl;
212 if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
213 lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
214 ceph_abort();
215 }
216 continue;
217 } else if (r == -ECONNABORTED) {
218 ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
219 << " errno " << r << " " << cpp_strerror(r) << dendl;
220 continue;
221 } else {
222 lderr(msgr->cct) << __func__ << " no incoming connection?"
223 << " errno " << r << " " << cpp_strerror(r) << dendl;
224 if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
225 lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
226 ceph_abort();
227 }
228 continue;
91327a77 229 }
7c673cae
FG
230 }
231 }
232 }
233}
234
235void Processor::stop()
236{
237 ldout(msgr->cct,10) << __func__ << dendl;
238
11fdf7f2
TL
239 worker->center.submit_to(worker->center.get_id(), [this]() {
240 for (auto& listen_socket : listen_sockets) {
241 if (listen_socket) {
242 worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
243 listen_socket.abort_accept();
244 }
245 }
7c673cae 246 }, false);
7c673cae
FG
247}
248
249
250struct StackSingleton {
251 CephContext *cct;
252 std::shared_ptr<NetworkStack> stack;
253
11fdf7f2 254 explicit StackSingleton(CephContext *c): cct(c) {}
7c673cae
FG
255 void ready(std::string &type) {
256 if (!stack)
257 stack = NetworkStack::create(cct, type);
258 }
259 ~StackSingleton() {
260 stack->stop();
261 }
262};
263
264
265class C_handle_reap : public EventCallback {
266 AsyncMessenger *msgr;
267
268 public:
269 explicit C_handle_reap(AsyncMessenger *m): msgr(m) {}
11fdf7f2 270 void do_request(uint64_t id) override {
7c673cae
FG
271 // judge whether is a time event
272 msgr->reap_dead();
273 }
274};
275
276/*******************
277 * AsyncMessenger
278 */
279
280AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
f67539c2 281 const std::string &type, std::string mname, uint64_t _nonce)
9f95a23c 282 : SimplePolicyMessenger(cct, name),
7c673cae 283 dispatch_queue(cct, this, mname),
9f95a23c 284 nonce(_nonce)
7c673cae
FG
285{
286 std::string transport_type = "posix";
287 if (type.find("rdma") != std::string::npos)
288 transport_type = "rdma";
289 else if (type.find("dpdk") != std::string::npos)
290 transport_type = "dpdk";
291
11fdf7f2
TL
292 auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
293 "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
7c673cae
FG
294 single->ready(transport_type);
295 stack = single->stack.get();
296 stack->start();
297 local_worker = stack->get_worker();
9f95a23c 298 local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,
11fdf7f2 299 local_worker, true, true);
7c673cae
FG
300 init_local_connection();
301 reap_handler = new C_handle_reap(this);
302 unsigned processor_num = 1;
303 if (stack->support_local_listen_table())
304 processor_num = stack->get_num_worker();
305 for (unsigned i = 0; i < processor_num; ++i)
306 processors.push_back(new Processor(this, stack->get_worker(i), cct));
307}
308
309/**
310 * Destroy the AsyncMessenger. Pretty simple since all the work is done
311 * elsewhere.
312 */
313AsyncMessenger::~AsyncMessenger()
314{
315 delete reap_handler;
11fdf7f2 316 ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
7c673cae
FG
317 for (auto &&p : processors)
318 delete p;
319}
320
321void AsyncMessenger::ready()
322{
11fdf7f2 323 ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
7c673cae
FG
324
325 stack->ready();
326 if (pending_bind) {
39ae355f 327 int err = bindv(pending_bind_addrs, saved_public_addrs);
7c673cae
FG
328 if (err) {
329 lderr(cct) << __func__ << " postponed bind failed" << dendl;
330 ceph_abort();
331 }
332 }
333
9f95a23c 334 std::lock_guard l{lock};
7c673cae
FG
335 for (auto &&p : processors)
336 p->start();
337 dispatch_queue.start();
338}
339
340int AsyncMessenger::shutdown()
341{
11fdf7f2 342 ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
7c673cae
FG
343
344 // done! clean up.
345 for (auto &&p : processors)
346 p->stop();
347 mark_down_all();
348 // break ref cycles on the loopback connection
9f95a23c
TL
349 local_connection->clear_priv();
350 local_connection->mark_down();
7c673cae 351 did_bind = false;
9f95a23c
TL
352 lock.lock();
353 stop_cond.notify_all();
7c673cae 354 stopped = true;
9f95a23c 355 lock.unlock();
7c673cae
FG
356 stack->drain();
357 return 0;
358}
359
39ae355f
TL
360int AsyncMessenger::bind(const entity_addr_t &bind_addr,
361 std::optional<entity_addrvec_t> public_addrs)
11fdf7f2 362{
39ae355f
TL
363 ldout(cct, 10) << __func__ << " " << bind_addr
364 << " public " << public_addrs << dendl;
11fdf7f2
TL
365 // old bind() can take entity_addr_t(). new bindv() can take a
366 // 0.0.0.0-like address but needs type and family to be set.
367 auto a = bind_addr;
368 if (a == entity_addr_t()) {
369 a.set_type(entity_addr_t::TYPE_LEGACY);
370 if (cct->_conf->ms_bind_ipv6) {
371 a.set_family(AF_INET6);
372 } else {
373 a.set_family(AF_INET);
374 }
375 }
39ae355f 376 return bindv(entity_addrvec_t(a), public_addrs);
11fdf7f2
TL
377}
378
39ae355f
TL
379int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs,
380 std::optional<entity_addrvec_t> public_addrs)
7c673cae 381{
9f95a23c 382 lock.lock();
7c673cae
FG
383
384 if (!pending_bind && started) {
385 ldout(cct,10) << __func__ << " already started" << dendl;
9f95a23c 386 lock.unlock();
7c673cae
FG
387 return -1;
388 }
389
39ae355f
TL
390 ldout(cct, 10) << __func__ << " " << bind_addrs
391 << " public " << public_addrs << dendl;
392 if (public_addrs && bind_addrs != public_addrs) {
393 // for the sake of rebind() and the is-not-ready case let's
394 // store public_addrs. there is no point in that if public
395 // addrs are indifferent from bind_addrs.
396 saved_public_addrs = std::move(public_addrs);
397 }
7c673cae
FG
398
399 if (!stack->is_ready()) {
400 ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
11fdf7f2 401 pending_bind_addrs = bind_addrs;
7c673cae 402 pending_bind = true;
9f95a23c 403 lock.unlock();
7c673cae
FG
404 return 0;
405 }
406
9f95a23c 407 lock.unlock();
7c673cae
FG
408
409 // bind to a socket
f67539c2 410 std::set<int> avoid_ports;
11fdf7f2 411 entity_addrvec_t bound_addrs;
7c673cae
FG
412 unsigned i = 0;
413 for (auto &&p : processors) {
11fdf7f2 414 int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
7c673cae
FG
415 if (r) {
416 // Note: this is related to local tcp listen table problem.
417 // Posix(default kernel implementation) backend shares listen table
418 // in the kernel, so all threads can use the same listen table naturally
419 // and only one thread need to bind. But other backends(like dpdk) uses local
420 // listen table, we need to bind/listen tcp port for each worker. So if the
421 // first worker failed to bind, it could be think the normal error then handle
422 // it, like port is used case. But if the first worker successfully to bind
423 // but the second worker failed, it's not expected and we need to assert
424 // here
11fdf7f2 425 ceph_assert(i == 0);
7c673cae
FG
426 return r;
427 }
428 ++i;
429 }
11fdf7f2 430 _finish_bind(bind_addrs, bound_addrs);
7c673cae
FG
431 return 0;
432}
433
f67539c2 434int AsyncMessenger::rebind(const std::set<int>& avoid_ports)
7c673cae
FG
435{
436 ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
11fdf7f2 437 ceph_assert(did_bind);
7c673cae
FG
438
439 for (auto &&p : processors)
440 p->stop();
441 mark_down_all();
442
443 // adjust the nonce; we want our entity_addr_t to be truly unique.
444 nonce += 1000000;
445 ldout(cct, 10) << __func__ << " new nonce " << nonce
11fdf7f2 446 << " and addr " << get_myaddrs() << dendl;
7c673cae 447
11fdf7f2
TL
448 entity_addrvec_t bound_addrs;
449 entity_addrvec_t bind_addrs = get_myaddrs();
f67539c2 450 std::set<int> new_avoid(avoid_ports);
11fdf7f2
TL
451 for (auto& a : bind_addrs.v) {
452 new_avoid.insert(a.get_port());
453 a.set_port(0);
454 }
455 ldout(cct, 10) << __func__ << " will try " << bind_addrs
7c673cae
FG
456 << " and avoid ports " << new_avoid << dendl;
457 unsigned i = 0;
458 for (auto &&p : processors) {
11fdf7f2 459 int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
7c673cae 460 if (r) {
11fdf7f2 461 ceph_assert(i == 0);
7c673cae
FG
462 return r;
463 }
464 ++i;
465 }
11fdf7f2 466 _finish_bind(bind_addrs, bound_addrs);
7c673cae
FG
467 for (auto &&p : processors) {
468 p->start();
469 }
470 return 0;
471}
472
473int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
474{
31f18b77
FG
475 if (!cct->_conf->ms_bind_before_connect)
476 return 0;
9f95a23c 477 std::lock_guard l{lock};
7c673cae 478 if (did_bind) {
7c673cae
FG
479 return 0;
480 }
481 if (started) {
482 ldout(cct, 10) << __func__ << " already started" << dendl;
483 return -1;
484 }
485 ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
486
11fdf7f2 487 set_myaddrs(entity_addrvec_t(bind_addr));
7c673cae
FG
488 return 0;
489}
490
11fdf7f2
TL
491void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs,
492 const entity_addrvec_t& listen_addrs)
7c673cae 493{
11fdf7f2
TL
494 set_myaddrs(bind_addrs);
495 for (auto& a : bind_addrs.v) {
496 if (!a.is_blank_ip()) {
497 learned_addr(a);
498 }
499 }
7c673cae 500
11fdf7f2
TL
501 if (get_myaddrs().front().get_port() == 0) {
502 set_myaddrs(listen_addrs);
7c673cae 503 }
05a536ef
TL
504
505 entity_addrvec_t newaddrs;
506 if (saved_public_addrs) {
507 newaddrs = *saved_public_addrs;
508 for (auto& public_addr : newaddrs.v) {
509 public_addr.set_nonce(nonce);
510 if (public_addr.is_ip() && public_addr.get_port() == 0) {
511 // port is not explicitly set. This is fine as it can be figured
512 // out by msgr. For instance, the low-level `Processor::bind`
513 // scans for free ports in a range controlled by ms_bind_port_min
514 // and ms_bind_port_max.
515 for (const auto& a : my_addrs->v) {
516 if (public_addr.get_type() == a.get_type() && a.is_ip()) {
517 public_addr.set_port(a.get_port());
518 }
519 }
520 }
521 }
522 } else {
523 newaddrs = *my_addrs;
524 for (auto& a : newaddrs.v) {
525 a.set_nonce(nonce);
39ae355f 526 }
11fdf7f2
TL
527 }
528 set_myaddrs(newaddrs);
7c673cae
FG
529
530 init_local_connection();
531
11fdf7f2 532 ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl;
7c673cae
FG
533 did_bind = true;
534}
535
f6b5b4d7
TL
536int AsyncMessenger::client_reset()
537{
538 mark_down_all();
539
540 std::scoped_lock l{lock};
541 // adjust the nonce; we want our entity_addr_t to be truly unique.
542 nonce += 1000000;
543 ldout(cct, 10) << __func__ << " new nonce " << nonce << dendl;
544
545 entity_addrvec_t newaddrs = *my_addrs;
546 for (auto& a : newaddrs.v) {
547 a.set_nonce(nonce);
548 }
549 set_myaddrs(newaddrs);
550 _init_local_connection();
551 return 0;
552}
553
7c673cae
FG
554int AsyncMessenger::start()
555{
9f95a23c 556 std::scoped_lock l{lock};
7c673cae
FG
557 ldout(cct,1) << __func__ << " start" << dendl;
558
559 // register at least one entity, first!
11fdf7f2 560 ceph_assert(my_name.type() >= 0);
7c673cae 561
11fdf7f2 562 ceph_assert(!started);
7c673cae
FG
563 started = true;
564 stopped = false;
565
566 if (!did_bind) {
11fdf7f2
TL
567 entity_addrvec_t newaddrs = *my_addrs;
568 for (auto& a : newaddrs.v) {
569 a.nonce = nonce;
570 }
571 set_myaddrs(newaddrs);
7c673cae
FG
572 _init_local_connection();
573 }
574
7c673cae
FG
575 return 0;
576}
577
578void AsyncMessenger::wait()
579{
9f95a23c
TL
580 {
581 std::unique_lock locker{lock};
582 if (!started) {
583 return;
584 }
585 if (!stopped)
586 stop_cond.wait(locker);
7c673cae 587 }
7c673cae
FG
588 dispatch_queue.shutdown();
589 if (dispatch_queue.is_started()) {
590 ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
591 dispatch_queue.wait();
592 dispatch_queue.discard_local();
593 ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl;
594 }
595
596 // close all connections
597 shutdown_connections(false);
598 stack->drain();
599
600 ldout(cct, 10) << __func__ << ": done." << dendl;
601 ldout(cct, 1) << __func__ << " complete." << dendl;
602 started = false;
603}
604
11fdf7f2
TL
605void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
606 const entity_addr_t &listen_addr,
607 const entity_addr_t &peer_addr)
7c673cae 608{
9f95a23c
TL
609 std::lock_guard l{lock};
610 auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
11fdf7f2
TL
611 listen_addr.is_msgr2(), false);
612 conn->accept(std::move(cli_socket), listen_addr, peer_addr);
7c673cae 613 accepting_conns.insert(conn);
7c673cae
FG
614}
615
11fdf7f2 616AsyncConnectionRef AsyncMessenger::create_connect(
9f95a23c 617 const entity_addrvec_t& addrs, int type, bool anon)
7c673cae 618{
9f95a23c 619 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae 620
11fdf7f2 621 ldout(cct, 10) << __func__ << " " << addrs
7c673cae
FG
622 << ", creating connection and registering" << dendl;
623
11fdf7f2
TL
624 // here is where we decide which of the addrs to connect to. always prefer
625 // the first one, if we support it.
626 entity_addr_t target;
627 for (auto& a : addrs.v) {
628 if (!a.is_msgr2() && !a.is_legacy()) {
629 continue;
630 }
631 // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
632 // trying it? for now, just pick whichever is listed first.
633 target = a;
634 break;
635 }
636
7c673cae
FG
637 // create connection
638 Worker *w = stack->get_worker();
9f95a23c 639 auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
11fdf7f2 640 target.is_msgr2(), false);
9f95a23c 641 conn->anon = anon;
11fdf7f2 642 conn->connect(addrs, type, target);
9f95a23c
TL
643 if (anon) {
644 anon_conns.insert(conn);
645 } else {
646 ceph_assert(!conns.count(addrs));
647 ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
648 << *conn->peer_addrs << dendl;
649 conns[addrs] = conn;
650 }
7c673cae
FG
651 w->get_perf_counter()->inc(l_msgr_active_connections);
652
653 return conn;
654}
655
7c673cae 656
11fdf7f2
TL
657ConnectionRef AsyncMessenger::get_loopback_connection()
658{
659 return local_connection;
660}
7c673cae 661
11fdf7f2
TL
662bool AsyncMessenger::should_use_msgr2()
663{
664 // if we are bound to v1 only, and we are connecting to a v2 peer,
665 // we cannot use the peer's v2 address. otherwise the connection
666 // is assymetrical, because they would have to use v1 to connect
667 // to us, and we would use v2, and connection race detection etc
668 // would totally break down (among other things). or, the other
669 // end will be confused that we advertise ourselve with a v1
670 // address only (that we bound to) but connected with protocol v2.
671 return !did_bind || get_myaddrs().has_msgr2();
7c673cae
FG
672}
673
9f95a23c 674entity_addrvec_t AsyncMessenger::_filter_addrs(const entity_addrvec_t& addrs)
7c673cae 675{
11fdf7f2 676 if (!should_use_msgr2()) {
9f95a23c 677 ldout(cct, 10) << __func__ << " " << addrs << " limiting to v1 ()" << dendl;
11fdf7f2
TL
678 entity_addrvec_t r;
679 for (auto& i : addrs.v) {
680 if (i.is_msgr2()) {
681 continue;
682 }
683 r.v.push_back(i);
684 }
685 return r;
686 } else {
687 return addrs;
688 }
7c673cae
FG
689}
690
11fdf7f2 691int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
7c673cae 692{
11fdf7f2
TL
693 FUNCTRACE(cct);
694 ceph_assert(m);
7c673cae 695
9f95a23c 696#if defined(WITH_EVENTTRACE)
7c673cae
FG
697 if (m->get_type() == CEPH_MSG_OSD_OP)
698 OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
699 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
700 OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY");
9f95a23c 701#endif
7c673cae 702
11fdf7f2
TL
703 ldout(cct, 1) << __func__ << "--> " << ceph_entity_type_name(type) << " "
704 << addrs << " -- " << *m << " -- ?+"
7c673cae
FG
705 << m->get_data().length() << " " << m << dendl;
706
11fdf7f2 707 if (addrs.empty()) {
7c673cae 708 ldout(cct,0) << __func__ << " message " << *m
11fdf7f2 709 << " with empty dest " << addrs << dendl;
7c673cae
FG
710 m->put();
711 return -EINVAL;
712 }
713
7c673cae
FG
714 if (cct->_conf->ms_dump_on_send) {
715 m->encode(-1, MSG_CRC_ALL);
11fdf7f2 716 ldout(cct, 0) << __func__ << " submit_message " << *m << "\n";
7c673cae
FG
717 m->get_payload().hexdump(*_dout);
718 if (m->get_data().length() > 0) {
719 *_dout << " data:\n";
720 m->get_data().hexdump(*_dout);
721 }
722 *_dout << dendl;
723 m->clear_payload();
724 }
725
9f95a23c
TL
726 connect_to(type, addrs, false)->send_message(m);
727 return 0;
728}
729
730ConnectionRef AsyncMessenger::connect_to(int type,
731 const entity_addrvec_t& addrs,
732 bool anon, bool not_local_dest)
733{
734 if (!not_local_dest) {
735 if (*my_addrs == addrs ||
736 (addrs.v.size() == 1 &&
737 my_addrs->contains(addrs.front()))) {
738 // local
739 return local_connection;
740 }
7c673cae
FG
741 }
742
9f95a23c
TL
743 auto av = _filter_addrs(addrs);
744 std::lock_guard l{lock};
745 if (anon) {
746 return create_connect(av, type, anon);
7c673cae
FG
747 }
748
9f95a23c
TL
749 AsyncConnectionRef conn = _lookup_conn(av);
750 if (conn) {
751 ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl;
7c673cae 752 } else {
9f95a23c
TL
753 conn = create_connect(av, type, false);
754 ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl;
7c673cae 755 }
9f95a23c
TL
756
757 return conn;
7c673cae
FG
758}
759
760/**
11fdf7f2 761 * If my_addr doesn't have an IP set, this function
7c673cae
FG
762 * will fill it in from the passed addr. Otherwise it does nothing and returns.
763 */
11fdf7f2 764bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
7c673cae 765{
11fdf7f2
TL
766 ldout(cct,1) << __func__ << " " << addrs << dendl;
767 bool ret = false;
9f95a23c 768 std::lock_guard l{lock};
11fdf7f2
TL
769
770 entity_addrvec_t newaddrs = *my_addrs;
771 for (auto& a : newaddrs.v) {
772 if (a.is_blank_ip()) {
773 int type = a.get_type();
774 int port = a.get_port();
775 uint32_t nonce = a.get_nonce();
776 for (auto& b : addrs.v) {
777 if (a.get_family() == b.get_family()) {
778 ldout(cct,1) << __func__ << " assuming my addr " << a
779 << " matches provided addr " << b << dendl;
780 a = b;
781 a.set_nonce(nonce);
782 a.set_type(type);
783 a.set_port(port);
784 ret = true;
785 break;
786 }
787 }
788 }
789 }
790 set_myaddrs(newaddrs);
791 if (ret) {
7c673cae
FG
792 _init_local_connection();
793 }
11fdf7f2
TL
794 ldout(cct,1) << __func__ << " now " << *my_addrs << dendl;
795 return ret;
7c673cae
FG
796}
797
798void AsyncMessenger::shutdown_connections(bool queue_reset)
799{
800 ldout(cct,1) << __func__ << " " << dendl;
9f95a23c
TL
801 std::lock_guard l{lock};
802 for (const auto& c : accepting_conns) {
803 ldout(cct, 5) << __func__ << " accepting_conn " << c << dendl;
804 c->stop(queue_reset);
7c673cae
FG
805 }
806 accepting_conns.clear();
807
9f95a23c
TL
808 for (const auto& [e, c] : conns) {
809 ldout(cct, 5) << __func__ << " mark down " << e << " " << c << dendl;
9f95a23c 810 c->stop(queue_reset);
7c673cae 811 }
9f95a23c
TL
812 conns.clear();
813
814 for (const auto& c : anon_conns) {
815 ldout(cct, 5) << __func__ << " mark down " << c << dendl;
9f95a23c
TL
816 c->stop(queue_reset);
817 }
818 anon_conns.clear();
7c673cae
FG
819
820 {
9f95a23c 821 std::lock_guard l{deleted_lock};
a4b75251
TL
822 for (const auto& c : deleted_conns) {
823 ldout(cct, 5) << __func__ << " delete " << c << dendl;
824 c->get_perf_counter()->dec(l_msgr_active_connections);
7c673cae 825 }
9f95a23c 826 deleted_conns.clear();
7c673cae 827 }
7c673cae
FG
828}
829
11fdf7f2 830void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
7c673cae 831{
9f95a23c
TL
832 std::lock_guard l{lock};
833 const AsyncConnectionRef& conn = _lookup_conn(addrs);
834 if (conn) {
835 ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl;
836 conn->stop(true);
7c673cae 837 } else {
11fdf7f2 838 ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
7c673cae 839 }
7c673cae
FG
840}
841
842int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
843{
11fdf7f2 844 int my_type = my_name.type();
7c673cae
FG
845
846 // set reply protocol version
847 if (peer_type == my_type) {
848 // internal
849 return cluster_protocol;
850 } else {
851 // public
852 switch (connect ? peer_type : my_type) {
853 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
854 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
855 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
856 }
857 }
858 return 0;
859}
860
9f95a23c 861int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn)
11fdf7f2 862{
9f95a23c
TL
863 std::lock_guard l{lock};
864 if (conn->policy.server &&
865 conn->policy.lossy &&
866 !conn->policy.register_lossy_clients) {
867 anon_conns.insert(conn);
868 conn->get_perf_counter()->inc(l_msgr_active_connections);
869 return 0;
870 }
11fdf7f2
TL
871 auto it = conns.find(*conn->peer_addrs);
872 if (it != conns.end()) {
9f95a23c 873 auto& existing = it->second;
11fdf7f2
TL
874
875 // lazy delete, see "deleted_conns"
876 // If conn already in, we will return 0
9f95a23c 877 std::lock_guard l{deleted_lock};
11fdf7f2 878 if (deleted_conns.erase(existing)) {
a4b75251 879 it->second->get_perf_counter()->dec(l_msgr_active_connections);
11fdf7f2
TL
880 conns.erase(it);
881 } else if (conn != existing) {
882 return -1;
883 }
884 }
885 ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
886 conns[*conn->peer_addrs] = conn;
887 conn->get_perf_counter()->inc(l_msgr_active_connections);
888 accepting_conns.erase(conn);
889 return 0;
890}
891
892
893bool AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
7c673cae
FG
894{
895 // be careful here: multiple threads may block here, and readers of
11fdf7f2 896 // my_addr do NOT hold any lock.
7c673cae
FG
897
898 // this always goes from true -> false under the protection of the
899 // mutex. if it is already false, we need not retake the mutex at
900 // all.
901 if (!need_addr)
11fdf7f2
TL
902 return false;
903 std::lock_guard l(lock);
7c673cae 904 if (need_addr) {
11fdf7f2
TL
905 if (my_addrs->empty()) {
906 auto a = peer_addr_for_me;
907 a.set_type(entity_addr_t::TYPE_ANY);
908 a.set_nonce(nonce);
909 if (!did_bind) {
910 a.set_port(0);
911 }
912 set_myaddrs(entity_addrvec_t(a));
913 ldout(cct,10) << __func__ << " had no addrs" << dendl;
914 } else {
915 // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
916 entity_addrvec_t newaddrs = *my_addrs;
917 for (auto& a : newaddrs.v) {
918 if (a.is_blank_ip() &&
919 a.get_family() == peer_addr_for_me.get_family()) {
920 entity_addr_t t = peer_addr_for_me;
921 if (!did_bind) {
922 t.set_type(entity_addr_t::TYPE_ANY);
923 t.set_port(0);
924 } else {
925 t.set_type(a.get_type());
926 t.set_port(a.get_port());
927 }
928 t.set_nonce(a.get_nonce());
929 ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl;
930 a = t;
931 }
932 }
933 set_myaddrs(newaddrs);
934 }
935 ldout(cct, 1) << __func__ << " learned my addr " << *my_addrs
936 << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl;
7c673cae 937 _init_local_connection();
11fdf7f2
TL
938 need_addr = false;
939 return true;
7c673cae 940 }
11fdf7f2 941 return false;
7c673cae
FG
942}
943
9f95a23c 944void AsyncMessenger::reap_dead()
7c673cae
FG
945{
946 ldout(cct, 1) << __func__ << " start" << dendl;
7c673cae 947
9f95a23c 948 std::lock_guard l1{lock};
7c673cae 949
9f95a23c
TL
950 {
951 std::lock_guard l2{deleted_lock};
952 for (auto& c : deleted_conns) {
953 ldout(cct, 5) << __func__ << " delete " << c << dendl;
954 auto conns_it = conns.find(*c->peer_addrs);
955 if (conns_it != conns.end() && conns_it->second == c)
956 conns.erase(conns_it);
957 accepting_conns.erase(c);
958 anon_conns.erase(c);
a4b75251 959 c->get_perf_counter()->dec(l_msgr_active_connections);
9f95a23c
TL
960 }
961 deleted_conns.clear();
7c673cae 962 }
7c673cae 963}