]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/Messenger.h
update sources to 12.2.7
[ceph.git] / ceph / src / msg / Messenger.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16
17#ifndef CEPH_MESSENGER_H
18#define CEPH_MESSENGER_H
19
20#include <map>
21using namespace std;
22
23#include "Message.h"
24#include "Dispatcher.h"
25#include "common/Mutex.h"
26#include "common/Cond.h"
27#include "include/Context.h"
28#include "include/types.h"
29#include "include/ceph_features.h"
30#include "auth/Crypto.h"
31
32#include <errno.h>
33#include <sstream>
3efd9988 34#include <signal.h>
7c673cae
FG
35
36#define SOCKET_PRIORITY_MIN_DELAY 6
37
38class Timer;
39
40
41class Messenger {
42private:
43 list<Dispatcher*> dispatchers;
44 list <Dispatcher*> fast_dispatchers;
45 ZTracer::Endpoint trace_endpoint;
46
47 void set_endpoint_addr(const entity_addr_t& a,
48 const entity_name_t &name);
49
50protected:
51 /// the "name" of the local daemon. eg client.99
52 entity_inst_t my_inst;
53 int default_send_priority;
54 /// set to true once the Messenger has started, and set to false on shutdown
55 bool started;
56 uint32_t magic;
57 int socket_priority;
58
59public:
60 /**
61 * Various Messenger conditional config/type flags to allow
62 * different "transport" Messengers to tune themselves
63 */
64 static const int HAS_HEAVY_TRAFFIC = 0x0001;
65 static const int HAS_MANY_CONNECTIONS = 0x0002;
66 static const int HEARTBEAT = 0x0004;
67
68 /**
69 * The CephContext this Messenger uses. Many other components initialize themselves
70 * from this value.
71 */
72 CephContext *cct;
73 int crcflags;
74
75 /**
76 * A Policy describes the rules of a Connection. Is there a limit on how
77 * much data this Connection can have locally? When the underlying connection
78 * experiences an error, does the Connection disappear? Can this Messenger
79 * re-establish the underlying connection?
80 */
81 struct Policy {
82 /// If true, the Connection is tossed out on errors.
83 bool lossy;
84 /// If true, the underlying connection can't be re-established from this end.
85 bool server;
86 /// If true, we will standby when idle
87 bool standby;
88 /// If true, we will try to detect session resets
89 bool resetcheck;
90 /**
91 * The throttler is used to limit how much data is held by Messages from
92 * the associated Connection(s). When reading in a new Message, the Messenger
93 * will call throttler->throttle() for the size of the new Message.
94 */
95 Throttle *throttler_bytes;
96 Throttle *throttler_messages;
97
98 /// Specify features supported locally by the endpoint.
99 uint64_t features_supported;
100 /// Specify features any remotes must have to talk to this endpoint.
101 uint64_t features_required;
102
103 Policy()
104 : lossy(false), server(false), standby(false), resetcheck(true),
105 throttler_bytes(NULL),
106 throttler_messages(NULL),
107 features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
108 features_required(0) {}
109 private:
110 Policy(bool l, bool s, bool st, bool r, uint64_t req)
111 : lossy(l), server(s), standby(st), resetcheck(r),
112 throttler_bytes(NULL),
113 throttler_messages(NULL),
114 features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
115 features_required(req) {}
116
117 public:
118 static Policy stateful_server(uint64_t req) {
119 return Policy(false, true, true, true, req);
120 }
121 static Policy stateless_server(uint64_t req) {
122 return Policy(true, true, false, false, req);
123 }
124 static Policy lossless_peer(uint64_t req) {
125 return Policy(false, false, true, false, req);
126 }
127 static Policy lossless_peer_reuse(uint64_t req) {
128 return Policy(false, false, true, true, req);
129 }
130 static Policy lossy_client(uint64_t req) {
131 return Policy(true, false, false, false, req);
132 }
133 static Policy lossless_client(uint64_t req) {
134 return Policy(false, false, false, true, req);
135 }
136 };
137
138 /**
139 * Messenger constructor. Call this from your implementation.
140 * Messenger users should construct full implementations directly,
141 * or use the create() function.
142 */
143 Messenger(CephContext *cct_, entity_name_t w)
144 : trace_endpoint("0.0.0.0", 0, "Messenger"),
145 my_inst(),
146 default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
147 magic(0),
148 socket_priority(-1),
149 cct(cct_),
150 crcflags(get_default_crc_flags(cct->_conf))
151 {
152 my_inst.name = w;
153 }
154 virtual ~Messenger() {}
155
156 /**
157 * create a new messenger
158 *
159 * Create a new messenger instance, with whatever implementation is
160 * available or specified via the configuration in cct.
161 *
162 * @param cct context
163 * @param type name of messenger type
164 * @param name entity name to register
165 * @param lname logical name of the messenger in this process (e.g., "client")
166 * @param nonce nonce value to uniquely identify this instance on the current host
167 * @param features bits for the local connection
168 * @param cflags general set of flags to configure transport resources
169 */
170 static Messenger *create(CephContext *cct,
171 const string &type,
172 entity_name_t name,
173 string lname,
174 uint64_t nonce,
175 uint64_t cflags);
176
177 /**
178 * create a new messenger
179 *
180 * Create a new messenger instance.
181 * Same as the above, but a slightly simpler interface for clients:
182 * - Generate a random nonce
183 * - use the default feature bits
184 * - get the messenger type from cct
185 * - use the client entity_type
186 *
187 * @param cct context
188 * @param lname logical name of the messenger in this process (e.g., "client")
189 */
190 static Messenger *create_client_messenger(CephContext *cct, string lname);
191
192 /**
193 * @defgroup Accessors
194 * @{
195 */
196 /**
197 * Retrieve the Messenger's instance.
198 *
199 * @return A const reference to the instance this Messenger
200 * currently believes to be its own.
201 */
202 const entity_inst_t& get_myinst() { return my_inst; }
203 /**
204 * set messenger's instance
205 */
206 void set_myinst(entity_inst_t i) { my_inst = i; }
207
208 uint32_t get_magic() { return magic; }
209 void set_magic(int _magic) { magic = _magic; }
210
211 /**
212 * Retrieve the Messenger's address.
213 *
214 * @return A const reference to the address this Messenger
215 * currently believes to be its own.
216 */
217 const entity_addr_t& get_myaddr() { return my_inst.addr; }
218protected:
219 /**
220 * set messenger's address
221 */
222 virtual void set_myaddr(const entity_addr_t& a) {
223 my_inst.addr = a;
224 set_endpoint_addr(a, my_inst.name);
225 }
226public:
227 /**
228 * @return the zipkin trace endpoint
229 */
230 const ZTracer::Endpoint* get_trace_endpoint() const {
231 return &trace_endpoint;
232 }
233
234 /**
235 * Retrieve the Messenger's name.
236 *
237 * @return A const reference to the name this Messenger
238 * currently believes to be its own.
239 */
240 const entity_name_t& get_myname() { return my_inst.name; }
241 /**
242 * Set the name of the local entity. The name is reported to others and
243 * can be changed while the system is running, but doing so at incorrect
244 * times may have bad results.
245 *
246 * @param m The name to set.
247 */
248 void set_myname(const entity_name_t& m) { my_inst.name = m; }
249 /**
250 * Set the unknown address components for this Messenger.
251 * This is useful if the Messenger doesn't know its full address just by
252 * binding, but another Messenger on the same interface has already learned
253 * its full address. This function does not fill in known address elements,
254 * cause a rebind, or do anything of that sort.
255 *
256 * @param addr The address to use as a template.
257 */
258 virtual void set_addr_unknowns(const entity_addr_t &addr) = 0;
224ce89b
WB
259 /**
260 * Set the address for this Messenger. This is useful if the Messenger
261 * binds to a specific address but advertises a different address on the
262 * the network.
263 *
264 * @param addr The address to use.
265 */
266 virtual void set_addr(const entity_addr_t &addr) = 0;
7c673cae
FG
267 /// Get the default send priority.
268 int get_default_send_priority() { return default_send_priority; }
269 /**
270 * Get the number of Messages which the Messenger has received
271 * but not yet dispatched.
272 */
273 virtual int get_dispatch_queue_len() = 0;
274
275 /**
276 * Get age of oldest undelivered message
277 * (0 if the queue is empty)
278 */
279 virtual double get_dispatch_queue_max_age(utime_t now) = 0;
280 /**
281 * Get the default crc flags for this messenger.
282 * but not yet dispatched.
283 */
284 static int get_default_crc_flags(md_config_t *);
285
286 /**
287 * @} // Accessors
288 */
289
290 /**
291 * @defgroup Configuration
292 * @{
293 */
294 /**
295 * Set the cluster protocol in use by this daemon.
296 * This is an init-time function and cannot be called after calling
297 * start() or bind().
298 *
299 * @param p The cluster protocol to use. Defined externally.
300 */
301 virtual void set_cluster_protocol(int p) = 0;
302 /**
303 * Set a policy which is applied to all peers who do not have a type-specific
304 * Policy.
305 * This is an init-time function and cannot be called after calling
306 * start() or bind().
307 *
308 * @param p The Policy to apply.
309 */
310 virtual void set_default_policy(Policy p) = 0;
311 /**
312 * Set a policy which is applied to all peers of the given type.
313 * This is an init-time function and cannot be called after calling
314 * start() or bind().
315 *
316 * @param type The peer type this policy applies to.
317 * @param p The policy to apply.
318 */
319 virtual void set_policy(int type, Policy p) = 0;
320 /**
321 * Set the Policy associated with a type of peer.
322 *
323 * This can be called either on initial setup, or after connections
324 * are already established. However, the policies for existing
325 * connections will not be affected; the new policy will only apply
326 * to future connections.
327 *
328 * @param t The peer type to get the default policy for.
329 * @return A const Policy reference.
330 */
331 virtual Policy get_policy(int t) = 0;
332 /**
333 * Get the default Policy
334 *
335 * @return A const Policy reference.
336 */
337 virtual Policy get_default_policy() = 0;
338 /**
339 * Set Throttlers applied to all Messages from the given type of peer
340 *
341 * This is an init-time function and cannot be called after calling
342 * start() or bind().
343 *
344 * @param type The peer type the Throttlers will apply to.
345 * @param bytes The Throttle for the number of bytes carried by the message
346 * @param msgs The Throttle for the number of messages for this @p type
347 * @note The Messenger does not take ownership of the Throttle pointers, but
348 * you must not destroy them before you destroy the Messenger.
349 */
350 virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0;
351 /**
352 * Set the default send priority
353 *
354 * This is an init-time function and must be called *before* calling
355 * start().
356 *
357 * @param p The cluster protocol to use. Defined externally.
358 */
359 void set_default_send_priority(int p) {
360 assert(!started);
361 default_send_priority = p;
362 }
363 /**
364 * Set the priority(SO_PRIORITY) for all packets to be sent on this socket.
365 *
366 * Linux uses this value to order the networking queues: packets with a higher
367 * priority may be processed first depending on the selected device queueing
368 * discipline.
369 *
370 * @param prio The priority. Setting a priority outside the range 0 to 6
371 * requires the CAP_NET_ADMIN capability.
372 */
373 void set_socket_priority(int prio) {
374 socket_priority = prio;
375 }
376 /**
377 * Get the socket priority
378 *
379 * @return the socket priority
380 */
381 int get_socket_priority() {
382 return socket_priority;
383 }
384 /**
385 * Add a new Dispatcher to the front of the list. If you add
386 * a Dispatcher which is already included, it will get a duplicate
387 * entry. This will reduce efficiency but not break anything.
388 *
389 * @param d The Dispatcher to insert into the list.
390 */
391 void add_dispatcher_head(Dispatcher *d) {
392 bool first = dispatchers.empty();
393 dispatchers.push_front(d);
394 if (d->ms_can_fast_dispatch_any())
395 fast_dispatchers.push_front(d);
396 if (first)
397 ready();
398 }
399 /**
400 * Add a new Dispatcher to the end of the list. If you add
401 * a Dispatcher which is already included, it will get a duplicate
402 * entry. This will reduce efficiency but not break anything.
403 *
404 * @param d The Dispatcher to insert into the list.
405 */
406 void add_dispatcher_tail(Dispatcher *d) {
407 bool first = dispatchers.empty();
408 dispatchers.push_back(d);
409 if (d->ms_can_fast_dispatch_any())
410 fast_dispatchers.push_back(d);
411 if (first)
412 ready();
413 }
414 /**
415 * Bind the Messenger to a specific address. If bind_addr
416 * is not completely filled in the system will use the
417 * valid portions and cycle through the unset ones (eg, the port)
418 * in an unspecified order.
419 *
420 * @param bind_addr The address to bind to.
421 * @return 0 on success, or -1 on error, or -errno if
422 * we can be more specific about the failure.
423 */
424 virtual int bind(const entity_addr_t& bind_addr) = 0;
425 /**
426 * This function performs a full restart of the Messenger component,
427 * whatever that means. Other entities who connect to this
428 * Messenger post-rebind() should perceive it as a new entity which
429 * they have not previously contacted, and it MUST bind to a
430 * different address than it did previously.
431 *
432 * @param avoid_ports Additional port to avoid binding to.
433 */
434 virtual int rebind(const set<int>& avoid_ports) { return -EOPNOTSUPP; }
435 /**
436 * Bind the 'client' Messenger to a specific address.Messenger will bind
437 * the address before connect to others when option ms_bind_before_connect
438 * is true.
439 * @param bind_addr The address to bind to.
440 * @return 0 on success, or -1 on error, or -errno if
441 */
442 virtual int client_bind(const entity_addr_t& bind_addr) = 0;
443 /**
444 * @} // Configuration
445 */
446
447 /**
448 * @defgroup Startup/Shutdown
449 * @{
450 */
451 /**
452 * Perform any resource allocation, thread startup, etc
453 * that is required before attempting to connect to other
454 * Messengers or transmit messages.
455 * Once this function completes, started shall be set to true.
456 *
457 * @return 0 on success; -errno on failure.
458 */
459 virtual int start() { started = true; return 0; }
460
461 // shutdown
462 /**
463 * Block until the Messenger has finished shutting down (according
464 * to the shutdown() function).
465 * It is valid to call this after calling shutdown(), but it must
466 * be called before deleting the Messenger.
467 */
468 virtual void wait() = 0;
469 /**
470 * Initiate a shutdown of the Messenger.
471 *
472 * @return 0 on success, -errno otherwise.
473 */
474 virtual int shutdown() { started = false; return 0; }
475 /**
476 * @} // Startup/Shutdown
477 */
478
479 /**
480 * @defgroup Messaging
481 * @{
482 */
483 /**
484 * Queue the given Message for the given entity.
485 * Success in this function does not guarantee Message delivery, only
486 * success in queueing the Message. Other guarantees may be provided based
487 * on the Connection policy associated with the dest.
488 *
489 * @param m The Message to send. The Messenger consumes a single reference
490 * when you pass it in.
491 * @param dest The entity to send the Message to.
492 *
493 * DEPRECATED: please do not use this interface for any new code;
494 * use the Connection* variant.
495 *
496 * @return 0 on success, or -errno on failure.
497 */
498 virtual int send_message(Message *m, const entity_inst_t& dest) = 0;
499
500 /**
501 * @} // Messaging
502 */
503 /**
504 * @defgroup Connection Management
505 * @{
506 */
507 /**
508 * Get the Connection object associated with a given entity. If a
509 * Connection does not exist, create one and establish a logical connection.
510 * The caller owns a reference when this returns. Call ->put() when you're
511 * done!
512 *
513 * @param dest The entity to get a connection for.
514 */
515 virtual ConnectionRef get_connection(const entity_inst_t& dest) = 0;
516 /**
517 * Get the Connection object associated with ourselves.
518 */
519 virtual ConnectionRef get_loopback_connection() = 0;
520 /**
521 * Mark down a Connection to a remote.
522 *
523 * This will cause us to discard our outgoing queue for them, and if
524 * reset detection is enabled in the policy and the endpoint tries
525 * to reconnect they will discard their queue when we inform them of
526 * the session reset.
527 *
528 * If there is no Connection to the given dest, it is a no-op.
529 *
530 * This generates a RESET notification to the Dispatcher.
531 *
532 * DEPRECATED: please do not use this interface for any new code;
533 * use the Connection* variant.
534 *
535 * @param a The address to mark down.
536 */
537 virtual void mark_down(const entity_addr_t& a) = 0;
538 /**
539 * Mark all the existing Connections down. This is equivalent
540 * to iterating over all Connections and calling mark_down()
541 * on each.
542 *
543 * This will generate a RESET event for each closed connections.
544 */
545 virtual void mark_down_all() = 0;
546 /**
547 * @} // Connection Management
548 */
549protected:
550 /**
551 * @defgroup Subclass Interfacing
552 * @{
553 */
554 /**
555 * A courtesy function for Messenger implementations which
556 * will be called when we receive our first Dispatcher.
557 */
558 virtual void ready() { }
559 /**
560 * @} // Subclass Interfacing
561 */
3efd9988
FG
562public:
563#ifdef CEPH_USE_SIGPIPE_BLOCKER
564 /**
565 * We need to disable SIGPIPE on all platforms, and if they
566 * don't give us a better mechanism (read: are on Solaris) that
567 * means blocking the signal whenever we do a send or sendmsg...
568 * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope
569 * whenever doing so. On most systems that's blank, but on systems where
570 * it's needed we construct an RAII object to plug and un-plug the SIGPIPE.
571 * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
572 */
573 struct sigpipe_stopper {
574 bool blocked;
575 sigset_t existing_mask;
576 sigset_t pipe_mask;
577 sigpipe_stopper() {
578 sigemptyset(&pipe_mask);
579 sigaddset(&pipe_mask, SIGPIPE);
580 sigset_t signals;
581 sigemptyset(&signals);
582 sigpending(&signals);
583 if (sigismember(&signals, SIGPIPE)) {
584 blocked = false;
585 } else {
586 blocked = true;
587 int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
588 assert(r == 0);
589 }
590 }
591 ~sigpipe_stopper() {
592 if (blocked) {
593 struct timespec nowait{0};
594 int r = sigtimedwait(&pipe_mask, 0, &nowait);
595 assert(r == EAGAIN || r == 0);
596 r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
597 assert(r == 0);
598 }
599 }
600 };
601# define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper();
602#else
603# define MSGR_SIGPIPE_STOPPER
604#endif
7c673cae
FG
605 /**
606 * @defgroup Dispatcher Interfacing
607 * @{
608 */
7c673cae
FG
609 /**
610 * Determine whether a message can be fast-dispatched. We will
611 * query each Dispatcher in sequence to determine if they are
612 * capable of handling a particular message via "fast dispatch".
613 *
614 * @param m The Message we are testing.
615 */
616 bool ms_can_fast_dispatch(const Message *m) {
617 for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
618 p != fast_dispatchers.end();
619 ++p) {
620 if ((*p)->ms_can_fast_dispatch(m))
621 return true;
622 }
623 return false;
624 }
625
626 /**
627 * Deliver a single Message via "fast dispatch".
628 *
629 * @param m The Message we are fast dispatching. We take ownership
630 * of one reference to it.
631 * If none of our Dispatchers can handle it, ceph_abort().
632 */
633 void ms_fast_dispatch(Message *m) {
634 m->set_dispatch_stamp(ceph_clock_now());
635 for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
636 p != fast_dispatchers.end();
637 ++p) {
638 if ((*p)->ms_can_fast_dispatch(m)) {
639 (*p)->ms_fast_dispatch(m);
640 return;
641 }
642 }
643 ceph_abort();
644 }
645 /**
646 *
647 */
648 void ms_fast_preprocess(Message *m) {
649 for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
650 p != fast_dispatchers.end();
651 ++p) {
652 (*p)->ms_fast_preprocess(m);
653 }
654 }
655 /**
656 * Deliver a single Message. Send it to each Dispatcher
657 * in sequence until one of them handles it.
658 * If none of our Dispatchers can handle it, assert(0).
659 *
660 * @param m The Message to deliver. We take ownership of
661 * one reference to it.
662 */
663 void ms_deliver_dispatch(Message *m) {
664 m->set_dispatch_stamp(ceph_clock_now());
665 for (list<Dispatcher*>::iterator p = dispatchers.begin();
666 p != dispatchers.end();
667 ++p) {
668 if ((*p)->ms_dispatch(m))
669 return;
670 }
671 lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
672 << m->get_source_inst() << dendl;
673 assert(!cct->_conf->ms_die_on_unhandled_msg);
674 m->put();
675 }
676 /**
677 * Notify each Dispatcher of a new Connection. Call
678 * this function whenever a new Connection is initiated or
679 * reconnects.
680 *
681 * @param con Pointer to the new Connection.
682 */
683 void ms_deliver_handle_connect(Connection *con) {
684 for (list<Dispatcher*>::iterator p = dispatchers.begin();
685 p != dispatchers.end();
686 ++p)
687 (*p)->ms_handle_connect(con);
688 }
689
690 /**
691 * Notify each fast Dispatcher of a new Connection. Call
692 * this function whenever a new Connection is initiated or
693 * reconnects.
694 *
695 * @param con Pointer to the new Connection.
696 */
697 void ms_deliver_handle_fast_connect(Connection *con) {
698 for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
699 p != fast_dispatchers.end();
700 ++p)
701 (*p)->ms_handle_fast_connect(con);
702 }
703
704 /**
705 * Notify each Dispatcher of a new incomming Connection. Call
706 * this function whenever a new Connection is accepted.
707 *
708 * @param con Pointer to the new Connection.
709 */
710 void ms_deliver_handle_accept(Connection *con) {
711 for (list<Dispatcher*>::iterator p = dispatchers.begin();
712 p != dispatchers.end();
713 ++p)
714 (*p)->ms_handle_accept(con);
715 }
716
717 /**
718 * Notify each fast Dispatcher of a new incoming Connection. Call
719 * this function whenever a new Connection is accepted.
720 *
721 * @param con Pointer to the new Connection.
722 */
723 void ms_deliver_handle_fast_accept(Connection *con) {
724 for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
725 p != fast_dispatchers.end();
726 ++p)
727 (*p)->ms_handle_fast_accept(con);
728 }
729
730 /**
731 * Notify each Dispatcher of a Connection which may have lost
732 * Messages. Call this function whenever you detect that a lossy Connection
733 * has been disconnected.
734 *
735 * @param con Pointer to the broken Connection.
736 */
737 void ms_deliver_handle_reset(Connection *con) {
738 for (list<Dispatcher*>::iterator p = dispatchers.begin();
739 p != dispatchers.end();
740 ++p) {
741 if ((*p)->ms_handle_reset(con))
742 return;
743 }
744 }
745 /**
746 * Notify each Dispatcher of a Connection which has been "forgotten" about
747 * by the remote end, implying that messages have probably been lost.
748 * Call this function whenever you detect a reset.
749 *
750 * @param con Pointer to the broken Connection.
751 */
752 void ms_deliver_handle_remote_reset(Connection *con) {
753 for (list<Dispatcher*>::iterator p = dispatchers.begin();
754 p != dispatchers.end();
755 ++p)
756 (*p)->ms_handle_remote_reset(con);
757 }
758
759 /**
760 * Notify each Dispatcher of a Connection for which reconnection
761 * attempts are being refused. Call this function whenever you
762 * detect that a lossy Connection has been disconnected and it's
763 * impossible to reconnect.
764 *
765 * @param con Pointer to the broken Connection.
766 */
767 void ms_deliver_handle_refused(Connection *con) {
768 for (list<Dispatcher*>::iterator p = dispatchers.begin();
769 p != dispatchers.end();
770 ++p) {
771 if ((*p)->ms_handle_refused(con))
772 return;
773 }
774 }
775
776 /**
777 * Get the AuthAuthorizer for a new outgoing Connection.
778 *
779 * @param peer_type The peer type for the new Connection
780 * @param force_new True if we want to wait for new keys, false otherwise.
781 * @return A pointer to the AuthAuthorizer, if we have one; NULL otherwise
782 */
783 AuthAuthorizer *ms_deliver_get_authorizer(int peer_type, bool force_new) {
784 AuthAuthorizer *a = 0;
785 for (list<Dispatcher*>::iterator p = dispatchers.begin();
786 p != dispatchers.end();
787 ++p) {
788 if ((*p)->ms_get_authorizer(peer_type, &a, force_new))
789 return a;
790 }
791 return NULL;
792 }
793 /**
794 * Verify that the authorizer on a new incoming Connection is correct.
795 *
796 * @param con The new incoming Connection
797 * @param peer_type The type of the endpoint on the new Connection
798 * @param protocol The ID of the protocol in use (at time of writing, cephx or none)
799 * @param authorizer The authorization string supplied by the remote
800 * @param authorizer_reply Output param: The string we should send back to
801 * the remote to authorize ourselves. Only filled in if isvalid
802 * @param isvalid Output param: True if authorizer is valid, false otherwise
803 *
804 * @return True if we were able to prove or disprove correctness of
805 * authorizer, false otherwise.
806 */
807 bool ms_deliver_verify_authorizer(Connection *con, int peer_type,
808 int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
28e407b8
AA
809 bool& isvalid, CryptoKey& session_key,
810 std::unique_ptr<AuthAuthorizerChallenge> *challenge) {
7c673cae
FG
811 for (list<Dispatcher*>::iterator p = dispatchers.begin();
812 p != dispatchers.end();
813 ++p) {
28e407b8
AA
814 if ((*p)->ms_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply,
815 isvalid, session_key, challenge))
7c673cae
FG
816 return true;
817 }
818 return false;
819 }
820
821 /**
822 * @} // Dispatcher Interfacing
823 */
824};
825
826
827
828#endif