]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include "acconfig.h" | |
18 | ||
19 | #include <iostream> | |
20 | #include <fstream> | |
21 | ||
22 | #include "AsyncMessenger.h" | |
23 | ||
24 | #include "common/config.h" | |
25 | #include "common/Timer.h" | |
26 | #include "common/errno.h" | |
27 | ||
28 | #include "messages/MOSDOp.h" | |
29 | #include "messages/MOSDOpReply.h" | |
30 | #include "common/EventTrace.h" | |
31 | ||
32 | #define dout_subsys ceph_subsys_ms | |
33 | #undef dout_prefix | |
34 | #define dout_prefix _prefix(_dout, this) | |
35 | static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) { | |
36 | return *_dout << "-- " << m->get_myaddr() << " "; | |
37 | } | |
38 | ||
39 | static ostream& _prefix(std::ostream *_dout, Processor *p) { | |
40 | return *_dout << " Processor -- "; | |
41 | } | |
42 | ||
43 | ||
44 | /******************* | |
45 | * Processor | |
46 | */ | |
47 | ||
48 | class Processor::C_processor_accept : public EventCallback { | |
49 | Processor *pro; | |
50 | ||
51 | public: | |
52 | explicit C_processor_accept(Processor *p): pro(p) {} | |
53 | void do_request(int id) override { | |
54 | pro->accept(); | |
55 | } | |
56 | }; | |
57 | ||
58 | Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c) | |
59 | : msgr(r), net(c), worker(w), | |
60 | listen_handler(new C_processor_accept(this)) {} | |
61 | ||
62 | int Processor::bind(const entity_addr_t &bind_addr, | |
63 | const set<int>& avoid_ports, | |
64 | entity_addr_t* bound_addr) | |
65 | { | |
66 | const md_config_t *conf = msgr->cct->_conf; | |
67 | // bind to a socket | |
68 | ldout(msgr->cct, 10) << __func__ << dendl; | |
69 | ||
70 | int family; | |
71 | switch (bind_addr.get_family()) { | |
72 | case AF_INET: | |
73 | case AF_INET6: | |
74 | family = bind_addr.get_family(); | |
75 | break; | |
76 | ||
77 | default: | |
78 | // bind_addr is empty | |
79 | family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; | |
80 | } | |
81 | ||
82 | SocketOptions opts; | |
83 | opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; | |
84 | opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; | |
85 | ||
86 | // use whatever user specified (if anything) | |
87 | entity_addr_t listen_addr = bind_addr; | |
88 | if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) { | |
89 | listen_addr.set_type(entity_addr_t::TYPE_LEGACY); | |
90 | } | |
91 | listen_addr.set_family(family); | |
92 | ||
93 | /* bind to port */ | |
94 | int r = -1; | |
95 | ||
96 | for (int i = 0; i < conf->ms_bind_retry_count; i++) { | |
97 | if (i > 0) { | |
98 | lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " | |
99 | << conf->ms_bind_retry_delay << " seconds " << dendl; | |
100 | sleep(conf->ms_bind_retry_delay); | |
101 | } | |
102 | ||
103 | if (listen_addr.get_port()) { | |
104 | worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { | |
105 | r = worker->listen(listen_addr, opts, &listen_socket); | |
106 | }, false); | |
107 | if (r < 0) { | |
108 | lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr | |
109 | << ": " << cpp_strerror(r) << dendl; | |
110 | continue; | |
111 | } | |
112 | } else { | |
113 | // try a range of ports | |
114 | for (int port = msgr->cct->_conf->ms_bind_port_min; port <= msgr->cct->_conf->ms_bind_port_max; port++) { | |
115 | if (avoid_ports.count(port)) | |
116 | continue; | |
117 | ||
118 | listen_addr.set_port(port); | |
119 | worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { | |
120 | r = worker->listen(listen_addr, opts, &listen_socket); | |
121 | }, false); | |
122 | if (r == 0) | |
123 | break; | |
124 | } | |
125 | if (r < 0) { | |
126 | lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr | |
127 | << " on any port in range " << msgr->cct->_conf->ms_bind_port_min | |
128 | << "-" << msgr->cct->_conf->ms_bind_port_max << ": " | |
129 | << cpp_strerror(r) << dendl; | |
130 | listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again. | |
131 | continue; | |
132 | } | |
133 | ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl; | |
134 | } | |
135 | if (r == 0) | |
136 | break; | |
137 | } | |
138 | // It seems that binding completely failed, return with that exit status | |
139 | if (r < 0) { | |
140 | lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count | |
141 | << " attempts: " << cpp_strerror(r) << dendl; | |
142 | return r; | |
143 | } | |
144 | ||
145 | ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl; | |
146 | *bound_addr = listen_addr; | |
147 | return 0; | |
148 | } | |
149 | ||
150 | void Processor::start() | |
151 | { | |
152 | ldout(msgr->cct, 1) << __func__ << dendl; | |
153 | ||
154 | // start thread | |
155 | if (listen_socket) { | |
156 | worker->center.submit_to(worker->center.get_id(), [this]() { | |
157 | worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false); | |
158 | } | |
159 | } | |
160 | ||
161 | void Processor::accept() | |
162 | { | |
163 | ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl; | |
164 | SocketOptions opts; | |
165 | opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; | |
166 | opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; | |
167 | opts.priority = msgr->get_socket_priority(); | |
168 | while (true) { | |
169 | entity_addr_t addr; | |
170 | ConnectedSocket cli_socket; | |
171 | Worker *w = worker; | |
172 | if (!msgr->get_stack()->support_local_listen_table()) | |
173 | w = msgr->get_stack()->get_worker(); | |
174 | int r = listen_socket.accept(&cli_socket, opts, &addr, w); | |
175 | if (r == 0) { | |
176 | ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl; | |
177 | ||
178 | msgr->add_accept(w, std::move(cli_socket), addr); | |
179 | continue; | |
180 | } else { | |
181 | if (r == -EINTR) { | |
182 | continue; | |
183 | } else if (r == -EAGAIN) { | |
184 | break; | |
185 | } else if (r == -EMFILE || r == -ENFILE) { | |
186 | lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd() | |
187 | << " errno " << r << " " << cpp_strerror(r) << dendl; | |
188 | break; | |
189 | } else if (r == -ECONNABORTED) { | |
190 | ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd() | |
191 | << " errno " << r << " " << cpp_strerror(r) << dendl; | |
192 | continue; | |
193 | } else { | |
194 | lderr(msgr->cct) << __func__ << " no incoming connection?" | |
195 | << " errno " << r << " " << cpp_strerror(r) << dendl; | |
196 | break; | |
197 | } | |
198 | } | |
199 | } | |
200 | } | |
201 | ||
202 | void Processor::stop() | |
203 | { | |
204 | ldout(msgr->cct,10) << __func__ << dendl; | |
205 | ||
206 | if (listen_socket) { | |
207 | worker->center.submit_to(worker->center.get_id(), [this]() { | |
208 | worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE); | |
209 | listen_socket.abort_accept(); | |
210 | }, false); | |
211 | } | |
212 | } | |
213 | ||
214 | ||
215 | struct StackSingleton { | |
216 | CephContext *cct; | |
217 | std::shared_ptr<NetworkStack> stack; | |
218 | ||
219 | StackSingleton(CephContext *c): cct(c) {} | |
220 | void ready(std::string &type) { | |
221 | if (!stack) | |
222 | stack = NetworkStack::create(cct, type); | |
223 | } | |
224 | ~StackSingleton() { | |
225 | stack->stop(); | |
226 | } | |
227 | }; | |
228 | ||
229 | ||
230 | class C_handle_reap : public EventCallback { | |
231 | AsyncMessenger *msgr; | |
232 | ||
233 | public: | |
234 | explicit C_handle_reap(AsyncMessenger *m): msgr(m) {} | |
235 | void do_request(int id) override { | |
236 | // judge whether is a time event | |
237 | msgr->reap_dead(); | |
238 | } | |
239 | }; | |
240 | ||
241 | /******************* | |
242 | * AsyncMessenger | |
243 | */ | |
244 | ||
245 | AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, | |
246 | const std::string &type, string mname, uint64_t _nonce) | |
247 | : SimplePolicyMessenger(cct, name,mname, _nonce), | |
248 | dispatch_queue(cct, this, mname), | |
249 | lock("AsyncMessenger::lock"), | |
250 | nonce(_nonce), need_addr(true), did_bind(false), | |
251 | global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), | |
252 | cluster_protocol(0), stopped(true) | |
253 | { | |
254 | std::string transport_type = "posix"; | |
255 | if (type.find("rdma") != std::string::npos) | |
256 | transport_type = "rdma"; | |
257 | else if (type.find("dpdk") != std::string::npos) | |
258 | transport_type = "dpdk"; | |
259 | ||
260 | ceph_spin_init(&global_seq_lock); | |
261 | StackSingleton *single; | |
262 | cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack::"+transport_type); | |
263 | single->ready(transport_type); | |
264 | stack = single->stack.get(); | |
265 | stack->start(); | |
266 | local_worker = stack->get_worker(); | |
267 | local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker); | |
268 | init_local_connection(); | |
269 | reap_handler = new C_handle_reap(this); | |
270 | unsigned processor_num = 1; | |
271 | if (stack->support_local_listen_table()) | |
272 | processor_num = stack->get_num_worker(); | |
273 | for (unsigned i = 0; i < processor_num; ++i) | |
274 | processors.push_back(new Processor(this, stack->get_worker(i), cct)); | |
275 | } | |
276 | ||
277 | /** | |
278 | * Destroy the AsyncMessenger. Pretty simple since all the work is done | |
279 | * elsewhere. | |
280 | */ | |
281 | AsyncMessenger::~AsyncMessenger() | |
282 | { | |
283 | delete reap_handler; | |
284 | assert(!did_bind); // either we didn't bind or we shut down the Processor | |
285 | local_connection->mark_down(); | |
286 | for (auto &&p : processors) | |
287 | delete p; | |
288 | } | |
289 | ||
290 | void AsyncMessenger::ready() | |
291 | { | |
292 | ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; | |
293 | ||
294 | stack->ready(); | |
295 | if (pending_bind) { | |
296 | int err = bind(pending_bind_addr); | |
297 | if (err) { | |
298 | lderr(cct) << __func__ << " postponed bind failed" << dendl; | |
299 | ceph_abort(); | |
300 | } | |
301 | } | |
302 | ||
303 | Mutex::Locker l(lock); | |
304 | for (auto &&p : processors) | |
305 | p->start(); | |
306 | dispatch_queue.start(); | |
307 | } | |
308 | ||
309 | int AsyncMessenger::shutdown() | |
310 | { | |
311 | ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; | |
312 | ||
313 | // done! clean up. | |
314 | for (auto &&p : processors) | |
315 | p->stop(); | |
316 | mark_down_all(); | |
317 | // break ref cycles on the loopback connection | |
318 | local_connection->set_priv(NULL); | |
319 | did_bind = false; | |
320 | lock.Lock(); | |
321 | stop_cond.Signal(); | |
322 | stopped = true; | |
323 | lock.Unlock(); | |
324 | stack->drain(); | |
325 | return 0; | |
326 | } | |
327 | ||
328 | ||
329 | int AsyncMessenger::bind(const entity_addr_t &bind_addr) | |
330 | { | |
331 | lock.Lock(); | |
332 | ||
333 | if (!pending_bind && started) { | |
334 | ldout(cct,10) << __func__ << " already started" << dendl; | |
335 | lock.Unlock(); | |
336 | return -1; | |
337 | } | |
338 | ||
339 | ldout(cct,10) << __func__ << " bind " << bind_addr << dendl; | |
340 | ||
341 | if (!stack->is_ready()) { | |
342 | ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl; | |
343 | pending_bind_addr = bind_addr; | |
344 | pending_bind = true; | |
345 | lock.Unlock(); | |
346 | return 0; | |
347 | } | |
348 | ||
349 | lock.Unlock(); | |
350 | ||
351 | // bind to a socket | |
352 | set<int> avoid_ports; | |
353 | entity_addr_t bound_addr; | |
354 | unsigned i = 0; | |
355 | for (auto &&p : processors) { | |
356 | int r = p->bind(bind_addr, avoid_ports, &bound_addr); | |
357 | if (r) { | |
358 | // Note: this is related to local tcp listen table problem. | |
359 | // Posix(default kernel implementation) backend shares listen table | |
360 | // in the kernel, so all threads can use the same listen table naturally | |
361 | // and only one thread need to bind. But other backends(like dpdk) uses local | |
362 | // listen table, we need to bind/listen tcp port for each worker. So if the | |
363 | // first worker failed to bind, it could be think the normal error then handle | |
364 | // it, like port is used case. But if the first worker successfully to bind | |
365 | // but the second worker failed, it's not expected and we need to assert | |
366 | // here | |
367 | assert(i == 0); | |
368 | return r; | |
369 | } | |
370 | ++i; | |
371 | } | |
372 | _finish_bind(bind_addr, bound_addr); | |
373 | return 0; | |
374 | } | |
375 | ||
376 | int AsyncMessenger::rebind(const set<int>& avoid_ports) | |
377 | { | |
378 | ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; | |
379 | assert(did_bind); | |
380 | ||
381 | for (auto &&p : processors) | |
382 | p->stop(); | |
383 | mark_down_all(); | |
384 | ||
385 | // adjust the nonce; we want our entity_addr_t to be truly unique. | |
386 | nonce += 1000000; | |
387 | ldout(cct, 10) << __func__ << " new nonce " << nonce | |
388 | << " and inst " << get_myinst() << dendl; | |
389 | ||
390 | entity_addr_t bound_addr; | |
391 | entity_addr_t bind_addr = get_myaddr(); | |
392 | bind_addr.set_port(0); | |
393 | set<int> new_avoid(avoid_ports); | |
394 | new_avoid.insert(bind_addr.get_port()); | |
395 | ldout(cct, 10) << __func__ << " will try " << bind_addr | |
396 | << " and avoid ports " << new_avoid << dendl; | |
397 | unsigned i = 0; | |
398 | for (auto &&p : processors) { | |
399 | int r = p->bind(bind_addr, avoid_ports, &bound_addr); | |
400 | if (r) { | |
401 | assert(i == 0); | |
402 | return r; | |
403 | } | |
404 | ++i; | |
405 | } | |
406 | _finish_bind(bind_addr, bound_addr); | |
407 | for (auto &&p : processors) { | |
408 | p->start(); | |
409 | } | |
410 | return 0; | |
411 | } | |
412 | ||
413 | int AsyncMessenger::client_bind(const entity_addr_t &bind_addr) | |
414 | { | |
31f18b77 FG |
415 | if (!cct->_conf->ms_bind_before_connect) |
416 | return 0; | |
7c673cae FG |
417 | Mutex::Locker l(lock); |
418 | if (did_bind) { | |
419 | assert(my_inst.addr == bind_addr); | |
420 | return 0; | |
421 | } | |
422 | if (started) { | |
423 | ldout(cct, 10) << __func__ << " already started" << dendl; | |
424 | return -1; | |
425 | } | |
426 | ldout(cct, 10) << __func__ << " " << bind_addr << dendl; | |
427 | ||
428 | set_myaddr(bind_addr); | |
429 | return 0; | |
430 | } | |
431 | ||
432 | void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr, | |
433 | const entity_addr_t& listen_addr) | |
434 | { | |
435 | set_myaddr(bind_addr); | |
436 | if (bind_addr != entity_addr_t()) | |
437 | learned_addr(bind_addr); | |
438 | ||
439 | if (get_myaddr().get_port() == 0) { | |
440 | set_myaddr(listen_addr); | |
441 | } | |
442 | entity_addr_t addr = get_myaddr(); | |
443 | addr.set_nonce(nonce); | |
444 | set_myaddr(addr); | |
445 | ||
446 | init_local_connection(); | |
447 | ||
448 | ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl; | |
449 | did_bind = true; | |
450 | } | |
451 | ||
452 | int AsyncMessenger::start() | |
453 | { | |
454 | lock.Lock(); | |
455 | ldout(cct,1) << __func__ << " start" << dendl; | |
456 | ||
457 | // register at least one entity, first! | |
458 | assert(my_inst.name.type() >= 0); | |
459 | ||
460 | assert(!started); | |
461 | started = true; | |
462 | stopped = false; | |
463 | ||
464 | if (!did_bind) { | |
465 | my_inst.addr.nonce = nonce; | |
466 | _init_local_connection(); | |
467 | } | |
468 | ||
469 | lock.Unlock(); | |
470 | return 0; | |
471 | } | |
472 | ||
473 | void AsyncMessenger::wait() | |
474 | { | |
475 | lock.Lock(); | |
476 | if (!started) { | |
477 | lock.Unlock(); | |
478 | return; | |
479 | } | |
480 | if (!stopped) | |
481 | stop_cond.Wait(lock); | |
482 | ||
483 | lock.Unlock(); | |
484 | ||
485 | dispatch_queue.shutdown(); | |
486 | if (dispatch_queue.is_started()) { | |
487 | ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl; | |
488 | dispatch_queue.wait(); | |
489 | dispatch_queue.discard_local(); | |
490 | ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl; | |
491 | } | |
492 | ||
493 | // close all connections | |
494 | shutdown_connections(false); | |
495 | stack->drain(); | |
496 | ||
497 | ldout(cct, 10) << __func__ << ": done." << dendl; | |
498 | ldout(cct, 1) << __func__ << " complete." << dendl; | |
499 | started = false; | |
500 | } | |
501 | ||
502 | void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr) | |
503 | { | |
504 | lock.Lock(); | |
505 | AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); | |
506 | conn->accept(std::move(cli_socket), addr); | |
507 | accepting_conns.insert(conn); | |
508 | lock.Unlock(); | |
509 | } | |
510 | ||
511 | AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type) | |
512 | { | |
513 | assert(lock.is_locked()); | |
514 | assert(addr != my_inst.addr); | |
515 | ||
516 | ldout(cct, 10) << __func__ << " " << addr | |
517 | << ", creating connection and registering" << dendl; | |
518 | ||
519 | // create connection | |
520 | Worker *w = stack->get_worker(); | |
521 | AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); | |
522 | conn->connect(addr, type); | |
523 | assert(!conns.count(addr)); | |
524 | conns[addr] = conn; | |
525 | w->get_perf_counter()->inc(l_msgr_active_connections); | |
526 | ||
527 | return conn; | |
528 | } | |
529 | ||
530 | ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest) | |
531 | { | |
532 | Mutex::Locker l(lock); | |
533 | if (my_inst.addr == dest.addr) { | |
534 | // local | |
535 | return local_connection; | |
536 | } | |
537 | ||
538 | AsyncConnectionRef conn = _lookup_conn(dest.addr); | |
539 | if (conn) { | |
540 | ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl; | |
541 | } else { | |
542 | conn = create_connect(dest.addr, dest.name.type()); | |
543 | ldout(cct, 10) << __func__ << " " << dest << " new " << conn << dendl; | |
544 | } | |
545 | ||
546 | return conn; | |
547 | } | |
548 | ||
549 | ConnectionRef AsyncMessenger::get_loopback_connection() | |
550 | { | |
551 | return local_connection; | |
552 | } | |
553 | ||
554 | int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest) | |
555 | { | |
556 | FUNCTRACE(); | |
557 | assert(m); | |
558 | ||
559 | if (m->get_type() == CEPH_MSG_OSD_OP) | |
560 | OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP"); | |
561 | else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) | |
562 | OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY"); | |
563 | ||
564 | ldout(cct, 1) << __func__ << "--> " << dest.name << " " | |
565 | << dest.addr << " -- " << *m << " -- ?+" | |
566 | << m->get_data().length() << " " << m << dendl; | |
567 | ||
568 | if (dest.addr == entity_addr_t()) { | |
569 | ldout(cct,0) << __func__ << " message " << *m | |
570 | << " with empty dest " << dest.addr << dendl; | |
571 | m->put(); | |
572 | return -EINVAL; | |
573 | } | |
574 | ||
575 | AsyncConnectionRef conn = _lookup_conn(dest.addr); | |
576 | submit_message(m, conn, dest.addr, dest.name.type()); | |
577 | return 0; | |
578 | } | |
579 | ||
580 | void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, | |
581 | const entity_addr_t& dest_addr, int dest_type) | |
582 | { | |
583 | if (cct->_conf->ms_dump_on_send) { | |
584 | m->encode(-1, MSG_CRC_ALL); | |
585 | ldout(cct, 0) << __func__ << "submit_message " << *m << "\n"; | |
586 | m->get_payload().hexdump(*_dout); | |
587 | if (m->get_data().length() > 0) { | |
588 | *_dout << " data:\n"; | |
589 | m->get_data().hexdump(*_dout); | |
590 | } | |
591 | *_dout << dendl; | |
592 | m->clear_payload(); | |
593 | } | |
594 | ||
595 | // existing connection? | |
596 | if (con) { | |
597 | con->send_message(m); | |
598 | return ; | |
599 | } | |
600 | ||
601 | // local? | |
602 | if (my_inst.addr == dest_addr) { | |
603 | // local | |
604 | local_connection->send_message(m); | |
605 | return ; | |
606 | } | |
607 | ||
608 | // remote, no existing connection. | |
609 | const Policy& policy = get_policy(dest_type); | |
610 | if (policy.server) { | |
611 | ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addr | |
612 | << ", lossy server for target type " | |
613 | << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl; | |
614 | m->put(); | |
615 | } else { | |
616 | ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new connection." << dendl; | |
617 | con = create_connect(dest_addr, dest_type); | |
618 | con->send_message(m); | |
619 | } | |
620 | } | |
621 | ||
622 | /** | |
623 | * If my_inst.addr doesn't have an IP set, this function | |
624 | * will fill it in from the passed addr. Otherwise it does nothing and returns. | |
625 | */ | |
626 | void AsyncMessenger::set_addr_unknowns(const entity_addr_t &addr) | |
627 | { | |
628 | Mutex::Locker l(lock); | |
629 | if (my_inst.addr.is_blank_ip()) { | |
630 | int port = my_inst.addr.get_port(); | |
631 | my_inst.addr.u = addr.u; | |
632 | my_inst.addr.set_port(port); | |
633 | _init_local_connection(); | |
634 | } | |
635 | } | |
636 | ||
224ce89b WB |
637 | void AsyncMessenger::set_addr(const entity_addr_t &addr) |
638 | { | |
639 | Mutex::Locker l(lock); | |
640 | entity_addr_t t = addr; | |
641 | t.set_nonce(nonce); | |
642 | set_myaddr(t); | |
643 | _init_local_connection(); | |
644 | } | |
645 | ||
7c673cae FG |
646 | void AsyncMessenger::shutdown_connections(bool queue_reset) |
647 | { | |
648 | ldout(cct,1) << __func__ << " " << dendl; | |
649 | lock.Lock(); | |
650 | for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin(); | |
651 | q != accepting_conns.end(); ++q) { | |
652 | AsyncConnectionRef p = *q; | |
653 | ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl; | |
654 | p->stop(queue_reset); | |
655 | } | |
656 | accepting_conns.clear(); | |
657 | ||
658 | while (!conns.empty()) { | |
659 | ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin(); | |
660 | AsyncConnectionRef p = it->second; | |
661 | ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl; | |
662 | conns.erase(it); | |
663 | p->get_perf_counter()->dec(l_msgr_active_connections); | |
664 | p->stop(queue_reset); | |
665 | } | |
666 | ||
667 | { | |
668 | Mutex::Locker l(deleted_lock); | |
669 | while (!deleted_conns.empty()) { | |
670 | set<AsyncConnectionRef>::iterator it = deleted_conns.begin(); | |
671 | AsyncConnectionRef p = *it; | |
672 | ldout(cct, 5) << __func__ << " delete " << p << dendl; | |
673 | deleted_conns.erase(it); | |
674 | } | |
675 | } | |
676 | lock.Unlock(); | |
677 | } | |
678 | ||
679 | void AsyncMessenger::mark_down(const entity_addr_t& addr) | |
680 | { | |
681 | lock.Lock(); | |
682 | AsyncConnectionRef p = _lookup_conn(addr); | |
683 | if (p) { | |
684 | ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl; | |
685 | p->stop(true); | |
686 | } else { | |
687 | ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl; | |
688 | } | |
689 | lock.Unlock(); | |
690 | } | |
691 | ||
692 | int AsyncMessenger::get_proto_version(int peer_type, bool connect) const | |
693 | { | |
694 | int my_type = my_inst.name.type(); | |
695 | ||
696 | // set reply protocol version | |
697 | if (peer_type == my_type) { | |
698 | // internal | |
699 | return cluster_protocol; | |
700 | } else { | |
701 | // public | |
702 | switch (connect ? peer_type : my_type) { | |
703 | case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; | |
704 | case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; | |
705 | case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; | |
706 | } | |
707 | } | |
708 | return 0; | |
709 | } | |
710 | ||
711 | void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) | |
712 | { | |
713 | // be careful here: multiple threads may block here, and readers of | |
714 | // my_inst.addr do NOT hold any lock. | |
715 | ||
716 | // this always goes from true -> false under the protection of the | |
717 | // mutex. if it is already false, we need not retake the mutex at | |
718 | // all. | |
719 | if (!need_addr) | |
720 | return ; | |
721 | lock.Lock(); | |
722 | if (need_addr) { | |
723 | need_addr = false; | |
724 | entity_addr_t t = peer_addr_for_me; | |
725 | t.set_port(my_inst.addr.get_port()); | |
726 | t.set_nonce(my_inst.addr.get_nonce()); | |
727 | my_inst.addr = t; | |
728 | ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl; | |
729 | _init_local_connection(); | |
730 | } | |
731 | lock.Unlock(); | |
732 | } | |
733 | ||
734 | int AsyncMessenger::reap_dead() | |
735 | { | |
736 | ldout(cct, 1) << __func__ << " start" << dendl; | |
737 | int num = 0; | |
738 | ||
739 | Mutex::Locker l1(lock); | |
740 | Mutex::Locker l2(deleted_lock); | |
741 | ||
742 | while (!deleted_conns.empty()) { | |
743 | auto it = deleted_conns.begin(); | |
744 | AsyncConnectionRef p = *it; | |
745 | ldout(cct, 5) << __func__ << " delete " << p << dendl; | |
746 | auto conns_it = conns.find(p->peer_addr); | |
747 | if (conns_it != conns.end() && conns_it->second == p) | |
748 | conns.erase(conns_it); | |
749 | accepting_conns.erase(p); | |
750 | deleted_conns.erase(it); | |
751 | ++num; | |
752 | } | |
753 | ||
754 | return num; | |
755 | } |