1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_SIMPLEMESSENGER_H
16 #define CEPH_SIMPLEMESSENGER_H
18 #include "include/types.h"
19 #include "include/xlist.h"
24 #include "include/unordered_map.h"
25 #include "include/unordered_set.h"
27 #include "common/Mutex.h"
28 #include "include/Spinlock.h"
29 #include "common/Cond.h"
30 #include "common/Thread.h"
31 #include "common/Throttle.h"
33 #include "msg/SimplePolicyMessenger.h"
34 #include "msg/Message.h"
35 #include "include/assert.h"
37 #include "msg/DispatchQueue.h"
42 * This class handles transmission and reception of messages. Generally
43 * speaking, there are several major components:
46 * Each logical session is associated with a Connection.
48 * Each network connection is handled through a pipe, which handles
49 * the input and output of each message. There is normally a 1:1
50 * relationship between Pipe and Connection, but logical sessions may
51 * get handed off between Pipes when sockets reconnect or during
54 * Incoming messages are associated with an IncomingQueue, and there
55 * is one such queue associated with each Pipe.
57 * IncomingQueues get queued in the DIspatchQueue, which is responsible
58 * for doing a round-robin sweep and processing them via a worker thread.
60 * It's the exterior class passed to the external message handler and
61 * most of the API details.
65 * SimpleMessenger::lock
71 class SimpleMessenger
: public SimplePolicyMessenger
{
72 // First we have the public Messenger interface implementation...
75 * Initialize the SimpleMessenger!
77 * @param cct The CephContext to use
78 * @param name The name to assign ourselves
79 * _nonce A unique ID to use for this SimpleMessenger. It should not
80 * be a value that will be repeated if the daemon restarts.
81 * features The local features bits for the local_connection
83 SimpleMessenger(CephContext
*cct
, entity_name_t name
,
84 string mname
, uint64_t _nonce
);
87 * Destroy the SimpleMessenger. Pretty simple since all the work is done
90 ~SimpleMessenger() override
;
92 /** @defgroup Accessors
95 void set_addr_unknowns(const entity_addr_t
& addr
) override
;
97 int get_dispatch_queue_len() override
{
98 return dispatch_queue
.get_queue_len();
101 double get_dispatch_queue_max_age(utime_t now
) override
{
102 return dispatch_queue
.get_max_age(now
);
107 * @defgroup Configuration functions
110 void set_cluster_protocol(int p
) override
{
111 assert(!started
&& !did_bind
);
112 cluster_protocol
= p
;
115 int bind(const entity_addr_t
& bind_addr
) override
;
116 int rebind(const set
<int>& avoid_ports
) override
;
117 int client_bind(const entity_addr_t
& bind_addr
) override
;
119 /** @} Configuration functions */
122 * @defgroup Startup/Shutdown
125 int start() override
;
126 void wait() override
;
127 int shutdown() override
;
129 /** @} // Startup/Shutdown */
132 * @defgroup Messaging
135 int send_message(Message
*m
, const entity_inst_t
& dest
) override
{
136 return _send_message(m
, dest
);
139 int send_message(Message
*m
, Connection
*con
) {
140 return _send_message(m
, con
);
143 /** @} // Messaging */
146 * @defgroup Connection Management
149 ConnectionRef
get_connection(const entity_inst_t
& dest
) override
;
150 ConnectionRef
get_loopback_connection() override
;
151 int send_keepalive(Connection
*con
);
152 void mark_down(const entity_addr_t
& addr
) override
;
153 void mark_down(Connection
*con
);
154 void mark_disposable(Connection
*con
);
155 void mark_down_all() override
;
156 /** @} // Connection Management */
159 * @defgroup Messenger Interfaces
163 * Start up the DispatchQueue thread once we have somebody to dispatch to.
165 void ready() override
;
166 /** @} // Messenger Interfaces */
169 * @defgroup Inner classes
175 DispatchQueue dispatch_queue
;
177 friend class Accepter
;
180 * Register a new pipe for accept
184 Pipe
*add_accept_pipe(int sd
);
189 * A thread used to tear down Pipes when they're complete.
191 class ReaperThread
: public Thread
{
192 SimpleMessenger
*msgr
;
194 explicit ReaperThread(SimpleMessenger
*m
) : msgr(m
) {}
195 void *entry() override
{
196 msgr
->reaper_entry();
202 * @} // Inner classes
206 * @defgroup Utility functions
211 * Create a Pipe associated with the given entity (of the given type).
212 * Initiate the connection. (This function returning does not guarantee
213 * connection success.)
215 * @param addr The address of the entity to connect to.
216 * @param type The peer type of the entity at the address.
217 * @param con An existing Connection to associate with the new Pipe. If
218 * NULL, it creates a new Connection.
219 * @param first an initial message to queue on the new pipe
221 * @return a pointer to the newly-created Pipe. Caller does not own a
222 * reference; take one if you need it.
224 Pipe
*connect_rank(const entity_addr_t
& addr
, int type
, PipeConnection
*con
,
227 * Send a message, lazily or not.
228 * This just glues send_message together and passes
229 * the input on to submit_message.
231 int _send_message(Message
*m
, const entity_inst_t
& dest
);
233 * Same as above, but for the Connection-based variants.
235 int _send_message(Message
*m
, Connection
*con
);
237 * Queue up a Message for delivery to the entity specified
238 * by addr and dest_type.
239 * submit_message() is responsible for creating
240 * new Pipes (and closing old ones) as necessary.
242 * @param m The Message to queue up. This function eats a reference.
243 * @param con The existing Connection to use, or NULL if you don't know of one.
244 * @param addr The address to send the Message to.
245 * @param dest_type The peer type of the address we're sending to
246 * just drop silently under failure.
247 * @param already_locked If false, submit_message() will acquire the
248 * SimpleMessenger lock before accessing shared data structures; otherwise
249 * it will assume the lock is held. NOTE: if you are making a request
250 * without locking, you MUST have filled in the con with a valid pointer.
252 void submit_message(Message
*m
, PipeConnection
*con
,
253 const entity_addr_t
& addr
, int dest_type
,
254 bool already_locked
);
256 * Look through the pipes in the pipe_reap_queue and tear them down.
260 * @} // Utility functions
263 // SimpleMessenger stuff
264 /// approximately unique ID set by the Constructor for use in entity_addr_t
266 /// overall lock used for SimpleMessenger data structures
268 /// true, specifying we haven't learned our addr; set false when we find it.
269 // maybe this should be protected by the lock?
273 bool get_need_addr() const { return need_addr
; }
277 * false; set to true if the SimpleMessenger bound to a specific address;
278 * and set false again by Accepter::stop(). This isn't lock-protected
279 * since you shouldn't be able to race the only writers.
282 /// counter for the global seq our connection protocol uses
284 /// lock to protect the global_seq
285 ceph_spinlock_t global_seq_lock
;
288 * hash map of addresses to Pipes
290 * NOTE: a Pipe* with state CLOSED may still be in the map but is considered
291 * invalid and can be replaced by anyone holding the msgr lock
293 ceph::unordered_map
<entity_addr_t
, Pipe
*> rank_pipe
;
295 * list of pipes are in teh process of accepting
297 * These are not yet in the rank_pipe map.
299 set
<Pipe
*> accepting_pipes
;
300 /// a set of all the Pipes we have which are somehow active
302 /// a list of Pipes we want to tear down
303 list
<Pipe
*> pipe_reap_queue
;
305 /// internal cluster protocol version, if any, for talking to entities of the same type.
306 int cluster_protocol
;
311 bool reaper_started
, reaper_stop
;
314 /// This Cond is slept on by wait() and signaled by dispatch_entry()
319 Pipe
*_lookup_pipe(const entity_addr_t
& k
) {
320 ceph::unordered_map
<entity_addr_t
, Pipe
*>::iterator p
= rank_pipe
.find(k
);
321 if (p
== rank_pipe
.end())
323 // see lock cribbing in Pipe::fault()
324 if (p
->second
->state_closed
)
333 /// con used for sending messages to ourselves
334 ConnectionRef local_connection
;
337 * @defgroup SimpleMessenger internals
342 * This wraps ms_deliver_get_authorizer. We use it for Pipe.
344 AuthAuthorizer
*get_authorizer(int peer_type
, bool force_new
);
346 * This wraps ms_deliver_verify_authorizer; we use it for Pipe.
348 bool verify_authorizer(Connection
*con
, int peer_type
, int protocol
, bufferlist
& auth
, bufferlist
& auth_reply
,
349 bool& isvalid
,CryptoKey
& session_key
);
351 * Increment the global sequence for this SimpleMessenger and return it.
352 * This is for the connect protocol, although it doesn't hurt if somebody
355 * @return a global sequence ID that nobody else has seen.
357 __u32
get_global_seq(__u32 old
=0) {
358 ceph_spin_lock(&global_seq_lock
);
359 if (old
> global_seq
)
361 __u32 ret
= ++global_seq
;
362 ceph_spin_unlock(&global_seq_lock
);
366 * Get the protocol version we support for the given peer type: either
367 * a peer protocol (if it matches our own), the protocol version for the
368 * peer (if we're connecting), or our protocol version (if we're accepting).
370 int get_proto_version(int peer_type
, bool connect
);
373 * Fill in the features, address and peer type for the local connection, which
374 * is used for delivering messages back to ourself.
376 void init_local_connection();
378 * Tell the SimpleMessenger its full IP address.
380 * This is used by Pipes when connecting to other endpoints, and
381 * probably shouldn't be called by anybody else.
383 void learned_addr(const entity_addr_t
& peer_addr_for_me
);
386 * This function is used by the reaper thread. As long as nobody
387 * has set reaper_stop, it calls the reaper function, then
388 * waits to be signaled when it needs to reap again (or when it needs
393 * Add a pipe to the pipe_reap_queue, to be torn down on
394 * the next call to reaper().
395 * It should really only be the Pipe calling this, in our current
398 * @param pipe A Pipe which has stopped its threads and is
399 * ready to be torn down.
401 void queue_reap(Pipe
*pipe
);
404 * Used to get whether this connection ready to send
406 bool is_connected(Connection
*con
);
408 * @} // SimpleMessenger Internals
412 #endif /* CEPH_SIMPLEMESSENGER_H */