]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/simple/SimpleMessenger.h
update sources to 12.2.7
[ceph.git] / ceph / src / msg / simple / SimpleMessenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_SIMPLEMESSENGER_H
16 #define CEPH_SIMPLEMESSENGER_H
17
18 #include "include/types.h"
19 #include "include/xlist.h"
20
21 #include <list>
22 #include <map>
23 using namespace std;
24 #include "include/unordered_map.h"
25 #include "include/unordered_set.h"
26
27 #include "common/Mutex.h"
28 #include "include/Spinlock.h"
29 #include "common/Cond.h"
30 #include "common/Thread.h"
31 #include "common/Throttle.h"
32
33 #include "msg/SimplePolicyMessenger.h"
34 #include "msg/Message.h"
35 #include "include/assert.h"
36
37 #include "msg/DispatchQueue.h"
38 #include "Pipe.h"
39 #include "Accepter.h"
40
41 /*
42 * This class handles transmission and reception of messages. Generally
43 * speaking, there are several major components:
44 *
45 * - Connection
46 * Each logical session is associated with a Connection.
47 * - Pipe
48 * Each network connection is handled through a pipe, which handles
49 * the input and output of each message. There is normally a 1:1
50 * relationship between Pipe and Connection, but logical sessions may
51 * get handed off between Pipes when sockets reconnect or during
52 * connection races.
53 * - IncomingQueue
54 * Incoming messages are associated with an IncomingQueue, and there
55 * is one such queue associated with each Pipe.
56 * - DispatchQueue
57 * IncomingQueues get queued in the DIspatchQueue, which is responsible
58 * for doing a round-robin sweep and processing them via a worker thread.
59 * - SimpleMessenger
60 * It's the exterior class passed to the external message handler and
61 * most of the API details.
62 *
63 * Lock ordering:
64 *
65 * SimpleMessenger::lock
66 * Pipe::pipe_lock
67 * DispatchQueue::lock
68 * IncomingQueue::lock
69 */
70
71 class SimpleMessenger : public SimplePolicyMessenger {
72 // First we have the public Messenger interface implementation...
73 public:
74 /**
75 * Initialize the SimpleMessenger!
76 *
77 * @param cct The CephContext to use
78 * @param name The name to assign ourselves
79 * _nonce A unique ID to use for this SimpleMessenger. It should not
80 * be a value that will be repeated if the daemon restarts.
81 * features The local features bits for the local_connection
82 */
83 SimpleMessenger(CephContext *cct, entity_name_t name,
84 string mname, uint64_t _nonce);
85
86 /**
87 * Destroy the SimpleMessenger. Pretty simple since all the work is done
88 * elsewhere.
89 */
90 ~SimpleMessenger() override;
91
92 /** @defgroup Accessors
93 * @{
94 */
95 void set_addr_unknowns(const entity_addr_t& addr) override;
96 void set_addr(const entity_addr_t &addr) override;
97
98 int get_dispatch_queue_len() override {
99 return dispatch_queue.get_queue_len();
100 }
101
102 double get_dispatch_queue_max_age(utime_t now) override {
103 return dispatch_queue.get_max_age(now);
104 }
105 /** @} Accessors */
106
107 /**
108 * @defgroup Configuration functions
109 * @{
110 */
111 void set_cluster_protocol(int p) override {
112 assert(!started && !did_bind);
113 cluster_protocol = p;
114 }
115
116 int bind(const entity_addr_t& bind_addr) override;
117 int rebind(const set<int>& avoid_ports) override;
118 int client_bind(const entity_addr_t& bind_addr) override;
119
120 /** @} Configuration functions */
121
122 /**
123 * @defgroup Startup/Shutdown
124 * @{
125 */
126 int start() override;
127 void wait() override;
128 int shutdown() override;
129
130 /** @} // Startup/Shutdown */
131
132 /**
133 * @defgroup Messaging
134 * @{
135 */
136 int send_message(Message *m, const entity_inst_t& dest) override {
137 return _send_message(m, dest);
138 }
139
140 int send_message(Message *m, Connection *con) {
141 return _send_message(m, con);
142 }
143
144 /** @} // Messaging */
145
146 /**
147 * @defgroup Connection Management
148 * @{
149 */
150 ConnectionRef get_connection(const entity_inst_t& dest) override;
151 ConnectionRef get_loopback_connection() override;
152 int send_keepalive(Connection *con);
153 void mark_down(const entity_addr_t& addr) override;
154 void mark_down(Connection *con);
155 void mark_disposable(Connection *con);
156 void mark_down_all() override;
157 /** @} // Connection Management */
158 protected:
159 /**
160 * @defgroup Messenger Interfaces
161 * @{
162 */
163 /**
164 * Start up the DispatchQueue thread once we have somebody to dispatch to.
165 */
166 void ready() override;
167 /** @} // Messenger Interfaces */
168 private:
169 /**
170 * @defgroup Inner classes
171 * @{
172 */
173
174 public:
175 Accepter accepter;
176 DispatchQueue dispatch_queue;
177
178 friend class Accepter;
179
180 /**
181 * Register a new pipe for accept
182 *
183 * @param sd socket
184 */
185 Pipe *add_accept_pipe(int sd);
186
187 private:
188
189 /**
190 * A thread used to tear down Pipes when they're complete.
191 */
192 class ReaperThread : public Thread {
193 SimpleMessenger *msgr;
194 public:
195 explicit ReaperThread(SimpleMessenger *m) : msgr(m) {}
196 void *entry() override {
197 msgr->reaper_entry();
198 return 0;
199 }
200 } reaper_thread;
201
202 /**
203 * @} // Inner classes
204 */
205
206 /**
207 * @defgroup Utility functions
208 * @{
209 */
210
211 /**
212 * Create a Pipe associated with the given entity (of the given type).
213 * Initiate the connection. (This function returning does not guarantee
214 * connection success.)
215 *
216 * @param addr The address of the entity to connect to.
217 * @param type The peer type of the entity at the address.
218 * @param con An existing Connection to associate with the new Pipe. If
219 * NULL, it creates a new Connection.
220 * @param first an initial message to queue on the new pipe
221 *
222 * @return a pointer to the newly-created Pipe. Caller does not own a
223 * reference; take one if you need it.
224 */
225 Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con,
226 Message *first);
227 /**
228 * Send a message, lazily or not.
229 * This just glues send_message together and passes
230 * the input on to submit_message.
231 */
232 int _send_message(Message *m, const entity_inst_t& dest);
233 /**
234 * Same as above, but for the Connection-based variants.
235 */
236 int _send_message(Message *m, Connection *con);
237 /**
238 * Queue up a Message for delivery to the entity specified
239 * by addr and dest_type.
240 * submit_message() is responsible for creating
241 * new Pipes (and closing old ones) as necessary.
242 *
243 * @param m The Message to queue up. This function eats a reference.
244 * @param con The existing Connection to use, or NULL if you don't know of one.
245 * @param addr The address to send the Message to.
246 * @param dest_type The peer type of the address we're sending to
247 * just drop silently under failure.
248 * @param already_locked If false, submit_message() will acquire the
249 * SimpleMessenger lock before accessing shared data structures; otherwise
250 * it will assume the lock is held. NOTE: if you are making a request
251 * without locking, you MUST have filled in the con with a valid pointer.
252 */
253 void submit_message(Message *m, PipeConnection *con,
254 const entity_addr_t& addr, int dest_type,
255 bool already_locked);
256 /**
257 * Look through the pipes in the pipe_reap_queue and tear them down.
258 */
259 void reaper();
260 /**
261 * @} // Utility functions
262 */
263
264 // SimpleMessenger stuff
265 /// approximately unique ID set by the Constructor for use in entity_addr_t
266 uint64_t nonce;
267 /// overall lock used for SimpleMessenger data structures
268 Mutex lock;
269 /// true, specifying we haven't learned our addr; set false when we find it.
270 // maybe this should be protected by the lock?
271 bool need_addr;
272
273 public:
274 bool get_need_addr() const { return need_addr; }
275
276 private:
277 /**
278 * false; set to true if the SimpleMessenger bound to a specific address;
279 * and set false again by Accepter::stop(). This isn't lock-protected
280 * since you shouldn't be able to race the only writers.
281 */
282 bool did_bind;
283 /// counter for the global seq our connection protocol uses
284 __u32 global_seq;
285 /// lock to protect the global_seq
286 ceph_spinlock_t global_seq_lock;
287
288 /**
289 * hash map of addresses to Pipes
290 *
291 * NOTE: a Pipe* with state CLOSED may still be in the map but is considered
292 * invalid and can be replaced by anyone holding the msgr lock
293 */
294 ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe;
295 /**
296 * list of pipes are in teh process of accepting
297 *
298 * These are not yet in the rank_pipe map.
299 */
300 set<Pipe*> accepting_pipes;
301 /// a set of all the Pipes we have which are somehow active
302 set<Pipe*> pipes;
303 /// a list of Pipes we want to tear down
304 list<Pipe*> pipe_reap_queue;
305
306 /// internal cluster protocol version, if any, for talking to entities of the same type.
307 int cluster_protocol;
308
309 Cond stop_cond;
310 bool stopped = true;
311
312 bool reaper_started, reaper_stop;
313 Cond reaper_cond;
314
315 /// This Cond is slept on by wait() and signaled by dispatch_entry()
316 Cond wait_cond;
317
318 friend class Pipe;
319
320 Pipe *_lookup_pipe(const entity_addr_t& k) {
321 ceph::unordered_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(k);
322 if (p == rank_pipe.end())
323 return NULL;
324 // see lock cribbing in Pipe::fault()
325 if (p->second->state_closed)
326 return NULL;
327 return p->second;
328 }
329
330 public:
331
332 int timeout;
333
334 /// con used for sending messages to ourselves
335 ConnectionRef local_connection;
336
337 /**
338 * @defgroup SimpleMessenger internals
339 * @{
340 */
341
342 /**
343 * This wraps ms_deliver_get_authorizer. We use it for Pipe.
344 */
345 AuthAuthorizer *get_authorizer(int peer_type, bool force_new);
346 /**
347 * This wraps ms_deliver_verify_authorizer; we use it for Pipe.
348 */
349 bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth,
350 bufferlist& auth_reply,
351 bool& isvalid,CryptoKey& session_key,
352 std::unique_ptr<AuthAuthorizerChallenge> *challenge);
353 /**
354 * Increment the global sequence for this SimpleMessenger and return it.
355 * This is for the connect protocol, although it doesn't hurt if somebody
356 * else calls it.
357 *
358 * @return a global sequence ID that nobody else has seen.
359 */
360 __u32 get_global_seq(__u32 old=0) {
361 ceph_spin_lock(&global_seq_lock);
362 if (old > global_seq)
363 global_seq = old;
364 __u32 ret = ++global_seq;
365 ceph_spin_unlock(&global_seq_lock);
366 return ret;
367 }
368 /**
369 * Get the protocol version we support for the given peer type: either
370 * a peer protocol (if it matches our own), the protocol version for the
371 * peer (if we're connecting), or our protocol version (if we're accepting).
372 */
373 int get_proto_version(int peer_type, bool connect);
374
375 /**
376 * Fill in the features, address and peer type for the local connection, which
377 * is used for delivering messages back to ourself.
378 */
379 void init_local_connection();
380 /**
381 * Tell the SimpleMessenger its full IP address.
382 *
383 * This is used by Pipes when connecting to other endpoints, and
384 * probably shouldn't be called by anybody else.
385 */
386 void learned_addr(const entity_addr_t& peer_addr_for_me);
387
388 /**
389 * This function is used by the reaper thread. As long as nobody
390 * has set reaper_stop, it calls the reaper function, then
391 * waits to be signaled when it needs to reap again (or when it needs
392 * to stop).
393 */
394 void reaper_entry();
395 /**
396 * Add a pipe to the pipe_reap_queue, to be torn down on
397 * the next call to reaper().
398 * It should really only be the Pipe calling this, in our current
399 * implementation.
400 *
401 * @param pipe A Pipe which has stopped its threads and is
402 * ready to be torn down.
403 */
404 void queue_reap(Pipe *pipe);
405
406 /**
407 * Used to get whether this connection ready to send
408 */
409 bool is_connected(Connection *con);
410 /**
411 * @} // SimpleMessenger Internals
412 */
413 } ;
414
415 #endif /* CEPH_SIMPLEMESSENGER_H */