]>
git.proxmox.com Git - ceph.git/blob - ceph/src/msg/simple/SimpleMessenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
20 #include "SimpleMessenger.h"
22 #include "common/config.h"
23 #include "common/Timer.h"
24 #include "common/errno.h"
25 #include "common/valgrind.h"
26 #include "auth/Crypto.h"
27 #include "include/Spinlock.h"
29 #define dout_subsys ceph_subsys_ms
31 #define dout_prefix _prefix(_dout, this)
32 static ostream
& _prefix(std::ostream
*_dout
, SimpleMessenger
*msgr
) {
33 return *_dout
<< "-- " << msgr
->get_myaddr() << " ";
41 SimpleMessenger::SimpleMessenger(CephContext
*cct
, entity_name_t name
,
42 string mname
, uint64_t _nonce
)
43 : SimplePolicyMessenger(cct
, name
,mname
, _nonce
),
44 accepter(this, _nonce
),
45 dispatch_queue(cct
, this, mname
),
48 lock("SimpleMessenger::lock"), need_addr(true), did_bind(false),
51 reaper_started(false), reaper_stop(false),
53 local_connection(new PipeConnection(cct
, this))
55 ANNOTATE_BENIGN_RACE_SIZED(&timeout
, sizeof(timeout
),
56 "SimpleMessenger read timeout");
57 ceph_spin_init(&global_seq_lock
);
58 init_local_connection();
62 * Destroy the SimpleMessenger. Pretty simple since all the work is done
65 SimpleMessenger::~SimpleMessenger()
67 assert(!did_bind
); // either we didn't bind or we shut down the Accepter
68 assert(rank_pipe
.empty()); // we don't have any running Pipes.
69 assert(!reaper_started
); // the reaper thread is stopped
70 ceph_spin_destroy(&global_seq_lock
);
73 void SimpleMessenger::ready()
75 ldout(cct
,10) << "ready " << get_myaddr() << dendl
;
76 dispatch_queue
.start();
85 int SimpleMessenger::shutdown()
87 ldout(cct
,10) << "shutdown " << get_myaddr() << dendl
;
90 // break ref cycles on the loopback connection
91 local_connection
->set_priv(NULL
);
101 int SimpleMessenger::_send_message(Message
*m
, const entity_inst_t
& dest
)
104 m
->get_header().src
= get_myname();
107 if (!m
->get_priority()) m
->set_priority(get_default_send_priority());
109 ldout(cct
,1) <<"--> " << dest
.name
<< " "
110 << dest
.addr
<< " -- " << *m
111 << " -- ?+" << m
->get_data().length()
115 if (dest
.addr
== entity_addr_t()) {
116 ldout(cct
,0) << "send_message message " << *m
117 << " with empty dest " << dest
.addr
<< dendl
;
123 Pipe
*pipe
= _lookup_pipe(dest
.addr
);
124 submit_message(m
, (pipe
? pipe
->connection_state
.get() : NULL
),
125 dest
.addr
, dest
.name
.type(), true);
130 int SimpleMessenger::_send_message(Message
*m
, Connection
*con
)
133 m
->get_header().src
= get_myname();
135 if (!m
->get_priority()) m
->set_priority(get_default_send_priority());
137 ldout(cct
,1) << "--> " << con
->get_peer_addr()
139 << " -- ?+" << m
->get_data().length()
140 << " " << m
<< " con " << con
143 submit_message(m
, static_cast<PipeConnection
*>(con
),
144 con
->get_peer_addr(), con
->get_peer_type(), false);
149 * If my_inst.addr doesn't have an IP set, this function
150 * will fill it in from the passed addr. Otherwise it does nothing and returns.
152 void SimpleMessenger::set_addr_unknowns(const entity_addr_t
&addr
)
154 if (my_inst
.addr
.is_blank_ip()) {
155 int port
= my_inst
.addr
.get_port();
156 my_inst
.addr
.u
= addr
.u
;
157 my_inst
.addr
.set_port(port
);
158 init_local_connection();
162 void SimpleMessenger::set_addr(const entity_addr_t
&addr
)
164 entity_addr_t t
= addr
;
167 init_local_connection();
170 int SimpleMessenger::get_proto_version(int peer_type
, bool connect
)
172 int my_type
= my_inst
.name
.type();
174 // set reply protocol version
175 if (peer_type
== my_type
) {
177 return cluster_protocol
;
182 case CEPH_ENTITY_TYPE_OSD
: return CEPH_OSDC_PROTOCOL
;
183 case CEPH_ENTITY_TYPE_MDS
: return CEPH_MDSC_PROTOCOL
;
184 case CEPH_ENTITY_TYPE_MON
: return CEPH_MONC_PROTOCOL
;
188 case CEPH_ENTITY_TYPE_OSD
: return CEPH_OSDC_PROTOCOL
;
189 case CEPH_ENTITY_TYPE_MDS
: return CEPH_MDSC_PROTOCOL
;
190 case CEPH_ENTITY_TYPE_MON
: return CEPH_MONC_PROTOCOL
;
203 /********************************************
207 #define dout_prefix _prefix(_dout, this)
209 void SimpleMessenger::reaper_entry()
211 ldout(cct
,10) << "reaper_entry start" << dendl
;
213 while (!reaper_stop
) {
214 reaper(); // may drop and retake the lock
217 reaper_cond
.Wait(lock
);
220 ldout(cct
,10) << "reaper_entry done" << dendl
;
224 * note: assumes lock is held
226 void SimpleMessenger::reaper()
228 ldout(cct
,10) << "reaper" << dendl
;
229 assert(lock
.is_locked());
231 while (!pipe_reap_queue
.empty()) {
232 Pipe
*p
= pipe_reap_queue
.front();
233 pipe_reap_queue
.pop_front();
234 ldout(cct
,10) << "reaper reaping pipe " << p
<< " " <<
235 p
->get_peer_addr() << dendl
;
237 p
->discard_out_queue();
238 if (p
->connection_state
) {
239 // mark_down, mark_down_all, or fault() should have done this,
240 // or accept() may have switch the Connection to a different
241 // Pipe... but make sure!
242 bool cleared
= p
->connection_state
->clear_pipe(p
);
245 p
->pipe_lock
.Unlock();
246 p
->unregister_pipe();
247 assert(pipes
.count(p
));
250 // drop msgr lock while joining thread; the delay through could be
251 // trying to fast dispatch, preventing it from joining without
252 // blocking and deadlocking.
259 ldout(cct
,10) << "reaper reaped pipe " << p
<< " " << p
->get_peer_addr() << dendl
;
261 ldout(cct
,10) << "reaper deleted pipe " << p
<< dendl
;
263 ldout(cct
,10) << "reaper done" << dendl
;
266 void SimpleMessenger::queue_reap(Pipe
*pipe
)
268 ldout(cct
,10) << "queue_reap " << pipe
<< dendl
;
270 pipe_reap_queue
.push_back(pipe
);
271 reaper_cond
.Signal();
275 bool SimpleMessenger::is_connected(Connection
*con
)
279 Pipe
*p
= static_cast<Pipe
*>(static_cast<PipeConnection
*>(con
)->get_pipe());
281 assert(p
->msgr
== this);
282 r
= p
->is_connected();
289 int SimpleMessenger::bind(const entity_addr_t
&bind_addr
)
293 ldout(cct
,10) << "rank.bind already started" << dendl
;
297 ldout(cct
,10) << "rank.bind " << bind_addr
<< dendl
;
301 set
<int> avoid_ports
;
302 int r
= accepter
.bind(bind_addr
, avoid_ports
);
308 int SimpleMessenger::rebind(const set
<int>& avoid_ports
)
310 ldout(cct
,1) << "rebind avoid " << avoid_ports
<< dendl
;
314 return accepter
.rebind(avoid_ports
);
318 int SimpleMessenger::client_bind(const entity_addr_t
&bind_addr
)
320 if (!cct
->_conf
->ms_bind_before_connect
)
322 Mutex::Locker
l(lock
);
324 assert(my_inst
.addr
== bind_addr
);
328 ldout(cct
,10) << "rank.bind already started" << dendl
;
331 ldout(cct
,10) << "rank.bind " << bind_addr
<< dendl
;
333 set_myaddr(bind_addr
);
338 int SimpleMessenger::start()
341 ldout(cct
,1) << "messenger.start" << dendl
;
343 // register at least one entity, first!
344 assert(my_inst
.name
.type() >= 0);
351 my_inst
.addr
.nonce
= nonce
;
352 init_local_connection();
357 reaper_started
= true;
358 reaper_thread
.create("ms_reaper");
362 Pipe
*SimpleMessenger::add_accept_pipe(int sd
)
365 Pipe
*p
= new Pipe(this, Pipe::STATE_ACCEPTING
, NULL
);
369 p
->pipe_lock
.Unlock();
371 accepting_pipes
.insert(p
);
377 * NOTE: assumes messenger.lock held.
379 Pipe
*SimpleMessenger::connect_rank(const entity_addr_t
& addr
,
384 assert(lock
.is_locked());
385 assert(addr
!= my_inst
.addr
);
387 ldout(cct
,10) << "connect_rank to " << addr
<< ", creating pipe and registering" << dendl
;
390 Pipe
*pipe
= new Pipe(this, Pipe::STATE_CONNECTING
,
391 static_cast<PipeConnection
*>(con
));
392 pipe
->pipe_lock
.Lock();
393 pipe
->set_peer_type(type
);
394 pipe
->set_peer_addr(addr
);
395 pipe
->policy
= get_policy(type
);
396 pipe
->start_writer();
399 pipe
->pipe_lock
.Unlock();
400 pipe
->register_pipe();
411 AuthAuthorizer
*SimpleMessenger::get_authorizer(int peer_type
, bool force_new
)
413 return ms_deliver_get_authorizer(peer_type
, force_new
);
416 bool SimpleMessenger::verify_authorizer(Connection
*con
, int peer_type
,
417 int protocol
, bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
418 bool& isvalid
,CryptoKey
& session_key
,
419 std::unique_ptr
<AuthAuthorizerChallenge
> *challenge
)
421 return ms_deliver_verify_authorizer(con
, peer_type
, protocol
, authorizer
, authorizer_reply
,
422 isvalid
, session_key
,
426 ConnectionRef
SimpleMessenger::get_connection(const entity_inst_t
& dest
)
428 Mutex::Locker
l(lock
);
429 if (my_inst
.addr
== dest
.addr
) {
431 return local_connection
;
436 Pipe
*pipe
= _lookup_pipe(dest
.addr
);
438 ldout(cct
, 10) << "get_connection " << dest
<< " existing " << pipe
<< dendl
;
440 pipe
= connect_rank(dest
.addr
, dest
.name
.type(), NULL
, NULL
);
441 ldout(cct
, 10) << "get_connection " << dest
<< " new " << pipe
<< dendl
;
443 Mutex::Locker
l(pipe
->pipe_lock
);
444 if (pipe
->connection_state
)
445 return pipe
->connection_state
;
446 // we failed too quickly! retry. FIXME.
450 ConnectionRef
SimpleMessenger::get_loopback_connection()
452 return local_connection
;
455 void SimpleMessenger::submit_message(Message
*m
, PipeConnection
*con
,
456 const entity_addr_t
& dest_addr
, int dest_type
,
459 m
->trace
.event("simple submitting message");
460 if (cct
->_conf
->ms_dump_on_send
) {
462 ldout(cct
, 0) << "submit_message " << *m
<< "\n";
463 m
->get_payload().hexdump(*_dout
);
464 if (m
->get_data().length() > 0) {
465 *_dout
<< " data:\n";
466 m
->get_data().hexdump(*_dout
);
472 // existing connection?
475 bool ok
= static_cast<PipeConnection
*>(con
)->try_get_pipe(&pipe
);
477 ldout(cct
,0) << "submit_message " << *m
<< " remote, " << dest_addr
478 << ", failed lossy con, dropping message " << m
<< dendl
;
483 // we loop in case of a racing reconnect, either from us or them
484 pipe
->pipe_lock
.Lock(); // can't use a Locker because of the Pipe ref
485 if (pipe
->state
!= Pipe::STATE_CLOSED
) {
486 ldout(cct
,20) << "submit_message " << *m
<< " remote, " << dest_addr
<< ", have pipe." << dendl
;
488 pipe
->pipe_lock
.Unlock();
493 ok
= con
->try_get_pipe(¤t_pipe
);
494 pipe
->pipe_lock
.Unlock();
495 if (current_pipe
== pipe
) {
496 ldout(cct
,20) << "submit_message " << *m
<< " remote, " << dest_addr
497 << ", had pipe " << pipe
<< ", but it closed." << dendl
;
510 if (my_inst
.addr
== dest_addr
) {
512 ldout(cct
,20) << "submit_message " << *m
<< " local" << dendl
;
513 m
->set_connection(local_connection
.get());
514 dispatch_queue
.local_delivery(m
, m
->get_priority());
518 // remote, no existing pipe.
519 const Policy
& policy
= get_policy(dest_type
);
521 ldout(cct
,20) << "submit_message " << *m
<< " remote, " << dest_addr
<< ", lossy server for target type "
522 << ceph_entity_type_name(dest_type
) << ", no session, dropping." << dendl
;
525 ldout(cct
,20) << "submit_message " << *m
<< " remote, " << dest_addr
<< ", new pipe." << dendl
;
526 if (!already_locked
) {
527 /** We couldn't handle the Message without reference to global data, so
528 * grab the lock and do it again. If we got here, we know it's a non-lossy
529 * Connection, so we can use our existing pointer without doing another lookup. */
530 Mutex::Locker
l(lock
);
531 submit_message(m
, con
, dest_addr
, dest_type
, true);
533 connect_rank(dest_addr
, dest_type
, static_cast<PipeConnection
*>(con
), m
);
538 int SimpleMessenger::send_keepalive(Connection
*con
)
541 Pipe
*pipe
= static_cast<Pipe
*>(
542 static_cast<PipeConnection
*>(con
)->get_pipe());
544 ldout(cct
,20) << "send_keepalive con " << con
<< ", have pipe." << dendl
;
545 assert(pipe
->msgr
== this);
546 pipe
->pipe_lock
.Lock();
547 pipe
->_send_keepalive();
548 pipe
->pipe_lock
.Unlock();
551 ldout(cct
,0) << "send_keepalive con " << con
<< ", no pipe." << dendl
;
559 void SimpleMessenger::wait()
567 stop_cond
.Wait(lock
);
573 ldout(cct
,20) << "wait: stopping accepter thread" << dendl
;
576 ldout(cct
,20) << "wait: stopped accepter thread" << dendl
;
579 dispatch_queue
.shutdown();
580 if (dispatch_queue
.is_started()) {
581 ldout(cct
,10) << "wait: waiting for dispatch queue" << dendl
;
582 dispatch_queue
.wait();
583 dispatch_queue
.discard_local();
584 ldout(cct
,10) << "wait: dispatch queue is stopped" << dendl
;
587 if (reaper_started
) {
588 ldout(cct
,20) << "wait: stopping reaper thread" << dendl
;
590 reaper_cond
.Signal();
593 reaper_thread
.join();
594 reaper_started
= false;
595 ldout(cct
,20) << "wait: stopped reaper thread" << dendl
;
598 // close+reap all pipes
601 ldout(cct
,10) << "wait: closing pipes" << dendl
;
603 while (!rank_pipe
.empty()) {
604 Pipe
*p
= rank_pipe
.begin()->second
;
605 p
->unregister_pipe();
608 // don't generate an event here; we're shutting down anyway.
609 PipeConnectionRef con
= p
->connection_state
;
612 p
->pipe_lock
.Unlock();
616 ldout(cct
,10) << "wait: waiting for pipes " << pipes
<< " to close" << dendl
;
617 while (!pipes
.empty()) {
618 reaper_cond
.Wait(lock
);
624 ldout(cct
,10) << "wait: done." << dendl
;
625 ldout(cct
,1) << "shutdown complete." << dendl
;
630 void SimpleMessenger::mark_down_all()
632 ldout(cct
,1) << "mark_down_all" << dendl
;
634 for (set
<Pipe
*>::iterator q
= accepting_pipes
.begin(); q
!= accepting_pipes
.end(); ++q
) {
636 ldout(cct
,5) << "mark_down_all accepting_pipe " << p
<< dendl
;
639 PipeConnectionRef con
= p
->connection_state
;
640 if (con
&& con
->clear_pipe(p
))
641 dispatch_queue
.queue_reset(con
.get());
642 p
->pipe_lock
.Unlock();
644 accepting_pipes
.clear();
646 while (!rank_pipe
.empty()) {
647 ceph::unordered_map
<entity_addr_t
,Pipe
*>::iterator it
= rank_pipe
.begin();
648 Pipe
*p
= it
->second
;
649 ldout(cct
,5) << "mark_down_all " << it
->first
<< " " << p
<< dendl
;
651 p
->unregister_pipe();
654 PipeConnectionRef con
= p
->connection_state
;
655 if (con
&& con
->clear_pipe(p
))
656 dispatch_queue
.queue_reset(con
.get());
657 p
->pipe_lock
.Unlock();
662 void SimpleMessenger::mark_down(const entity_addr_t
& addr
)
665 Pipe
*p
= _lookup_pipe(addr
);
667 ldout(cct
,1) << "mark_down " << addr
<< " -- " << p
<< dendl
;
668 p
->unregister_pipe();
671 if (p
->connection_state
) {
672 // generate a reset event for the caller in this case, even
673 // though they asked for it, since this is the addr-based (and
674 // not Connection* based) interface
675 PipeConnectionRef con
= p
->connection_state
;
676 if (con
&& con
->clear_pipe(p
))
677 dispatch_queue
.queue_reset(con
.get());
679 p
->pipe_lock
.Unlock();
681 ldout(cct
,1) << "mark_down " << addr
<< " -- pipe dne" << dendl
;
686 void SimpleMessenger::mark_down(Connection
*con
)
691 Pipe
*p
= static_cast<Pipe
*>(static_cast<PipeConnection
*>(con
)->get_pipe());
693 ldout(cct
,1) << "mark_down " << con
<< " -- " << p
<< dendl
;
694 assert(p
->msgr
== this);
695 p
->unregister_pipe();
698 if (p
->connection_state
) {
699 // do not generate a reset event for the caller in this case,
700 // since they asked for it.
701 p
->connection_state
->clear_pipe(p
);
703 p
->pipe_lock
.Unlock();
706 ldout(cct
,1) << "mark_down " << con
<< " -- pipe dne" << dendl
;
711 void SimpleMessenger::mark_disposable(Connection
*con
)
714 Pipe
*p
= static_cast<Pipe
*>(static_cast<PipeConnection
*>(con
)->get_pipe());
716 ldout(cct
,1) << "mark_disposable " << con
<< " -- " << p
<< dendl
;
717 assert(p
->msgr
== this);
719 p
->policy
.lossy
= true;
720 p
->pipe_lock
.Unlock();
723 ldout(cct
,1) << "mark_disposable " << con
<< " -- pipe dne" << dendl
;
728 void SimpleMessenger::learned_addr(const entity_addr_t
&peer_addr_for_me
)
730 // be careful here: multiple threads may block here, and readers of
731 // my_inst.addr do NOT hold any lock.
733 // this always goes from true -> false under the protection of the
734 // mutex. if it is already false, we need not retake the mutex at
741 entity_addr_t t
= peer_addr_for_me
;
742 t
.set_port(my_inst
.addr
.get_port());
743 t
.set_nonce(my_inst
.addr
.get_nonce());
744 ANNOTATE_BENIGN_RACE_SIZED(&my_inst
.addr
, sizeof(my_inst
.addr
),
745 "SimpleMessenger learned addr");
747 ldout(cct
,1) << "learned my addr " << my_inst
.addr
<< dendl
;
749 init_local_connection();
754 void SimpleMessenger::init_local_connection()
756 local_connection
->peer_addr
= my_inst
.addr
;
757 local_connection
->peer_type
= my_inst
.name
.type();
758 local_connection
->set_features(CEPH_FEATURES_ALL
);
759 ms_deliver_handle_fast_connect(local_connection
.get());