]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/AsyncMessenger.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / msg / async / AsyncMessenger.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include "acconfig.h"
18
19#include <iostream>
20#include <fstream>
21
22#include "AsyncMessenger.h"
23
24#include "common/config.h"
25#include "common/Timer.h"
26#include "common/errno.h"
27
28#include "messages/MOSDOp.h"
29#include "messages/MOSDOpReply.h"
30#include "common/EventTrace.h"
31
32#define dout_subsys ceph_subsys_ms
33#undef dout_prefix
34#define dout_prefix _prefix(_dout, this)
35static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
11fdf7f2 36 return *_dout << "-- " << m->get_myaddrs() << " ";
7c673cae
FG
37}
38
39static ostream& _prefix(std::ostream *_dout, Processor *p) {
40 return *_dout << " Processor -- ";
41}
42
43
44/*******************
45 * Processor
46 */
47
48class Processor::C_processor_accept : public EventCallback {
49 Processor *pro;
50
51 public:
52 explicit C_processor_accept(Processor *p): pro(p) {}
11fdf7f2 53 void do_request(uint64_t id) override {
7c673cae
FG
54 pro->accept();
55 }
56};
57
58Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
59 : msgr(r), net(c), worker(w),
60 listen_handler(new C_processor_accept(this)) {}
61
11fdf7f2 62int Processor::bind(const entity_addrvec_t &bind_addrs,
7c673cae 63 const set<int>& avoid_ports,
11fdf7f2 64 entity_addrvec_t* bound_addrs)
7c673cae 65{
11fdf7f2
TL
66 const auto& conf = msgr->cct->_conf;
67 // bind to socket(s)
68 ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl;
7c673cae
FG
69
70 SocketOptions opts;
71 opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
72 opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
73
11fdf7f2
TL
74 listen_sockets.resize(bind_addrs.v.size());
75 *bound_addrs = bind_addrs;
7c673cae 76
11fdf7f2
TL
77 for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
78 auto& listen_addr = bound_addrs->v[k];
7c673cae 79
11fdf7f2
TL
80 /* bind to port */
81 int r = -1;
7c673cae 82
11fdf7f2
TL
83 for (int i = 0; i < conf->ms_bind_retry_count; i++) {
84 if (i > 0) {
85 lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
86 << conf->ms_bind_retry_delay << " seconds " << dendl;
87 sleep(conf->ms_bind_retry_delay);
7c673cae 88 }
11fdf7f2
TL
89
90 if (listen_addr.get_port()) {
91 worker->center.submit_to(
92 worker->center.get_id(),
93 [this, k, &listen_addr, &opts, &r]() {
94 r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
95 }, false);
96 if (r < 0) {
97 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
98 << ": " << cpp_strerror(r) << dendl;
99 continue;
100 }
101 } else {
102 // try a range of ports
103 for (int port = msgr->cct->_conf->ms_bind_port_min;
104 port <= msgr->cct->_conf->ms_bind_port_max;
105 port++) {
106 if (avoid_ports.count(port))
107 continue;
108
109 listen_addr.set_port(port);
110 worker->center.submit_to(
111 worker->center.get_id(),
112 [this, k, &listen_addr, &opts, &r]() {
113 r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
114 }, false);
115 if (r == 0)
116 break;
117 }
118 if (r < 0) {
119 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
120 << " on any port in range "
121 << msgr->cct->_conf->ms_bind_port_min
122 << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
123 << cpp_strerror(r) << dendl;
124 listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
125 continue;
126 }
127 ldout(msgr->cct, 10) << __func__ << " bound on random port "
128 << listen_addr << dendl;
7c673cae 129 }
11fdf7f2
TL
130 if (r == 0) {
131 break;
7c673cae 132 }
7c673cae 133 }
11fdf7f2
TL
134
135 // It seems that binding completely failed, return with that exit status
136 if (r < 0) {
137 lderr(msgr->cct) << __func__ << " was unable to bind after "
138 << conf->ms_bind_retry_count
139 << " attempts: " << cpp_strerror(r) << dendl;
140 for (unsigned j = 0; j < k; ++j) {
141 // clean up previous bind
142 listen_sockets[j].abort_accept();
143 }
144 return r;
145 }
7c673cae
FG
146 }
147
11fdf7f2 148 ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl;
7c673cae
FG
149 return 0;
150}
151
152void Processor::start()
153{
154 ldout(msgr->cct, 1) << __func__ << dendl;
155
156 // start thread
11fdf7f2
TL
157 worker->center.submit_to(worker->center.get_id(), [this]() {
158 for (auto& l : listen_sockets) {
159 if (l) {
160 worker->center.create_file_event(l.fd(), EVENT_READABLE,
161 listen_handler); }
162 }
163 }, false);
7c673cae
FG
164}
165
166void Processor::accept()
167{
7c673cae
FG
168 SocketOptions opts;
169 opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
170 opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
171 opts.priority = msgr->get_socket_priority();
11fdf7f2
TL
172
173 for (auto& listen_socket : listen_sockets) {
174 ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd()
175 << dendl;
176 unsigned accept_error_num = 0;
177
178 while (true) {
179 entity_addr_t addr;
180 ConnectedSocket cli_socket;
181 Worker *w = worker;
182 if (!msgr->get_stack()->support_local_listen_table())
183 w = msgr->get_stack()->get_worker();
184 else
185 ++w->references;
186 int r = listen_socket.accept(&cli_socket, opts, &addr, w);
187 if (r == 0) {
188 ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd "
189 << cli_socket.fd() << dendl;
190
191 msgr->add_accept(
192 w, std::move(cli_socket),
193 msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
194 addr);
195 accept_error_num = 0;
91327a77 196 continue;
7c673cae 197 } else {
11fdf7f2
TL
198 if (r == -EINTR) {
199 continue;
200 } else if (r == -EAGAIN) {
201 break;
202 } else if (r == -EMFILE || r == -ENFILE) {
203 lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
204 << " errno " << r << " " << cpp_strerror(r) << dendl;
205 if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
206 lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
207 ceph_abort();
208 }
209 continue;
210 } else if (r == -ECONNABORTED) {
211 ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
212 << " errno " << r << " " << cpp_strerror(r) << dendl;
213 continue;
214 } else {
215 lderr(msgr->cct) << __func__ << " no incoming connection?"
216 << " errno " << r << " " << cpp_strerror(r) << dendl;
217 if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
218 lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
219 ceph_abort();
220 }
221 continue;
91327a77 222 }
7c673cae
FG
223 }
224 }
225 }
226}
227
228void Processor::stop()
229{
230 ldout(msgr->cct,10) << __func__ << dendl;
231
11fdf7f2
TL
232 worker->center.submit_to(worker->center.get_id(), [this]() {
233 for (auto& listen_socket : listen_sockets) {
234 if (listen_socket) {
235 worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
236 listen_socket.abort_accept();
237 }
238 }
7c673cae 239 }, false);
7c673cae
FG
240}
241
242
243struct StackSingleton {
244 CephContext *cct;
245 std::shared_ptr<NetworkStack> stack;
246
11fdf7f2 247 explicit StackSingleton(CephContext *c): cct(c) {}
7c673cae
FG
248 void ready(std::string &type) {
249 if (!stack)
250 stack = NetworkStack::create(cct, type);
251 }
252 ~StackSingleton() {
253 stack->stop();
254 }
255};
256
257
258class C_handle_reap : public EventCallback {
259 AsyncMessenger *msgr;
260
261 public:
262 explicit C_handle_reap(AsyncMessenger *m): msgr(m) {}
11fdf7f2 263 void do_request(uint64_t id) override {
7c673cae
FG
264 // judge whether is a time event
265 msgr->reap_dead();
266 }
267};
268
269/*******************
270 * AsyncMessenger
271 */
272
273AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
274 const std::string &type, string mname, uint64_t _nonce)
275 : SimplePolicyMessenger(cct, name,mname, _nonce),
276 dispatch_queue(cct, this, mname),
277 lock("AsyncMessenger::lock"),
278 nonce(_nonce), need_addr(true), did_bind(false),
279 global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
280 cluster_protocol(0), stopped(true)
281{
282 std::string transport_type = "posix";
283 if (type.find("rdma") != std::string::npos)
284 transport_type = "rdma";
285 else if (type.find("dpdk") != std::string::npos)
286 transport_type = "dpdk";
287
11fdf7f2
TL
288 auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
289 "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
7c673cae
FG
290 single->ready(transport_type);
291 stack = single->stack.get();
292 stack->start();
293 local_worker = stack->get_worker();
11fdf7f2
TL
294 local_connection = new AsyncConnection(cct, this, &dispatch_queue,
295 local_worker, true, true);
7c673cae
FG
296 init_local_connection();
297 reap_handler = new C_handle_reap(this);
298 unsigned processor_num = 1;
299 if (stack->support_local_listen_table())
300 processor_num = stack->get_num_worker();
301 for (unsigned i = 0; i < processor_num; ++i)
302 processors.push_back(new Processor(this, stack->get_worker(i), cct));
303}
304
305/**
306 * Destroy the AsyncMessenger. Pretty simple since all the work is done
307 * elsewhere.
308 */
309AsyncMessenger::~AsyncMessenger()
310{
311 delete reap_handler;
11fdf7f2 312 ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
7c673cae
FG
313 local_connection->mark_down();
314 for (auto &&p : processors)
315 delete p;
316}
317
318void AsyncMessenger::ready()
319{
11fdf7f2 320 ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
7c673cae
FG
321
322 stack->ready();
323 if (pending_bind) {
11fdf7f2 324 int err = bindv(pending_bind_addrs);
7c673cae
FG
325 if (err) {
326 lderr(cct) << __func__ << " postponed bind failed" << dendl;
327 ceph_abort();
328 }
329 }
330
331 Mutex::Locker l(lock);
332 for (auto &&p : processors)
333 p->start();
334 dispatch_queue.start();
335}
336
337int AsyncMessenger::shutdown()
338{
11fdf7f2 339 ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
7c673cae
FG
340
341 // done! clean up.
342 for (auto &&p : processors)
343 p->stop();
344 mark_down_all();
345 // break ref cycles on the loopback connection
346 local_connection->set_priv(NULL);
347 did_bind = false;
348 lock.Lock();
349 stop_cond.Signal();
350 stopped = true;
351 lock.Unlock();
352 stack->drain();
353 return 0;
354}
355
7c673cae 356int AsyncMessenger::bind(const entity_addr_t &bind_addr)
11fdf7f2
TL
357{
358 ldout(cct,10) << __func__ << " " << bind_addr << dendl;
359 // old bind() can take entity_addr_t(). new bindv() can take a
360 // 0.0.0.0-like address but needs type and family to be set.
361 auto a = bind_addr;
362 if (a == entity_addr_t()) {
363 a.set_type(entity_addr_t::TYPE_LEGACY);
364 if (cct->_conf->ms_bind_ipv6) {
365 a.set_family(AF_INET6);
366 } else {
367 a.set_family(AF_INET);
368 }
369 }
370 return bindv(entity_addrvec_t(a));
371}
372
373int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
7c673cae
FG
374{
375 lock.Lock();
376
377 if (!pending_bind && started) {
378 ldout(cct,10) << __func__ << " already started" << dendl;
379 lock.Unlock();
380 return -1;
381 }
382
11fdf7f2 383 ldout(cct,10) << __func__ << " " << bind_addrs << dendl;
7c673cae
FG
384
385 if (!stack->is_ready()) {
386 ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
11fdf7f2 387 pending_bind_addrs = bind_addrs;
7c673cae
FG
388 pending_bind = true;
389 lock.Unlock();
390 return 0;
391 }
392
393 lock.Unlock();
394
395 // bind to a socket
396 set<int> avoid_ports;
11fdf7f2 397 entity_addrvec_t bound_addrs;
7c673cae
FG
398 unsigned i = 0;
399 for (auto &&p : processors) {
11fdf7f2 400 int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
7c673cae
FG
401 if (r) {
402 // Note: this is related to local tcp listen table problem.
403 // Posix(default kernel implementation) backend shares listen table
404 // in the kernel, so all threads can use the same listen table naturally
405 // and only one thread need to bind. But other backends(like dpdk) uses local
406 // listen table, we need to bind/listen tcp port for each worker. So if the
407 // first worker failed to bind, it could be think the normal error then handle
408 // it, like port is used case. But if the first worker successfully to bind
409 // but the second worker failed, it's not expected and we need to assert
410 // here
11fdf7f2 411 ceph_assert(i == 0);
7c673cae
FG
412 return r;
413 }
414 ++i;
415 }
11fdf7f2 416 _finish_bind(bind_addrs, bound_addrs);
7c673cae
FG
417 return 0;
418}
419
420int AsyncMessenger::rebind(const set<int>& avoid_ports)
421{
422 ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
11fdf7f2 423 ceph_assert(did_bind);
7c673cae
FG
424
425 for (auto &&p : processors)
426 p->stop();
427 mark_down_all();
428
429 // adjust the nonce; we want our entity_addr_t to be truly unique.
430 nonce += 1000000;
431 ldout(cct, 10) << __func__ << " new nonce " << nonce
11fdf7f2 432 << " and addr " << get_myaddrs() << dendl;
7c673cae 433
11fdf7f2
TL
434 entity_addrvec_t bound_addrs;
435 entity_addrvec_t bind_addrs = get_myaddrs();
7c673cae 436 set<int> new_avoid(avoid_ports);
11fdf7f2
TL
437 for (auto& a : bind_addrs.v) {
438 new_avoid.insert(a.get_port());
439 a.set_port(0);
440 }
441 ldout(cct, 10) << __func__ << " will try " << bind_addrs
7c673cae
FG
442 << " and avoid ports " << new_avoid << dendl;
443 unsigned i = 0;
444 for (auto &&p : processors) {
11fdf7f2 445 int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
7c673cae 446 if (r) {
11fdf7f2 447 ceph_assert(i == 0);
7c673cae
FG
448 return r;
449 }
450 ++i;
451 }
11fdf7f2 452 _finish_bind(bind_addrs, bound_addrs);
7c673cae
FG
453 for (auto &&p : processors) {
454 p->start();
455 }
456 return 0;
457}
458
459int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
460{
31f18b77
FG
461 if (!cct->_conf->ms_bind_before_connect)
462 return 0;
7c673cae
FG
463 Mutex::Locker l(lock);
464 if (did_bind) {
7c673cae
FG
465 return 0;
466 }
467 if (started) {
468 ldout(cct, 10) << __func__ << " already started" << dendl;
469 return -1;
470 }
471 ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
472
11fdf7f2 473 set_myaddrs(entity_addrvec_t(bind_addr));
7c673cae
FG
474 return 0;
475}
476
11fdf7f2
TL
477void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs,
478 const entity_addrvec_t& listen_addrs)
7c673cae 479{
11fdf7f2
TL
480 set_myaddrs(bind_addrs);
481 for (auto& a : bind_addrs.v) {
482 if (!a.is_blank_ip()) {
483 learned_addr(a);
484 }
485 }
7c673cae 486
11fdf7f2
TL
487 if (get_myaddrs().front().get_port() == 0) {
488 set_myaddrs(listen_addrs);
7c673cae 489 }
11fdf7f2
TL
490 entity_addrvec_t newaddrs = *my_addrs;
491 for (auto& a : newaddrs.v) {
492 a.set_nonce(nonce);
493 }
494 set_myaddrs(newaddrs);
7c673cae
FG
495
496 init_local_connection();
497
11fdf7f2 498 ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl;
7c673cae
FG
499 did_bind = true;
500}
501
502int AsyncMessenger::start()
503{
504 lock.Lock();
505 ldout(cct,1) << __func__ << " start" << dendl;
506
507 // register at least one entity, first!
11fdf7f2 508 ceph_assert(my_name.type() >= 0);
7c673cae 509
11fdf7f2 510 ceph_assert(!started);
7c673cae
FG
511 started = true;
512 stopped = false;
513
514 if (!did_bind) {
11fdf7f2
TL
515 entity_addrvec_t newaddrs = *my_addrs;
516 for (auto& a : newaddrs.v) {
517 a.nonce = nonce;
518 }
519 set_myaddrs(newaddrs);
7c673cae
FG
520 _init_local_connection();
521 }
522
523 lock.Unlock();
524 return 0;
525}
526
527void AsyncMessenger::wait()
528{
529 lock.Lock();
530 if (!started) {
531 lock.Unlock();
532 return;
533 }
534 if (!stopped)
535 stop_cond.Wait(lock);
536
537 lock.Unlock();
538
539 dispatch_queue.shutdown();
540 if (dispatch_queue.is_started()) {
541 ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
542 dispatch_queue.wait();
543 dispatch_queue.discard_local();
544 ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl;
545 }
546
547 // close all connections
548 shutdown_connections(false);
549 stack->drain();
550
551 ldout(cct, 10) << __func__ << ": done." << dendl;
552 ldout(cct, 1) << __func__ << " complete." << dendl;
553 started = false;
554}
555
11fdf7f2
TL
556void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
557 const entity_addr_t &listen_addr,
558 const entity_addr_t &peer_addr)
7c673cae
FG
559{
560 lock.Lock();
11fdf7f2
TL
561 AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
562 listen_addr.is_msgr2(), false);
563 conn->accept(std::move(cli_socket), listen_addr, peer_addr);
7c673cae
FG
564 accepting_conns.insert(conn);
565 lock.Unlock();
566}
567
11fdf7f2
TL
568AsyncConnectionRef AsyncMessenger::create_connect(
569 const entity_addrvec_t& addrs, int type)
7c673cae 570{
11fdf7f2 571 ceph_assert(lock.is_locked());
7c673cae 572
11fdf7f2 573 ldout(cct, 10) << __func__ << " " << addrs
7c673cae
FG
574 << ", creating connection and registering" << dendl;
575
11fdf7f2
TL
576 // here is where we decide which of the addrs to connect to. always prefer
577 // the first one, if we support it.
578 entity_addr_t target;
579 for (auto& a : addrs.v) {
580 if (!a.is_msgr2() && !a.is_legacy()) {
581 continue;
582 }
583 // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
584 // trying it? for now, just pick whichever is listed first.
585 target = a;
586 break;
587 }
588
7c673cae
FG
589 // create connection
590 Worker *w = stack->get_worker();
11fdf7f2
TL
591 AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
592 target.is_msgr2(), false);
593 conn->connect(addrs, type, target);
594 ceph_assert(!conns.count(addrs));
595 ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
596 << *conn->peer_addrs << dendl;
597 conns[addrs] = conn;
7c673cae
FG
598 w->get_perf_counter()->inc(l_msgr_active_connections);
599
600 return conn;
601}
602
7c673cae 603
11fdf7f2
TL
604ConnectionRef AsyncMessenger::get_loopback_connection()
605{
606 return local_connection;
607}
7c673cae 608
11fdf7f2
TL
609bool AsyncMessenger::should_use_msgr2()
610{
611 // if we are bound to v1 only, and we are connecting to a v2 peer,
612 // we cannot use the peer's v2 address. otherwise the connection
613 // is assymetrical, because they would have to use v1 to connect
614 // to us, and we would use v2, and connection race detection etc
615 // would totally break down (among other things). or, the other
616 // end will be confused that we advertise ourselve with a v1
617 // address only (that we bound to) but connected with protocol v2.
618 return !did_bind || get_myaddrs().has_msgr2();
7c673cae
FG
619}
620
11fdf7f2
TL
621entity_addrvec_t AsyncMessenger::_filter_addrs(int type,
622 const entity_addrvec_t& addrs)
7c673cae 623{
11fdf7f2
TL
624 if (!should_use_msgr2()) {
625 ldout(cct, 10) << __func__ << " " << addrs << " type " << type
626 << " limiting to v1 ()" << dendl;
627 entity_addrvec_t r;
628 for (auto& i : addrs.v) {
629 if (i.is_msgr2()) {
630 continue;
631 }
632 r.v.push_back(i);
633 }
634 return r;
635 } else {
636 return addrs;
637 }
7c673cae
FG
638}
639
11fdf7f2 640int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
7c673cae 641{
11fdf7f2
TL
642 Mutex::Locker l(lock);
643
644 FUNCTRACE(cct);
645 ceph_assert(m);
7c673cae
FG
646
647 if (m->get_type() == CEPH_MSG_OSD_OP)
648 OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
649 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
650 OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY");
651
11fdf7f2
TL
652 ldout(cct, 1) << __func__ << "--> " << ceph_entity_type_name(type) << " "
653 << addrs << " -- " << *m << " -- ?+"
7c673cae
FG
654 << m->get_data().length() << " " << m << dendl;
655
11fdf7f2 656 if (addrs.empty()) {
7c673cae 657 ldout(cct,0) << __func__ << " message " << *m
11fdf7f2 658 << " with empty dest " << addrs << dendl;
7c673cae
FG
659 m->put();
660 return -EINVAL;
661 }
662
11fdf7f2
TL
663 auto av = _filter_addrs(type, addrs);
664 AsyncConnectionRef conn = _lookup_conn(av);
665 submit_message(m, conn, av, type);
7c673cae
FG
666 return 0;
667}
668
11fdf7f2
TL
669ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs)
670{
671 Mutex::Locker l(lock);
672 if (*my_addrs == addrs ||
673 (addrs.v.size() == 1 &&
674 my_addrs->contains(addrs.front()))) {
675 // local
676 return local_connection;
677 }
678
679 auto av = _filter_addrs(type, addrs);
680
681 AsyncConnectionRef conn = _lookup_conn(av);
682 if (conn) {
683 ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl;
684 } else {
685 conn = create_connect(av, type);
686 ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl;
687 }
688
689 return conn;
690}
691
7c673cae 692void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
11fdf7f2
TL
693 const entity_addrvec_t& dest_addrs,
694 int dest_type)
7c673cae
FG
695{
696 if (cct->_conf->ms_dump_on_send) {
697 m->encode(-1, MSG_CRC_ALL);
11fdf7f2 698 ldout(cct, 0) << __func__ << " submit_message " << *m << "\n";
7c673cae
FG
699 m->get_payload().hexdump(*_dout);
700 if (m->get_data().length() > 0) {
701 *_dout << " data:\n";
702 m->get_data().hexdump(*_dout);
703 }
704 *_dout << dendl;
705 m->clear_payload();
706 }
707
708 // existing connection?
709 if (con) {
710 con->send_message(m);
711 return ;
712 }
713
714 // local?
11fdf7f2
TL
715 if (*my_addrs == dest_addrs ||
716 (dest_addrs.v.size() == 1 &&
717 my_addrs->contains(dest_addrs.front()))) {
7c673cae
FG
718 // local
719 local_connection->send_message(m);
720 return ;
721 }
722
723 // remote, no existing connection.
724 const Policy& policy = get_policy(dest_type);
725 if (policy.server) {
11fdf7f2 726 ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addrs
7c673cae
FG
727 << ", lossy server for target type "
728 << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
729 m->put();
730 } else {
11fdf7f2
TL
731 ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs
732 << ", new connection." << dendl;
733 con = create_connect(dest_addrs, dest_type);
7c673cae
FG
734 con->send_message(m);
735 }
736}
737
738/**
11fdf7f2 739 * If my_addr doesn't have an IP set, this function
7c673cae
FG
740 * will fill it in from the passed addr. Otherwise it does nothing and returns.
741 */
11fdf7f2 742bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
7c673cae 743{
11fdf7f2
TL
744 ldout(cct,1) << __func__ << " " << addrs << dendl;
745 bool ret = false;
7c673cae 746 Mutex::Locker l(lock);
11fdf7f2
TL
747
748 entity_addrvec_t newaddrs = *my_addrs;
749 for (auto& a : newaddrs.v) {
750 if (a.is_blank_ip()) {
751 int type = a.get_type();
752 int port = a.get_port();
753 uint32_t nonce = a.get_nonce();
754 for (auto& b : addrs.v) {
755 if (a.get_family() == b.get_family()) {
756 ldout(cct,1) << __func__ << " assuming my addr " << a
757 << " matches provided addr " << b << dendl;
758 a = b;
759 a.set_nonce(nonce);
760 a.set_type(type);
761 a.set_port(port);
762 ret = true;
763 break;
764 }
765 }
766 }
767 }
768 set_myaddrs(newaddrs);
769 if (ret) {
7c673cae
FG
770 _init_local_connection();
771 }
11fdf7f2
TL
772 ldout(cct,1) << __func__ << " now " << *my_addrs << dendl;
773 return ret;
7c673cae
FG
774}
775
11fdf7f2 776void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
224ce89b
WB
777{
778 Mutex::Locker l(lock);
11fdf7f2
TL
779 auto t = addrs;
780 for (auto& a : t.v) {
781 a.set_nonce(nonce);
782 }
783 set_myaddrs(t);
224ce89b
WB
784 _init_local_connection();
785}
786
7c673cae
FG
787void AsyncMessenger::shutdown_connections(bool queue_reset)
788{
789 ldout(cct,1) << __func__ << " " << dendl;
790 lock.Lock();
791 for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
792 q != accepting_conns.end(); ++q) {
793 AsyncConnectionRef p = *q;
794 ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl;
795 p->stop(queue_reset);
796 }
797 accepting_conns.clear();
798
799 while (!conns.empty()) {
11fdf7f2 800 auto it = conns.begin();
7c673cae
FG
801 AsyncConnectionRef p = it->second;
802 ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
803 conns.erase(it);
804 p->get_perf_counter()->dec(l_msgr_active_connections);
805 p->stop(queue_reset);
806 }
807
808 {
809 Mutex::Locker l(deleted_lock);
810 while (!deleted_conns.empty()) {
811 set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
812 AsyncConnectionRef p = *it;
813 ldout(cct, 5) << __func__ << " delete " << p << dendl;
814 deleted_conns.erase(it);
815 }
816 }
817 lock.Unlock();
818}
819
11fdf7f2 820void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
7c673cae
FG
821{
822 lock.Lock();
11fdf7f2 823 AsyncConnectionRef p = _lookup_conn(addrs);
7c673cae 824 if (p) {
11fdf7f2 825 ldout(cct, 1) << __func__ << " " << addrs << " -- " << p << dendl;
7c673cae
FG
826 p->stop(true);
827 } else {
11fdf7f2 828 ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
7c673cae
FG
829 }
830 lock.Unlock();
831}
832
833int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
834{
11fdf7f2 835 int my_type = my_name.type();
7c673cae
FG
836
837 // set reply protocol version
838 if (peer_type == my_type) {
839 // internal
840 return cluster_protocol;
841 } else {
842 // public
843 switch (connect ? peer_type : my_type) {
844 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
845 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
846 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
847 }
848 }
849 return 0;
850}
851
11fdf7f2
TL
852int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
853{
854 Mutex::Locker l(lock);
855 auto it = conns.find(*conn->peer_addrs);
856 if (it != conns.end()) {
857 AsyncConnectionRef existing = it->second;
858
859 // lazy delete, see "deleted_conns"
860 // If conn already in, we will return 0
861 Mutex::Locker l(deleted_lock);
862 if (deleted_conns.erase(existing)) {
11fdf7f2
TL
863 conns.erase(it);
864 } else if (conn != existing) {
865 return -1;
866 }
867 }
868 ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
869 conns[*conn->peer_addrs] = conn;
870 conn->get_perf_counter()->inc(l_msgr_active_connections);
871 accepting_conns.erase(conn);
872 return 0;
873}
874
875
876bool AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
7c673cae
FG
877{
878 // be careful here: multiple threads may block here, and readers of
11fdf7f2 879 // my_addr do NOT hold any lock.
7c673cae
FG
880
881 // this always goes from true -> false under the protection of the
882 // mutex. if it is already false, we need not retake the mutex at
883 // all.
884 if (!need_addr)
11fdf7f2
TL
885 return false;
886 std::lock_guard l(lock);
7c673cae 887 if (need_addr) {
11fdf7f2
TL
888 if (my_addrs->empty()) {
889 auto a = peer_addr_for_me;
890 a.set_type(entity_addr_t::TYPE_ANY);
891 a.set_nonce(nonce);
892 if (!did_bind) {
893 a.set_port(0);
894 }
895 set_myaddrs(entity_addrvec_t(a));
896 ldout(cct,10) << __func__ << " had no addrs" << dendl;
897 } else {
898 // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
899 entity_addrvec_t newaddrs = *my_addrs;
900 for (auto& a : newaddrs.v) {
901 if (a.is_blank_ip() &&
902 a.get_family() == peer_addr_for_me.get_family()) {
903 entity_addr_t t = peer_addr_for_me;
904 if (!did_bind) {
905 t.set_type(entity_addr_t::TYPE_ANY);
906 t.set_port(0);
907 } else {
908 t.set_type(a.get_type());
909 t.set_port(a.get_port());
910 }
911 t.set_nonce(a.get_nonce());
912 ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl;
913 a = t;
914 }
915 }
916 set_myaddrs(newaddrs);
917 }
918 ldout(cct, 1) << __func__ << " learned my addr " << *my_addrs
919 << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl;
7c673cae 920 _init_local_connection();
11fdf7f2
TL
921 need_addr = false;
922 return true;
7c673cae 923 }
11fdf7f2 924 return false;
7c673cae
FG
925}
926
927int AsyncMessenger::reap_dead()
928{
929 ldout(cct, 1) << __func__ << " start" << dendl;
930 int num = 0;
931
932 Mutex::Locker l1(lock);
933 Mutex::Locker l2(deleted_lock);
934
935 while (!deleted_conns.empty()) {
936 auto it = deleted_conns.begin();
937 AsyncConnectionRef p = *it;
938 ldout(cct, 5) << __func__ << " delete " << p << dendl;
11fdf7f2 939 auto conns_it = conns.find(*p->peer_addrs);
7c673cae
FG
940 if (conns_it != conns.end() && conns_it->second == p)
941 conns.erase(conns_it);
942 accepting_conns.erase(p);
943 deleted_conns.erase(it);
944 ++num;
945 }
946
947 return num;
948}