]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/simple/SimpleMessenger.h
update download target update for octopus release
[ceph.git] / ceph / src / msg / simple / SimpleMessenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_SIMPLEMESSENGER_H
16 #define CEPH_SIMPLEMESSENGER_H
17
18 #include <list>
19 #include <map>
20
21 #include "include/types.h"
22 #include "include/xlist.h"
23
24 #include "include/unordered_map.h"
25 #include "include/unordered_set.h"
26
27 #include "common/Mutex.h"
28 #include "common/Cond.h"
29 #include "common/Thread.h"
30 #include "common/Throttle.h"
31
32 #include "include/spinlock.h"
33
34 #include "msg/SimplePolicyMessenger.h"
35 #include "msg/Message.h"
36 #include "include/ceph_assert.h"
37
38 #include "msg/DispatchQueue.h"
39 #include "Pipe.h"
40 #include "Accepter.h"
41
42 /*
43 * This class handles transmission and reception of messages. Generally
44 * speaking, there are several major components:
45 *
46 * - Connection
47 * Each logical session is associated with a Connection.
48 * - Pipe
49 * Each network connection is handled through a pipe, which handles
50 * the input and output of each message. There is normally a 1:1
51 * relationship between Pipe and Connection, but logical sessions may
52 * get handed off between Pipes when sockets reconnect or during
53 * connection races.
54 * - IncomingQueue
55 * Incoming messages are associated with an IncomingQueue, and there
56 * is one such queue associated with each Pipe.
57 * - DispatchQueue
58 * IncomingQueues get queued in the DIspatchQueue, which is responsible
59 * for doing a round-robin sweep and processing them via a worker thread.
60 * - SimpleMessenger
61 * It's the exterior class passed to the external message handler and
62 * most of the API details.
63 *
64 * Lock ordering:
65 *
66 * SimpleMessenger::lock
67 * Pipe::pipe_lock
68 * DispatchQueue::lock
69 * IncomingQueue::lock
70 */
71
72 class SimpleMessenger : public SimplePolicyMessenger {
73 // First we have the public Messenger interface implementation...
74 public:
75 /**
76 * Initialize the SimpleMessenger!
77 *
78 * @param cct The CephContext to use
79 * @param name The name to assign ourselves
80 * _nonce A unique ID to use for this SimpleMessenger. It should not
81 * be a value that will be repeated if the daemon restarts.
82 * features The local features bits for the local_connection
83 */
84 SimpleMessenger(CephContext *cct, entity_name_t name,
85 string mname, uint64_t _nonce);
86
87 /**
88 * Destroy the SimpleMessenger. Pretty simple since all the work is done
89 * elsewhere.
90 */
91 ~SimpleMessenger() override;
92
93 /** @defgroup Accessors
94 * @{
95 */
96 bool set_addr_unknowns(const entity_addrvec_t& addr) override;
97 void set_addrs(const entity_addrvec_t &addr) override;
98 void set_myaddrs(const entity_addrvec_t& a) override;
99
100 int get_dispatch_queue_len() override {
101 return dispatch_queue.get_queue_len();
102 }
103
104 double get_dispatch_queue_max_age(utime_t now) override {
105 return dispatch_queue.get_max_age(now);
106 }
107 /** @} Accessors */
108
109 /**
110 * @defgroup Configuration functions
111 * @{
112 */
113 void set_cluster_protocol(int p) override {
114 ceph_assert(!started && !did_bind);
115 cluster_protocol = p;
116 }
117
118 int bind(const entity_addr_t& bind_addr) override;
119 int rebind(const set<int>& avoid_ports) override;
120 int client_bind(const entity_addr_t& bind_addr) override;
121
122 /** @} Configuration functions */
123
124 /**
125 * @defgroup Startup/Shutdown
126 * @{
127 */
128 int start() override;
129 void wait() override;
130 int shutdown() override;
131
132 /** @} // Startup/Shutdown */
133
134 /**
135 * @defgroup Messaging
136 * @{
137 */
138 int send_to(
139 Message *m,
140 int type,
141 const entity_addrvec_t& addr) override {
142 // temporary
143 return _send_message(m, entity_inst_t(entity_name_t(type, -1),
144 addr.legacy_addr()));
145 }
146
147 int send_message(Message *m, Connection *con) {
148 return _send_message(m, con);
149 }
150
151 /** @} // Messaging */
152
153 /**
154 * @defgroup Connection Management
155 * @{
156 */
157 ConnectionRef connect_to(int type, const entity_addrvec_t& addrs) override;
158 ConnectionRef get_loopback_connection() override;
159 int send_keepalive(Connection *con);
160 void mark_down(const entity_addr_t& addr) override;
161 void mark_down(Connection *con);
162 void mark_disposable(Connection *con);
163 void mark_down_all() override;
164 /** @} // Connection Management */
165 protected:
166 /**
167 * @defgroup Messenger Interfaces
168 * @{
169 */
170 /**
171 * Start up the DispatchQueue thread once we have somebody to dispatch to.
172 */
173 void ready() override;
174 /** @} // Messenger Interfaces */
175 private:
176 /**
177 * @defgroup Inner classes
178 * @{
179 */
180
181 public:
182 Accepter accepter;
183 DispatchQueue dispatch_queue;
184
185 friend class Accepter;
186
187 /**
188 * Register a new pipe for accept
189 *
190 * @param sd socket
191 */
192 Pipe *add_accept_pipe(int sd);
193
194 private:
195
196 /**
197 * A thread used to tear down Pipes when they're complete.
198 */
199 class ReaperThread : public Thread {
200 SimpleMessenger *msgr;
201 public:
202 explicit ReaperThread(SimpleMessenger *m) : msgr(m) {}
203 void *entry() override {
204 msgr->reaper_entry();
205 return 0;
206 }
207 } reaper_thread;
208
209 /**
210 * @} // Inner classes
211 */
212
213 /**
214 * @defgroup Utility functions
215 * @{
216 */
217
218 /**
219 * Create a Pipe associated with the given entity (of the given type).
220 * Initiate the connection. (This function returning does not guarantee
221 * connection success.)
222 *
223 * @param addr The address of the entity to connect to.
224 * @param type The peer type of the entity at the address.
225 * @param con An existing Connection to associate with the new Pipe. If
226 * NULL, it creates a new Connection.
227 * @param first an initial message to queue on the new pipe
228 *
229 * @return a pointer to the newly-created Pipe. Caller does not own a
230 * reference; take one if you need it.
231 */
232 Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con,
233 Message *first);
234 /**
235 * Send a message, lazily or not.
236 * This just glues send_message together and passes
237 * the input on to submit_message.
238 */
239 int _send_message(Message *m, const entity_inst_t& dest);
240 /**
241 * Same as above, but for the Connection-based variants.
242 */
243 int _send_message(Message *m, Connection *con);
244 /**
245 * Queue up a Message for delivery to the entity specified
246 * by addr and dest_type.
247 * submit_message() is responsible for creating
248 * new Pipes (and closing old ones) as necessary.
249 *
250 * @param m The Message to queue up. This function eats a reference.
251 * @param con The existing Connection to use, or NULL if you don't know of one.
252 * @param addr The address to send the Message to.
253 * @param dest_type The peer type of the address we're sending to
254 * just drop silently under failure.
255 * @param already_locked If false, submit_message() will acquire the
256 * SimpleMessenger lock before accessing shared data structures; otherwise
257 * it will assume the lock is held. NOTE: if you are making a request
258 * without locking, you MUST have filled in the con with a valid pointer.
259 */
260 void submit_message(Message *m, PipeConnection *con,
261 const entity_addr_t& addr, int dest_type,
262 bool already_locked);
263 /**
264 * Look through the pipes in the pipe_reap_queue and tear them down.
265 */
266 void reaper();
267 /**
268 * @} // Utility functions
269 */
270
271 // SimpleMessenger stuff
272 /// approximately unique ID set by the Constructor for use in entity_addr_t
273 uint64_t nonce;
274 /// overall lock used for SimpleMessenger data structures
275 Mutex lock;
276 /// true, specifying we haven't learned our addr; set false when we find it.
277 // maybe this should be protected by the lock?
278 bool need_addr;
279
280 public:
281 bool get_need_addr() const { return need_addr; }
282
283 private:
284 /**
285 * false; set to true if the SimpleMessenger bound to a specific address;
286 * and set false again by Accepter::stop(). This isn't lock-protected
287 * since you shouldn't be able to race the only writers.
288 */
289 bool did_bind;
290 /// counter for the global seq our connection protocol uses
291 __u32 global_seq;
292 /// lock to protect the global_seq
293 ceph::spinlock global_seq_lock;
294
295 entity_addr_t my_addr;
296
297 /**
298 * hash map of addresses to Pipes
299 *
300 * NOTE: a Pipe* with state CLOSED may still be in the map but is considered
301 * invalid and can be replaced by anyone holding the msgr lock
302 */
303 ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe;
304 /**
305 * list of pipes are in the process of accepting
306 *
307 * These are not yet in the rank_pipe map.
308 */
309 set<Pipe*> accepting_pipes;
310 /// a set of all the Pipes we have which are somehow active
311 set<Pipe*> pipes;
312 /// a list of Pipes we want to tear down
313 list<Pipe*> pipe_reap_queue;
314
315 /// internal cluster protocol version, if any, for talking to entities of the same type.
316 int cluster_protocol;
317
318 Cond stop_cond;
319 bool stopped = true;
320
321 bool reaper_started, reaper_stop;
322 Cond reaper_cond;
323
324 /// This Cond is slept on by wait() and signaled by dispatch_entry()
325 Cond wait_cond;
326
327 friend class Pipe;
328
329 Pipe *_lookup_pipe(const entity_addr_t& k) {
330 ceph::unordered_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(k);
331 if (p == rank_pipe.end())
332 return NULL;
333 // see lock cribbing in Pipe::fault()
334 if (p->second->state_closed)
335 return NULL;
336 return p->second;
337 }
338
339 public:
340
341 int timeout;
342
343 /// con used for sending messages to ourselves
344 ConnectionRef local_connection;
345
346 /**
347 * @defgroup SimpleMessenger internals
348 * @{
349 */
350
351 /**
352 * Increment the global sequence for this SimpleMessenger and return it.
353 * This is for the connect protocol, although it doesn't hurt if somebody
354 * else calls it.
355 *
356 * @return a global sequence ID that nobody else has seen.
357 */
358 __u32 get_global_seq(__u32 old=0) {
359 std::lock_guard<decltype(global_seq_lock)> lg(global_seq_lock);
360
361 if (old > global_seq)
362 global_seq = old;
363 __u32 ret = ++global_seq;
364
365 return ret;
366 }
367 /**
368 * Get the protocol version we support for the given peer type: either
369 * a peer protocol (if it matches our own), the protocol version for the
370 * peer (if we're connecting), or our protocol version (if we're accepting).
371 */
372 int get_proto_version(int peer_type, bool connect);
373
374 /**
375 * Fill in the features, address and peer type for the local connection, which
376 * is used for delivering messages back to ourself.
377 */
378 void init_local_connection();
379 /**
380 * Tell the SimpleMessenger its full IP address.
381 *
382 * This is used by Pipes when connecting to other endpoints, and
383 * probably shouldn't be called by anybody else.
384 */
385 void learned_addr(const entity_addr_t& peer_addr_for_me);
386
387 /**
388 * This function is used by the reaper thread. As long as nobody
389 * has set reaper_stop, it calls the reaper function, then
390 * waits to be signaled when it needs to reap again (or when it needs
391 * to stop).
392 */
393 void reaper_entry();
394 /**
395 * Add a pipe to the pipe_reap_queue, to be torn down on
396 * the next call to reaper().
397 * It should really only be the Pipe calling this, in our current
398 * implementation.
399 *
400 * @param pipe A Pipe which has stopped its threads and is
401 * ready to be torn down.
402 */
403 void queue_reap(Pipe *pipe);
404
405 /**
406 * Used to get whether this connection ready to send
407 */
408 bool is_connected(Connection *con);
409 /**
410 * @} // SimpleMessenger Internals
411 */
412 } ;
413
414 #endif /* CEPH_SIMPLEMESSENGER_H */