]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/Messenger.h
import ceph quincy 17.2.6
[ceph.git] / ceph / src / msg / Messenger.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
11fdf7f2 10 * License version 2.1, as published by the Free Software
7c673cae 11 * Foundation. See file COPYING.
11fdf7f2 12 *
7c673cae
FG
13 */
14
15
16
17#ifndef CEPH_MESSENGER_H
18#define CEPH_MESSENGER_H
19
11fdf7f2 20#include <deque>
39ae355f
TL
21#include <map>
22#include <optional>
11fdf7f2
TL
23
24#include <errno.h>
25#include <sstream>
26#include <memory>
7c673cae
FG
27
28#include "Message.h"
29#include "Dispatcher.h"
11fdf7f2 30#include "Policy.h"
11fdf7f2 31#include "common/Throttle.h"
7c673cae
FG
32#include "include/Context.h"
33#include "include/types.h"
34#include "include/ceph_features.h"
35#include "auth/Crypto.h"
11fdf7f2
TL
36#include "common/item_history.h"
37#include "auth/AuthRegistry.h"
20effc67 38#include "compressor_registry.h"
11fdf7f2 39#include "include/ceph_assert.h"
7c673cae
FG
40
41#include <errno.h>
42#include <sstream>
3efd9988 43#include <signal.h>
7c673cae
FG
44
45#define SOCKET_PRIORITY_MIN_DELAY 6
46
47class Timer;
48
11fdf7f2
TL
49class AuthClient;
50class AuthServer;
51
52#ifdef UNIT_TESTS_BUILT
53
54struct Interceptor {
55 std::mutex lock;
56 std::condition_variable cond_var;
57
58 enum ACTION : uint32_t {
59 CONTINUE = 0,
60 FAIL,
61 STOP
62 };
63
f67539c2
TL
64 enum STEP {
65 START_CLIENT_BANNER_EXCHANGE = 1,
66 START_SERVER_BANNER_EXCHANGE,
67 BANNER_EXCHANGE_BANNER_CONNECTING,
68 BANNER_EXCHANGE,
69 HANDLE_PEER_BANNER_BANNER_CONNECTING,
70 HANDLE_PEER_BANNER,
71 HANDLE_PEER_BANNER_PAYLOAD_HELLO_CONNECTING,
72 HANDLE_PEER_BANNER_PAYLOAD,
73 SEND_AUTH_REQUEST,
74 HANDLE_AUTH_REQUEST_ACCEPTING_SIGN,
75 SEND_CLIENT_IDENTITY,
76 SEND_SERVER_IDENTITY,
77 SEND_RECONNECT,
78 SEND_RECONNECT_OK,
79 READY,
80 HANDLE_MESSAGE,
81 READ_MESSAGE_COMPLETE,
20effc67
TL
82 SESSION_RETRY,
83 SEND_COMPRESSION_REQUEST,
84 HANDLE_COMPRESSION_REQUEST
f67539c2
TL
85 };
86
11fdf7f2
TL
87 virtual ~Interceptor() {}
88 virtual ACTION intercept(Connection *conn, uint32_t step) = 0;
89};
90
91#endif
7c673cae
FG
92
93class Messenger {
94private:
11fdf7f2
TL
95 std::deque<Dispatcher*> dispatchers;
96 std::deque<Dispatcher*> fast_dispatchers;
7c673cae
FG
97 ZTracer::Endpoint trace_endpoint;
98
11fdf7f2 99protected:
7c673cae
FG
100 void set_endpoint_addr(const entity_addr_t& a,
101 const entity_name_t &name);
102
103protected:
104 /// the "name" of the local daemon. eg client.99
11fdf7f2
TL
105 entity_name_t my_name;
106
107 /// my addr
108 safe_item_history<entity_addrvec_t> my_addrs;
109
7c673cae 110 int default_send_priority;
9f95a23c 111 /// std::set to true once the Messenger has started, and std::set to false on shutdown
7c673cae
FG
112 bool started;
113 uint32_t magic;
114 int socket_priority;
115
116public:
11fdf7f2
TL
117 AuthClient *auth_client = 0;
118 AuthServer *auth_server = 0;
119
120#ifdef UNIT_TESTS_BUILT
121 Interceptor *interceptor = nullptr;
122#endif
123
7c673cae
FG
124 /**
125 * The CephContext this Messenger uses. Many other components initialize themselves
126 * from this value.
127 */
128 CephContext *cct;
129 int crcflags;
130
11fdf7f2 131 using Policy = ceph::net::Policy<Throttle>;
7c673cae 132
9f95a23c
TL
133public:
134 // allow unauthenticated connections. This is needed for
135 // compatibility with pre-nautilus OSDs, which do not authenticate
136 // the heartbeat sessions.
137 bool require_authorizer = true;
138
11fdf7f2
TL
139protected:
140 // for authentication
141 AuthRegistry auth_registry;
142
143public:
7c673cae
FG
144 /**
145 * Messenger constructor. Call this from your implementation.
146 * Messenger users should construct full implementations directly,
147 * or use the create() function.
148 */
11fdf7f2 149 Messenger(CephContext *cct_, entity_name_t w);
7c673cae
FG
150 virtual ~Messenger() {}
151
152 /**
153 * create a new messenger
154 *
155 * Create a new messenger instance, with whatever implementation is
156 * available or specified via the configuration in cct.
157 *
158 * @param cct context
159 * @param type name of messenger type
160 * @param name entity name to register
161 * @param lname logical name of the messenger in this process (e.g., "client")
162 * @param nonce nonce value to uniquely identify this instance on the current host
7c673cae
FG
163 */
164 static Messenger *create(CephContext *cct,
9f95a23c 165 const std::string &type,
7c673cae 166 entity_name_t name,
9f95a23c 167 std::string lname,
f67539c2 168 uint64_t nonce);
7c673cae 169
9f95a23c
TL
170 static uint64_t get_random_nonce();
171 static uint64_t get_pid_nonce();
172
7c673cae
FG
173 /**
174 * create a new messenger
175 *
176 * Create a new messenger instance.
177 * Same as the above, but a slightly simpler interface for clients:
178 * - Generate a random nonce
7c673cae
FG
179 * - get the messenger type from cct
180 * - use the client entity_type
181 *
182 * @param cct context
183 * @param lname logical name of the messenger in this process (e.g., "client")
184 */
9f95a23c 185 static Messenger *create_client_messenger(CephContext *cct, std::string lname);
7c673cae
FG
186
187 /**
188 * @defgroup Accessors
189 * @{
190 */
11fdf7f2
TL
191 int get_mytype() const { return my_name.type(); }
192
7c673cae 193 /**
11fdf7f2 194 * Retrieve the Messenger's name
7c673cae 195 *
11fdf7f2 196 * @return A const reference to the name this Messenger
7c673cae
FG
197 * currently believes to be its own.
198 */
11fdf7f2 199 const entity_name_t& get_myname() { return my_name; }
7c673cae
FG
200
201 /**
202 * Retrieve the Messenger's address.
203 *
204 * @return A const reference to the address this Messenger
205 * currently believes to be its own.
206 */
11fdf7f2
TL
207 const entity_addrvec_t& get_myaddrs() {
208 return *my_addrs;
209 }
210
211 /**
212 * get legacy addr for myself, suitable for protocol v1
213 *
214 * Note that myaddrs might be a proper addrvec with v1 in it, or it might be an
215 * ANY addr (if i am a pure client).
216 */
217 entity_addr_t get_myaddr_legacy() {
218 return my_addrs->as_legacy_addr();
219 }
220
221
222 /**
9f95a23c 223 * std::set messenger's instance
11fdf7f2
TL
224 */
225 uint32_t get_magic() { return magic; }
226 void set_magic(int _magic) { magic = _magic; }
227
228 void set_auth_client(AuthClient *ac) {
229 auth_client = ac;
230 }
231 void set_auth_server(AuthServer *as) {
232 auth_server = as;
233 }
234
20effc67
TL
235 // for compression
236 CompressorRegistry comp_registry;
237
7c673cae
FG
238protected:
239 /**
9f95a23c 240 * std::set messenger's address
7c673cae 241 */
11fdf7f2
TL
242 virtual void set_myaddrs(const entity_addrvec_t& a) {
243 my_addrs = a;
244 set_endpoint_addr(a.front(), my_name);
7c673cae
FG
245 }
246public:
247 /**
248 * @return the zipkin trace endpoint
249 */
250 const ZTracer::Endpoint* get_trace_endpoint() const {
251 return &trace_endpoint;
252 }
253
7c673cae 254 /**
9f95a23c 255 * set the name of the local entity. The name is reported to others and
7c673cae
FG
256 * can be changed while the system is running, but doing so at incorrect
257 * times may have bad results.
258 *
9f95a23c 259 * @param m The name to std::set.
7c673cae 260 */
11fdf7f2
TL
261 void set_myname(const entity_name_t& m) { my_name = m; }
262
7c673cae 263 /**
9f95a23c 264 * set the unknown address components for this Messenger.
7c673cae
FG
265 * This is useful if the Messenger doesn't know its full address just by
266 * binding, but another Messenger on the same interface has already learned
267 * its full address. This function does not fill in known address elements,
268 * cause a rebind, or do anything of that sort.
269 *
270 * @param addr The address to use as a template.
271 */
11fdf7f2 272 virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0;
39ae355f 273
7c673cae
FG
274 /// Get the default send priority.
275 int get_default_send_priority() { return default_send_priority; }
276 /**
277 * Get the number of Messages which the Messenger has received
278 * but not yet dispatched.
279 */
280 virtual int get_dispatch_queue_len() = 0;
281
282 /**
283 * Get age of oldest undelivered message
284 * (0 if the queue is empty)
285 */
286 virtual double get_dispatch_queue_max_age(utime_t now) = 0;
7c673cae
FG
287
288 /**
289 * @} // Accessors
290 */
291
292 /**
293 * @defgroup Configuration
294 * @{
295 */
296 /**
9f95a23c 297 * set the cluster protocol in use by this daemon.
7c673cae
FG
298 * This is an init-time function and cannot be called after calling
299 * start() or bind().
300 *
301 * @param p The cluster protocol to use. Defined externally.
302 */
303 virtual void set_cluster_protocol(int p) = 0;
304 /**
9f95a23c 305 * set a policy which is applied to all peers who do not have a type-specific
7c673cae
FG
306 * Policy.
307 * This is an init-time function and cannot be called after calling
308 * start() or bind().
309 *
310 * @param p The Policy to apply.
311 */
312 virtual void set_default_policy(Policy p) = 0;
313 /**
9f95a23c 314 * set a policy which is applied to all peers of the given type.
7c673cae
FG
315 * This is an init-time function and cannot be called after calling
316 * start() or bind().
317 *
318 * @param type The peer type this policy applies to.
319 * @param p The policy to apply.
320 */
321 virtual void set_policy(int type, Policy p) = 0;
322 /**
9f95a23c 323 * set the Policy associated with a type of peer.
7c673cae
FG
324 *
325 * This can be called either on initial setup, or after connections
326 * are already established. However, the policies for existing
327 * connections will not be affected; the new policy will only apply
328 * to future connections.
329 *
330 * @param t The peer type to get the default policy for.
331 * @return A const Policy reference.
332 */
333 virtual Policy get_policy(int t) = 0;
334 /**
335 * Get the default Policy
336 *
337 * @return A const Policy reference.
338 */
339 virtual Policy get_default_policy() = 0;
340 /**
9f95a23c 341 * set Throttlers applied to all Messages from the given type of peer
7c673cae
FG
342 *
343 * This is an init-time function and cannot be called after calling
344 * start() or bind().
345 *
346 * @param type The peer type the Throttlers will apply to.
347 * @param bytes The Throttle for the number of bytes carried by the message
348 * @param msgs The Throttle for the number of messages for this @p type
349 * @note The Messenger does not take ownership of the Throttle pointers, but
350 * you must not destroy them before you destroy the Messenger.
351 */
352 virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0;
353 /**
9f95a23c 354 * set the default send priority
7c673cae
FG
355 *
356 * This is an init-time function and must be called *before* calling
357 * start().
358 *
359 * @param p The cluster protocol to use. Defined externally.
360 */
361 void set_default_send_priority(int p) {
11fdf7f2 362 ceph_assert(!started);
7c673cae
FG
363 default_send_priority = p;
364 }
365 /**
9f95a23c 366 * set the priority(SO_PRIORITY) for all packets to be sent on this socket.
7c673cae
FG
367 *
368 * Linux uses this value to order the networking queues: packets with a higher
369 * priority may be processed first depending on the selected device queueing
370 * discipline.
371 *
372 * @param prio The priority. Setting a priority outside the range 0 to 6
373 * requires the CAP_NET_ADMIN capability.
374 */
375 void set_socket_priority(int prio) {
376 socket_priority = prio;
377 }
378 /**
379 * Get the socket priority
380 *
381 * @return the socket priority
382 */
383 int get_socket_priority() {
384 return socket_priority;
385 }
386 /**
387 * Add a new Dispatcher to the front of the list. If you add
388 * a Dispatcher which is already included, it will get a duplicate
389 * entry. This will reduce efficiency but not break anything.
390 *
391 * @param d The Dispatcher to insert into the list.
392 */
11fdf7f2 393 void add_dispatcher_head(Dispatcher *d) {
7c673cae
FG
394 bool first = dispatchers.empty();
395 dispatchers.push_front(d);
396 if (d->ms_can_fast_dispatch_any())
397 fast_dispatchers.push_front(d);
398 if (first)
399 ready();
400 }
401 /**
402 * Add a new Dispatcher to the end of the list. If you add
403 * a Dispatcher which is already included, it will get a duplicate
404 * entry. This will reduce efficiency but not break anything.
405 *
406 * @param d The Dispatcher to insert into the list.
407 */
11fdf7f2 408 void add_dispatcher_tail(Dispatcher *d) {
7c673cae
FG
409 bool first = dispatchers.empty();
410 dispatchers.push_back(d);
411 if (d->ms_can_fast_dispatch_any())
412 fast_dispatchers.push_back(d);
413 if (first)
414 ready();
415 }
416 /**
417 * Bind the Messenger to a specific address. If bind_addr
418 * is not completely filled in the system will use the
419 * valid portions and cycle through the unset ones (eg, the port)
420 * in an unspecified order.
421 *
422 * @param bind_addr The address to bind to.
39ae355f 423 * @patam public_addrs The addresses to announce over the network
7c673cae
FG
424 * @return 0 on success, or -1 on error, or -errno if
425 * we can be more specific about the failure.
426 */
39ae355f
TL
427 virtual int bind(const entity_addr_t& bind_addr,
428 std::optional<entity_addrvec_t> public_addrs=std::nullopt) = 0;
11fdf7f2 429
39ae355f
TL
430 virtual int bindv(const entity_addrvec_t& bind_addrs,
431 std::optional<entity_addrvec_t> public_addrs=std::nullopt);
f6b5b4d7 432
7c673cae
FG
433 /**
434 * This function performs a full restart of the Messenger component,
435 * whatever that means. Other entities who connect to this
436 * Messenger post-rebind() should perceive it as a new entity which
437 * they have not previously contacted, and it MUST bind to a
438 * different address than it did previously.
439 *
440 * @param avoid_ports Additional port to avoid binding to.
441 */
9f95a23c 442 virtual int rebind(const std::set<int>& avoid_ports) { return -EOPNOTSUPP; }
7c673cae
FG
443 /**
444 * Bind the 'client' Messenger to a specific address.Messenger will bind
445 * the address before connect to others when option ms_bind_before_connect
446 * is true.
447 * @param bind_addr The address to bind to.
448 * @return 0 on success, or -1 on error, or -errno if
f6b5b4d7 449 * we can be more specific about the failure.
7c673cae
FG
450 */
451 virtual int client_bind(const entity_addr_t& bind_addr) = 0;
11fdf7f2 452
f6b5b4d7
TL
453 /**
454 * reset the 'client' Messenger. Mark all the existing Connections down
455 * and update 'nonce'.
456 */
457 virtual int client_reset() = 0;
11fdf7f2
TL
458
459
460 virtual bool should_use_msgr2() {
461 return false;
462 }
463
7c673cae
FG
464 /**
465 * @} // Configuration
466 */
467
468 /**
469 * @defgroup Startup/Shutdown
470 * @{
471 */
472 /**
473 * Perform any resource allocation, thread startup, etc
474 * that is required before attempting to connect to other
475 * Messengers or transmit messages.
476 * Once this function completes, started shall be set to true.
477 *
478 * @return 0 on success; -errno on failure.
479 */
480 virtual int start() { started = true; return 0; }
481
482 // shutdown
483 /**
484 * Block until the Messenger has finished shutting down (according
485 * to the shutdown() function).
486 * It is valid to call this after calling shutdown(), but it must
487 * be called before deleting the Messenger.
488 */
489 virtual void wait() = 0;
490 /**
491 * Initiate a shutdown of the Messenger.
492 *
493 * @return 0 on success, -errno otherwise.
494 */
495 virtual int shutdown() { started = false; return 0; }
496 /**
497 * @} // Startup/Shutdown
498 */
499
500 /**
501 * @defgroup Messaging
502 * @{
503 */
504 /**
505 * Queue the given Message for the given entity.
506 * Success in this function does not guarantee Message delivery, only
507 * success in queueing the Message. Other guarantees may be provided based
508 * on the Connection policy associated with the dest.
509 *
510 * @param m The Message to send. The Messenger consumes a single reference
511 * when you pass it in.
512 * @param dest The entity to send the Message to.
513 *
514 * DEPRECATED: please do not use this interface for any new code;
515 * use the Connection* variant.
516 *
517 * @return 0 on success, or -errno on failure.
518 */
11fdf7f2
TL
519 virtual int send_to(
520 Message *m,
521 int type,
522 const entity_addrvec_t& addr) = 0;
523 int send_to_mon(
524 Message *m, const entity_addrvec_t& addrs) {
525 return send_to(m, CEPH_ENTITY_TYPE_MON, addrs);
526 }
527 int send_to_mds(
528 Message *m, const entity_addrvec_t& addrs) {
529 return send_to(m, CEPH_ENTITY_TYPE_MDS, addrs);
530 }
7c673cae
FG
531
532 /**
533 * @} // Messaging
534 */
535 /**
536 * @defgroup Connection Management
537 * @{
538 */
539 /**
540 * Get the Connection object associated with a given entity. If a
541 * Connection does not exist, create one and establish a logical connection.
542 * The caller owns a reference when this returns. Call ->put() when you're
543 * done!
544 *
545 * @param dest The entity to get a connection for.
546 */
11fdf7f2 547 virtual ConnectionRef connect_to(
9f95a23c
TL
548 int type, const entity_addrvec_t& dest,
549 bool anon=false, bool not_local_dest=false) = 0;
550 ConnectionRef connect_to_mon(const entity_addrvec_t& dest,
551 bool anon=false, bool not_local_dest=false) {
552 return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon, not_local_dest);
11fdf7f2 553 }
9f95a23c
TL
554 ConnectionRef connect_to_mds(const entity_addrvec_t& dest,
555 bool anon=false, bool not_local_dest=false) {
556 return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon, not_local_dest);
11fdf7f2 557 }
9f95a23c
TL
558 ConnectionRef connect_to_osd(const entity_addrvec_t& dest,
559 bool anon=false, bool not_local_dest=false) {
560 return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon, not_local_dest);
11fdf7f2 561 }
9f95a23c
TL
562 ConnectionRef connect_to_mgr(const entity_addrvec_t& dest,
563 bool anon=false, bool not_local_dest=false) {
564 return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon, not_local_dest);
11fdf7f2
TL
565 }
566
7c673cae
FG
567 /**
568 * Get the Connection object associated with ourselves.
569 */
570 virtual ConnectionRef get_loopback_connection() = 0;
571 /**
572 * Mark down a Connection to a remote.
573 *
574 * This will cause us to discard our outgoing queue for them, and if
575 * reset detection is enabled in the policy and the endpoint tries
576 * to reconnect they will discard their queue when we inform them of
577 * the session reset.
578 *
579 * If there is no Connection to the given dest, it is a no-op.
580 *
581 * This generates a RESET notification to the Dispatcher.
582 *
583 * DEPRECATED: please do not use this interface for any new code;
584 * use the Connection* variant.
585 *
586 * @param a The address to mark down.
587 */
588 virtual void mark_down(const entity_addr_t& a) = 0;
11fdf7f2
TL
589 virtual void mark_down_addrs(const entity_addrvec_t& a) {
590 mark_down(a.legacy_addr());
591 }
7c673cae
FG
592 /**
593 * Mark all the existing Connections down. This is equivalent
594 * to iterating over all Connections and calling mark_down()
595 * on each.
596 *
597 * This will generate a RESET event for each closed connections.
598 */
599 virtual void mark_down_all() = 0;
600 /**
601 * @} // Connection Management
602 */
603protected:
604 /**
605 * @defgroup Subclass Interfacing
606 * @{
607 */
608 /**
609 * A courtesy function for Messenger implementations which
610 * will be called when we receive our first Dispatcher.
611 */
612 virtual void ready() { }
613 /**
614 * @} // Subclass Interfacing
615 */
3efd9988
FG
616public:
617#ifdef CEPH_USE_SIGPIPE_BLOCKER
618 /**
619 * We need to disable SIGPIPE on all platforms, and if they
620 * don't give us a better mechanism (read: are on Solaris) that
621 * means blocking the signal whenever we do a send or sendmsg...
622 * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope
623 * whenever doing so. On most systems that's blank, but on systems where
624 * it's needed we construct an RAII object to plug and un-plug the SIGPIPE.
625 * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
626 */
627 struct sigpipe_stopper {
628 bool blocked;
629 sigset_t existing_mask;
630 sigset_t pipe_mask;
631 sigpipe_stopper() {
632 sigemptyset(&pipe_mask);
633 sigaddset(&pipe_mask, SIGPIPE);
634 sigset_t signals;
635 sigemptyset(&signals);
636 sigpending(&signals);
637 if (sigismember(&signals, SIGPIPE)) {
638 blocked = false;
639 } else {
640 blocked = true;
641 int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
11fdf7f2 642 ceph_assert(r == 0);
3efd9988
FG
643 }
644 }
645 ~sigpipe_stopper() {
646 if (blocked) {
647 struct timespec nowait{0};
648 int r = sigtimedwait(&pipe_mask, 0, &nowait);
11fdf7f2 649 ceph_assert(r == EAGAIN || r == 0);
3efd9988 650 r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
11fdf7f2 651 ceph_assert(r == 0);
3efd9988
FG
652 }
653 }
654 };
655# define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper();
656#else
657# define MSGR_SIGPIPE_STOPPER
658#endif
7c673cae
FG
659 /**
660 * @defgroup Dispatcher Interfacing
661 * @{
662 */
7c673cae
FG
663 /**
664 * Determine whether a message can be fast-dispatched. We will
665 * query each Dispatcher in sequence to determine if they are
666 * capable of handling a particular message via "fast dispatch".
667 *
668 * @param m The Message we are testing.
669 */
f67539c2 670 bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
11fdf7f2
TL
671 for (const auto &dispatcher : fast_dispatchers) {
672 if (dispatcher->ms_can_fast_dispatch2(m))
7c673cae
FG
673 return true;
674 }
675 return false;
676 }
677
678 /**
679 * Deliver a single Message via "fast dispatch".
680 *
11fdf7f2 681 * @param m The Message we are fast dispatching.
7c673cae
FG
682 * If none of our Dispatchers can handle it, ceph_abort().
683 */
f67539c2 684 void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
7c673cae 685 m->set_dispatch_stamp(ceph_clock_now());
11fdf7f2
TL
686 for (const auto &dispatcher : fast_dispatchers) {
687 if (dispatcher->ms_can_fast_dispatch2(m)) {
688 dispatcher->ms_fast_dispatch2(m);
7c673cae
FG
689 return;
690 }
691 }
692 ceph_abort();
693 }
11fdf7f2 694 void ms_fast_dispatch(Message *m) {
f67539c2 695 return ms_fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
11fdf7f2 696 }
7c673cae
FG
697 /**
698 *
699 */
f67539c2 700 void ms_fast_preprocess(const ceph::ref_t<Message> &m) {
11fdf7f2
TL
701 for (const auto &dispatcher : fast_dispatchers) {
702 dispatcher->ms_fast_preprocess2(m);
7c673cae
FG
703 }
704 }
705 /**
706 * Deliver a single Message. Send it to each Dispatcher
707 * in sequence until one of them handles it.
11fdf7f2 708 * If none of our Dispatchers can handle it, ceph_abort().
7c673cae 709 *
11fdf7f2 710 * @param m The Message to deliver.
7c673cae 711 */
f67539c2 712 void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
7c673cae 713 m->set_dispatch_stamp(ceph_clock_now());
11fdf7f2
TL
714 for (const auto &dispatcher : dispatchers) {
715 if (dispatcher->ms_dispatch2(m))
7c673cae
FG
716 return;
717 }
718 lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
719 << m->get_source_inst() << dendl;
11fdf7f2
TL
720 ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
721 }
722 void ms_deliver_dispatch(Message *m) {
f67539c2 723 return ms_deliver_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
7c673cae
FG
724 }
725 /**
726 * Notify each Dispatcher of a new Connection. Call
727 * this function whenever a new Connection is initiated or
728 * reconnects.
729 *
730 * @param con Pointer to the new Connection.
731 */
732 void ms_deliver_handle_connect(Connection *con) {
11fdf7f2
TL
733 for (const auto& dispatcher : dispatchers) {
734 dispatcher->ms_handle_connect(con);
735 }
7c673cae
FG
736 }
737
738 /**
739 * Notify each fast Dispatcher of a new Connection. Call
740 * this function whenever a new Connection is initiated or
741 * reconnects.
742 *
743 * @param con Pointer to the new Connection.
744 */
745 void ms_deliver_handle_fast_connect(Connection *con) {
11fdf7f2
TL
746 for (const auto& dispatcher : fast_dispatchers) {
747 dispatcher->ms_handle_fast_connect(con);
748 }
7c673cae
FG
749 }
750
751 /**
11fdf7f2 752 * Notify each Dispatcher of a new incoming Connection. Call
7c673cae
FG
753 * this function whenever a new Connection is accepted.
754 *
755 * @param con Pointer to the new Connection.
756 */
757 void ms_deliver_handle_accept(Connection *con) {
11fdf7f2
TL
758 for (const auto& dispatcher : dispatchers) {
759 dispatcher->ms_handle_accept(con);
760 }
7c673cae
FG
761 }
762
763 /**
764 * Notify each fast Dispatcher of a new incoming Connection. Call
765 * this function whenever a new Connection is accepted.
766 *
767 * @param con Pointer to the new Connection.
768 */
769 void ms_deliver_handle_fast_accept(Connection *con) {
11fdf7f2
TL
770 for (const auto& dispatcher : fast_dispatchers) {
771 dispatcher->ms_handle_fast_accept(con);
772 }
7c673cae
FG
773 }
774
775 /**
776 * Notify each Dispatcher of a Connection which may have lost
777 * Messages. Call this function whenever you detect that a lossy Connection
778 * has been disconnected.
779 *
780 * @param con Pointer to the broken Connection.
781 */
782 void ms_deliver_handle_reset(Connection *con) {
11fdf7f2
TL
783 for (const auto& dispatcher : dispatchers) {
784 if (dispatcher->ms_handle_reset(con))
7c673cae
FG
785 return;
786 }
787 }
788 /**
789 * Notify each Dispatcher of a Connection which has been "forgotten" about
790 * by the remote end, implying that messages have probably been lost.
791 * Call this function whenever you detect a reset.
792 *
793 * @param con Pointer to the broken Connection.
794 */
795 void ms_deliver_handle_remote_reset(Connection *con) {
11fdf7f2
TL
796 for (const auto& dispatcher : dispatchers) {
797 dispatcher->ms_handle_remote_reset(con);
798 }
7c673cae
FG
799 }
800
801 /**
802 * Notify each Dispatcher of a Connection for which reconnection
803 * attempts are being refused. Call this function whenever you
804 * detect that a lossy Connection has been disconnected and it's
805 * impossible to reconnect.
806 *
807 * @param con Pointer to the broken Connection.
808 */
809 void ms_deliver_handle_refused(Connection *con) {
11fdf7f2
TL
810 for (const auto& dispatcher : dispatchers) {
811 if (dispatcher->ms_handle_refused(con))
7c673cae
FG
812 return;
813 }
814 }
815
9f95a23c
TL
816 void set_require_authorizer(bool b) {
817 require_authorizer = b;
7c673cae 818 }
7c673cae
FG
819
820 /**
821 * @} // Dispatcher Interfacing
822 */
823};
824
825
826
827#endif