]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_SIMPLEMESSENGER_H | |
16 | #define CEPH_SIMPLEMESSENGER_H | |
17 | ||
18 | #include "include/types.h" | |
19 | #include "include/xlist.h" | |
20 | ||
21 | #include <list> | |
22 | #include <map> | |
23 | using namespace std; | |
24 | #include "include/unordered_map.h" | |
25 | #include "include/unordered_set.h" | |
26 | ||
27 | #include "common/Mutex.h" | |
7c673cae FG |
28 | #include "include/Spinlock.h" |
29 | #include "common/Cond.h" | |
30 | #include "common/Thread.h" | |
31 | #include "common/Throttle.h" | |
32 | ||
33 | #include "msg/SimplePolicyMessenger.h" | |
34 | #include "msg/Message.h" | |
35 | #include "include/assert.h" | |
36 | ||
37 | #include "msg/DispatchQueue.h" | |
38 | #include "Pipe.h" | |
39 | #include "Accepter.h" | |
40 | ||
41 | /* | |
42 | * This class handles transmission and reception of messages. Generally | |
43 | * speaking, there are several major components: | |
44 | * | |
45 | * - Connection | |
46 | * Each logical session is associated with a Connection. | |
47 | * - Pipe | |
48 | * Each network connection is handled through a pipe, which handles | |
49 | * the input and output of each message. There is normally a 1:1 | |
50 | * relationship between Pipe and Connection, but logical sessions may | |
51 | * get handed off between Pipes when sockets reconnect or during | |
52 | * connection races. | |
53 | * - IncomingQueue | |
54 | * Incoming messages are associated with an IncomingQueue, and there | |
55 | * is one such queue associated with each Pipe. | |
56 | * - DispatchQueue | |
57 | * IncomingQueues get queued in the DIspatchQueue, which is responsible | |
58 | * for doing a round-robin sweep and processing them via a worker thread. | |
59 | * - SimpleMessenger | |
60 | * It's the exterior class passed to the external message handler and | |
61 | * most of the API details. | |
62 | * | |
63 | * Lock ordering: | |
64 | * | |
65 | * SimpleMessenger::lock | |
66 | * Pipe::pipe_lock | |
67 | * DispatchQueue::lock | |
68 | * IncomingQueue::lock | |
69 | */ | |
70 | ||
71 | class SimpleMessenger : public SimplePolicyMessenger { | |
72 | // First we have the public Messenger interface implementation... | |
73 | public: | |
74 | /** | |
75 | * Initialize the SimpleMessenger! | |
76 | * | |
77 | * @param cct The CephContext to use | |
78 | * @param name The name to assign ourselves | |
79 | * _nonce A unique ID to use for this SimpleMessenger. It should not | |
80 | * be a value that will be repeated if the daemon restarts. | |
81 | * features The local features bits for the local_connection | |
82 | */ | |
83 | SimpleMessenger(CephContext *cct, entity_name_t name, | |
84 | string mname, uint64_t _nonce); | |
85 | ||
86 | /** | |
87 | * Destroy the SimpleMessenger. Pretty simple since all the work is done | |
88 | * elsewhere. | |
89 | */ | |
90 | ~SimpleMessenger() override; | |
91 | ||
92 | /** @defgroup Accessors | |
93 | * @{ | |
94 | */ | |
95 | void set_addr_unknowns(const entity_addr_t& addr) override; | |
224ce89b | 96 | void set_addr(const entity_addr_t &addr) override; |
7c673cae FG |
97 | |
98 | int get_dispatch_queue_len() override { | |
99 | return dispatch_queue.get_queue_len(); | |
100 | } | |
101 | ||
102 | double get_dispatch_queue_max_age(utime_t now) override { | |
103 | return dispatch_queue.get_max_age(now); | |
104 | } | |
105 | /** @} Accessors */ | |
106 | ||
107 | /** | |
108 | * @defgroup Configuration functions | |
109 | * @{ | |
110 | */ | |
111 | void set_cluster_protocol(int p) override { | |
112 | assert(!started && !did_bind); | |
113 | cluster_protocol = p; | |
114 | } | |
115 | ||
116 | int bind(const entity_addr_t& bind_addr) override; | |
117 | int rebind(const set<int>& avoid_ports) override; | |
118 | int client_bind(const entity_addr_t& bind_addr) override; | |
119 | ||
120 | /** @} Configuration functions */ | |
121 | ||
122 | /** | |
123 | * @defgroup Startup/Shutdown | |
124 | * @{ | |
125 | */ | |
126 | int start() override; | |
127 | void wait() override; | |
128 | int shutdown() override; | |
129 | ||
130 | /** @} // Startup/Shutdown */ | |
131 | ||
132 | /** | |
133 | * @defgroup Messaging | |
134 | * @{ | |
135 | */ | |
136 | int send_message(Message *m, const entity_inst_t& dest) override { | |
137 | return _send_message(m, dest); | |
138 | } | |
139 | ||
140 | int send_message(Message *m, Connection *con) { | |
141 | return _send_message(m, con); | |
142 | } | |
143 | ||
144 | /** @} // Messaging */ | |
145 | ||
146 | /** | |
147 | * @defgroup Connection Management | |
148 | * @{ | |
149 | */ | |
150 | ConnectionRef get_connection(const entity_inst_t& dest) override; | |
151 | ConnectionRef get_loopback_connection() override; | |
152 | int send_keepalive(Connection *con); | |
153 | void mark_down(const entity_addr_t& addr) override; | |
154 | void mark_down(Connection *con); | |
155 | void mark_disposable(Connection *con); | |
156 | void mark_down_all() override; | |
157 | /** @} // Connection Management */ | |
158 | protected: | |
159 | /** | |
160 | * @defgroup Messenger Interfaces | |
161 | * @{ | |
162 | */ | |
163 | /** | |
164 | * Start up the DispatchQueue thread once we have somebody to dispatch to. | |
165 | */ | |
166 | void ready() override; | |
167 | /** @} // Messenger Interfaces */ | |
168 | private: | |
169 | /** | |
170 | * @defgroup Inner classes | |
171 | * @{ | |
172 | */ | |
173 | ||
174 | public: | |
175 | Accepter accepter; | |
176 | DispatchQueue dispatch_queue; | |
177 | ||
178 | friend class Accepter; | |
179 | ||
180 | /** | |
181 | * Register a new pipe for accept | |
182 | * | |
183 | * @param sd socket | |
184 | */ | |
185 | Pipe *add_accept_pipe(int sd); | |
186 | ||
187 | private: | |
188 | ||
189 | /** | |
190 | * A thread used to tear down Pipes when they're complete. | |
191 | */ | |
192 | class ReaperThread : public Thread { | |
193 | SimpleMessenger *msgr; | |
194 | public: | |
195 | explicit ReaperThread(SimpleMessenger *m) : msgr(m) {} | |
196 | void *entry() override { | |
197 | msgr->reaper_entry(); | |
198 | return 0; | |
199 | } | |
200 | } reaper_thread; | |
201 | ||
202 | /** | |
203 | * @} // Inner classes | |
204 | */ | |
205 | ||
206 | /** | |
207 | * @defgroup Utility functions | |
208 | * @{ | |
209 | */ | |
210 | ||
211 | /** | |
212 | * Create a Pipe associated with the given entity (of the given type). | |
213 | * Initiate the connection. (This function returning does not guarantee | |
214 | * connection success.) | |
215 | * | |
216 | * @param addr The address of the entity to connect to. | |
217 | * @param type The peer type of the entity at the address. | |
218 | * @param con An existing Connection to associate with the new Pipe. If | |
219 | * NULL, it creates a new Connection. | |
220 | * @param first an initial message to queue on the new pipe | |
221 | * | |
222 | * @return a pointer to the newly-created Pipe. Caller does not own a | |
223 | * reference; take one if you need it. | |
224 | */ | |
225 | Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con, | |
226 | Message *first); | |
227 | /** | |
228 | * Send a message, lazily or not. | |
229 | * This just glues send_message together and passes | |
230 | * the input on to submit_message. | |
231 | */ | |
232 | int _send_message(Message *m, const entity_inst_t& dest); | |
233 | /** | |
234 | * Same as above, but for the Connection-based variants. | |
235 | */ | |
236 | int _send_message(Message *m, Connection *con); | |
237 | /** | |
238 | * Queue up a Message for delivery to the entity specified | |
239 | * by addr and dest_type. | |
240 | * submit_message() is responsible for creating | |
241 | * new Pipes (and closing old ones) as necessary. | |
242 | * | |
243 | * @param m The Message to queue up. This function eats a reference. | |
244 | * @param con The existing Connection to use, or NULL if you don't know of one. | |
245 | * @param addr The address to send the Message to. | |
246 | * @param dest_type The peer type of the address we're sending to | |
247 | * just drop silently under failure. | |
248 | * @param already_locked If false, submit_message() will acquire the | |
249 | * SimpleMessenger lock before accessing shared data structures; otherwise | |
250 | * it will assume the lock is held. NOTE: if you are making a request | |
251 | * without locking, you MUST have filled in the con with a valid pointer. | |
252 | */ | |
253 | void submit_message(Message *m, PipeConnection *con, | |
254 | const entity_addr_t& addr, int dest_type, | |
255 | bool already_locked); | |
256 | /** | |
257 | * Look through the pipes in the pipe_reap_queue and tear them down. | |
258 | */ | |
259 | void reaper(); | |
260 | /** | |
261 | * @} // Utility functions | |
262 | */ | |
263 | ||
264 | // SimpleMessenger stuff | |
265 | /// approximately unique ID set by the Constructor for use in entity_addr_t | |
266 | uint64_t nonce; | |
267 | /// overall lock used for SimpleMessenger data structures | |
268 | Mutex lock; | |
269 | /// true, specifying we haven't learned our addr; set false when we find it. | |
270 | // maybe this should be protected by the lock? | |
271 | bool need_addr; | |
272 | ||
273 | public: | |
274 | bool get_need_addr() const { return need_addr; } | |
275 | ||
276 | private: | |
277 | /** | |
278 | * false; set to true if the SimpleMessenger bound to a specific address; | |
279 | * and set false again by Accepter::stop(). This isn't lock-protected | |
280 | * since you shouldn't be able to race the only writers. | |
281 | */ | |
282 | bool did_bind; | |
283 | /// counter for the global seq our connection protocol uses | |
284 | __u32 global_seq; | |
285 | /// lock to protect the global_seq | |
286 | ceph_spinlock_t global_seq_lock; | |
287 | ||
288 | /** | |
289 | * hash map of addresses to Pipes | |
290 | * | |
291 | * NOTE: a Pipe* with state CLOSED may still be in the map but is considered | |
292 | * invalid and can be replaced by anyone holding the msgr lock | |
293 | */ | |
294 | ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe; | |
295 | /** | |
296 | * list of pipes are in teh process of accepting | |
297 | * | |
298 | * These are not yet in the rank_pipe map. | |
299 | */ | |
300 | set<Pipe*> accepting_pipes; | |
301 | /// a set of all the Pipes we have which are somehow active | |
302 | set<Pipe*> pipes; | |
303 | /// a list of Pipes we want to tear down | |
304 | list<Pipe*> pipe_reap_queue; | |
305 | ||
306 | /// internal cluster protocol version, if any, for talking to entities of the same type. | |
307 | int cluster_protocol; | |
308 | ||
309 | Cond stop_cond; | |
310 | bool stopped = true; | |
311 | ||
312 | bool reaper_started, reaper_stop; | |
313 | Cond reaper_cond; | |
314 | ||
315 | /// This Cond is slept on by wait() and signaled by dispatch_entry() | |
316 | Cond wait_cond; | |
317 | ||
318 | friend class Pipe; | |
319 | ||
320 | Pipe *_lookup_pipe(const entity_addr_t& k) { | |
321 | ceph::unordered_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(k); | |
322 | if (p == rank_pipe.end()) | |
323 | return NULL; | |
324 | // see lock cribbing in Pipe::fault() | |
31f18b77 | 325 | if (p->second->state_closed) |
7c673cae FG |
326 | return NULL; |
327 | return p->second; | |
328 | } | |
329 | ||
330 | public: | |
331 | ||
332 | int timeout; | |
333 | ||
334 | /// con used for sending messages to ourselves | |
335 | ConnectionRef local_connection; | |
336 | ||
337 | /** | |
338 | * @defgroup SimpleMessenger internals | |
339 | * @{ | |
340 | */ | |
341 | ||
342 | /** | |
343 | * This wraps ms_deliver_get_authorizer. We use it for Pipe. | |
344 | */ | |
345 | AuthAuthorizer *get_authorizer(int peer_type, bool force_new); | |
346 | /** | |
347 | * This wraps ms_deliver_verify_authorizer; we use it for Pipe. | |
348 | */ | |
28e407b8 AA |
349 | bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, |
350 | bufferlist& auth_reply, | |
351 | bool& isvalid,CryptoKey& session_key, | |
352 | std::unique_ptr<AuthAuthorizerChallenge> *challenge); | |
7c673cae FG |
353 | /** |
354 | * Increment the global sequence for this SimpleMessenger and return it. | |
355 | * This is for the connect protocol, although it doesn't hurt if somebody | |
356 | * else calls it. | |
357 | * | |
358 | * @return a global sequence ID that nobody else has seen. | |
359 | */ | |
360 | __u32 get_global_seq(__u32 old=0) { | |
361 | ceph_spin_lock(&global_seq_lock); | |
362 | if (old > global_seq) | |
363 | global_seq = old; | |
364 | __u32 ret = ++global_seq; | |
365 | ceph_spin_unlock(&global_seq_lock); | |
366 | return ret; | |
367 | } | |
368 | /** | |
369 | * Get the protocol version we support for the given peer type: either | |
370 | * a peer protocol (if it matches our own), the protocol version for the | |
371 | * peer (if we're connecting), or our protocol version (if we're accepting). | |
372 | */ | |
373 | int get_proto_version(int peer_type, bool connect); | |
374 | ||
375 | /** | |
376 | * Fill in the features, address and peer type for the local connection, which | |
377 | * is used for delivering messages back to ourself. | |
378 | */ | |
379 | void init_local_connection(); | |
380 | /** | |
381 | * Tell the SimpleMessenger its full IP address. | |
382 | * | |
383 | * This is used by Pipes when connecting to other endpoints, and | |
384 | * probably shouldn't be called by anybody else. | |
385 | */ | |
386 | void learned_addr(const entity_addr_t& peer_addr_for_me); | |
387 | ||
388 | /** | |
389 | * This function is used by the reaper thread. As long as nobody | |
390 | * has set reaper_stop, it calls the reaper function, then | |
391 | * waits to be signaled when it needs to reap again (or when it needs | |
392 | * to stop). | |
393 | */ | |
394 | void reaper_entry(); | |
395 | /** | |
396 | * Add a pipe to the pipe_reap_queue, to be torn down on | |
397 | * the next call to reaper(). | |
398 | * It should really only be the Pipe calling this, in our current | |
399 | * implementation. | |
400 | * | |
401 | * @param pipe A Pipe which has stopped its threads and is | |
402 | * ready to be torn down. | |
403 | */ | |
404 | void queue_reap(Pipe *pipe); | |
405 | ||
406 | /** | |
407 | * Used to get whether this connection ready to send | |
408 | */ | |
409 | bool is_connected(Connection *con); | |
410 | /** | |
411 | * @} // SimpleMessenger Internals | |
412 | */ | |
413 | } ; | |
414 | ||
415 | #endif /* CEPH_SIMPLEMESSENGER_H */ |