]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/simple/SimpleMessenger.cc
84b6a253ec6e6ee72499e312d4d6737d09841f85
[ceph.git] / ceph / src / msg / simple / SimpleMessenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <errno.h>
16 #include <iostream>
17 #include <fstream>
18
19
20 #include "SimpleMessenger.h"
21
22 #include "common/config.h"
23 #include "common/Timer.h"
24 #include "common/errno.h"
25 #include "common/valgrind.h"
26 #include "auth/Crypto.h"
27 #include "include/Spinlock.h"
28
29 #define dout_subsys ceph_subsys_ms
30 #undef dout_prefix
31 #define dout_prefix _prefix(_dout, this)
32 static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
33 return *_dout << "-- " << msgr->get_myaddr() << " ";
34 }
35
36
37 /*******************
38 * SimpleMessenger
39 */
40
41 SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
42 string mname, uint64_t _nonce)
43 : SimplePolicyMessenger(cct, name,mname, _nonce),
44 accepter(this, _nonce),
45 dispatch_queue(cct, this, mname),
46 reaper_thread(this),
47 nonce(_nonce),
48 lock("SimpleMessenger::lock"), need_addr(true), did_bind(false),
49 global_seq(0),
50 cluster_protocol(0),
51 reaper_started(false), reaper_stop(false),
52 timeout(0),
53 local_connection(new PipeConnection(cct, this))
54 {
55 ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout),
56 "SimpleMessenger read timeout");
57 ceph_spin_init(&global_seq_lock);
58 init_local_connection();
59 }
60
61 /**
62 * Destroy the SimpleMessenger. Pretty simple since all the work is done
63 * elsewhere.
64 */
65 SimpleMessenger::~SimpleMessenger()
66 {
67 assert(!did_bind); // either we didn't bind or we shut down the Accepter
68 assert(rank_pipe.empty()); // we don't have any running Pipes.
69 assert(!reaper_started); // the reaper thread is stopped
70 ceph_spin_destroy(&global_seq_lock);
71 }
72
73 void SimpleMessenger::ready()
74 {
75 ldout(cct,10) << "ready " << get_myaddr() << dendl;
76 dispatch_queue.start();
77
78 lock.Lock();
79 if (did_bind)
80 accepter.start();
81 lock.Unlock();
82 }
83
84
85 int SimpleMessenger::shutdown()
86 {
87 ldout(cct,10) << "shutdown " << get_myaddr() << dendl;
88 mark_down_all();
89
90 // break ref cycles on the loopback connection
91 local_connection->set_priv(NULL);
92
93 lock.Lock();
94 stop_cond.Signal();
95 stopped = true;
96 lock.Unlock();
97
98 return 0;
99 }
100
101 int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest)
102 {
103 // set envelope
104 m->get_header().src = get_myname();
105 m->set_cct(cct);
106
107 if (!m->get_priority()) m->set_priority(get_default_send_priority());
108
109 ldout(cct,1) <<"--> " << dest.name << " "
110 << dest.addr << " -- " << *m
111 << " -- ?+" << m->get_data().length()
112 << " " << m
113 << dendl;
114
115 if (dest.addr == entity_addr_t()) {
116 ldout(cct,0) << "send_message message " << *m
117 << " with empty dest " << dest.addr << dendl;
118 m->put();
119 return -EINVAL;
120 }
121
122 lock.Lock();
123 Pipe *pipe = _lookup_pipe(dest.addr);
124 submit_message(m, (pipe ? pipe->connection_state.get() : NULL),
125 dest.addr, dest.name.type(), true);
126 lock.Unlock();
127 return 0;
128 }
129
130 int SimpleMessenger::_send_message(Message *m, Connection *con)
131 {
132 //set envelope
133 m->get_header().src = get_myname();
134
135 if (!m->get_priority()) m->set_priority(get_default_send_priority());
136
137 ldout(cct,1) << "--> " << con->get_peer_addr()
138 << " -- " << *m
139 << " -- ?+" << m->get_data().length()
140 << " " << m << " con " << con
141 << dendl;
142
143 submit_message(m, static_cast<PipeConnection*>(con),
144 con->get_peer_addr(), con->get_peer_type(), false);
145 return 0;
146 }
147
148 /**
149 * If my_inst.addr doesn't have an IP set, this function
150 * will fill it in from the passed addr. Otherwise it does nothing and returns.
151 */
152 void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr)
153 {
154 if (my_inst.addr.is_blank_ip()) {
155 int port = my_inst.addr.get_port();
156 my_inst.addr.u = addr.u;
157 my_inst.addr.set_port(port);
158 init_local_connection();
159 }
160 }
161
162 int SimpleMessenger::get_proto_version(int peer_type, bool connect)
163 {
164 int my_type = my_inst.name.type();
165
166 // set reply protocol version
167 if (peer_type == my_type) {
168 // internal
169 return cluster_protocol;
170 } else {
171 // public
172 if (connect) {
173 switch (peer_type) {
174 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
175 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
176 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
177 }
178 } else {
179 switch (my_type) {
180 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
181 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
182 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
183 }
184 }
185 }
186 return 0;
187 }
188
189
190
191
192
193
194
195 /********************************************
196 * SimpleMessenger
197 */
198 #undef dout_prefix
199 #define dout_prefix _prefix(_dout, this)
200
201 void SimpleMessenger::reaper_entry()
202 {
203 ldout(cct,10) << "reaper_entry start" << dendl;
204 lock.Lock();
205 while (!reaper_stop) {
206 reaper(); // may drop and retake the lock
207 if (reaper_stop)
208 break;
209 reaper_cond.Wait(lock);
210 }
211 lock.Unlock();
212 ldout(cct,10) << "reaper_entry done" << dendl;
213 }
214
215 /*
216 * note: assumes lock is held
217 */
218 void SimpleMessenger::reaper()
219 {
220 ldout(cct,10) << "reaper" << dendl;
221 assert(lock.is_locked());
222
223 while (!pipe_reap_queue.empty()) {
224 Pipe *p = pipe_reap_queue.front();
225 pipe_reap_queue.pop_front();
226 ldout(cct,10) << "reaper reaping pipe " << p << " " <<
227 p->get_peer_addr() << dendl;
228 p->pipe_lock.Lock();
229 p->discard_out_queue();
230 if (p->connection_state) {
231 // mark_down, mark_down_all, or fault() should have done this,
232 // or accept() may have switch the Connection to a different
233 // Pipe... but make sure!
234 bool cleared = p->connection_state->clear_pipe(p);
235 assert(!cleared);
236 }
237 p->pipe_lock.Unlock();
238 p->unregister_pipe();
239 assert(pipes.count(p));
240 pipes.erase(p);
241
242 // drop msgr lock while joining thread; the delay through could be
243 // trying to fast dispatch, preventing it from joining without
244 // blocking and deadlocking.
245 lock.Unlock();
246 p->join();
247 lock.Lock();
248
249 if (p->sd >= 0)
250 ::close(p->sd);
251 ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
252 p->put();
253 ldout(cct,10) << "reaper deleted pipe " << p << dendl;
254 }
255 ldout(cct,10) << "reaper done" << dendl;
256 }
257
258 void SimpleMessenger::queue_reap(Pipe *pipe)
259 {
260 ldout(cct,10) << "queue_reap " << pipe << dendl;
261 lock.Lock();
262 pipe_reap_queue.push_back(pipe);
263 reaper_cond.Signal();
264 lock.Unlock();
265 }
266
267 bool SimpleMessenger::is_connected(Connection *con)
268 {
269 bool r = false;
270 if (con) {
271 Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
272 if (p) {
273 assert(p->msgr == this);
274 r = p->is_connected();
275 p->put();
276 }
277 }
278 return r;
279 }
280
281 int SimpleMessenger::bind(const entity_addr_t &bind_addr)
282 {
283 lock.Lock();
284 if (started) {
285 ldout(cct,10) << "rank.bind already started" << dendl;
286 lock.Unlock();
287 return -1;
288 }
289 ldout(cct,10) << "rank.bind " << bind_addr << dendl;
290 lock.Unlock();
291
292 // bind to a socket
293 set<int> avoid_ports;
294 int r = accepter.bind(bind_addr, avoid_ports);
295 if (r >= 0)
296 did_bind = true;
297 return r;
298 }
299
300 int SimpleMessenger::rebind(const set<int>& avoid_ports)
301 {
302 ldout(cct,1) << "rebind avoid " << avoid_ports << dendl;
303 assert(did_bind);
304 accepter.stop();
305 mark_down_all();
306 return accepter.rebind(avoid_ports);
307 }
308
309
310 int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
311 {
312 if (!cct->_conf->ms_bind_before_connect)
313 return 0;
314 Mutex::Locker l(lock);
315 if (did_bind) {
316 assert(my_inst.addr == bind_addr);
317 return 0;
318 }
319 if (started) {
320 ldout(cct,10) << "rank.bind already started" << dendl;
321 return -1;
322 }
323 ldout(cct,10) << "rank.bind " << bind_addr << dendl;
324
325 set_myaddr(bind_addr);
326 return 0;
327 }
328
329
330 int SimpleMessenger::start()
331 {
332 lock.Lock();
333 ldout(cct,1) << "messenger.start" << dendl;
334
335 // register at least one entity, first!
336 assert(my_inst.name.type() >= 0);
337
338 assert(!started);
339 started = true;
340 stopped = false;
341
342 if (!did_bind) {
343 my_inst.addr.nonce = nonce;
344 init_local_connection();
345 }
346
347 lock.Unlock();
348
349 reaper_started = true;
350 reaper_thread.create("ms_reaper");
351 return 0;
352 }
353
354 Pipe *SimpleMessenger::add_accept_pipe(int sd)
355 {
356 lock.Lock();
357 Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
358 p->sd = sd;
359 p->pipe_lock.Lock();
360 p->start_reader();
361 p->pipe_lock.Unlock();
362 pipes.insert(p);
363 accepting_pipes.insert(p);
364 lock.Unlock();
365 return p;
366 }
367
368 /* connect_rank
369 * NOTE: assumes messenger.lock held.
370 */
371 Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
372 int type,
373 PipeConnection *con,
374 Message *first)
375 {
376 assert(lock.is_locked());
377 assert(addr != my_inst.addr);
378
379 ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
380
381 // create pipe
382 Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING,
383 static_cast<PipeConnection*>(con));
384 pipe->pipe_lock.Lock();
385 pipe->set_peer_type(type);
386 pipe->set_peer_addr(addr);
387 pipe->policy = get_policy(type);
388 pipe->start_writer();
389 if (first)
390 pipe->_send(first);
391 pipe->pipe_lock.Unlock();
392 pipe->register_pipe();
393 pipes.insert(pipe);
394
395 return pipe;
396 }
397
398
399
400
401
402
403 AuthAuthorizer *SimpleMessenger::get_authorizer(int peer_type, bool force_new)
404 {
405 return ms_deliver_get_authorizer(peer_type, force_new);
406 }
407
408 bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type,
409 int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
410 bool& isvalid,CryptoKey& session_key)
411 {
412 return ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid,session_key);
413 }
414
415 ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
416 {
417 Mutex::Locker l(lock);
418 if (my_inst.addr == dest.addr) {
419 // local
420 return local_connection;
421 }
422
423 // remote
424 while (true) {
425 Pipe *pipe = _lookup_pipe(dest.addr);
426 if (pipe) {
427 ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
428 } else {
429 pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
430 ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
431 }
432 Mutex::Locker l(pipe->pipe_lock);
433 if (pipe->connection_state)
434 return pipe->connection_state;
435 // we failed too quickly! retry. FIXME.
436 }
437 }
438
439 ConnectionRef SimpleMessenger::get_loopback_connection()
440 {
441 return local_connection;
442 }
443
444 void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
445 const entity_addr_t& dest_addr, int dest_type,
446 bool already_locked)
447 {
448 m->trace.event("simple submitting message");
449 if (cct->_conf->ms_dump_on_send) {
450 m->encode(-1, true);
451 ldout(cct, 0) << "submit_message " << *m << "\n";
452 m->get_payload().hexdump(*_dout);
453 if (m->get_data().length() > 0) {
454 *_dout << " data:\n";
455 m->get_data().hexdump(*_dout);
456 }
457 *_dout << dendl;
458 m->clear_payload();
459 }
460
461 // existing connection?
462 if (con) {
463 Pipe *pipe = NULL;
464 bool ok = static_cast<PipeConnection*>(con)->try_get_pipe(&pipe);
465 if (!ok) {
466 ldout(cct,0) << "submit_message " << *m << " remote, " << dest_addr
467 << ", failed lossy con, dropping message " << m << dendl;
468 m->put();
469 return;
470 }
471 while (pipe && ok) {
472 // we loop in case of a racing reconnect, either from us or them
473 pipe->pipe_lock.Lock(); // can't use a Locker because of the Pipe ref
474 if (pipe->state != Pipe::STATE_CLOSED) {
475 ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
476 pipe->_send(m);
477 pipe->pipe_lock.Unlock();
478 pipe->put();
479 return;
480 }
481 Pipe *current_pipe;
482 ok = con->try_get_pipe(&current_pipe);
483 pipe->pipe_lock.Unlock();
484 if (current_pipe == pipe) {
485 ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr
486 << ", had pipe " << pipe << ", but it closed." << dendl;
487 pipe->put();
488 current_pipe->put();
489 m->put();
490 return;
491 } else {
492 pipe->put();
493 pipe = current_pipe;
494 }
495 }
496 }
497
498 // local?
499 if (my_inst.addr == dest_addr) {
500 // local
501 ldout(cct,20) << "submit_message " << *m << " local" << dendl;
502 m->set_connection(local_connection.get());
503 dispatch_queue.local_delivery(m, m->get_priority());
504 return;
505 }
506
507 // remote, no existing pipe.
508 const Policy& policy = get_policy(dest_type);
509 if (policy.server) {
510 ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type "
511 << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
512 m->put();
513 } else {
514 ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
515 if (!already_locked) {
516 /** We couldn't handle the Message without reference to global data, so
517 * grab the lock and do it again. If we got here, we know it's a non-lossy
518 * Connection, so we can use our existing pointer without doing another lookup. */
519 Mutex::Locker l(lock);
520 submit_message(m, con, dest_addr, dest_type, true);
521 } else {
522 connect_rank(dest_addr, dest_type, static_cast<PipeConnection*>(con), m);
523 }
524 }
525 }
526
527 int SimpleMessenger::send_keepalive(Connection *con)
528 {
529 int ret = 0;
530 Pipe *pipe = static_cast<Pipe *>(
531 static_cast<PipeConnection*>(con)->get_pipe());
532 if (pipe) {
533 ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl;
534 assert(pipe->msgr == this);
535 pipe->pipe_lock.Lock();
536 pipe->_send_keepalive();
537 pipe->pipe_lock.Unlock();
538 pipe->put();
539 } else {
540 ldout(cct,0) << "send_keepalive con " << con << ", no pipe." << dendl;
541 ret = -EPIPE;
542 }
543 return ret;
544 }
545
546
547
548 void SimpleMessenger::wait()
549 {
550 lock.Lock();
551 if (!started) {
552 lock.Unlock();
553 return;
554 }
555 if (!stopped)
556 stop_cond.Wait(lock);
557
558 lock.Unlock();
559
560 // done! clean up.
561 if (did_bind) {
562 ldout(cct,20) << "wait: stopping accepter thread" << dendl;
563 accepter.stop();
564 did_bind = false;
565 ldout(cct,20) << "wait: stopped accepter thread" << dendl;
566 }
567
568 dispatch_queue.shutdown();
569 if (dispatch_queue.is_started()) {
570 ldout(cct,10) << "wait: waiting for dispatch queue" << dendl;
571 dispatch_queue.wait();
572 dispatch_queue.discard_local();
573 ldout(cct,10) << "wait: dispatch queue is stopped" << dendl;
574 }
575
576 if (reaper_started) {
577 ldout(cct,20) << "wait: stopping reaper thread" << dendl;
578 lock.Lock();
579 reaper_cond.Signal();
580 reaper_stop = true;
581 lock.Unlock();
582 reaper_thread.join();
583 reaper_started = false;
584 ldout(cct,20) << "wait: stopped reaper thread" << dendl;
585 }
586
587 // close+reap all pipes
588 lock.Lock();
589 {
590 ldout(cct,10) << "wait: closing pipes" << dendl;
591
592 while (!rank_pipe.empty()) {
593 Pipe *p = rank_pipe.begin()->second;
594 p->unregister_pipe();
595 p->pipe_lock.Lock();
596 p->stop_and_wait();
597 // don't generate an event here; we're shutting down anyway.
598 PipeConnectionRef con = p->connection_state;
599 if (con)
600 con->clear_pipe(p);
601 p->pipe_lock.Unlock();
602 }
603
604 reaper();
605 ldout(cct,10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
606 while (!pipes.empty()) {
607 reaper_cond.Wait(lock);
608 reaper();
609 }
610 }
611 lock.Unlock();
612
613 ldout(cct,10) << "wait: done." << dendl;
614 ldout(cct,1) << "shutdown complete." << dendl;
615 started = false;
616 }
617
618
619 void SimpleMessenger::mark_down_all()
620 {
621 ldout(cct,1) << "mark_down_all" << dendl;
622 lock.Lock();
623 for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) {
624 Pipe *p = *q;
625 ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl;
626 p->pipe_lock.Lock();
627 p->stop();
628 PipeConnectionRef con = p->connection_state;
629 if (con && con->clear_pipe(p))
630 dispatch_queue.queue_reset(con.get());
631 p->pipe_lock.Unlock();
632 }
633 accepting_pipes.clear();
634
635 while (!rank_pipe.empty()) {
636 ceph::unordered_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin();
637 Pipe *p = it->second;
638 ldout(cct,5) << "mark_down_all " << it->first << " " << p << dendl;
639 rank_pipe.erase(it);
640 p->unregister_pipe();
641 p->pipe_lock.Lock();
642 p->stop();
643 PipeConnectionRef con = p->connection_state;
644 if (con && con->clear_pipe(p))
645 dispatch_queue.queue_reset(con.get());
646 p->pipe_lock.Unlock();
647 }
648 lock.Unlock();
649 }
650
651 void SimpleMessenger::mark_down(const entity_addr_t& addr)
652 {
653 lock.Lock();
654 Pipe *p = _lookup_pipe(addr);
655 if (p) {
656 ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl;
657 p->unregister_pipe();
658 p->pipe_lock.Lock();
659 p->stop();
660 if (p->connection_state) {
661 // generate a reset event for the caller in this case, even
662 // though they asked for it, since this is the addr-based (and
663 // not Connection* based) interface
664 PipeConnectionRef con = p->connection_state;
665 if (con && con->clear_pipe(p))
666 dispatch_queue.queue_reset(con.get());
667 }
668 p->pipe_lock.Unlock();
669 } else {
670 ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl;
671 }
672 lock.Unlock();
673 }
674
675 void SimpleMessenger::mark_down(Connection *con)
676 {
677 if (con == NULL)
678 return;
679 lock.Lock();
680 Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
681 if (p) {
682 ldout(cct,1) << "mark_down " << con << " -- " << p << dendl;
683 assert(p->msgr == this);
684 p->unregister_pipe();
685 p->pipe_lock.Lock();
686 p->stop();
687 if (p->connection_state) {
688 // do not generate a reset event for the caller in this case,
689 // since they asked for it.
690 p->connection_state->clear_pipe(p);
691 }
692 p->pipe_lock.Unlock();
693 p->put();
694 } else {
695 ldout(cct,1) << "mark_down " << con << " -- pipe dne" << dendl;
696 }
697 lock.Unlock();
698 }
699
700 void SimpleMessenger::mark_disposable(Connection *con)
701 {
702 lock.Lock();
703 Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
704 if (p) {
705 ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl;
706 assert(p->msgr == this);
707 p->pipe_lock.Lock();
708 p->policy.lossy = true;
709 p->pipe_lock.Unlock();
710 p->put();
711 } else {
712 ldout(cct,1) << "mark_disposable " << con << " -- pipe dne" << dendl;
713 }
714 lock.Unlock();
715 }
716
717 void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
718 {
719 // be careful here: multiple threads may block here, and readers of
720 // my_inst.addr do NOT hold any lock.
721
722 // this always goes from true -> false under the protection of the
723 // mutex. if it is already false, we need not retake the mutex at
724 // all.
725 if (!need_addr)
726 return;
727
728 lock.Lock();
729 if (need_addr) {
730 entity_addr_t t = peer_addr_for_me;
731 t.set_port(my_inst.addr.get_port());
732 t.set_nonce(my_inst.addr.get_nonce());
733 ANNOTATE_BENIGN_RACE_SIZED(&my_inst.addr, sizeof(my_inst.addr),
734 "SimpleMessenger learned addr");
735 my_inst.addr = t;
736 ldout(cct,1) << "learned my addr " << my_inst.addr << dendl;
737 need_addr = false;
738 init_local_connection();
739 }
740 lock.Unlock();
741 }
742
743 void SimpleMessenger::init_local_connection()
744 {
745 local_connection->peer_addr = my_inst.addr;
746 local_connection->peer_type = my_inst.name.type();
747 local_connection->set_features(CEPH_FEATURES_ALL);
748 ms_deliver_handle_fast_connect(local_connection.get());
749 }