]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/Messenger.h
bump version to 15.2.4-pve1
[ceph.git] / ceph / src / msg / Messenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16
17 #ifndef CEPH_MESSENGER_H
18 #define CEPH_MESSENGER_H
19
20 #include <map>
21 #include <deque>
22
23 #include <errno.h>
24 #include <sstream>
25 #include <memory>
26
27 #include "Message.h"
28 #include "Dispatcher.h"
29 #include "Policy.h"
30 #include "common/Throttle.h"
31 #include "include/Context.h"
32 #include "include/types.h"
33 #include "include/ceph_features.h"
34 #include "auth/Crypto.h"
35 #include "common/item_history.h"
36 #include "auth/AuthRegistry.h"
37 #include "include/ceph_assert.h"
38
39 #include <errno.h>
40 #include <sstream>
41 #include <signal.h>
42
43 #define SOCKET_PRIORITY_MIN_DELAY 6
44
45 class Timer;
46
47 class AuthClient;
48 class AuthServer;
49
50 #ifdef UNIT_TESTS_BUILT
51
52 struct Interceptor {
53 std::mutex lock;
54 std::condition_variable cond_var;
55
56 enum ACTION : uint32_t {
57 CONTINUE = 0,
58 FAIL,
59 STOP
60 };
61
62 virtual ~Interceptor() {}
63 virtual ACTION intercept(Connection *conn, uint32_t step) = 0;
64 };
65
66 #endif
67
68 class Messenger {
69 private:
70 std::deque<Dispatcher*> dispatchers;
71 std::deque<Dispatcher*> fast_dispatchers;
72 ZTracer::Endpoint trace_endpoint;
73
74 protected:
75 void set_endpoint_addr(const entity_addr_t& a,
76 const entity_name_t &name);
77
78 protected:
79 /// the "name" of the local daemon. eg client.99
80 entity_name_t my_name;
81
82 /// my addr
83 safe_item_history<entity_addrvec_t> my_addrs;
84
85 int default_send_priority;
86 /// std::set to true once the Messenger has started, and std::set to false on shutdown
87 bool started;
88 uint32_t magic;
89 int socket_priority;
90
91 public:
92 AuthClient *auth_client = 0;
93 AuthServer *auth_server = 0;
94
95 #ifdef UNIT_TESTS_BUILT
96 Interceptor *interceptor = nullptr;
97 #endif
98
99 /**
100 * Various Messenger conditional config/type flags to allow
101 * different "transport" Messengers to tune themselves
102 */
103 static const int HAS_HEAVY_TRAFFIC = 0x0001;
104 static const int HAS_MANY_CONNECTIONS = 0x0002;
105 static const int HEARTBEAT = 0x0004;
106
107 /**
108 * The CephContext this Messenger uses. Many other components initialize themselves
109 * from this value.
110 */
111 CephContext *cct;
112 int crcflags;
113
114 using Policy = ceph::net::Policy<Throttle>;
115
116 public:
117 // allow unauthenticated connections. This is needed for
118 // compatibility with pre-nautilus OSDs, which do not authenticate
119 // the heartbeat sessions.
120 bool require_authorizer = true;
121
122 protected:
123 // for authentication
124 AuthRegistry auth_registry;
125
126 public:
127 /**
128 * Messenger constructor. Call this from your implementation.
129 * Messenger users should construct full implementations directly,
130 * or use the create() function.
131 */
132 Messenger(CephContext *cct_, entity_name_t w);
133 virtual ~Messenger() {}
134
135 /**
136 * create a new messenger
137 *
138 * Create a new messenger instance, with whatever implementation is
139 * available or specified via the configuration in cct.
140 *
141 * @param cct context
142 * @param type name of messenger type
143 * @param name entity name to register
144 * @param lname logical name of the messenger in this process (e.g., "client")
145 * @param nonce nonce value to uniquely identify this instance on the current host
146 * @param features bits for the local connection
147 * @param cflags general std::set of flags to configure transport resources
148 */
149 static Messenger *create(CephContext *cct,
150 const std::string &type,
151 entity_name_t name,
152 std::string lname,
153 uint64_t nonce,
154 uint64_t cflags);
155
156 static uint64_t get_random_nonce();
157 static uint64_t get_pid_nonce();
158
159 /**
160 * create a new messenger
161 *
162 * Create a new messenger instance.
163 * Same as the above, but a slightly simpler interface for clients:
164 * - Generate a random nonce
165 * - use the default feature bits
166 * - get the messenger type from cct
167 * - use the client entity_type
168 *
169 * @param cct context
170 * @param lname logical name of the messenger in this process (e.g., "client")
171 */
172 static Messenger *create_client_messenger(CephContext *cct, std::string lname);
173
174 /**
175 * @defgroup Accessors
176 * @{
177 */
178 int get_mytype() const { return my_name.type(); }
179
180 /**
181 * Retrieve the Messenger's name
182 *
183 * @return A const reference to the name this Messenger
184 * currently believes to be its own.
185 */
186 const entity_name_t& get_myname() { return my_name; }
187
188 /**
189 * Retrieve the Messenger's address.
190 *
191 * @return A const reference to the address this Messenger
192 * currently believes to be its own.
193 */
194 const entity_addrvec_t& get_myaddrs() {
195 return *my_addrs;
196 }
197
198 /**
199 * get legacy addr for myself, suitable for protocol v1
200 *
201 * Note that myaddrs might be a proper addrvec with v1 in it, or it might be an
202 * ANY addr (if i am a pure client).
203 */
204 entity_addr_t get_myaddr_legacy() {
205 return my_addrs->as_legacy_addr();
206 }
207
208
209 /**
210 * std::set messenger's instance
211 */
212 uint32_t get_magic() { return magic; }
213 void set_magic(int _magic) { magic = _magic; }
214
215 void set_auth_client(AuthClient *ac) {
216 auth_client = ac;
217 }
218 void set_auth_server(AuthServer *as) {
219 auth_server = as;
220 }
221
222 protected:
223 /**
224 * std::set messenger's address
225 */
226 virtual void set_myaddrs(const entity_addrvec_t& a) {
227 my_addrs = a;
228 set_endpoint_addr(a.front(), my_name);
229 }
230 public:
231 /**
232 * @return the zipkin trace endpoint
233 */
234 const ZTracer::Endpoint* get_trace_endpoint() const {
235 return &trace_endpoint;
236 }
237
238 /**
239 * set the name of the local entity. The name is reported to others and
240 * can be changed while the system is running, but doing so at incorrect
241 * times may have bad results.
242 *
243 * @param m The name to std::set.
244 */
245 void set_myname(const entity_name_t& m) { my_name = m; }
246
247 /**
248 * set the unknown address components for this Messenger.
249 * This is useful if the Messenger doesn't know its full address just by
250 * binding, but another Messenger on the same interface has already learned
251 * its full address. This function does not fill in known address elements,
252 * cause a rebind, or do anything of that sort.
253 *
254 * @param addr The address to use as a template.
255 */
256 virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0;
257 /**
258 * set the address for this Messenger. This is useful if the Messenger
259 * binds to a specific address but advertises a different address on the
260 * the network.
261 *
262 * @param addr The address to use.
263 */
264 virtual void set_addrs(const entity_addrvec_t &addr) = 0;
265 /// Get the default send priority.
266 int get_default_send_priority() { return default_send_priority; }
267 /**
268 * Get the number of Messages which the Messenger has received
269 * but not yet dispatched.
270 */
271 virtual int get_dispatch_queue_len() = 0;
272
273 /**
274 * Get age of oldest undelivered message
275 * (0 if the queue is empty)
276 */
277 virtual double get_dispatch_queue_max_age(utime_t now) = 0;
278
279 /**
280 * @} // Accessors
281 */
282
283 /**
284 * @defgroup Configuration
285 * @{
286 */
287 /**
288 * set the cluster protocol in use by this daemon.
289 * This is an init-time function and cannot be called after calling
290 * start() or bind().
291 *
292 * @param p The cluster protocol to use. Defined externally.
293 */
294 virtual void set_cluster_protocol(int p) = 0;
295 /**
296 * set a policy which is applied to all peers who do not have a type-specific
297 * Policy.
298 * This is an init-time function and cannot be called after calling
299 * start() or bind().
300 *
301 * @param p The Policy to apply.
302 */
303 virtual void set_default_policy(Policy p) = 0;
304 /**
305 * set a policy which is applied to all peers of the given type.
306 * This is an init-time function and cannot be called after calling
307 * start() or bind().
308 *
309 * @param type The peer type this policy applies to.
310 * @param p The policy to apply.
311 */
312 virtual void set_policy(int type, Policy p) = 0;
313 /**
314 * set the Policy associated with a type of peer.
315 *
316 * This can be called either on initial setup, or after connections
317 * are already established. However, the policies for existing
318 * connections will not be affected; the new policy will only apply
319 * to future connections.
320 *
321 * @param t The peer type to get the default policy for.
322 * @return A const Policy reference.
323 */
324 virtual Policy get_policy(int t) = 0;
325 /**
326 * Get the default Policy
327 *
328 * @return A const Policy reference.
329 */
330 virtual Policy get_default_policy() = 0;
331 /**
332 * set Throttlers applied to all Messages from the given type of peer
333 *
334 * This is an init-time function and cannot be called after calling
335 * start() or bind().
336 *
337 * @param type The peer type the Throttlers will apply to.
338 * @param bytes The Throttle for the number of bytes carried by the message
339 * @param msgs The Throttle for the number of messages for this @p type
340 * @note The Messenger does not take ownership of the Throttle pointers, but
341 * you must not destroy them before you destroy the Messenger.
342 */
343 virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0;
344 /**
345 * set the default send priority
346 *
347 * This is an init-time function and must be called *before* calling
348 * start().
349 *
350 * @param p The cluster protocol to use. Defined externally.
351 */
352 void set_default_send_priority(int p) {
353 ceph_assert(!started);
354 default_send_priority = p;
355 }
356 /**
357 * set the priority(SO_PRIORITY) for all packets to be sent on this socket.
358 *
359 * Linux uses this value to order the networking queues: packets with a higher
360 * priority may be processed first depending on the selected device queueing
361 * discipline.
362 *
363 * @param prio The priority. Setting a priority outside the range 0 to 6
364 * requires the CAP_NET_ADMIN capability.
365 */
366 void set_socket_priority(int prio) {
367 socket_priority = prio;
368 }
369 /**
370 * Get the socket priority
371 *
372 * @return the socket priority
373 */
374 int get_socket_priority() {
375 return socket_priority;
376 }
377 /**
378 * Add a new Dispatcher to the front of the list. If you add
379 * a Dispatcher which is already included, it will get a duplicate
380 * entry. This will reduce efficiency but not break anything.
381 *
382 * @param d The Dispatcher to insert into the list.
383 */
384 void add_dispatcher_head(Dispatcher *d) {
385 bool first = dispatchers.empty();
386 dispatchers.push_front(d);
387 if (d->ms_can_fast_dispatch_any())
388 fast_dispatchers.push_front(d);
389 if (first)
390 ready();
391 }
392 /**
393 * Add a new Dispatcher to the end of the list. If you add
394 * a Dispatcher which is already included, it will get a duplicate
395 * entry. This will reduce efficiency but not break anything.
396 *
397 * @param d The Dispatcher to insert into the list.
398 */
399 void add_dispatcher_tail(Dispatcher *d) {
400 bool first = dispatchers.empty();
401 dispatchers.push_back(d);
402 if (d->ms_can_fast_dispatch_any())
403 fast_dispatchers.push_back(d);
404 if (first)
405 ready();
406 }
407 /**
408 * Bind the Messenger to a specific address. If bind_addr
409 * is not completely filled in the system will use the
410 * valid portions and cycle through the unset ones (eg, the port)
411 * in an unspecified order.
412 *
413 * @param bind_addr The address to bind to.
414 * @return 0 on success, or -1 on error, or -errno if
415 * we can be more specific about the failure.
416 */
417 virtual int bind(const entity_addr_t& bind_addr) = 0;
418
419 /**
420 * This function performs a full restart of the Messenger component,
421 * whatever that means. Other entities who connect to this
422 * Messenger post-rebind() should perceive it as a new entity which
423 * they have not previously contacted, and it MUST bind to a
424 * different address than it did previously.
425 *
426 * @param avoid_ports Additional port to avoid binding to.
427 */
428 virtual int rebind(const std::set<int>& avoid_ports) { return -EOPNOTSUPP; }
429 /**
430 * Bind the 'client' Messenger to a specific address.Messenger will bind
431 * the address before connect to others when option ms_bind_before_connect
432 * is true.
433 * @param bind_addr The address to bind to.
434 * @return 0 on success, or -1 on error, or -errno if
435 */
436 virtual int client_bind(const entity_addr_t& bind_addr) = 0;
437
438 virtual int bindv(const entity_addrvec_t& addrs);
439
440
441 virtual bool should_use_msgr2() {
442 return false;
443 }
444
445 /**
446 * @} // Configuration
447 */
448
449 /**
450 * @defgroup Startup/Shutdown
451 * @{
452 */
453 /**
454 * Perform any resource allocation, thread startup, etc
455 * that is required before attempting to connect to other
456 * Messengers or transmit messages.
457 * Once this function completes, started shall be set to true.
458 *
459 * @return 0 on success; -errno on failure.
460 */
461 virtual int start() { started = true; return 0; }
462
463 // shutdown
464 /**
465 * Block until the Messenger has finished shutting down (according
466 * to the shutdown() function).
467 * It is valid to call this after calling shutdown(), but it must
468 * be called before deleting the Messenger.
469 */
470 virtual void wait() = 0;
471 /**
472 * Initiate a shutdown of the Messenger.
473 *
474 * @return 0 on success, -errno otherwise.
475 */
476 virtual int shutdown() { started = false; return 0; }
477 /**
478 * @} // Startup/Shutdown
479 */
480
481 /**
482 * @defgroup Messaging
483 * @{
484 */
485 /**
486 * Queue the given Message for the given entity.
487 * Success in this function does not guarantee Message delivery, only
488 * success in queueing the Message. Other guarantees may be provided based
489 * on the Connection policy associated with the dest.
490 *
491 * @param m The Message to send. The Messenger consumes a single reference
492 * when you pass it in.
493 * @param dest The entity to send the Message to.
494 *
495 * DEPRECATED: please do not use this interface for any new code;
496 * use the Connection* variant.
497 *
498 * @return 0 on success, or -errno on failure.
499 */
500 virtual int send_to(
501 Message *m,
502 int type,
503 const entity_addrvec_t& addr) = 0;
504 int send_to_mon(
505 Message *m, const entity_addrvec_t& addrs) {
506 return send_to(m, CEPH_ENTITY_TYPE_MON, addrs);
507 }
508 int send_to_mds(
509 Message *m, const entity_addrvec_t& addrs) {
510 return send_to(m, CEPH_ENTITY_TYPE_MDS, addrs);
511 }
512
513 /**
514 * @} // Messaging
515 */
516 /**
517 * @defgroup Connection Management
518 * @{
519 */
520 /**
521 * Get the Connection object associated with a given entity. If a
522 * Connection does not exist, create one and establish a logical connection.
523 * The caller owns a reference when this returns. Call ->put() when you're
524 * done!
525 *
526 * @param dest The entity to get a connection for.
527 */
528 virtual ConnectionRef connect_to(
529 int type, const entity_addrvec_t& dest,
530 bool anon=false, bool not_local_dest=false) = 0;
531 ConnectionRef connect_to_mon(const entity_addrvec_t& dest,
532 bool anon=false, bool not_local_dest=false) {
533 return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon, not_local_dest);
534 }
535 ConnectionRef connect_to_mds(const entity_addrvec_t& dest,
536 bool anon=false, bool not_local_dest=false) {
537 return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon, not_local_dest);
538 }
539 ConnectionRef connect_to_osd(const entity_addrvec_t& dest,
540 bool anon=false, bool not_local_dest=false) {
541 return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon, not_local_dest);
542 }
543 ConnectionRef connect_to_mgr(const entity_addrvec_t& dest,
544 bool anon=false, bool not_local_dest=false) {
545 return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon, not_local_dest);
546 }
547
548 /**
549 * Get the Connection object associated with ourselves.
550 */
551 virtual ConnectionRef get_loopback_connection() = 0;
552 /**
553 * Mark down a Connection to a remote.
554 *
555 * This will cause us to discard our outgoing queue for them, and if
556 * reset detection is enabled in the policy and the endpoint tries
557 * to reconnect they will discard their queue when we inform them of
558 * the session reset.
559 *
560 * If there is no Connection to the given dest, it is a no-op.
561 *
562 * This generates a RESET notification to the Dispatcher.
563 *
564 * DEPRECATED: please do not use this interface for any new code;
565 * use the Connection* variant.
566 *
567 * @param a The address to mark down.
568 */
569 virtual void mark_down(const entity_addr_t& a) = 0;
570 virtual void mark_down_addrs(const entity_addrvec_t& a) {
571 mark_down(a.legacy_addr());
572 }
573 /**
574 * Mark all the existing Connections down. This is equivalent
575 * to iterating over all Connections and calling mark_down()
576 * on each.
577 *
578 * This will generate a RESET event for each closed connections.
579 */
580 virtual void mark_down_all() = 0;
581 /**
582 * @} // Connection Management
583 */
584 protected:
585 /**
586 * @defgroup Subclass Interfacing
587 * @{
588 */
589 /**
590 * A courtesy function for Messenger implementations which
591 * will be called when we receive our first Dispatcher.
592 */
593 virtual void ready() { }
594 /**
595 * @} // Subclass Interfacing
596 */
597 public:
598 #ifdef CEPH_USE_SIGPIPE_BLOCKER
599 /**
600 * We need to disable SIGPIPE on all platforms, and if they
601 * don't give us a better mechanism (read: are on Solaris) that
602 * means blocking the signal whenever we do a send or sendmsg...
603 * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope
604 * whenever doing so. On most systems that's blank, but on systems where
605 * it's needed we construct an RAII object to plug and un-plug the SIGPIPE.
606 * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
607 */
608 struct sigpipe_stopper {
609 bool blocked;
610 sigset_t existing_mask;
611 sigset_t pipe_mask;
612 sigpipe_stopper() {
613 sigemptyset(&pipe_mask);
614 sigaddset(&pipe_mask, SIGPIPE);
615 sigset_t signals;
616 sigemptyset(&signals);
617 sigpending(&signals);
618 if (sigismember(&signals, SIGPIPE)) {
619 blocked = false;
620 } else {
621 blocked = true;
622 int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
623 ceph_assert(r == 0);
624 }
625 }
626 ~sigpipe_stopper() {
627 if (blocked) {
628 struct timespec nowait{0};
629 int r = sigtimedwait(&pipe_mask, 0, &nowait);
630 ceph_assert(r == EAGAIN || r == 0);
631 r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
632 ceph_assert(r == 0);
633 }
634 }
635 };
636 # define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper();
637 #else
638 # define MSGR_SIGPIPE_STOPPER
639 #endif
640 /**
641 * @defgroup Dispatcher Interfacing
642 * @{
643 */
644 /**
645 * Determine whether a message can be fast-dispatched. We will
646 * query each Dispatcher in sequence to determine if they are
647 * capable of handling a particular message via "fast dispatch".
648 *
649 * @param m The Message we are testing.
650 */
651 bool ms_can_fast_dispatch(const cref_t<Message>& m) {
652 for (const auto &dispatcher : fast_dispatchers) {
653 if (dispatcher->ms_can_fast_dispatch2(m))
654 return true;
655 }
656 return false;
657 }
658
659 /**
660 * Deliver a single Message via "fast dispatch".
661 *
662 * @param m The Message we are fast dispatching.
663 * If none of our Dispatchers can handle it, ceph_abort().
664 */
665 void ms_fast_dispatch(const ref_t<Message> &m) {
666 m->set_dispatch_stamp(ceph_clock_now());
667 for (const auto &dispatcher : fast_dispatchers) {
668 if (dispatcher->ms_can_fast_dispatch2(m)) {
669 dispatcher->ms_fast_dispatch2(m);
670 return;
671 }
672 }
673 ceph_abort();
674 }
675 void ms_fast_dispatch(Message *m) {
676 return ms_fast_dispatch(ref_t<Message>(m, false)); /* consume ref */
677 }
678 /**
679 *
680 */
681 void ms_fast_preprocess(const ref_t<Message> &m) {
682 for (const auto &dispatcher : fast_dispatchers) {
683 dispatcher->ms_fast_preprocess2(m);
684 }
685 }
686 /**
687 * Deliver a single Message. Send it to each Dispatcher
688 * in sequence until one of them handles it.
689 * If none of our Dispatchers can handle it, ceph_abort().
690 *
691 * @param m The Message to deliver.
692 */
693 void ms_deliver_dispatch(const ref_t<Message> &m) {
694 m->set_dispatch_stamp(ceph_clock_now());
695 for (const auto &dispatcher : dispatchers) {
696 if (dispatcher->ms_dispatch2(m))
697 return;
698 }
699 lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
700 << m->get_source_inst() << dendl;
701 ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
702 }
703 void ms_deliver_dispatch(Message *m) {
704 return ms_deliver_dispatch(ref_t<Message>(m, false)); /* consume ref */
705 }
706 /**
707 * Notify each Dispatcher of a new Connection. Call
708 * this function whenever a new Connection is initiated or
709 * reconnects.
710 *
711 * @param con Pointer to the new Connection.
712 */
713 void ms_deliver_handle_connect(Connection *con) {
714 for (const auto& dispatcher : dispatchers) {
715 dispatcher->ms_handle_connect(con);
716 }
717 }
718
719 /**
720 * Notify each fast Dispatcher of a new Connection. Call
721 * this function whenever a new Connection is initiated or
722 * reconnects.
723 *
724 * @param con Pointer to the new Connection.
725 */
726 void ms_deliver_handle_fast_connect(Connection *con) {
727 for (const auto& dispatcher : fast_dispatchers) {
728 dispatcher->ms_handle_fast_connect(con);
729 }
730 }
731
732 /**
733 * Notify each Dispatcher of a new incoming Connection. Call
734 * this function whenever a new Connection is accepted.
735 *
736 * @param con Pointer to the new Connection.
737 */
738 void ms_deliver_handle_accept(Connection *con) {
739 for (const auto& dispatcher : dispatchers) {
740 dispatcher->ms_handle_accept(con);
741 }
742 }
743
744 /**
745 * Notify each fast Dispatcher of a new incoming Connection. Call
746 * this function whenever a new Connection is accepted.
747 *
748 * @param con Pointer to the new Connection.
749 */
750 void ms_deliver_handle_fast_accept(Connection *con) {
751 for (const auto& dispatcher : fast_dispatchers) {
752 dispatcher->ms_handle_fast_accept(con);
753 }
754 }
755
756 /**
757 * Notify each Dispatcher of a Connection which may have lost
758 * Messages. Call this function whenever you detect that a lossy Connection
759 * has been disconnected.
760 *
761 * @param con Pointer to the broken Connection.
762 */
763 void ms_deliver_handle_reset(Connection *con) {
764 for (const auto& dispatcher : dispatchers) {
765 if (dispatcher->ms_handle_reset(con))
766 return;
767 }
768 }
769 /**
770 * Notify each Dispatcher of a Connection which has been "forgotten" about
771 * by the remote end, implying that messages have probably been lost.
772 * Call this function whenever you detect a reset.
773 *
774 * @param con Pointer to the broken Connection.
775 */
776 void ms_deliver_handle_remote_reset(Connection *con) {
777 for (const auto& dispatcher : dispatchers) {
778 dispatcher->ms_handle_remote_reset(con);
779 }
780 }
781
782 /**
783 * Notify each Dispatcher of a Connection for which reconnection
784 * attempts are being refused. Call this function whenever you
785 * detect that a lossy Connection has been disconnected and it's
786 * impossible to reconnect.
787 *
788 * @param con Pointer to the broken Connection.
789 */
790 void ms_deliver_handle_refused(Connection *con) {
791 for (const auto& dispatcher : dispatchers) {
792 if (dispatcher->ms_handle_refused(con))
793 return;
794 }
795 }
796
797 void set_require_authorizer(bool b) {
798 require_authorizer = b;
799 }
800
801 /**
802 * @} // Dispatcher Interfacing
803 */
804 };
805
806
807
808 #endif