]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/Messenger.h
c832589e88ea5ba46530704d9ec70c532b8736ae
[ceph.git] / ceph / src / msg / Messenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16
17 #ifndef CEPH_MESSENGER_H
18 #define CEPH_MESSENGER_H
19
20 #include <map>
21 #include <deque>
22
23 #include <errno.h>
24 #include <sstream>
25 #include <memory>
26
27 #include "Message.h"
28 #include "Dispatcher.h"
29 #include "Policy.h"
30 #include "common/Throttle.h"
31 #include "include/Context.h"
32 #include "include/types.h"
33 #include "include/ceph_features.h"
34 #include "auth/Crypto.h"
35 #include "common/item_history.h"
36 #include "auth/AuthRegistry.h"
37 #include "compressor_registry.h"
38 #include "include/ceph_assert.h"
39
40 #include <errno.h>
41 #include <sstream>
42 #include <signal.h>
43
44 #define SOCKET_PRIORITY_MIN_DELAY 6
45
46 class Timer;
47
48 class AuthClient;
49 class AuthServer;
50
51 #ifdef UNIT_TESTS_BUILT
52
53 struct Interceptor {
54 std::mutex lock;
55 std::condition_variable cond_var;
56
57 enum ACTION : uint32_t {
58 CONTINUE = 0,
59 FAIL,
60 STOP
61 };
62
63 enum STEP {
64 START_CLIENT_BANNER_EXCHANGE = 1,
65 START_SERVER_BANNER_EXCHANGE,
66 BANNER_EXCHANGE_BANNER_CONNECTING,
67 BANNER_EXCHANGE,
68 HANDLE_PEER_BANNER_BANNER_CONNECTING,
69 HANDLE_PEER_BANNER,
70 HANDLE_PEER_BANNER_PAYLOAD_HELLO_CONNECTING,
71 HANDLE_PEER_BANNER_PAYLOAD,
72 SEND_AUTH_REQUEST,
73 HANDLE_AUTH_REQUEST_ACCEPTING_SIGN,
74 SEND_CLIENT_IDENTITY,
75 SEND_SERVER_IDENTITY,
76 SEND_RECONNECT,
77 SEND_RECONNECT_OK,
78 READY,
79 HANDLE_MESSAGE,
80 READ_MESSAGE_COMPLETE,
81 SESSION_RETRY,
82 SEND_COMPRESSION_REQUEST,
83 HANDLE_COMPRESSION_REQUEST
84 };
85
86 virtual ~Interceptor() {}
87 virtual ACTION intercept(Connection *conn, uint32_t step) = 0;
88 };
89
90 #endif
91
92 class Messenger {
93 private:
94 std::deque<Dispatcher*> dispatchers;
95 std::deque<Dispatcher*> fast_dispatchers;
96 ZTracer::Endpoint trace_endpoint;
97
98 protected:
99 void set_endpoint_addr(const entity_addr_t& a,
100 const entity_name_t &name);
101
102 protected:
103 /// the "name" of the local daemon. eg client.99
104 entity_name_t my_name;
105
106 /// my addr
107 safe_item_history<entity_addrvec_t> my_addrs;
108
109 int default_send_priority;
110 /// std::set to true once the Messenger has started, and std::set to false on shutdown
111 bool started;
112 uint32_t magic;
113 int socket_priority;
114
115 public:
116 AuthClient *auth_client = 0;
117 AuthServer *auth_server = 0;
118
119 #ifdef UNIT_TESTS_BUILT
120 Interceptor *interceptor = nullptr;
121 #endif
122
123 /**
124 * The CephContext this Messenger uses. Many other components initialize themselves
125 * from this value.
126 */
127 CephContext *cct;
128 int crcflags;
129
130 using Policy = ceph::net::Policy<Throttle>;
131
132 public:
133 // allow unauthenticated connections. This is needed for
134 // compatibility with pre-nautilus OSDs, which do not authenticate
135 // the heartbeat sessions.
136 bool require_authorizer = true;
137
138 protected:
139 // for authentication
140 AuthRegistry auth_registry;
141
142 public:
143 /**
144 * Messenger constructor. Call this from your implementation.
145 * Messenger users should construct full implementations directly,
146 * or use the create() function.
147 */
148 Messenger(CephContext *cct_, entity_name_t w);
149 virtual ~Messenger() {}
150
151 /**
152 * create a new messenger
153 *
154 * Create a new messenger instance, with whatever implementation is
155 * available or specified via the configuration in cct.
156 *
157 * @param cct context
158 * @param type name of messenger type
159 * @param name entity name to register
160 * @param lname logical name of the messenger in this process (e.g., "client")
161 * @param nonce nonce value to uniquely identify this instance on the current host
162 */
163 static Messenger *create(CephContext *cct,
164 const std::string &type,
165 entity_name_t name,
166 std::string lname,
167 uint64_t nonce);
168
169 static uint64_t get_random_nonce();
170 static uint64_t get_pid_nonce();
171
172 /**
173 * create a new messenger
174 *
175 * Create a new messenger instance.
176 * Same as the above, but a slightly simpler interface for clients:
177 * - Generate a random nonce
178 * - get the messenger type from cct
179 * - use the client entity_type
180 *
181 * @param cct context
182 * @param lname logical name of the messenger in this process (e.g., "client")
183 */
184 static Messenger *create_client_messenger(CephContext *cct, std::string lname);
185
186 /**
187 * @defgroup Accessors
188 * @{
189 */
190 int get_mytype() const { return my_name.type(); }
191
192 /**
193 * Retrieve the Messenger's name
194 *
195 * @return A const reference to the name this Messenger
196 * currently believes to be its own.
197 */
198 const entity_name_t& get_myname() { return my_name; }
199
200 /**
201 * Retrieve the Messenger's address.
202 *
203 * @return A const reference to the address this Messenger
204 * currently believes to be its own.
205 */
206 const entity_addrvec_t& get_myaddrs() {
207 return *my_addrs;
208 }
209
210 /**
211 * get legacy addr for myself, suitable for protocol v1
212 *
213 * Note that myaddrs might be a proper addrvec with v1 in it, or it might be an
214 * ANY addr (if i am a pure client).
215 */
216 entity_addr_t get_myaddr_legacy() {
217 return my_addrs->as_legacy_addr();
218 }
219
220
221 /**
222 * std::set messenger's instance
223 */
224 uint32_t get_magic() { return magic; }
225 void set_magic(int _magic) { magic = _magic; }
226
227 void set_auth_client(AuthClient *ac) {
228 auth_client = ac;
229 }
230 void set_auth_server(AuthServer *as) {
231 auth_server = as;
232 }
233
234 // for compression
235 CompressorRegistry comp_registry;
236
237 protected:
238 /**
239 * std::set messenger's address
240 */
241 virtual void set_myaddrs(const entity_addrvec_t& a) {
242 my_addrs = a;
243 set_endpoint_addr(a.front(), my_name);
244 }
245 public:
246 /**
247 * @return the zipkin trace endpoint
248 */
249 const ZTracer::Endpoint* get_trace_endpoint() const {
250 return &trace_endpoint;
251 }
252
253 /**
254 * set the name of the local entity. The name is reported to others and
255 * can be changed while the system is running, but doing so at incorrect
256 * times may have bad results.
257 *
258 * @param m The name to std::set.
259 */
260 void set_myname(const entity_name_t& m) { my_name = m; }
261
262 /**
263 * set the unknown address components for this Messenger.
264 * This is useful if the Messenger doesn't know its full address just by
265 * binding, but another Messenger on the same interface has already learned
266 * its full address. This function does not fill in known address elements,
267 * cause a rebind, or do anything of that sort.
268 *
269 * @param addr The address to use as a template.
270 */
271 virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0;
272 /**
273 * set the address for this Messenger. This is useful if the Messenger
274 * binds to a specific address but advertises a different address on the
275 * the network.
276 *
277 * @param addr The address to use.
278 */
279 virtual void set_addrs(const entity_addrvec_t &addr) = 0;
280 /// Get the default send priority.
281 int get_default_send_priority() { return default_send_priority; }
282 /**
283 * Get the number of Messages which the Messenger has received
284 * but not yet dispatched.
285 */
286 virtual int get_dispatch_queue_len() = 0;
287
288 /**
289 * Get age of oldest undelivered message
290 * (0 if the queue is empty)
291 */
292 virtual double get_dispatch_queue_max_age(utime_t now) = 0;
293
294 /**
295 * @} // Accessors
296 */
297
298 /**
299 * @defgroup Configuration
300 * @{
301 */
302 /**
303 * set the cluster protocol in use by this daemon.
304 * This is an init-time function and cannot be called after calling
305 * start() or bind().
306 *
307 * @param p The cluster protocol to use. Defined externally.
308 */
309 virtual void set_cluster_protocol(int p) = 0;
310 /**
311 * set a policy which is applied to all peers who do not have a type-specific
312 * Policy.
313 * This is an init-time function and cannot be called after calling
314 * start() or bind().
315 *
316 * @param p The Policy to apply.
317 */
318 virtual void set_default_policy(Policy p) = 0;
319 /**
320 * set a policy which is applied to all peers of the given type.
321 * This is an init-time function and cannot be called after calling
322 * start() or bind().
323 *
324 * @param type The peer type this policy applies to.
325 * @param p The policy to apply.
326 */
327 virtual void set_policy(int type, Policy p) = 0;
328 /**
329 * set the Policy associated with a type of peer.
330 *
331 * This can be called either on initial setup, or after connections
332 * are already established. However, the policies for existing
333 * connections will not be affected; the new policy will only apply
334 * to future connections.
335 *
336 * @param t The peer type to get the default policy for.
337 * @return A const Policy reference.
338 */
339 virtual Policy get_policy(int t) = 0;
340 /**
341 * Get the default Policy
342 *
343 * @return A const Policy reference.
344 */
345 virtual Policy get_default_policy() = 0;
346 /**
347 * set Throttlers applied to all Messages from the given type of peer
348 *
349 * This is an init-time function and cannot be called after calling
350 * start() or bind().
351 *
352 * @param type The peer type the Throttlers will apply to.
353 * @param bytes The Throttle for the number of bytes carried by the message
354 * @param msgs The Throttle for the number of messages for this @p type
355 * @note The Messenger does not take ownership of the Throttle pointers, but
356 * you must not destroy them before you destroy the Messenger.
357 */
358 virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0;
359 /**
360 * set the default send priority
361 *
362 * This is an init-time function and must be called *before* calling
363 * start().
364 *
365 * @param p The cluster protocol to use. Defined externally.
366 */
367 void set_default_send_priority(int p) {
368 ceph_assert(!started);
369 default_send_priority = p;
370 }
371 /**
372 * set the priority(SO_PRIORITY) for all packets to be sent on this socket.
373 *
374 * Linux uses this value to order the networking queues: packets with a higher
375 * priority may be processed first depending on the selected device queueing
376 * discipline.
377 *
378 * @param prio The priority. Setting a priority outside the range 0 to 6
379 * requires the CAP_NET_ADMIN capability.
380 */
381 void set_socket_priority(int prio) {
382 socket_priority = prio;
383 }
384 /**
385 * Get the socket priority
386 *
387 * @return the socket priority
388 */
389 int get_socket_priority() {
390 return socket_priority;
391 }
392 /**
393 * Add a new Dispatcher to the front of the list. If you add
394 * a Dispatcher which is already included, it will get a duplicate
395 * entry. This will reduce efficiency but not break anything.
396 *
397 * @param d The Dispatcher to insert into the list.
398 */
399 void add_dispatcher_head(Dispatcher *d) {
400 bool first = dispatchers.empty();
401 dispatchers.push_front(d);
402 if (d->ms_can_fast_dispatch_any())
403 fast_dispatchers.push_front(d);
404 if (first)
405 ready();
406 }
407 /**
408 * Add a new Dispatcher to the end of the list. If you add
409 * a Dispatcher which is already included, it will get a duplicate
410 * entry. This will reduce efficiency but not break anything.
411 *
412 * @param d The Dispatcher to insert into the list.
413 */
414 void add_dispatcher_tail(Dispatcher *d) {
415 bool first = dispatchers.empty();
416 dispatchers.push_back(d);
417 if (d->ms_can_fast_dispatch_any())
418 fast_dispatchers.push_back(d);
419 if (first)
420 ready();
421 }
422 /**
423 * Bind the Messenger to a specific address. If bind_addr
424 * is not completely filled in the system will use the
425 * valid portions and cycle through the unset ones (eg, the port)
426 * in an unspecified order.
427 *
428 * @param bind_addr The address to bind to.
429 * @return 0 on success, or -1 on error, or -errno if
430 * we can be more specific about the failure.
431 */
432 virtual int bind(const entity_addr_t& bind_addr) = 0;
433
434 virtual int bindv(const entity_addrvec_t& addrs);
435
436 /**
437 * This function performs a full restart of the Messenger component,
438 * whatever that means. Other entities who connect to this
439 * Messenger post-rebind() should perceive it as a new entity which
440 * they have not previously contacted, and it MUST bind to a
441 * different address than it did previously.
442 *
443 * @param avoid_ports Additional port to avoid binding to.
444 */
445 virtual int rebind(const std::set<int>& avoid_ports) { return -EOPNOTSUPP; }
446 /**
447 * Bind the 'client' Messenger to a specific address.Messenger will bind
448 * the address before connect to others when option ms_bind_before_connect
449 * is true.
450 * @param bind_addr The address to bind to.
451 * @return 0 on success, or -1 on error, or -errno if
452 * we can be more specific about the failure.
453 */
454 virtual int client_bind(const entity_addr_t& bind_addr) = 0;
455
456 /**
457 * reset the 'client' Messenger. Mark all the existing Connections down
458 * and update 'nonce'.
459 */
460 virtual int client_reset() = 0;
461
462
463 virtual bool should_use_msgr2() {
464 return false;
465 }
466
467 /**
468 * @} // Configuration
469 */
470
471 /**
472 * @defgroup Startup/Shutdown
473 * @{
474 */
475 /**
476 * Perform any resource allocation, thread startup, etc
477 * that is required before attempting to connect to other
478 * Messengers or transmit messages.
479 * Once this function completes, started shall be set to true.
480 *
481 * @return 0 on success; -errno on failure.
482 */
483 virtual int start() { started = true; return 0; }
484
485 // shutdown
486 /**
487 * Block until the Messenger has finished shutting down (according
488 * to the shutdown() function).
489 * It is valid to call this after calling shutdown(), but it must
490 * be called before deleting the Messenger.
491 */
492 virtual void wait() = 0;
493 /**
494 * Initiate a shutdown of the Messenger.
495 *
496 * @return 0 on success, -errno otherwise.
497 */
498 virtual int shutdown() { started = false; return 0; }
499 /**
500 * @} // Startup/Shutdown
501 */
502
503 /**
504 * @defgroup Messaging
505 * @{
506 */
507 /**
508 * Queue the given Message for the given entity.
509 * Success in this function does not guarantee Message delivery, only
510 * success in queueing the Message. Other guarantees may be provided based
511 * on the Connection policy associated with the dest.
512 *
513 * @param m The Message to send. The Messenger consumes a single reference
514 * when you pass it in.
515 * @param dest The entity to send the Message to.
516 *
517 * DEPRECATED: please do not use this interface for any new code;
518 * use the Connection* variant.
519 *
520 * @return 0 on success, or -errno on failure.
521 */
522 virtual int send_to(
523 Message *m,
524 int type,
525 const entity_addrvec_t& addr) = 0;
526 int send_to_mon(
527 Message *m, const entity_addrvec_t& addrs) {
528 return send_to(m, CEPH_ENTITY_TYPE_MON, addrs);
529 }
530 int send_to_mds(
531 Message *m, const entity_addrvec_t& addrs) {
532 return send_to(m, CEPH_ENTITY_TYPE_MDS, addrs);
533 }
534
535 /**
536 * @} // Messaging
537 */
538 /**
539 * @defgroup Connection Management
540 * @{
541 */
542 /**
543 * Get the Connection object associated with a given entity. If a
544 * Connection does not exist, create one and establish a logical connection.
545 * The caller owns a reference when this returns. Call ->put() when you're
546 * done!
547 *
548 * @param dest The entity to get a connection for.
549 */
550 virtual ConnectionRef connect_to(
551 int type, const entity_addrvec_t& dest,
552 bool anon=false, bool not_local_dest=false) = 0;
553 ConnectionRef connect_to_mon(const entity_addrvec_t& dest,
554 bool anon=false, bool not_local_dest=false) {
555 return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon, not_local_dest);
556 }
557 ConnectionRef connect_to_mds(const entity_addrvec_t& dest,
558 bool anon=false, bool not_local_dest=false) {
559 return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon, not_local_dest);
560 }
561 ConnectionRef connect_to_osd(const entity_addrvec_t& dest,
562 bool anon=false, bool not_local_dest=false) {
563 return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon, not_local_dest);
564 }
565 ConnectionRef connect_to_mgr(const entity_addrvec_t& dest,
566 bool anon=false, bool not_local_dest=false) {
567 return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon, not_local_dest);
568 }
569
570 /**
571 * Get the Connection object associated with ourselves.
572 */
573 virtual ConnectionRef get_loopback_connection() = 0;
574 /**
575 * Mark down a Connection to a remote.
576 *
577 * This will cause us to discard our outgoing queue for them, and if
578 * reset detection is enabled in the policy and the endpoint tries
579 * to reconnect they will discard their queue when we inform them of
580 * the session reset.
581 *
582 * If there is no Connection to the given dest, it is a no-op.
583 *
584 * This generates a RESET notification to the Dispatcher.
585 *
586 * DEPRECATED: please do not use this interface for any new code;
587 * use the Connection* variant.
588 *
589 * @param a The address to mark down.
590 */
591 virtual void mark_down(const entity_addr_t& a) = 0;
592 virtual void mark_down_addrs(const entity_addrvec_t& a) {
593 mark_down(a.legacy_addr());
594 }
595 /**
596 * Mark all the existing Connections down. This is equivalent
597 * to iterating over all Connections and calling mark_down()
598 * on each.
599 *
600 * This will generate a RESET event for each closed connections.
601 */
602 virtual void mark_down_all() = 0;
603 /**
604 * @} // Connection Management
605 */
606 protected:
607 /**
608 * @defgroup Subclass Interfacing
609 * @{
610 */
611 /**
612 * A courtesy function for Messenger implementations which
613 * will be called when we receive our first Dispatcher.
614 */
615 virtual void ready() { }
616 /**
617 * @} // Subclass Interfacing
618 */
619 public:
620 #ifdef CEPH_USE_SIGPIPE_BLOCKER
621 /**
622 * We need to disable SIGPIPE on all platforms, and if they
623 * don't give us a better mechanism (read: are on Solaris) that
624 * means blocking the signal whenever we do a send or sendmsg...
625 * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope
626 * whenever doing so. On most systems that's blank, but on systems where
627 * it's needed we construct an RAII object to plug and un-plug the SIGPIPE.
628 * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
629 */
630 struct sigpipe_stopper {
631 bool blocked;
632 sigset_t existing_mask;
633 sigset_t pipe_mask;
634 sigpipe_stopper() {
635 sigemptyset(&pipe_mask);
636 sigaddset(&pipe_mask, SIGPIPE);
637 sigset_t signals;
638 sigemptyset(&signals);
639 sigpending(&signals);
640 if (sigismember(&signals, SIGPIPE)) {
641 blocked = false;
642 } else {
643 blocked = true;
644 int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
645 ceph_assert(r == 0);
646 }
647 }
648 ~sigpipe_stopper() {
649 if (blocked) {
650 struct timespec nowait{0};
651 int r = sigtimedwait(&pipe_mask, 0, &nowait);
652 ceph_assert(r == EAGAIN || r == 0);
653 r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
654 ceph_assert(r == 0);
655 }
656 }
657 };
658 # define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper();
659 #else
660 # define MSGR_SIGPIPE_STOPPER
661 #endif
662 /**
663 * @defgroup Dispatcher Interfacing
664 * @{
665 */
666 /**
667 * Determine whether a message can be fast-dispatched. We will
668 * query each Dispatcher in sequence to determine if they are
669 * capable of handling a particular message via "fast dispatch".
670 *
671 * @param m The Message we are testing.
672 */
673 bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
674 for (const auto &dispatcher : fast_dispatchers) {
675 if (dispatcher->ms_can_fast_dispatch2(m))
676 return true;
677 }
678 return false;
679 }
680
681 /**
682 * Deliver a single Message via "fast dispatch".
683 *
684 * @param m The Message we are fast dispatching.
685 * If none of our Dispatchers can handle it, ceph_abort().
686 */
687 void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
688 m->set_dispatch_stamp(ceph_clock_now());
689 for (const auto &dispatcher : fast_dispatchers) {
690 if (dispatcher->ms_can_fast_dispatch2(m)) {
691 dispatcher->ms_fast_dispatch2(m);
692 return;
693 }
694 }
695 ceph_abort();
696 }
697 void ms_fast_dispatch(Message *m) {
698 return ms_fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
699 }
700 /**
701 *
702 */
703 void ms_fast_preprocess(const ceph::ref_t<Message> &m) {
704 for (const auto &dispatcher : fast_dispatchers) {
705 dispatcher->ms_fast_preprocess2(m);
706 }
707 }
708 /**
709 * Deliver a single Message. Send it to each Dispatcher
710 * in sequence until one of them handles it.
711 * If none of our Dispatchers can handle it, ceph_abort().
712 *
713 * @param m The Message to deliver.
714 */
715 void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
716 m->set_dispatch_stamp(ceph_clock_now());
717 for (const auto &dispatcher : dispatchers) {
718 if (dispatcher->ms_dispatch2(m))
719 return;
720 }
721 lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
722 << m->get_source_inst() << dendl;
723 ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
724 }
725 void ms_deliver_dispatch(Message *m) {
726 return ms_deliver_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
727 }
728 /**
729 * Notify each Dispatcher of a new Connection. Call
730 * this function whenever a new Connection is initiated or
731 * reconnects.
732 *
733 * @param con Pointer to the new Connection.
734 */
735 void ms_deliver_handle_connect(Connection *con) {
736 for (const auto& dispatcher : dispatchers) {
737 dispatcher->ms_handle_connect(con);
738 }
739 }
740
741 /**
742 * Notify each fast Dispatcher of a new Connection. Call
743 * this function whenever a new Connection is initiated or
744 * reconnects.
745 *
746 * @param con Pointer to the new Connection.
747 */
748 void ms_deliver_handle_fast_connect(Connection *con) {
749 for (const auto& dispatcher : fast_dispatchers) {
750 dispatcher->ms_handle_fast_connect(con);
751 }
752 }
753
754 /**
755 * Notify each Dispatcher of a new incoming Connection. Call
756 * this function whenever a new Connection is accepted.
757 *
758 * @param con Pointer to the new Connection.
759 */
760 void ms_deliver_handle_accept(Connection *con) {
761 for (const auto& dispatcher : dispatchers) {
762 dispatcher->ms_handle_accept(con);
763 }
764 }
765
766 /**
767 * Notify each fast Dispatcher of a new incoming Connection. Call
768 * this function whenever a new Connection is accepted.
769 *
770 * @param con Pointer to the new Connection.
771 */
772 void ms_deliver_handle_fast_accept(Connection *con) {
773 for (const auto& dispatcher : fast_dispatchers) {
774 dispatcher->ms_handle_fast_accept(con);
775 }
776 }
777
778 /**
779 * Notify each Dispatcher of a Connection which may have lost
780 * Messages. Call this function whenever you detect that a lossy Connection
781 * has been disconnected.
782 *
783 * @param con Pointer to the broken Connection.
784 */
785 void ms_deliver_handle_reset(Connection *con) {
786 for (const auto& dispatcher : dispatchers) {
787 if (dispatcher->ms_handle_reset(con))
788 return;
789 }
790 }
791 /**
792 * Notify each Dispatcher of a Connection which has been "forgotten" about
793 * by the remote end, implying that messages have probably been lost.
794 * Call this function whenever you detect a reset.
795 *
796 * @param con Pointer to the broken Connection.
797 */
798 void ms_deliver_handle_remote_reset(Connection *con) {
799 for (const auto& dispatcher : dispatchers) {
800 dispatcher->ms_handle_remote_reset(con);
801 }
802 }
803
804 /**
805 * Notify each Dispatcher of a Connection for which reconnection
806 * attempts are being refused. Call this function whenever you
807 * detect that a lossy Connection has been disconnected and it's
808 * impossible to reconnect.
809 *
810 * @param con Pointer to the broken Connection.
811 */
812 void ms_deliver_handle_refused(Connection *con) {
813 for (const auto& dispatcher : dispatchers) {
814 if (dispatcher->ms_handle_refused(con))
815 return;
816 }
817 }
818
819 void set_require_authorizer(bool b) {
820 require_authorizer = b;
821 }
822
823 /**
824 * @} // Dispatcher Interfacing
825 */
826 };
827
828
829
830 #endif