1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
22 #include "AsyncMessenger.h"
24 #include "common/config.h"
25 #include "common/Timer.h"
26 #include "common/errno.h"
28 #include "messages/MOSDOp.h"
29 #include "messages/MOSDOpReply.h"
30 #include "common/EventTrace.h"
32 #define dout_subsys ceph_subsys_ms
34 #define dout_prefix _prefix(_dout, this)
35 static ostream
& _prefix(std::ostream
*_dout
, AsyncMessenger
*m
) {
36 return *_dout
<< "-- " << m
->get_myaddr() << " ";
39 static ostream
& _prefix(std::ostream
*_dout
, Processor
*p
) {
40 return *_dout
<< " Processor -- ";
48 class Processor::C_processor_accept
: public EventCallback
{
52 explicit C_processor_accept(Processor
*p
): pro(p
) {}
53 void do_request(int id
) override
{
58 Processor::Processor(AsyncMessenger
*r
, Worker
*w
, CephContext
*c
)
59 : msgr(r
), net(c
), worker(w
),
60 listen_handler(new C_processor_accept(this)) {}
62 int Processor::bind(const entity_addr_t
&bind_addr
,
63 const set
<int>& avoid_ports
,
64 entity_addr_t
* bound_addr
)
66 const md_config_t
*conf
= msgr
->cct
->_conf
;
68 ldout(msgr
->cct
, 10) << __func__
<< dendl
;
71 switch (bind_addr
.get_family()) {
74 family
= bind_addr
.get_family();
79 family
= conf
->ms_bind_ipv6
? AF_INET6
: AF_INET
;
83 opts
.nodelay
= msgr
->cct
->_conf
->ms_tcp_nodelay
;
84 opts
.rcbuf_size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
86 // use whatever user specified (if anything)
87 entity_addr_t listen_addr
= bind_addr
;
88 if (listen_addr
.get_type() == entity_addr_t::TYPE_NONE
) {
89 listen_addr
.set_type(entity_addr_t::TYPE_LEGACY
);
91 listen_addr
.set_family(family
);
96 for (int i
= 0; i
< conf
->ms_bind_retry_count
; i
++) {
98 lderr(msgr
->cct
) << __func__
<< " was unable to bind. Trying again in "
99 << conf
->ms_bind_retry_delay
<< " seconds " << dendl
;
100 sleep(conf
->ms_bind_retry_delay
);
103 if (listen_addr
.get_port()) {
104 worker
->center
.submit_to(worker
->center
.get_id(), [this, &listen_addr
, &opts
, &r
]() {
105 r
= worker
->listen(listen_addr
, opts
, &listen_socket
);
108 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
109 << ": " << cpp_strerror(r
) << dendl
;
113 // try a range of ports
114 for (int port
= msgr
->cct
->_conf
->ms_bind_port_min
; port
<= msgr
->cct
->_conf
->ms_bind_port_max
; port
++) {
115 if (avoid_ports
.count(port
))
118 listen_addr
.set_port(port
);
119 worker
->center
.submit_to(worker
->center
.get_id(), [this, &listen_addr
, &opts
, &r
]() {
120 r
= worker
->listen(listen_addr
, opts
, &listen_socket
);
126 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
127 << " on any port in range " << msgr
->cct
->_conf
->ms_bind_port_min
128 << "-" << msgr
->cct
->_conf
->ms_bind_port_max
<< ": "
129 << cpp_strerror(r
) << dendl
;
130 listen_addr
.set_port(0); // Clear port before retry, otherwise we shall fail again.
133 ldout(msgr
->cct
, 10) << __func__
<< " bound on random port " << listen_addr
<< dendl
;
138 // It seems that binding completely failed, return with that exit status
140 lderr(msgr
->cct
) << __func__
<< " was unable to bind after " << conf
->ms_bind_retry_count
141 << " attempts: " << cpp_strerror(r
) << dendl
;
145 ldout(msgr
->cct
, 10) << __func__
<< " bound to " << listen_addr
<< dendl
;
146 *bound_addr
= listen_addr
;
150 void Processor::start()
152 ldout(msgr
->cct
, 1) << __func__
<< dendl
;
156 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
157 worker
->center
.create_file_event(listen_socket
.fd(), EVENT_READABLE
, listen_handler
); }, false);
161 void Processor::accept()
163 ldout(msgr
->cct
, 10) << __func__
<< " listen_fd=" << listen_socket
.fd() << dendl
;
165 opts
.nodelay
= msgr
->cct
->_conf
->ms_tcp_nodelay
;
166 opts
.rcbuf_size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
167 opts
.priority
= msgr
->get_socket_priority();
170 ConnectedSocket cli_socket
;
172 if (!msgr
->get_stack()->support_local_listen_table())
173 w
= msgr
->get_stack()->get_worker();
174 int r
= listen_socket
.accept(&cli_socket
, opts
, &addr
, w
);
176 ldout(msgr
->cct
, 10) << __func__
<< " accepted incoming on sd " << cli_socket
.fd() << dendl
;
178 msgr
->add_accept(w
, std::move(cli_socket
), addr
);
183 } else if (r
== -EAGAIN
) {
185 } else if (r
== -EMFILE
|| r
== -ENFILE
) {
186 lderr(msgr
->cct
) << __func__
<< " open file descriptions limit reached sd = " << listen_socket
.fd()
187 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
189 } else if (r
== -ECONNABORTED
) {
190 ldout(msgr
->cct
, 0) << __func__
<< " it was closed because of rst arrived sd = " << listen_socket
.fd()
191 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
194 lderr(msgr
->cct
) << __func__
<< " no incoming connection?"
195 << " errno " << r
<< " " << cpp_strerror(r
) << dendl
;
202 void Processor::stop()
204 ldout(msgr
->cct
,10) << __func__
<< dendl
;
207 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
208 worker
->center
.delete_file_event(listen_socket
.fd(), EVENT_READABLE
);
209 listen_socket
.abort_accept();
215 struct StackSingleton
{
217 std::shared_ptr
<NetworkStack
> stack
;
219 StackSingleton(CephContext
*c
): cct(c
) {}
220 void ready(std::string
&type
) {
222 stack
= NetworkStack::create(cct
, type
);
230 class C_handle_reap
: public EventCallback
{
231 AsyncMessenger
*msgr
;
234 explicit C_handle_reap(AsyncMessenger
*m
): msgr(m
) {}
235 void do_request(int id
) override
{
236 // judge whether is a time event
245 AsyncMessenger::AsyncMessenger(CephContext
*cct
, entity_name_t name
,
246 const std::string
&type
, string mname
, uint64_t _nonce
)
247 : SimplePolicyMessenger(cct
, name
,mname
, _nonce
),
248 dispatch_queue(cct
, this, mname
),
249 lock("AsyncMessenger::lock"),
250 nonce(_nonce
), need_addr(true), did_bind(false),
251 global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
252 cluster_protocol(0), stopped(true)
254 std::string transport_type
= "posix";
255 if (type
.find("rdma") != std::string::npos
)
256 transport_type
= "rdma";
257 else if (type
.find("dpdk") != std::string::npos
)
258 transport_type
= "dpdk";
260 ceph_spin_init(&global_seq_lock
);
261 StackSingleton
*single
;
262 cct
->lookup_or_create_singleton_object
<StackSingleton
>(single
, "AsyncMessenger::NetworkStack::"+transport_type
);
263 single
->ready(transport_type
);
264 stack
= single
->stack
.get();
266 local_worker
= stack
->get_worker();
267 local_connection
= new AsyncConnection(cct
, this, &dispatch_queue
, local_worker
);
268 init_local_connection();
269 reap_handler
= new C_handle_reap(this);
270 unsigned processor_num
= 1;
271 if (stack
->support_local_listen_table())
272 processor_num
= stack
->get_num_worker();
273 for (unsigned i
= 0; i
< processor_num
; ++i
)
274 processors
.push_back(new Processor(this, stack
->get_worker(i
), cct
));
278 * Destroy the AsyncMessenger. Pretty simple since all the work is done
281 AsyncMessenger::~AsyncMessenger()
284 assert(!did_bind
); // either we didn't bind or we shut down the Processor
285 local_connection
->mark_down();
286 for (auto &&p
: processors
)
290 void AsyncMessenger::ready()
292 ldout(cct
,10) << __func__
<< " " << get_myaddr() << dendl
;
296 int err
= bind(pending_bind_addr
);
298 lderr(cct
) << __func__
<< " postponed bind failed" << dendl
;
303 Mutex::Locker
l(lock
);
304 for (auto &&p
: processors
)
306 dispatch_queue
.start();
309 int AsyncMessenger::shutdown()
311 ldout(cct
,10) << __func__
<< " " << get_myaddr() << dendl
;
314 for (auto &&p
: processors
)
317 // break ref cycles on the loopback connection
318 local_connection
->set_priv(NULL
);
329 int AsyncMessenger::bind(const entity_addr_t
&bind_addr
)
333 if (!pending_bind
&& started
) {
334 ldout(cct
,10) << __func__
<< " already started" << dendl
;
339 ldout(cct
,10) << __func__
<< " bind " << bind_addr
<< dendl
;
341 if (!stack
->is_ready()) {
342 ldout(cct
, 10) << __func__
<< " Network Stack is not ready for bind yet - postponed" << dendl
;
343 pending_bind_addr
= bind_addr
;
352 set
<int> avoid_ports
;
353 entity_addr_t bound_addr
;
355 for (auto &&p
: processors
) {
356 int r
= p
->bind(bind_addr
, avoid_ports
, &bound_addr
);
358 // Note: this is related to local tcp listen table problem.
359 // Posix(default kernel implementation) backend shares listen table
360 // in the kernel, so all threads can use the same listen table naturally
361 // and only one thread need to bind. But other backends(like dpdk) uses local
362 // listen table, we need to bind/listen tcp port for each worker. So if the
363 // first worker failed to bind, it could be think the normal error then handle
364 // it, like port is used case. But if the first worker successfully to bind
365 // but the second worker failed, it's not expected and we need to assert
372 _finish_bind(bind_addr
, bound_addr
);
376 int AsyncMessenger::rebind(const set
<int>& avoid_ports
)
378 ldout(cct
,1) << __func__
<< " rebind avoid " << avoid_ports
<< dendl
;
381 for (auto &&p
: processors
)
385 // adjust the nonce; we want our entity_addr_t to be truly unique.
387 ldout(cct
, 10) << __func__
<< " new nonce " << nonce
388 << " and inst " << get_myinst() << dendl
;
390 entity_addr_t bound_addr
;
391 entity_addr_t bind_addr
= get_myaddr();
392 bind_addr
.set_port(0);
393 set
<int> new_avoid(avoid_ports
);
394 new_avoid
.insert(bind_addr
.get_port());
395 ldout(cct
, 10) << __func__
<< " will try " << bind_addr
396 << " and avoid ports " << new_avoid
<< dendl
;
398 for (auto &&p
: processors
) {
399 int r
= p
->bind(bind_addr
, avoid_ports
, &bound_addr
);
406 _finish_bind(bind_addr
, bound_addr
);
407 for (auto &&p
: processors
) {
413 int AsyncMessenger::client_bind(const entity_addr_t
&bind_addr
)
415 if (!cct
->_conf
->ms_bind_before_connect
)
417 Mutex::Locker
l(lock
);
419 assert(my_inst
.addr
== bind_addr
);
423 ldout(cct
, 10) << __func__
<< " already started" << dendl
;
426 ldout(cct
, 10) << __func__
<< " " << bind_addr
<< dendl
;
428 set_myaddr(bind_addr
);
432 void AsyncMessenger::_finish_bind(const entity_addr_t
& bind_addr
,
433 const entity_addr_t
& listen_addr
)
435 set_myaddr(bind_addr
);
436 if (bind_addr
!= entity_addr_t())
437 learned_addr(bind_addr
);
439 if (get_myaddr().get_port() == 0) {
440 set_myaddr(listen_addr
);
442 entity_addr_t addr
= get_myaddr();
443 addr
.set_nonce(nonce
);
446 init_local_connection();
448 ldout(cct
,1) << __func__
<< " bind my_inst.addr is " << get_myaddr() << dendl
;
452 int AsyncMessenger::start()
455 ldout(cct
,1) << __func__
<< " start" << dendl
;
457 // register at least one entity, first!
458 assert(my_inst
.name
.type() >= 0);
465 my_inst
.addr
.nonce
= nonce
;
466 _init_local_connection();
473 void AsyncMessenger::wait()
481 stop_cond
.Wait(lock
);
485 dispatch_queue
.shutdown();
486 if (dispatch_queue
.is_started()) {
487 ldout(cct
, 10) << __func__
<< ": waiting for dispatch queue" << dendl
;
488 dispatch_queue
.wait();
489 dispatch_queue
.discard_local();
490 ldout(cct
, 10) << __func__
<< ": dispatch queue is stopped" << dendl
;
493 // close all connections
494 shutdown_connections(false);
497 ldout(cct
, 10) << __func__
<< ": done." << dendl
;
498 ldout(cct
, 1) << __func__
<< " complete." << dendl
;
502 void AsyncMessenger::add_accept(Worker
*w
, ConnectedSocket cli_socket
, entity_addr_t
&addr
)
505 AsyncConnectionRef conn
= new AsyncConnection(cct
, this, &dispatch_queue
, w
);
506 conn
->accept(std::move(cli_socket
), addr
);
507 accepting_conns
.insert(conn
);
511 AsyncConnectionRef
AsyncMessenger::create_connect(const entity_addr_t
& addr
, int type
)
513 assert(lock
.is_locked());
514 assert(addr
!= my_inst
.addr
);
516 ldout(cct
, 10) << __func__
<< " " << addr
517 << ", creating connection and registering" << dendl
;
520 Worker
*w
= stack
->get_worker();
521 AsyncConnectionRef conn
= new AsyncConnection(cct
, this, &dispatch_queue
, w
);
522 conn
->connect(addr
, type
);
523 assert(!conns
.count(addr
));
525 w
->get_perf_counter()->inc(l_msgr_active_connections
);
530 ConnectionRef
AsyncMessenger::get_connection(const entity_inst_t
& dest
)
532 Mutex::Locker
l(lock
);
533 if (my_inst
.addr
== dest
.addr
) {
535 return local_connection
;
538 AsyncConnectionRef conn
= _lookup_conn(dest
.addr
);
540 ldout(cct
, 10) << __func__
<< " " << dest
<< " existing " << conn
<< dendl
;
542 conn
= create_connect(dest
.addr
, dest
.name
.type());
543 ldout(cct
, 10) << __func__
<< " " << dest
<< " new " << conn
<< dendl
;
549 ConnectionRef
AsyncMessenger::get_loopback_connection()
551 return local_connection
;
554 int AsyncMessenger::_send_message(Message
*m
, const entity_inst_t
& dest
)
559 if (m
->get_type() == CEPH_MSG_OSD_OP
)
560 OID_EVENT_TRACE(((MOSDOp
*)m
)->get_oid().name
.c_str(), "SEND_MSG_OSD_OP");
561 else if (m
->get_type() == CEPH_MSG_OSD_OPREPLY
)
562 OID_EVENT_TRACE(((MOSDOpReply
*)m
)->get_oid().name
.c_str(), "SEND_MSG_OSD_OP_REPLY");
564 ldout(cct
, 1) << __func__
<< "--> " << dest
.name
<< " "
565 << dest
.addr
<< " -- " << *m
<< " -- ?+"
566 << m
->get_data().length() << " " << m
<< dendl
;
568 if (dest
.addr
== entity_addr_t()) {
569 ldout(cct
,0) << __func__
<< " message " << *m
570 << " with empty dest " << dest
.addr
<< dendl
;
575 AsyncConnectionRef conn
= _lookup_conn(dest
.addr
);
576 submit_message(m
, conn
, dest
.addr
, dest
.name
.type());
580 void AsyncMessenger::submit_message(Message
*m
, AsyncConnectionRef con
,
581 const entity_addr_t
& dest_addr
, int dest_type
)
583 if (cct
->_conf
->ms_dump_on_send
) {
584 m
->encode(-1, MSG_CRC_ALL
);
585 ldout(cct
, 0) << __func__
<< "submit_message " << *m
<< "\n";
586 m
->get_payload().hexdump(*_dout
);
587 if (m
->get_data().length() > 0) {
588 *_dout
<< " data:\n";
589 m
->get_data().hexdump(*_dout
);
595 // existing connection?
597 con
->send_message(m
);
602 if (my_inst
.addr
== dest_addr
) {
604 local_connection
->send_message(m
);
608 // remote, no existing connection.
609 const Policy
& policy
= get_policy(dest_type
);
611 ldout(cct
, 20) << __func__
<< " " << *m
<< " remote, " << dest_addr
612 << ", lossy server for target type "
613 << ceph_entity_type_name(dest_type
) << ", no session, dropping." << dendl
;
616 ldout(cct
,20) << __func__
<< " " << *m
<< " remote, " << dest_addr
<< ", new connection." << dendl
;
617 con
= create_connect(dest_addr
, dest_type
);
618 con
->send_message(m
);
623 * If my_inst.addr doesn't have an IP set, this function
624 * will fill it in from the passed addr. Otherwise it does nothing and returns.
626 void AsyncMessenger::set_addr_unknowns(const entity_addr_t
&addr
)
628 Mutex::Locker
l(lock
);
629 if (my_inst
.addr
.is_blank_ip()) {
630 int port
= my_inst
.addr
.get_port();
631 my_inst
.addr
.u
= addr
.u
;
632 my_inst
.addr
.set_port(port
);
633 _init_local_connection();
637 void AsyncMessenger::set_addr(const entity_addr_t
&addr
)
639 Mutex::Locker
l(lock
);
640 entity_addr_t t
= addr
;
643 _init_local_connection();
646 void AsyncMessenger::shutdown_connections(bool queue_reset
)
648 ldout(cct
,1) << __func__
<< " " << dendl
;
650 for (set
<AsyncConnectionRef
>::iterator q
= accepting_conns
.begin();
651 q
!= accepting_conns
.end(); ++q
) {
652 AsyncConnectionRef p
= *q
;
653 ldout(cct
, 5) << __func__
<< " accepting_conn " << p
.get() << dendl
;
654 p
->stop(queue_reset
);
656 accepting_conns
.clear();
658 while (!conns
.empty()) {
659 ceph::unordered_map
<entity_addr_t
, AsyncConnectionRef
>::iterator it
= conns
.begin();
660 AsyncConnectionRef p
= it
->second
;
661 ldout(cct
, 5) << __func__
<< " mark down " << it
->first
<< " " << p
<< dendl
;
663 p
->get_perf_counter()->dec(l_msgr_active_connections
);
664 p
->stop(queue_reset
);
668 Mutex::Locker
l(deleted_lock
);
669 while (!deleted_conns
.empty()) {
670 set
<AsyncConnectionRef
>::iterator it
= deleted_conns
.begin();
671 AsyncConnectionRef p
= *it
;
672 ldout(cct
, 5) << __func__
<< " delete " << p
<< dendl
;
673 deleted_conns
.erase(it
);
679 void AsyncMessenger::mark_down(const entity_addr_t
& addr
)
682 AsyncConnectionRef p
= _lookup_conn(addr
);
684 ldout(cct
, 1) << __func__
<< " " << addr
<< " -- " << p
<< dendl
;
687 ldout(cct
, 1) << __func__
<< " " << addr
<< " -- connection dne" << dendl
;
692 int AsyncMessenger::get_proto_version(int peer_type
, bool connect
) const
694 int my_type
= my_inst
.name
.type();
696 // set reply protocol version
697 if (peer_type
== my_type
) {
699 return cluster_protocol
;
702 switch (connect
? peer_type
: my_type
) {
703 case CEPH_ENTITY_TYPE_OSD
: return CEPH_OSDC_PROTOCOL
;
704 case CEPH_ENTITY_TYPE_MDS
: return CEPH_MDSC_PROTOCOL
;
705 case CEPH_ENTITY_TYPE_MON
: return CEPH_MONC_PROTOCOL
;
711 void AsyncMessenger::learned_addr(const entity_addr_t
&peer_addr_for_me
)
713 // be careful here: multiple threads may block here, and readers of
714 // my_inst.addr do NOT hold any lock.
716 // this always goes from true -> false under the protection of the
717 // mutex. if it is already false, we need not retake the mutex at
724 entity_addr_t t
= peer_addr_for_me
;
725 t
.set_port(my_inst
.addr
.get_port());
726 t
.set_nonce(my_inst
.addr
.get_nonce());
728 ldout(cct
, 1) << __func__
<< " learned my addr " << my_inst
.addr
<< dendl
;
729 _init_local_connection();
734 int AsyncMessenger::reap_dead()
736 ldout(cct
, 1) << __func__
<< " start" << dendl
;
739 Mutex::Locker
l1(lock
);
740 Mutex::Locker
l2(deleted_lock
);
742 while (!deleted_conns
.empty()) {
743 auto it
= deleted_conns
.begin();
744 AsyncConnectionRef p
= *it
;
745 ldout(cct
, 5) << __func__
<< " delete " << p
<< dendl
;
746 auto conns_it
= conns
.find(p
->peer_addr
);
747 if (conns_it
!= conns
.end() && conns_it
->second
== p
)
748 conns
.erase(conns_it
);
749 accepting_conns
.erase(p
);
750 deleted_conns
.erase(it
);