1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #ifndef CEPH_MESSENGER_H
18 #define CEPH_MESSENGER_H
24 #include "Dispatcher.h"
25 #include "common/Mutex.h"
26 #include "common/Cond.h"
27 #include "include/Context.h"
28 #include "include/types.h"
29 #include "include/ceph_features.h"
30 #include "auth/Crypto.h"
35 #define SOCKET_PRIORITY_MIN_DELAY 6
42 list
<Dispatcher
*> dispatchers
;
43 list
<Dispatcher
*> fast_dispatchers
;
44 ZTracer::Endpoint trace_endpoint
;
46 void set_endpoint_addr(const entity_addr_t
& a
,
47 const entity_name_t
&name
);
50 /// the "name" of the local daemon. eg client.99
51 entity_inst_t my_inst
;
52 int default_send_priority
;
53 /// set to true once the Messenger has started, and set to false on shutdown
60 * Various Messenger conditional config/type flags to allow
61 * different "transport" Messengers to tune themselves
63 static const int HAS_HEAVY_TRAFFIC
= 0x0001;
64 static const int HAS_MANY_CONNECTIONS
= 0x0002;
65 static const int HEARTBEAT
= 0x0004;
68 * The CephContext this Messenger uses. Many other components initialize themselves
75 * A Policy describes the rules of a Connection. Is there a limit on how
76 * much data this Connection can have locally? When the underlying connection
77 * experiences an error, does the Connection disappear? Can this Messenger
78 * re-establish the underlying connection?
81 /// If true, the Connection is tossed out on errors.
83 /// If true, the underlying connection can't be re-established from this end.
85 /// If true, we will standby when idle
87 /// If true, we will try to detect session resets
90 * The throttler is used to limit how much data is held by Messages from
91 * the associated Connection(s). When reading in a new Message, the Messenger
92 * will call throttler->throttle() for the size of the new Message.
94 Throttle
*throttler_bytes
;
95 Throttle
*throttler_messages
;
97 /// Specify features supported locally by the endpoint.
98 uint64_t features_supported
;
99 /// Specify features any remotes must have to talk to this endpoint.
100 uint64_t features_required
;
103 : lossy(false), server(false), standby(false), resetcheck(true),
104 throttler_bytes(NULL
),
105 throttler_messages(NULL
),
106 features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT
),
107 features_required(0) {}
109 Policy(bool l
, bool s
, bool st
, bool r
, uint64_t req
)
110 : lossy(l
), server(s
), standby(st
), resetcheck(r
),
111 throttler_bytes(NULL
),
112 throttler_messages(NULL
),
113 features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT
),
114 features_required(req
) {}
117 static Policy
stateful_server(uint64_t req
) {
118 return Policy(false, true, true, true, req
);
120 static Policy
stateless_server(uint64_t req
) {
121 return Policy(true, true, false, false, req
);
123 static Policy
lossless_peer(uint64_t req
) {
124 return Policy(false, false, true, false, req
);
126 static Policy
lossless_peer_reuse(uint64_t req
) {
127 return Policy(false, false, true, true, req
);
129 static Policy
lossy_client(uint64_t req
) {
130 return Policy(true, false, false, false, req
);
132 static Policy
lossless_client(uint64_t req
) {
133 return Policy(false, false, false, true, req
);
138 * Messenger constructor. Call this from your implementation.
139 * Messenger users should construct full implementations directly,
140 * or use the create() function.
142 Messenger(CephContext
*cct_
, entity_name_t w
)
143 : trace_endpoint("0.0.0.0", 0, "Messenger"),
145 default_send_priority(CEPH_MSG_PRIO_DEFAULT
), started(false),
149 crcflags(get_default_crc_flags(cct
->_conf
))
153 virtual ~Messenger() {}
156 * create a new messenger
158 * Create a new messenger instance, with whatever implementation is
159 * available or specified via the configuration in cct.
162 * @param type name of messenger type
163 * @param name entity name to register
164 * @param lname logical name of the messenger in this process (e.g., "client")
165 * @param nonce nonce value to uniquely identify this instance on the current host
166 * @param features bits for the local connection
167 * @param cflags general set of flags to configure transport resources
169 static Messenger
*create(CephContext
*cct
,
177 * create a new messenger
179 * Create a new messenger instance.
180 * Same as the above, but a slightly simpler interface for clients:
181 * - Generate a random nonce
182 * - use the default feature bits
183 * - get the messenger type from cct
184 * - use the client entity_type
187 * @param lname logical name of the messenger in this process (e.g., "client")
189 static Messenger
*create_client_messenger(CephContext
*cct
, string lname
);
192 * @defgroup Accessors
196 * Retrieve the Messenger's instance.
198 * @return A const reference to the instance this Messenger
199 * currently believes to be its own.
201 const entity_inst_t
& get_myinst() { return my_inst
; }
203 * set messenger's instance
205 void set_myinst(entity_inst_t i
) { my_inst
= i
; }
207 uint32_t get_magic() { return magic
; }
208 void set_magic(int _magic
) { magic
= _magic
; }
211 * Retrieve the Messenger's address.
213 * @return A const reference to the address this Messenger
214 * currently believes to be its own.
216 const entity_addr_t
& get_myaddr() { return my_inst
.addr
; }
219 * set messenger's address
221 virtual void set_myaddr(const entity_addr_t
& a
) {
223 set_endpoint_addr(a
, my_inst
.name
);
227 * @return the zipkin trace endpoint
229 const ZTracer::Endpoint
* get_trace_endpoint() const {
230 return &trace_endpoint
;
234 * Retrieve the Messenger's name.
236 * @return A const reference to the name this Messenger
237 * currently believes to be its own.
239 const entity_name_t
& get_myname() { return my_inst
.name
; }
241 * Set the name of the local entity. The name is reported to others and
242 * can be changed while the system is running, but doing so at incorrect
243 * times may have bad results.
245 * @param m The name to set.
247 void set_myname(const entity_name_t
& m
) { my_inst
.name
= m
; }
249 * Set the unknown address components for this Messenger.
250 * This is useful if the Messenger doesn't know its full address just by
251 * binding, but another Messenger on the same interface has already learned
252 * its full address. This function does not fill in known address elements,
253 * cause a rebind, or do anything of that sort.
255 * @param addr The address to use as a template.
257 virtual void set_addr_unknowns(const entity_addr_t
&addr
) = 0;
259 * Set the address for this Messenger. This is useful if the Messenger
260 * binds to a specific address but advertises a different address on the
263 * @param addr The address to use.
265 virtual void set_addr(const entity_addr_t
&addr
) = 0;
266 /// Get the default send priority.
267 int get_default_send_priority() { return default_send_priority
; }
269 * Get the number of Messages which the Messenger has received
270 * but not yet dispatched.
272 virtual int get_dispatch_queue_len() = 0;
275 * Get age of oldest undelivered message
276 * (0 if the queue is empty)
278 virtual double get_dispatch_queue_max_age(utime_t now
) = 0;
280 * Get the default crc flags for this messenger.
281 * but not yet dispatched.
283 static int get_default_crc_flags(md_config_t
*);
290 * @defgroup Configuration
294 * Set the cluster protocol in use by this daemon.
295 * This is an init-time function and cannot be called after calling
298 * @param p The cluster protocol to use. Defined externally.
300 virtual void set_cluster_protocol(int p
) = 0;
302 * Set a policy which is applied to all peers who do not have a type-specific
304 * This is an init-time function and cannot be called after calling
307 * @param p The Policy to apply.
309 virtual void set_default_policy(Policy p
) = 0;
311 * Set a policy which is applied to all peers of the given type.
312 * This is an init-time function and cannot be called after calling
315 * @param type The peer type this policy applies to.
316 * @param p The policy to apply.
318 virtual void set_policy(int type
, Policy p
) = 0;
320 * Set the Policy associated with a type of peer.
322 * This can be called either on initial setup, or after connections
323 * are already established. However, the policies for existing
324 * connections will not be affected; the new policy will only apply
325 * to future connections.
327 * @param t The peer type to get the default policy for.
328 * @return A const Policy reference.
330 virtual Policy
get_policy(int t
) = 0;
332 * Get the default Policy
334 * @return A const Policy reference.
336 virtual Policy
get_default_policy() = 0;
338 * Set Throttlers applied to all Messages from the given type of peer
340 * This is an init-time function and cannot be called after calling
343 * @param type The peer type the Throttlers will apply to.
344 * @param bytes The Throttle for the number of bytes carried by the message
345 * @param msgs The Throttle for the number of messages for this @p type
346 * @note The Messenger does not take ownership of the Throttle pointers, but
347 * you must not destroy them before you destroy the Messenger.
349 virtual void set_policy_throttlers(int type
, Throttle
*bytes
, Throttle
*msgs
=NULL
) = 0;
351 * Set the default send priority
353 * This is an init-time function and must be called *before* calling
356 * @param p The cluster protocol to use. Defined externally.
358 void set_default_send_priority(int p
) {
360 default_send_priority
= p
;
363 * Set the priority(SO_PRIORITY) for all packets to be sent on this socket.
365 * Linux uses this value to order the networking queues: packets with a higher
366 * priority may be processed first depending on the selected device queueing
369 * @param prio The priority. Setting a priority outside the range 0 to 6
370 * requires the CAP_NET_ADMIN capability.
372 void set_socket_priority(int prio
) {
373 socket_priority
= prio
;
376 * Get the socket priority
378 * @return the socket priority
380 int get_socket_priority() {
381 return socket_priority
;
384 * Add a new Dispatcher to the front of the list. If you add
385 * a Dispatcher which is already included, it will get a duplicate
386 * entry. This will reduce efficiency but not break anything.
388 * @param d The Dispatcher to insert into the list.
390 void add_dispatcher_head(Dispatcher
*d
) {
391 bool first
= dispatchers
.empty();
392 dispatchers
.push_front(d
);
393 if (d
->ms_can_fast_dispatch_any())
394 fast_dispatchers
.push_front(d
);
399 * Add a new Dispatcher to the end of the list. If you add
400 * a Dispatcher which is already included, it will get a duplicate
401 * entry. This will reduce efficiency but not break anything.
403 * @param d The Dispatcher to insert into the list.
405 void add_dispatcher_tail(Dispatcher
*d
) {
406 bool first
= dispatchers
.empty();
407 dispatchers
.push_back(d
);
408 if (d
->ms_can_fast_dispatch_any())
409 fast_dispatchers
.push_back(d
);
414 * Bind the Messenger to a specific address. If bind_addr
415 * is not completely filled in the system will use the
416 * valid portions and cycle through the unset ones (eg, the port)
417 * in an unspecified order.
419 * @param bind_addr The address to bind to.
420 * @return 0 on success, or -1 on error, or -errno if
421 * we can be more specific about the failure.
423 virtual int bind(const entity_addr_t
& bind_addr
) = 0;
425 * This function performs a full restart of the Messenger component,
426 * whatever that means. Other entities who connect to this
427 * Messenger post-rebind() should perceive it as a new entity which
428 * they have not previously contacted, and it MUST bind to a
429 * different address than it did previously.
431 * @param avoid_ports Additional port to avoid binding to.
433 virtual int rebind(const set
<int>& avoid_ports
) { return -EOPNOTSUPP
; }
435 * Bind the 'client' Messenger to a specific address.Messenger will bind
436 * the address before connect to others when option ms_bind_before_connect
438 * @param bind_addr The address to bind to.
439 * @return 0 on success, or -1 on error, or -errno if
441 virtual int client_bind(const entity_addr_t
& bind_addr
) = 0;
443 * @} // Configuration
447 * @defgroup Startup/Shutdown
451 * Perform any resource allocation, thread startup, etc
452 * that is required before attempting to connect to other
453 * Messengers or transmit messages.
454 * Once this function completes, started shall be set to true.
456 * @return 0 on success; -errno on failure.
458 virtual int start() { started
= true; return 0; }
462 * Block until the Messenger has finished shutting down (according
463 * to the shutdown() function).
464 * It is valid to call this after calling shutdown(), but it must
465 * be called before deleting the Messenger.
467 virtual void wait() = 0;
469 * Initiate a shutdown of the Messenger.
471 * @return 0 on success, -errno otherwise.
473 virtual int shutdown() { started
= false; return 0; }
475 * @} // Startup/Shutdown
479 * @defgroup Messaging
483 * Queue the given Message for the given entity.
484 * Success in this function does not guarantee Message delivery, only
485 * success in queueing the Message. Other guarantees may be provided based
486 * on the Connection policy associated with the dest.
488 * @param m The Message to send. The Messenger consumes a single reference
489 * when you pass it in.
490 * @param dest The entity to send the Message to.
492 * DEPRECATED: please do not use this interface for any new code;
493 * use the Connection* variant.
495 * @return 0 on success, or -errno on failure.
497 virtual int send_message(Message
*m
, const entity_inst_t
& dest
) = 0;
503 * @defgroup Connection Management
507 * Get the Connection object associated with a given entity. If a
508 * Connection does not exist, create one and establish a logical connection.
509 * The caller owns a reference when this returns. Call ->put() when you're
512 * @param dest The entity to get a connection for.
514 virtual ConnectionRef
get_connection(const entity_inst_t
& dest
) = 0;
516 * Get the Connection object associated with ourselves.
518 virtual ConnectionRef
get_loopback_connection() = 0;
520 * Mark down a Connection to a remote.
522 * This will cause us to discard our outgoing queue for them, and if
523 * reset detection is enabled in the policy and the endpoint tries
524 * to reconnect they will discard their queue when we inform them of
527 * If there is no Connection to the given dest, it is a no-op.
529 * This generates a RESET notification to the Dispatcher.
531 * DEPRECATED: please do not use this interface for any new code;
532 * use the Connection* variant.
534 * @param a The address to mark down.
536 virtual void mark_down(const entity_addr_t
& a
) = 0;
538 * Mark all the existing Connections down. This is equivalent
539 * to iterating over all Connections and calling mark_down()
542 * This will generate a RESET event for each closed connections.
544 virtual void mark_down_all() = 0;
546 * @} // Connection Management
550 * @defgroup Subclass Interfacing
554 * A courtesy function for Messenger implementations which
555 * will be called when we receive our first Dispatcher.
557 virtual void ready() { }
559 * @} // Subclass Interfacing
562 * @defgroup Dispatcher Interfacing
567 * Determine whether a message can be fast-dispatched. We will
568 * query each Dispatcher in sequence to determine if they are
569 * capable of handling a particular message via "fast dispatch".
571 * @param m The Message we are testing.
573 bool ms_can_fast_dispatch(const Message
*m
) {
574 for (list
<Dispatcher
*>::iterator p
= fast_dispatchers
.begin();
575 p
!= fast_dispatchers
.end();
577 if ((*p
)->ms_can_fast_dispatch(m
))
584 * Deliver a single Message via "fast dispatch".
586 * @param m The Message we are fast dispatching. We take ownership
587 * of one reference to it.
588 * If none of our Dispatchers can handle it, ceph_abort().
590 void ms_fast_dispatch(Message
*m
) {
591 m
->set_dispatch_stamp(ceph_clock_now());
592 for (list
<Dispatcher
*>::iterator p
= fast_dispatchers
.begin();
593 p
!= fast_dispatchers
.end();
595 if ((*p
)->ms_can_fast_dispatch(m
)) {
596 (*p
)->ms_fast_dispatch(m
);
605 void ms_fast_preprocess(Message
*m
) {
606 for (list
<Dispatcher
*>::iterator p
= fast_dispatchers
.begin();
607 p
!= fast_dispatchers
.end();
609 (*p
)->ms_fast_preprocess(m
);
613 * Deliver a single Message. Send it to each Dispatcher
614 * in sequence until one of them handles it.
615 * If none of our Dispatchers can handle it, assert(0).
617 * @param m The Message to deliver. We take ownership of
618 * one reference to it.
620 void ms_deliver_dispatch(Message
*m
) {
621 m
->set_dispatch_stamp(ceph_clock_now());
622 for (list
<Dispatcher
*>::iterator p
= dispatchers
.begin();
623 p
!= dispatchers
.end();
625 if ((*p
)->ms_dispatch(m
))
628 lsubdout(cct
, ms
, 0) << "ms_deliver_dispatch: unhandled message " << m
<< " " << *m
<< " from "
629 << m
->get_source_inst() << dendl
;
630 assert(!cct
->_conf
->ms_die_on_unhandled_msg
);
634 * Notify each Dispatcher of a new Connection. Call
635 * this function whenever a new Connection is initiated or
638 * @param con Pointer to the new Connection.
640 void ms_deliver_handle_connect(Connection
*con
) {
641 for (list
<Dispatcher
*>::iterator p
= dispatchers
.begin();
642 p
!= dispatchers
.end();
644 (*p
)->ms_handle_connect(con
);
648 * Notify each fast Dispatcher of a new Connection. Call
649 * this function whenever a new Connection is initiated or
652 * @param con Pointer to the new Connection.
654 void ms_deliver_handle_fast_connect(Connection
*con
) {
655 for (list
<Dispatcher
*>::iterator p
= fast_dispatchers
.begin();
656 p
!= fast_dispatchers
.end();
658 (*p
)->ms_handle_fast_connect(con
);
662 * Notify each Dispatcher of a new incomming Connection. Call
663 * this function whenever a new Connection is accepted.
665 * @param con Pointer to the new Connection.
667 void ms_deliver_handle_accept(Connection
*con
) {
668 for (list
<Dispatcher
*>::iterator p
= dispatchers
.begin();
669 p
!= dispatchers
.end();
671 (*p
)->ms_handle_accept(con
);
675 * Notify each fast Dispatcher of a new incoming Connection. Call
676 * this function whenever a new Connection is accepted.
678 * @param con Pointer to the new Connection.
680 void ms_deliver_handle_fast_accept(Connection
*con
) {
681 for (list
<Dispatcher
*>::iterator p
= fast_dispatchers
.begin();
682 p
!= fast_dispatchers
.end();
684 (*p
)->ms_handle_fast_accept(con
);
688 * Notify each Dispatcher of a Connection which may have lost
689 * Messages. Call this function whenever you detect that a lossy Connection
690 * has been disconnected.
692 * @param con Pointer to the broken Connection.
694 void ms_deliver_handle_reset(Connection
*con
) {
695 for (list
<Dispatcher
*>::iterator p
= dispatchers
.begin();
696 p
!= dispatchers
.end();
698 if ((*p
)->ms_handle_reset(con
))
703 * Notify each Dispatcher of a Connection which has been "forgotten" about
704 * by the remote end, implying that messages have probably been lost.
705 * Call this function whenever you detect a reset.
707 * @param con Pointer to the broken Connection.
709 void ms_deliver_handle_remote_reset(Connection
*con
) {
710 for (list
<Dispatcher
*>::iterator p
= dispatchers
.begin();
711 p
!= dispatchers
.end();
713 (*p
)->ms_handle_remote_reset(con
);
717 * Notify each Dispatcher of a Connection for which reconnection
718 * attempts are being refused. Call this function whenever you
719 * detect that a lossy Connection has been disconnected and it's
720 * impossible to reconnect.
722 * @param con Pointer to the broken Connection.
724 void ms_deliver_handle_refused(Connection
*con
) {
725 for (list
<Dispatcher
*>::iterator p
= dispatchers
.begin();
726 p
!= dispatchers
.end();
728 if ((*p
)->ms_handle_refused(con
))
734 * Get the AuthAuthorizer for a new outgoing Connection.
736 * @param peer_type The peer type for the new Connection
737 * @param force_new True if we want to wait for new keys, false otherwise.
738 * @return A pointer to the AuthAuthorizer, if we have one; NULL otherwise
740 AuthAuthorizer
*ms_deliver_get_authorizer(int peer_type
, bool force_new
) {
741 AuthAuthorizer
*a
= 0;
742 for (list
<Dispatcher
*>::iterator p
= dispatchers
.begin();
743 p
!= dispatchers
.end();
745 if ((*p
)->ms_get_authorizer(peer_type
, &a
, force_new
))
751 * Verify that the authorizer on a new incoming Connection is correct.
753 * @param con The new incoming Connection
754 * @param peer_type The type of the endpoint on the new Connection
755 * @param protocol The ID of the protocol in use (at time of writing, cephx or none)
756 * @param authorizer The authorization string supplied by the remote
757 * @param authorizer_reply Output param: The string we should send back to
758 * the remote to authorize ourselves. Only filled in if isvalid
759 * @param isvalid Output param: True if authorizer is valid, false otherwise
761 * @return True if we were able to prove or disprove correctness of
762 * authorizer, false otherwise.
764 bool ms_deliver_verify_authorizer(Connection
*con
, int peer_type
,
765 int protocol
, bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
766 bool& isvalid
, CryptoKey
& session_key
) {
767 for (list
<Dispatcher
*>::iterator p
= dispatchers
.begin();
768 p
!= dispatchers
.end();
770 if ((*p
)->ms_verify_authorizer(con
, peer_type
, protocol
, authorizer
, authorizer_reply
, isvalid
, session_key
))
777 * @} // Dispatcher Interfacing