]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/simple/SimpleMessenger.cc
update sources to 12.2.7
[ceph.git] / ceph / src / msg / simple / SimpleMessenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <errno.h>
16 #include <iostream>
17 #include <fstream>
18
19
20 #include "SimpleMessenger.h"
21
22 #include "common/config.h"
23 #include "common/Timer.h"
24 #include "common/errno.h"
25 #include "common/valgrind.h"
26 #include "auth/Crypto.h"
27 #include "include/Spinlock.h"
28
29 #define dout_subsys ceph_subsys_ms
30 #undef dout_prefix
31 #define dout_prefix _prefix(_dout, this)
32 static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
33 return *_dout << "-- " << msgr->get_myaddr() << " ";
34 }
35
36
37 /*******************
38 * SimpleMessenger
39 */
40
41 SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
42 string mname, uint64_t _nonce)
43 : SimplePolicyMessenger(cct, name,mname, _nonce),
44 accepter(this, _nonce),
45 dispatch_queue(cct, this, mname),
46 reaper_thread(this),
47 nonce(_nonce),
48 lock("SimpleMessenger::lock"), need_addr(true), did_bind(false),
49 global_seq(0),
50 cluster_protocol(0),
51 reaper_started(false), reaper_stop(false),
52 timeout(0),
53 local_connection(new PipeConnection(cct, this))
54 {
55 ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout),
56 "SimpleMessenger read timeout");
57 ceph_spin_init(&global_seq_lock);
58 init_local_connection();
59 }
60
61 /**
62 * Destroy the SimpleMessenger. Pretty simple since all the work is done
63 * elsewhere.
64 */
65 SimpleMessenger::~SimpleMessenger()
66 {
67 assert(!did_bind); // either we didn't bind or we shut down the Accepter
68 assert(rank_pipe.empty()); // we don't have any running Pipes.
69 assert(!reaper_started); // the reaper thread is stopped
70 ceph_spin_destroy(&global_seq_lock);
71 }
72
73 void SimpleMessenger::ready()
74 {
75 ldout(cct,10) << "ready " << get_myaddr() << dendl;
76 dispatch_queue.start();
77
78 lock.Lock();
79 if (did_bind)
80 accepter.start();
81 lock.Unlock();
82 }
83
84
85 int SimpleMessenger::shutdown()
86 {
87 ldout(cct,10) << "shutdown " << get_myaddr() << dendl;
88 mark_down_all();
89
90 // break ref cycles on the loopback connection
91 local_connection->set_priv(NULL);
92
93 lock.Lock();
94 stop_cond.Signal();
95 stopped = true;
96 lock.Unlock();
97
98 return 0;
99 }
100
101 int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest)
102 {
103 // set envelope
104 m->get_header().src = get_myname();
105 m->set_cct(cct);
106
107 if (!m->get_priority()) m->set_priority(get_default_send_priority());
108
109 ldout(cct,1) <<"--> " << dest.name << " "
110 << dest.addr << " -- " << *m
111 << " -- ?+" << m->get_data().length()
112 << " " << m
113 << dendl;
114
115 if (dest.addr == entity_addr_t()) {
116 ldout(cct,0) << "send_message message " << *m
117 << " with empty dest " << dest.addr << dendl;
118 m->put();
119 return -EINVAL;
120 }
121
122 lock.Lock();
123 Pipe *pipe = _lookup_pipe(dest.addr);
124 submit_message(m, (pipe ? pipe->connection_state.get() : NULL),
125 dest.addr, dest.name.type(), true);
126 lock.Unlock();
127 return 0;
128 }
129
130 int SimpleMessenger::_send_message(Message *m, Connection *con)
131 {
132 //set envelope
133 m->get_header().src = get_myname();
134
135 if (!m->get_priority()) m->set_priority(get_default_send_priority());
136
137 ldout(cct,1) << "--> " << con->get_peer_addr()
138 << " -- " << *m
139 << " -- ?+" << m->get_data().length()
140 << " " << m << " con " << con
141 << dendl;
142
143 submit_message(m, static_cast<PipeConnection*>(con),
144 con->get_peer_addr(), con->get_peer_type(), false);
145 return 0;
146 }
147
148 /**
149 * If my_inst.addr doesn't have an IP set, this function
150 * will fill it in from the passed addr. Otherwise it does nothing and returns.
151 */
152 void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr)
153 {
154 if (my_inst.addr.is_blank_ip()) {
155 int port = my_inst.addr.get_port();
156 my_inst.addr.u = addr.u;
157 my_inst.addr.set_port(port);
158 init_local_connection();
159 }
160 }
161
162 void SimpleMessenger::set_addr(const entity_addr_t &addr)
163 {
164 entity_addr_t t = addr;
165 t.set_nonce(nonce);
166 set_myaddr(t);
167 init_local_connection();
168 }
169
170 int SimpleMessenger::get_proto_version(int peer_type, bool connect)
171 {
172 int my_type = my_inst.name.type();
173
174 // set reply protocol version
175 if (peer_type == my_type) {
176 // internal
177 return cluster_protocol;
178 } else {
179 // public
180 if (connect) {
181 switch (peer_type) {
182 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
183 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
184 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
185 }
186 } else {
187 switch (my_type) {
188 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
189 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
190 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
191 }
192 }
193 }
194 return 0;
195 }
196
197
198
199
200
201
202
203 /********************************************
204 * SimpleMessenger
205 */
206 #undef dout_prefix
207 #define dout_prefix _prefix(_dout, this)
208
209 void SimpleMessenger::reaper_entry()
210 {
211 ldout(cct,10) << "reaper_entry start" << dendl;
212 lock.Lock();
213 while (!reaper_stop) {
214 reaper(); // may drop and retake the lock
215 if (reaper_stop)
216 break;
217 reaper_cond.Wait(lock);
218 }
219 lock.Unlock();
220 ldout(cct,10) << "reaper_entry done" << dendl;
221 }
222
223 /*
224 * note: assumes lock is held
225 */
226 void SimpleMessenger::reaper()
227 {
228 ldout(cct,10) << "reaper" << dendl;
229 assert(lock.is_locked());
230
231 while (!pipe_reap_queue.empty()) {
232 Pipe *p = pipe_reap_queue.front();
233 pipe_reap_queue.pop_front();
234 ldout(cct,10) << "reaper reaping pipe " << p << " " <<
235 p->get_peer_addr() << dendl;
236 p->pipe_lock.Lock();
237 p->discard_out_queue();
238 if (p->connection_state) {
239 // mark_down, mark_down_all, or fault() should have done this,
240 // or accept() may have switch the Connection to a different
241 // Pipe... but make sure!
242 bool cleared = p->connection_state->clear_pipe(p);
243 assert(!cleared);
244 }
245 p->pipe_lock.Unlock();
246 p->unregister_pipe();
247 assert(pipes.count(p));
248 pipes.erase(p);
249
250 // drop msgr lock while joining thread; the delay through could be
251 // trying to fast dispatch, preventing it from joining without
252 // blocking and deadlocking.
253 lock.Unlock();
254 p->join();
255 lock.Lock();
256
257 if (p->sd >= 0)
258 ::close(p->sd);
259 ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
260 p->put();
261 ldout(cct,10) << "reaper deleted pipe " << p << dendl;
262 }
263 ldout(cct,10) << "reaper done" << dendl;
264 }
265
266 void SimpleMessenger::queue_reap(Pipe *pipe)
267 {
268 ldout(cct,10) << "queue_reap " << pipe << dendl;
269 lock.Lock();
270 pipe_reap_queue.push_back(pipe);
271 reaper_cond.Signal();
272 lock.Unlock();
273 }
274
275 bool SimpleMessenger::is_connected(Connection *con)
276 {
277 bool r = false;
278 if (con) {
279 Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
280 if (p) {
281 assert(p->msgr == this);
282 r = p->is_connected();
283 p->put();
284 }
285 }
286 return r;
287 }
288
289 int SimpleMessenger::bind(const entity_addr_t &bind_addr)
290 {
291 lock.Lock();
292 if (started) {
293 ldout(cct,10) << "rank.bind already started" << dendl;
294 lock.Unlock();
295 return -1;
296 }
297 ldout(cct,10) << "rank.bind " << bind_addr << dendl;
298 lock.Unlock();
299
300 // bind to a socket
301 set<int> avoid_ports;
302 int r = accepter.bind(bind_addr, avoid_ports);
303 if (r >= 0)
304 did_bind = true;
305 return r;
306 }
307
308 int SimpleMessenger::rebind(const set<int>& avoid_ports)
309 {
310 ldout(cct,1) << "rebind avoid " << avoid_ports << dendl;
311 assert(did_bind);
312 accepter.stop();
313 mark_down_all();
314 return accepter.rebind(avoid_ports);
315 }
316
317
318 int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
319 {
320 if (!cct->_conf->ms_bind_before_connect)
321 return 0;
322 Mutex::Locker l(lock);
323 if (did_bind) {
324 assert(my_inst.addr == bind_addr);
325 return 0;
326 }
327 if (started) {
328 ldout(cct,10) << "rank.bind already started" << dendl;
329 return -1;
330 }
331 ldout(cct,10) << "rank.bind " << bind_addr << dendl;
332
333 set_myaddr(bind_addr);
334 return 0;
335 }
336
337
338 int SimpleMessenger::start()
339 {
340 lock.Lock();
341 ldout(cct,1) << "messenger.start" << dendl;
342
343 // register at least one entity, first!
344 assert(my_inst.name.type() >= 0);
345
346 assert(!started);
347 started = true;
348 stopped = false;
349
350 if (!did_bind) {
351 my_inst.addr.nonce = nonce;
352 init_local_connection();
353 }
354
355 lock.Unlock();
356
357 reaper_started = true;
358 reaper_thread.create("ms_reaper");
359 return 0;
360 }
361
362 Pipe *SimpleMessenger::add_accept_pipe(int sd)
363 {
364 lock.Lock();
365 Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
366 p->sd = sd;
367 p->pipe_lock.Lock();
368 p->start_reader();
369 p->pipe_lock.Unlock();
370 pipes.insert(p);
371 accepting_pipes.insert(p);
372 lock.Unlock();
373 return p;
374 }
375
376 /* connect_rank
377 * NOTE: assumes messenger.lock held.
378 */
379 Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
380 int type,
381 PipeConnection *con,
382 Message *first)
383 {
384 assert(lock.is_locked());
385 assert(addr != my_inst.addr);
386
387 ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
388
389 // create pipe
390 Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING,
391 static_cast<PipeConnection*>(con));
392 pipe->pipe_lock.Lock();
393 pipe->set_peer_type(type);
394 pipe->set_peer_addr(addr);
395 pipe->policy = get_policy(type);
396 pipe->start_writer();
397 if (first)
398 pipe->_send(first);
399 pipe->pipe_lock.Unlock();
400 pipe->register_pipe();
401 pipes.insert(pipe);
402
403 return pipe;
404 }
405
406
407
408
409
410
411 AuthAuthorizer *SimpleMessenger::get_authorizer(int peer_type, bool force_new)
412 {
413 return ms_deliver_get_authorizer(peer_type, force_new);
414 }
415
416 bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type,
417 int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
418 bool& isvalid,CryptoKey& session_key,
419 std::unique_ptr<AuthAuthorizerChallenge> *challenge)
420 {
421 return ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply,
422 isvalid, session_key,
423 challenge);
424 }
425
426 ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
427 {
428 Mutex::Locker l(lock);
429 if (my_inst.addr == dest.addr) {
430 // local
431 return local_connection;
432 }
433
434 // remote
435 while (true) {
436 Pipe *pipe = _lookup_pipe(dest.addr);
437 if (pipe) {
438 ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
439 } else {
440 pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
441 ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
442 }
443 Mutex::Locker l(pipe->pipe_lock);
444 if (pipe->connection_state)
445 return pipe->connection_state;
446 // we failed too quickly! retry. FIXME.
447 }
448 }
449
450 ConnectionRef SimpleMessenger::get_loopback_connection()
451 {
452 return local_connection;
453 }
454
455 void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
456 const entity_addr_t& dest_addr, int dest_type,
457 bool already_locked)
458 {
459 m->trace.event("simple submitting message");
460 if (cct->_conf->ms_dump_on_send) {
461 m->encode(-1, true);
462 ldout(cct, 0) << "submit_message " << *m << "\n";
463 m->get_payload().hexdump(*_dout);
464 if (m->get_data().length() > 0) {
465 *_dout << " data:\n";
466 m->get_data().hexdump(*_dout);
467 }
468 *_dout << dendl;
469 m->clear_payload();
470 }
471
472 // existing connection?
473 if (con) {
474 Pipe *pipe = NULL;
475 bool ok = static_cast<PipeConnection*>(con)->try_get_pipe(&pipe);
476 if (!ok) {
477 ldout(cct,0) << "submit_message " << *m << " remote, " << dest_addr
478 << ", failed lossy con, dropping message " << m << dendl;
479 m->put();
480 return;
481 }
482 while (pipe && ok) {
483 // we loop in case of a racing reconnect, either from us or them
484 pipe->pipe_lock.Lock(); // can't use a Locker because of the Pipe ref
485 if (pipe->state != Pipe::STATE_CLOSED) {
486 ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
487 pipe->_send(m);
488 pipe->pipe_lock.Unlock();
489 pipe->put();
490 return;
491 }
492 Pipe *current_pipe;
493 ok = con->try_get_pipe(&current_pipe);
494 pipe->pipe_lock.Unlock();
495 if (current_pipe == pipe) {
496 ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr
497 << ", had pipe " << pipe << ", but it closed." << dendl;
498 pipe->put();
499 current_pipe->put();
500 m->put();
501 return;
502 } else {
503 pipe->put();
504 pipe = current_pipe;
505 }
506 }
507 }
508
509 // local?
510 if (my_inst.addr == dest_addr) {
511 // local
512 ldout(cct,20) << "submit_message " << *m << " local" << dendl;
513 m->set_connection(local_connection.get());
514 dispatch_queue.local_delivery(m, m->get_priority());
515 return;
516 }
517
518 // remote, no existing pipe.
519 const Policy& policy = get_policy(dest_type);
520 if (policy.server) {
521 ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type "
522 << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
523 m->put();
524 } else {
525 ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
526 if (!already_locked) {
527 /** We couldn't handle the Message without reference to global data, so
528 * grab the lock and do it again. If we got here, we know it's a non-lossy
529 * Connection, so we can use our existing pointer without doing another lookup. */
530 Mutex::Locker l(lock);
531 submit_message(m, con, dest_addr, dest_type, true);
532 } else {
533 connect_rank(dest_addr, dest_type, static_cast<PipeConnection*>(con), m);
534 }
535 }
536 }
537
538 int SimpleMessenger::send_keepalive(Connection *con)
539 {
540 int ret = 0;
541 Pipe *pipe = static_cast<Pipe *>(
542 static_cast<PipeConnection*>(con)->get_pipe());
543 if (pipe) {
544 ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl;
545 assert(pipe->msgr == this);
546 pipe->pipe_lock.Lock();
547 pipe->_send_keepalive();
548 pipe->pipe_lock.Unlock();
549 pipe->put();
550 } else {
551 ldout(cct,0) << "send_keepalive con " << con << ", no pipe." << dendl;
552 ret = -EPIPE;
553 }
554 return ret;
555 }
556
557
558
559 void SimpleMessenger::wait()
560 {
561 lock.Lock();
562 if (!started) {
563 lock.Unlock();
564 return;
565 }
566 if (!stopped)
567 stop_cond.Wait(lock);
568
569 lock.Unlock();
570
571 // done! clean up.
572 if (did_bind) {
573 ldout(cct,20) << "wait: stopping accepter thread" << dendl;
574 accepter.stop();
575 did_bind = false;
576 ldout(cct,20) << "wait: stopped accepter thread" << dendl;
577 }
578
579 dispatch_queue.shutdown();
580 if (dispatch_queue.is_started()) {
581 ldout(cct,10) << "wait: waiting for dispatch queue" << dendl;
582 dispatch_queue.wait();
583 dispatch_queue.discard_local();
584 ldout(cct,10) << "wait: dispatch queue is stopped" << dendl;
585 }
586
587 if (reaper_started) {
588 ldout(cct,20) << "wait: stopping reaper thread" << dendl;
589 lock.Lock();
590 reaper_cond.Signal();
591 reaper_stop = true;
592 lock.Unlock();
593 reaper_thread.join();
594 reaper_started = false;
595 ldout(cct,20) << "wait: stopped reaper thread" << dendl;
596 }
597
598 // close+reap all pipes
599 lock.Lock();
600 {
601 ldout(cct,10) << "wait: closing pipes" << dendl;
602
603 while (!rank_pipe.empty()) {
604 Pipe *p = rank_pipe.begin()->second;
605 p->unregister_pipe();
606 p->pipe_lock.Lock();
607 p->stop_and_wait();
608 // don't generate an event here; we're shutting down anyway.
609 PipeConnectionRef con = p->connection_state;
610 if (con)
611 con->clear_pipe(p);
612 p->pipe_lock.Unlock();
613 }
614
615 reaper();
616 ldout(cct,10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
617 while (!pipes.empty()) {
618 reaper_cond.Wait(lock);
619 reaper();
620 }
621 }
622 lock.Unlock();
623
624 ldout(cct,10) << "wait: done." << dendl;
625 ldout(cct,1) << "shutdown complete." << dendl;
626 started = false;
627 }
628
629
630 void SimpleMessenger::mark_down_all()
631 {
632 ldout(cct,1) << "mark_down_all" << dendl;
633 lock.Lock();
634 for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) {
635 Pipe *p = *q;
636 ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl;
637 p->pipe_lock.Lock();
638 p->stop();
639 PipeConnectionRef con = p->connection_state;
640 if (con && con->clear_pipe(p))
641 dispatch_queue.queue_reset(con.get());
642 p->pipe_lock.Unlock();
643 }
644 accepting_pipes.clear();
645
646 while (!rank_pipe.empty()) {
647 ceph::unordered_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin();
648 Pipe *p = it->second;
649 ldout(cct,5) << "mark_down_all " << it->first << " " << p << dendl;
650 rank_pipe.erase(it);
651 p->unregister_pipe();
652 p->pipe_lock.Lock();
653 p->stop();
654 PipeConnectionRef con = p->connection_state;
655 if (con && con->clear_pipe(p))
656 dispatch_queue.queue_reset(con.get());
657 p->pipe_lock.Unlock();
658 }
659 lock.Unlock();
660 }
661
662 void SimpleMessenger::mark_down(const entity_addr_t& addr)
663 {
664 lock.Lock();
665 Pipe *p = _lookup_pipe(addr);
666 if (p) {
667 ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl;
668 p->unregister_pipe();
669 p->pipe_lock.Lock();
670 p->stop();
671 if (p->connection_state) {
672 // generate a reset event for the caller in this case, even
673 // though they asked for it, since this is the addr-based (and
674 // not Connection* based) interface
675 PipeConnectionRef con = p->connection_state;
676 if (con && con->clear_pipe(p))
677 dispatch_queue.queue_reset(con.get());
678 }
679 p->pipe_lock.Unlock();
680 } else {
681 ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl;
682 }
683 lock.Unlock();
684 }
685
686 void SimpleMessenger::mark_down(Connection *con)
687 {
688 if (con == NULL)
689 return;
690 lock.Lock();
691 Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
692 if (p) {
693 ldout(cct,1) << "mark_down " << con << " -- " << p << dendl;
694 assert(p->msgr == this);
695 p->unregister_pipe();
696 p->pipe_lock.Lock();
697 p->stop();
698 if (p->connection_state) {
699 // do not generate a reset event for the caller in this case,
700 // since they asked for it.
701 p->connection_state->clear_pipe(p);
702 }
703 p->pipe_lock.Unlock();
704 p->put();
705 } else {
706 ldout(cct,1) << "mark_down " << con << " -- pipe dne" << dendl;
707 }
708 lock.Unlock();
709 }
710
711 void SimpleMessenger::mark_disposable(Connection *con)
712 {
713 lock.Lock();
714 Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
715 if (p) {
716 ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl;
717 assert(p->msgr == this);
718 p->pipe_lock.Lock();
719 p->policy.lossy = true;
720 p->pipe_lock.Unlock();
721 p->put();
722 } else {
723 ldout(cct,1) << "mark_disposable " << con << " -- pipe dne" << dendl;
724 }
725 lock.Unlock();
726 }
727
728 void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
729 {
730 // be careful here: multiple threads may block here, and readers of
731 // my_inst.addr do NOT hold any lock.
732
733 // this always goes from true -> false under the protection of the
734 // mutex. if it is already false, we need not retake the mutex at
735 // all.
736 if (!need_addr)
737 return;
738
739 lock.Lock();
740 if (need_addr) {
741 entity_addr_t t = peer_addr_for_me;
742 t.set_port(my_inst.addr.get_port());
743 t.set_nonce(my_inst.addr.get_nonce());
744 ANNOTATE_BENIGN_RACE_SIZED(&my_inst.addr, sizeof(my_inst.addr),
745 "SimpleMessenger learned addr");
746 my_inst.addr = t;
747 ldout(cct,1) << "learned my addr " << my_inst.addr << dendl;
748 need_addr = false;
749 init_local_connection();
750 }
751 lock.Unlock();
752 }
753
754 void SimpleMessenger::init_local_connection()
755 {
756 local_connection->peer_addr = my_inst.addr;
757 local_connection->peer_type = my_inst.name.type();
758 local_connection->set_features(CEPH_FEATURES_ALL);
759 ms_deliver_handle_fast_connect(local_connection.get());
760 }