]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/Messenger.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / msg / Messenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16
17 #ifndef CEPH_MESSENGER_H
18 #define CEPH_MESSENGER_H
19
20 #include <map>
21 #include <deque>
22
23 #include <errno.h>
24 #include <sstream>
25 #include <memory>
26
27 #include "Message.h"
28 #include "Dispatcher.h"
29 #include "Policy.h"
30 #include "common/Throttle.h"
31 #include "include/Context.h"
32 #include "include/types.h"
33 #include "include/ceph_features.h"
34 #include "auth/Crypto.h"
35 #include "common/item_history.h"
36 #include "auth/AuthRegistry.h"
37 #include "include/ceph_assert.h"
38
39 #include <errno.h>
40 #include <sstream>
41 #include <signal.h>
42
43 #define SOCKET_PRIORITY_MIN_DELAY 6
44
45 class Timer;
46
47 class AuthClient;
48 class AuthServer;
49
50 #ifdef UNIT_TESTS_BUILT
51
52 struct Interceptor {
53 std::mutex lock;
54 std::condition_variable cond_var;
55
56 enum ACTION : uint32_t {
57 CONTINUE = 0,
58 FAIL,
59 STOP
60 };
61
62 enum STEP {
63 START_CLIENT_BANNER_EXCHANGE = 1,
64 START_SERVER_BANNER_EXCHANGE,
65 BANNER_EXCHANGE_BANNER_CONNECTING,
66 BANNER_EXCHANGE,
67 HANDLE_PEER_BANNER_BANNER_CONNECTING,
68 HANDLE_PEER_BANNER,
69 HANDLE_PEER_BANNER_PAYLOAD_HELLO_CONNECTING,
70 HANDLE_PEER_BANNER_PAYLOAD,
71 SEND_AUTH_REQUEST,
72 HANDLE_AUTH_REQUEST_ACCEPTING_SIGN,
73 SEND_CLIENT_IDENTITY,
74 SEND_SERVER_IDENTITY,
75 SEND_RECONNECT,
76 SEND_RECONNECT_OK,
77 READY,
78 HANDLE_MESSAGE,
79 READ_MESSAGE_COMPLETE,
80 SESSION_RETRY
81 };
82
83 virtual ~Interceptor() {}
84 virtual ACTION intercept(Connection *conn, uint32_t step) = 0;
85 };
86
87 #endif
88
89 class Messenger {
90 private:
91 std::deque<Dispatcher*> dispatchers;
92 std::deque<Dispatcher*> fast_dispatchers;
93 ZTracer::Endpoint trace_endpoint;
94
95 protected:
96 void set_endpoint_addr(const entity_addr_t& a,
97 const entity_name_t &name);
98
99 protected:
100 /// the "name" of the local daemon. eg client.99
101 entity_name_t my_name;
102
103 /// my addr
104 safe_item_history<entity_addrvec_t> my_addrs;
105
106 int default_send_priority;
107 /// std::set to true once the Messenger has started, and std::set to false on shutdown
108 bool started;
109 uint32_t magic;
110 int socket_priority;
111
112 public:
113 AuthClient *auth_client = 0;
114 AuthServer *auth_server = 0;
115
116 #ifdef UNIT_TESTS_BUILT
117 Interceptor *interceptor = nullptr;
118 #endif
119
120 /**
121 * The CephContext this Messenger uses. Many other components initialize themselves
122 * from this value.
123 */
124 CephContext *cct;
125 int crcflags;
126
127 using Policy = ceph::net::Policy<Throttle>;
128
129 public:
130 // allow unauthenticated connections. This is needed for
131 // compatibility with pre-nautilus OSDs, which do not authenticate
132 // the heartbeat sessions.
133 bool require_authorizer = true;
134
135 protected:
136 // for authentication
137 AuthRegistry auth_registry;
138
139 public:
140 /**
141 * Messenger constructor. Call this from your implementation.
142 * Messenger users should construct full implementations directly,
143 * or use the create() function.
144 */
145 Messenger(CephContext *cct_, entity_name_t w);
146 virtual ~Messenger() {}
147
148 /**
149 * create a new messenger
150 *
151 * Create a new messenger instance, with whatever implementation is
152 * available or specified via the configuration in cct.
153 *
154 * @param cct context
155 * @param type name of messenger type
156 * @param name entity name to register
157 * @param lname logical name of the messenger in this process (e.g., "client")
158 * @param nonce nonce value to uniquely identify this instance on the current host
159 */
160 static Messenger *create(CephContext *cct,
161 const std::string &type,
162 entity_name_t name,
163 std::string lname,
164 uint64_t nonce);
165
166 static uint64_t get_random_nonce();
167 static uint64_t get_pid_nonce();
168
169 /**
170 * create a new messenger
171 *
172 * Create a new messenger instance.
173 * Same as the above, but a slightly simpler interface for clients:
174 * - Generate a random nonce
175 * - get the messenger type from cct
176 * - use the client entity_type
177 *
178 * @param cct context
179 * @param lname logical name of the messenger in this process (e.g., "client")
180 */
181 static Messenger *create_client_messenger(CephContext *cct, std::string lname);
182
183 /**
184 * @defgroup Accessors
185 * @{
186 */
187 int get_mytype() const { return my_name.type(); }
188
189 /**
190 * Retrieve the Messenger's name
191 *
192 * @return A const reference to the name this Messenger
193 * currently believes to be its own.
194 */
195 const entity_name_t& get_myname() { return my_name; }
196
197 /**
198 * Retrieve the Messenger's address.
199 *
200 * @return A const reference to the address this Messenger
201 * currently believes to be its own.
202 */
203 const entity_addrvec_t& get_myaddrs() {
204 return *my_addrs;
205 }
206
207 /**
208 * get legacy addr for myself, suitable for protocol v1
209 *
210 * Note that myaddrs might be a proper addrvec with v1 in it, or it might be an
211 * ANY addr (if i am a pure client).
212 */
213 entity_addr_t get_myaddr_legacy() {
214 return my_addrs->as_legacy_addr();
215 }
216
217
218 /**
219 * std::set messenger's instance
220 */
221 uint32_t get_magic() { return magic; }
222 void set_magic(int _magic) { magic = _magic; }
223
224 void set_auth_client(AuthClient *ac) {
225 auth_client = ac;
226 }
227 void set_auth_server(AuthServer *as) {
228 auth_server = as;
229 }
230
231 protected:
232 /**
233 * std::set messenger's address
234 */
235 virtual void set_myaddrs(const entity_addrvec_t& a) {
236 my_addrs = a;
237 set_endpoint_addr(a.front(), my_name);
238 }
239 public:
240 /**
241 * @return the zipkin trace endpoint
242 */
243 const ZTracer::Endpoint* get_trace_endpoint() const {
244 return &trace_endpoint;
245 }
246
247 /**
248 * set the name of the local entity. The name is reported to others and
249 * can be changed while the system is running, but doing so at incorrect
250 * times may have bad results.
251 *
252 * @param m The name to std::set.
253 */
254 void set_myname(const entity_name_t& m) { my_name = m; }
255
256 /**
257 * set the unknown address components for this Messenger.
258 * This is useful if the Messenger doesn't know its full address just by
259 * binding, but another Messenger on the same interface has already learned
260 * its full address. This function does not fill in known address elements,
261 * cause a rebind, or do anything of that sort.
262 *
263 * @param addr The address to use as a template.
264 */
265 virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0;
266 /**
267 * set the address for this Messenger. This is useful if the Messenger
268 * binds to a specific address but advertises a different address on the
269 * the network.
270 *
271 * @param addr The address to use.
272 */
273 virtual void set_addrs(const entity_addrvec_t &addr) = 0;
274 /// Get the default send priority.
275 int get_default_send_priority() { return default_send_priority; }
276 /**
277 * Get the number of Messages which the Messenger has received
278 * but not yet dispatched.
279 */
280 virtual int get_dispatch_queue_len() = 0;
281
282 /**
283 * Get age of oldest undelivered message
284 * (0 if the queue is empty)
285 */
286 virtual double get_dispatch_queue_max_age(utime_t now) = 0;
287
288 /**
289 * @} // Accessors
290 */
291
292 /**
293 * @defgroup Configuration
294 * @{
295 */
296 /**
297 * set the cluster protocol in use by this daemon.
298 * This is an init-time function and cannot be called after calling
299 * start() or bind().
300 *
301 * @param p The cluster protocol to use. Defined externally.
302 */
303 virtual void set_cluster_protocol(int p) = 0;
304 /**
305 * set a policy which is applied to all peers who do not have a type-specific
306 * Policy.
307 * This is an init-time function and cannot be called after calling
308 * start() or bind().
309 *
310 * @param p The Policy to apply.
311 */
312 virtual void set_default_policy(Policy p) = 0;
313 /**
314 * set a policy which is applied to all peers of the given type.
315 * This is an init-time function and cannot be called after calling
316 * start() or bind().
317 *
318 * @param type The peer type this policy applies to.
319 * @param p The policy to apply.
320 */
321 virtual void set_policy(int type, Policy p) = 0;
322 /**
323 * set the Policy associated with a type of peer.
324 *
325 * This can be called either on initial setup, or after connections
326 * are already established. However, the policies for existing
327 * connections will not be affected; the new policy will only apply
328 * to future connections.
329 *
330 * @param t The peer type to get the default policy for.
331 * @return A const Policy reference.
332 */
333 virtual Policy get_policy(int t) = 0;
334 /**
335 * Get the default Policy
336 *
337 * @return A const Policy reference.
338 */
339 virtual Policy get_default_policy() = 0;
340 /**
341 * set Throttlers applied to all Messages from the given type of peer
342 *
343 * This is an init-time function and cannot be called after calling
344 * start() or bind().
345 *
346 * @param type The peer type the Throttlers will apply to.
347 * @param bytes The Throttle for the number of bytes carried by the message
348 * @param msgs The Throttle for the number of messages for this @p type
349 * @note The Messenger does not take ownership of the Throttle pointers, but
350 * you must not destroy them before you destroy the Messenger.
351 */
352 virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0;
353 /**
354 * set the default send priority
355 *
356 * This is an init-time function and must be called *before* calling
357 * start().
358 *
359 * @param p The cluster protocol to use. Defined externally.
360 */
361 void set_default_send_priority(int p) {
362 ceph_assert(!started);
363 default_send_priority = p;
364 }
365 /**
366 * set the priority(SO_PRIORITY) for all packets to be sent on this socket.
367 *
368 * Linux uses this value to order the networking queues: packets with a higher
369 * priority may be processed first depending on the selected device queueing
370 * discipline.
371 *
372 * @param prio The priority. Setting a priority outside the range 0 to 6
373 * requires the CAP_NET_ADMIN capability.
374 */
375 void set_socket_priority(int prio) {
376 socket_priority = prio;
377 }
378 /**
379 * Get the socket priority
380 *
381 * @return the socket priority
382 */
383 int get_socket_priority() {
384 return socket_priority;
385 }
386 /**
387 * Add a new Dispatcher to the front of the list. If you add
388 * a Dispatcher which is already included, it will get a duplicate
389 * entry. This will reduce efficiency but not break anything.
390 *
391 * @param d The Dispatcher to insert into the list.
392 */
393 void add_dispatcher_head(Dispatcher *d) {
394 bool first = dispatchers.empty();
395 dispatchers.push_front(d);
396 if (d->ms_can_fast_dispatch_any())
397 fast_dispatchers.push_front(d);
398 if (first)
399 ready();
400 }
401 /**
402 * Add a new Dispatcher to the end of the list. If you add
403 * a Dispatcher which is already included, it will get a duplicate
404 * entry. This will reduce efficiency but not break anything.
405 *
406 * @param d The Dispatcher to insert into the list.
407 */
408 void add_dispatcher_tail(Dispatcher *d) {
409 bool first = dispatchers.empty();
410 dispatchers.push_back(d);
411 if (d->ms_can_fast_dispatch_any())
412 fast_dispatchers.push_back(d);
413 if (first)
414 ready();
415 }
416 /**
417 * Bind the Messenger to a specific address. If bind_addr
418 * is not completely filled in the system will use the
419 * valid portions and cycle through the unset ones (eg, the port)
420 * in an unspecified order.
421 *
422 * @param bind_addr The address to bind to.
423 * @return 0 on success, or -1 on error, or -errno if
424 * we can be more specific about the failure.
425 */
426 virtual int bind(const entity_addr_t& bind_addr) = 0;
427
428 virtual int bindv(const entity_addrvec_t& addrs);
429
430 /**
431 * This function performs a full restart of the Messenger component,
432 * whatever that means. Other entities who connect to this
433 * Messenger post-rebind() should perceive it as a new entity which
434 * they have not previously contacted, and it MUST bind to a
435 * different address than it did previously.
436 *
437 * @param avoid_ports Additional port to avoid binding to.
438 */
439 virtual int rebind(const std::set<int>& avoid_ports) { return -EOPNOTSUPP; }
440 /**
441 * Bind the 'client' Messenger to a specific address.Messenger will bind
442 * the address before connect to others when option ms_bind_before_connect
443 * is true.
444 * @param bind_addr The address to bind to.
445 * @return 0 on success, or -1 on error, or -errno if
446 * we can be more specific about the failure.
447 */
448 virtual int client_bind(const entity_addr_t& bind_addr) = 0;
449
450 /**
451 * reset the 'client' Messenger. Mark all the existing Connections down
452 * and update 'nonce'.
453 */
454 virtual int client_reset() = 0;
455
456
457 virtual bool should_use_msgr2() {
458 return false;
459 }
460
461 /**
462 * @} // Configuration
463 */
464
465 /**
466 * @defgroup Startup/Shutdown
467 * @{
468 */
469 /**
470 * Perform any resource allocation, thread startup, etc
471 * that is required before attempting to connect to other
472 * Messengers or transmit messages.
473 * Once this function completes, started shall be set to true.
474 *
475 * @return 0 on success; -errno on failure.
476 */
477 virtual int start() { started = true; return 0; }
478
479 // shutdown
480 /**
481 * Block until the Messenger has finished shutting down (according
482 * to the shutdown() function).
483 * It is valid to call this after calling shutdown(), but it must
484 * be called before deleting the Messenger.
485 */
486 virtual void wait() = 0;
487 /**
488 * Initiate a shutdown of the Messenger.
489 *
490 * @return 0 on success, -errno otherwise.
491 */
492 virtual int shutdown() { started = false; return 0; }
493 /**
494 * @} // Startup/Shutdown
495 */
496
497 /**
498 * @defgroup Messaging
499 * @{
500 */
501 /**
502 * Queue the given Message for the given entity.
503 * Success in this function does not guarantee Message delivery, only
504 * success in queueing the Message. Other guarantees may be provided based
505 * on the Connection policy associated with the dest.
506 *
507 * @param m The Message to send. The Messenger consumes a single reference
508 * when you pass it in.
509 * @param dest The entity to send the Message to.
510 *
511 * DEPRECATED: please do not use this interface for any new code;
512 * use the Connection* variant.
513 *
514 * @return 0 on success, or -errno on failure.
515 */
516 virtual int send_to(
517 Message *m,
518 int type,
519 const entity_addrvec_t& addr) = 0;
520 int send_to_mon(
521 Message *m, const entity_addrvec_t& addrs) {
522 return send_to(m, CEPH_ENTITY_TYPE_MON, addrs);
523 }
524 int send_to_mds(
525 Message *m, const entity_addrvec_t& addrs) {
526 return send_to(m, CEPH_ENTITY_TYPE_MDS, addrs);
527 }
528
529 /**
530 * @} // Messaging
531 */
532 /**
533 * @defgroup Connection Management
534 * @{
535 */
536 /**
537 * Get the Connection object associated with a given entity. If a
538 * Connection does not exist, create one and establish a logical connection.
539 * The caller owns a reference when this returns. Call ->put() when you're
540 * done!
541 *
542 * @param dest The entity to get a connection for.
543 */
544 virtual ConnectionRef connect_to(
545 int type, const entity_addrvec_t& dest,
546 bool anon=false, bool not_local_dest=false) = 0;
547 ConnectionRef connect_to_mon(const entity_addrvec_t& dest,
548 bool anon=false, bool not_local_dest=false) {
549 return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon, not_local_dest);
550 }
551 ConnectionRef connect_to_mds(const entity_addrvec_t& dest,
552 bool anon=false, bool not_local_dest=false) {
553 return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon, not_local_dest);
554 }
555 ConnectionRef connect_to_osd(const entity_addrvec_t& dest,
556 bool anon=false, bool not_local_dest=false) {
557 return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon, not_local_dest);
558 }
559 ConnectionRef connect_to_mgr(const entity_addrvec_t& dest,
560 bool anon=false, bool not_local_dest=false) {
561 return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon, not_local_dest);
562 }
563
564 /**
565 * Get the Connection object associated with ourselves.
566 */
567 virtual ConnectionRef get_loopback_connection() = 0;
568 /**
569 * Mark down a Connection to a remote.
570 *
571 * This will cause us to discard our outgoing queue for them, and if
572 * reset detection is enabled in the policy and the endpoint tries
573 * to reconnect they will discard their queue when we inform them of
574 * the session reset.
575 *
576 * If there is no Connection to the given dest, it is a no-op.
577 *
578 * This generates a RESET notification to the Dispatcher.
579 *
580 * DEPRECATED: please do not use this interface for any new code;
581 * use the Connection* variant.
582 *
583 * @param a The address to mark down.
584 */
585 virtual void mark_down(const entity_addr_t& a) = 0;
586 virtual void mark_down_addrs(const entity_addrvec_t& a) {
587 mark_down(a.legacy_addr());
588 }
589 /**
590 * Mark all the existing Connections down. This is equivalent
591 * to iterating over all Connections and calling mark_down()
592 * on each.
593 *
594 * This will generate a RESET event for each closed connections.
595 */
596 virtual void mark_down_all() = 0;
597 /**
598 * @} // Connection Management
599 */
600 protected:
601 /**
602 * @defgroup Subclass Interfacing
603 * @{
604 */
605 /**
606 * A courtesy function for Messenger implementations which
607 * will be called when we receive our first Dispatcher.
608 */
609 virtual void ready() { }
610 /**
611 * @} // Subclass Interfacing
612 */
613 public:
614 #ifdef CEPH_USE_SIGPIPE_BLOCKER
615 /**
616 * We need to disable SIGPIPE on all platforms, and if they
617 * don't give us a better mechanism (read: are on Solaris) that
618 * means blocking the signal whenever we do a send or sendmsg...
619 * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope
620 * whenever doing so. On most systems that's blank, but on systems where
621 * it's needed we construct an RAII object to plug and un-plug the SIGPIPE.
622 * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
623 */
624 struct sigpipe_stopper {
625 bool blocked;
626 sigset_t existing_mask;
627 sigset_t pipe_mask;
628 sigpipe_stopper() {
629 sigemptyset(&pipe_mask);
630 sigaddset(&pipe_mask, SIGPIPE);
631 sigset_t signals;
632 sigemptyset(&signals);
633 sigpending(&signals);
634 if (sigismember(&signals, SIGPIPE)) {
635 blocked = false;
636 } else {
637 blocked = true;
638 int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
639 ceph_assert(r == 0);
640 }
641 }
642 ~sigpipe_stopper() {
643 if (blocked) {
644 struct timespec nowait{0};
645 int r = sigtimedwait(&pipe_mask, 0, &nowait);
646 ceph_assert(r == EAGAIN || r == 0);
647 r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
648 ceph_assert(r == 0);
649 }
650 }
651 };
652 # define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper();
653 #else
654 # define MSGR_SIGPIPE_STOPPER
655 #endif
656 /**
657 * @defgroup Dispatcher Interfacing
658 * @{
659 */
660 /**
661 * Determine whether a message can be fast-dispatched. We will
662 * query each Dispatcher in sequence to determine if they are
663 * capable of handling a particular message via "fast dispatch".
664 *
665 * @param m The Message we are testing.
666 */
667 bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
668 for (const auto &dispatcher : fast_dispatchers) {
669 if (dispatcher->ms_can_fast_dispatch2(m))
670 return true;
671 }
672 return false;
673 }
674
675 /**
676 * Deliver a single Message via "fast dispatch".
677 *
678 * @param m The Message we are fast dispatching.
679 * If none of our Dispatchers can handle it, ceph_abort().
680 */
681 void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
682 m->set_dispatch_stamp(ceph_clock_now());
683 for (const auto &dispatcher : fast_dispatchers) {
684 if (dispatcher->ms_can_fast_dispatch2(m)) {
685 dispatcher->ms_fast_dispatch2(m);
686 return;
687 }
688 }
689 ceph_abort();
690 }
691 void ms_fast_dispatch(Message *m) {
692 return ms_fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
693 }
694 /**
695 *
696 */
697 void ms_fast_preprocess(const ceph::ref_t<Message> &m) {
698 for (const auto &dispatcher : fast_dispatchers) {
699 dispatcher->ms_fast_preprocess2(m);
700 }
701 }
702 /**
703 * Deliver a single Message. Send it to each Dispatcher
704 * in sequence until one of them handles it.
705 * If none of our Dispatchers can handle it, ceph_abort().
706 *
707 * @param m The Message to deliver.
708 */
709 void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
710 m->set_dispatch_stamp(ceph_clock_now());
711 for (const auto &dispatcher : dispatchers) {
712 if (dispatcher->ms_dispatch2(m))
713 return;
714 }
715 lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
716 << m->get_source_inst() << dendl;
717 ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
718 }
719 void ms_deliver_dispatch(Message *m) {
720 return ms_deliver_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
721 }
722 /**
723 * Notify each Dispatcher of a new Connection. Call
724 * this function whenever a new Connection is initiated or
725 * reconnects.
726 *
727 * @param con Pointer to the new Connection.
728 */
729 void ms_deliver_handle_connect(Connection *con) {
730 for (const auto& dispatcher : dispatchers) {
731 dispatcher->ms_handle_connect(con);
732 }
733 }
734
735 /**
736 * Notify each fast Dispatcher of a new Connection. Call
737 * this function whenever a new Connection is initiated or
738 * reconnects.
739 *
740 * @param con Pointer to the new Connection.
741 */
742 void ms_deliver_handle_fast_connect(Connection *con) {
743 for (const auto& dispatcher : fast_dispatchers) {
744 dispatcher->ms_handle_fast_connect(con);
745 }
746 }
747
748 /**
749 * Notify each Dispatcher of a new incoming Connection. Call
750 * this function whenever a new Connection is accepted.
751 *
752 * @param con Pointer to the new Connection.
753 */
754 void ms_deliver_handle_accept(Connection *con) {
755 for (const auto& dispatcher : dispatchers) {
756 dispatcher->ms_handle_accept(con);
757 }
758 }
759
760 /**
761 * Notify each fast Dispatcher of a new incoming Connection. Call
762 * this function whenever a new Connection is accepted.
763 *
764 * @param con Pointer to the new Connection.
765 */
766 void ms_deliver_handle_fast_accept(Connection *con) {
767 for (const auto& dispatcher : fast_dispatchers) {
768 dispatcher->ms_handle_fast_accept(con);
769 }
770 }
771
772 /**
773 * Notify each Dispatcher of a Connection which may have lost
774 * Messages. Call this function whenever you detect that a lossy Connection
775 * has been disconnected.
776 *
777 * @param con Pointer to the broken Connection.
778 */
779 void ms_deliver_handle_reset(Connection *con) {
780 for (const auto& dispatcher : dispatchers) {
781 if (dispatcher->ms_handle_reset(con))
782 return;
783 }
784 }
785 /**
786 * Notify each Dispatcher of a Connection which has been "forgotten" about
787 * by the remote end, implying that messages have probably been lost.
788 * Call this function whenever you detect a reset.
789 *
790 * @param con Pointer to the broken Connection.
791 */
792 void ms_deliver_handle_remote_reset(Connection *con) {
793 for (const auto& dispatcher : dispatchers) {
794 dispatcher->ms_handle_remote_reset(con);
795 }
796 }
797
798 /**
799 * Notify each Dispatcher of a Connection for which reconnection
800 * attempts are being refused. Call this function whenever you
801 * detect that a lossy Connection has been disconnected and it's
802 * impossible to reconnect.
803 *
804 * @param con Pointer to the broken Connection.
805 */
806 void ms_deliver_handle_refused(Connection *con) {
807 for (const auto& dispatcher : dispatchers) {
808 if (dispatcher->ms_handle_refused(con))
809 return;
810 }
811 }
812
813 void set_require_authorizer(bool b) {
814 require_authorizer = b;
815 }
816
817 /**
818 * @} // Dispatcher Interfacing
819 */
820 };
821
822
823
824 #endif