]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/AsyncMessenger.h
update sources to 12.2.7
[ceph.git] / ceph / src / msg / async / AsyncMessenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_ASYNCMESSENGER_H
18 #define CEPH_ASYNCMESSENGER_H
19
20 #include "include/types.h"
21 #include "include/xlist.h"
22
23 #include <map>
24 using namespace std;
25 #include "include/unordered_map.h"
26 #include "include/unordered_set.h"
27
28 #include "common/Mutex.h"
29 #include "common/Cond.h"
30 #include "common/Thread.h"
31
32 #include "include/Spinlock.h"
33
34 #include "msg/SimplePolicyMessenger.h"
35 #include "msg/DispatchQueue.h"
36 #include "include/assert.h"
37 #include "AsyncConnection.h"
38 #include "Event.h"
39
40
41 class AsyncMessenger;
42
43 /**
44 * If the Messenger binds to a specific address, the Processor runs
45 * and listens for incoming connections.
46 */
47 class Processor {
48 AsyncMessenger *msgr;
49 NetHandler net;
50 Worker *worker;
51 ServerSocket listen_socket;
52 EventCallbackRef listen_handler;
53
54 class C_processor_accept;
55
56 public:
57 Processor(AsyncMessenger *r, Worker *w, CephContext *c);
58 ~Processor() { delete listen_handler; };
59
60 void stop();
61 int bind(const entity_addr_t &bind_addr,
62 const set<int>& avoid_ports,
63 entity_addr_t* bound_addr);
64 void start();
65 void accept();
66 };
67
68 /*
69 * AsyncMessenger is represented for maintaining a set of asynchronous connections,
70 * it may own a bind address and the accepted connections will be managed by
71 * AsyncMessenger.
72 *
73 */
74
75 class AsyncMessenger : public SimplePolicyMessenger {
76 // First we have the public Messenger interface implementation...
77 public:
78 /**
79 * Initialize the AsyncMessenger!
80 *
81 * @param cct The CephContext to use
82 * @param name The name to assign ourselves
83 * _nonce A unique ID to use for this AsyncMessenger. It should not
84 * be a value that will be repeated if the daemon restarts.
85 */
86 AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
87 string mname, uint64_t _nonce);
88
89 /**
90 * Destroy the AsyncMessenger. Pretty simple since all the work is done
91 * elsewhere.
92 */
93 ~AsyncMessenger() override;
94
95 /** @defgroup Accessors
96 * @{
97 */
98 void set_addr_unknowns(const entity_addr_t &addr) override;
99 void set_addr(const entity_addr_t &addr) override;
100
101 int get_dispatch_queue_len() override {
102 return dispatch_queue.get_queue_len();
103 }
104
105 double get_dispatch_queue_max_age(utime_t now) override {
106 return dispatch_queue.get_max_age(now);
107 }
108 /** @} Accessors */
109
110 /**
111 * @defgroup Configuration functions
112 * @{
113 */
114 void set_cluster_protocol(int p) override {
115 assert(!started && !did_bind);
116 cluster_protocol = p;
117 }
118
119 int bind(const entity_addr_t& bind_addr) override;
120 int rebind(const set<int>& avoid_ports) override;
121 int client_bind(const entity_addr_t& bind_addr) override;
122
123 /** @} Configuration functions */
124
125 /**
126 * @defgroup Startup/Shutdown
127 * @{
128 */
129 int start() override;
130 void wait() override;
131 int shutdown() override;
132
133 /** @} // Startup/Shutdown */
134
135 /**
136 * @defgroup Messaging
137 * @{
138 */
139 int send_message(Message *m, const entity_inst_t& dest) override {
140 Mutex::Locker l(lock);
141
142 return _send_message(m, dest);
143 }
144
145 /** @} // Messaging */
146
147 /**
148 * @defgroup Connection Management
149 * @{
150 */
151 ConnectionRef get_connection(const entity_inst_t& dest) override;
152 ConnectionRef get_loopback_connection() override;
153 void mark_down(const entity_addr_t& addr) override;
154 void mark_down_all() override {
155 shutdown_connections(true);
156 }
157 /** @} // Connection Management */
158
159 /**
160 * @defgroup Inner classes
161 * @{
162 */
163
164 /**
165 * @} // Inner classes
166 */
167
168 protected:
169 /**
170 * @defgroup Messenger Interfaces
171 * @{
172 */
173 /**
174 * Start up the DispatchQueue thread once we have somebody to dispatch to.
175 */
176 void ready() override;
177 /** @} // Messenger Interfaces */
178
179 private:
180
181 /**
182 * @defgroup Utility functions
183 * @{
184 */
185
186 /**
187 * Create a connection associated with the given entity (of the given type).
188 * Initiate the connection. (This function returning does not guarantee
189 * connection success.)
190 *
191 * @param addr The address of the entity to connect to.
192 * @param type The peer type of the entity at the address.
193 *
194 * @return a pointer to the newly-created connection. Caller does not own a
195 * reference; take one if you need it.
196 */
197 AsyncConnectionRef create_connect(const entity_addr_t& addr, int type);
198
199 /**
200 * Queue up a Message for delivery to the entity specified
201 * by addr and dest_type.
202 * submit_message() is responsible for creating
203 * new AsyncConnection (and closing old ones) as necessary.
204 *
205 * @param m The Message to queue up. This function eats a reference.
206 * @param con The existing Connection to use, or NULL if you don't know of one.
207 * @param dest_addr The address to send the Message to.
208 * @param dest_type The peer type of the address we're sending to
209 * just drop silently under failure.
210 */
211 void submit_message(Message *m, AsyncConnectionRef con,
212 const entity_addr_t& dest_addr, int dest_type);
213
214 int _send_message(Message *m, const entity_inst_t& dest);
215 void _finish_bind(const entity_addr_t& bind_addr,
216 const entity_addr_t& listen_addr);
217
218 private:
219 static const uint64_t ReapDeadConnectionThreshold = 5;
220
221 NetworkStack *stack;
222 std::vector<Processor*> processors;
223 friend class Processor;
224 DispatchQueue dispatch_queue;
225
226 // the worker run messenger's cron jobs
227 Worker *local_worker;
228
229 std::string ms_type;
230
231 /// overall lock used for AsyncMessenger data structures
232 Mutex lock;
233 // AsyncMessenger stuff
234 /// approximately unique ID set by the Constructor for use in entity_addr_t
235 uint64_t nonce;
236
237 /// true, specifying we haven't learned our addr; set false when we find it.
238 // maybe this should be protected by the lock?
239 bool need_addr;
240
241 /**
242 * set to bind address if bind was called before NetworkStack was ready to
243 * bind
244 */
245 entity_addr_t pending_bind_addr;
246
247 /**
248 * false; set to true if a pending bind exists
249 */
250 bool pending_bind = false;
251
252 /**
253 * The following aren't lock-protected since you shouldn't be able to race
254 * the only writers.
255 */
256
257 /**
258 * false; set to true if the AsyncMessenger bound to a specific address;
259 * and set false again by Accepter::stop().
260 */
261 bool did_bind;
262 /// counter for the global seq our connection protocol uses
263 __u32 global_seq;
264 /// lock to protect the global_seq
265 ceph_spinlock_t global_seq_lock;
266
267 /**
268 * hash map of addresses to Asyncconnection
269 *
270 * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
271 * invalid and can be replaced by anyone holding the msgr lock
272 */
273 ceph::unordered_map<entity_addr_t, AsyncConnectionRef> conns;
274
275 /**
276 * list of connection are in teh process of accepting
277 *
278 * These are not yet in the conns map.
279 */
280 set<AsyncConnectionRef> accepting_conns;
281
282 /**
283 * list of connection are closed which need to be clean up
284 *
285 * Because AsyncMessenger and AsyncConnection follow a lock rule that
286 * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock
287 * but can't reversed. This rule is aimed to avoid dead lock.
288 * So if AsyncConnection want to unregister itself from AsyncMessenger,
289 * we pick up this idea that just queue itself to this set and do lazy
290 * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
291 * AsyncConnection in this set.
292 */
293 Mutex deleted_lock;
294 set<AsyncConnectionRef> deleted_conns;
295
296 EventCallbackRef reap_handler;
297
298 /// internal cluster protocol version, if any, for talking to entities of the same type.
299 int cluster_protocol;
300
301 Cond stop_cond;
302 bool stopped;
303
304 AsyncConnectionRef _lookup_conn(const entity_addr_t& k) {
305 assert(lock.is_locked());
306 ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
307 if (p == conns.end())
308 return NULL;
309
310 // lazy delete, see "deleted_conns"
311 Mutex::Locker l(deleted_lock);
312 if (deleted_conns.erase(p->second)) {
313 p->second->get_perf_counter()->dec(l_msgr_active_connections);
314 conns.erase(p);
315 return NULL;
316 }
317
318 return p->second;
319 }
320
321 void _init_local_connection() {
322 assert(lock.is_locked());
323 local_connection->peer_addr = my_inst.addr;
324 local_connection->peer_type = my_inst.name.type();
325 local_connection->set_features(CEPH_FEATURES_ALL);
326 ms_deliver_handle_fast_connect(local_connection.get());
327 }
328
329 void shutdown_connections(bool queue_reset);
330
331 public:
332
333 /// con used for sending messages to ourselves
334 ConnectionRef local_connection;
335
336 /**
337 * @defgroup AsyncMessenger internals
338 * @{
339 */
340 /**
341 * This wraps _lookup_conn.
342 */
343 AsyncConnectionRef lookup_conn(const entity_addr_t& k) {
344 Mutex::Locker l(lock);
345 return _lookup_conn(k);
346 }
347
348 int accept_conn(AsyncConnectionRef conn) {
349 Mutex::Locker l(lock);
350 auto it = conns.find(conn->peer_addr);
351 if (it != conns.end()) {
352 AsyncConnectionRef existing = it->second;
353
354 // lazy delete, see "deleted_conns"
355 // If conn already in, we will return 0
356 Mutex::Locker l(deleted_lock);
357 if (deleted_conns.erase(existing)) {
358 existing->get_perf_counter()->dec(l_msgr_active_connections);
359 conns.erase(it);
360 } else if (conn != existing) {
361 return -1;
362 }
363 }
364 conns[conn->peer_addr] = conn;
365 conn->get_perf_counter()->inc(l_msgr_active_connections);
366 accepting_conns.erase(conn);
367 return 0;
368 }
369
370 void learned_addr(const entity_addr_t &peer_addr_for_me);
371 void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
372 NetworkStack *get_stack() {
373 return stack;
374 }
375
376 /**
377 * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.
378 */
379 AuthAuthorizer *get_authorizer(int peer_type, bool force_new) {
380 return ms_deliver_get_authorizer(peer_type, force_new);
381 }
382
383 /**
384 * This wraps ms_deliver_verify_authorizer; we use it for AsyncConnection.
385 */
386 bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, bufferlist& auth_reply,
387 bool& isvalid, CryptoKey& session_key,
388 std::unique_ptr<AuthAuthorizerChallenge> *challenge) {
389 return ms_deliver_verify_authorizer(con, peer_type, protocol, auth,
390 auth_reply, isvalid, session_key, challenge);
391 }
392 /**
393 * Increment the global sequence for this AsyncMessenger and return it.
394 * This is for the connect protocol, although it doesn't hurt if somebody
395 * else calls it.
396 *
397 * @return a global sequence ID that nobody else has seen.
398 */
399 __u32 get_global_seq(__u32 old=0) {
400 ceph_spin_lock(&global_seq_lock);
401 if (old > global_seq)
402 global_seq = old;
403 __u32 ret = ++global_seq;
404 ceph_spin_unlock(&global_seq_lock);
405 return ret;
406 }
407 /**
408 * Get the protocol version we support for the given peer type: either
409 * a peer protocol (if it matches our own), the protocol version for the
410 * peer (if we're connecting), or our protocol version (if we're accepting).
411 */
412 int get_proto_version(int peer_type, bool connect) const;
413
414 /**
415 * Fill in the address and peer type for the local connection, which
416 * is used for delivering messages back to ourself.
417 */
418 void init_local_connection() {
419 Mutex::Locker l(lock);
420 _init_local_connection();
421 }
422
423 /**
424 * Unregister connection from `conns`
425 *
426 * See "deleted_conns"
427 */
428 void unregister_conn(AsyncConnectionRef conn) {
429 Mutex::Locker l(deleted_lock);
430 deleted_conns.insert(conn);
431
432 if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
433 local_worker->center.dispatch_event_external(reap_handler);
434 }
435 }
436
437 /**
438 * Reap dead connection from `deleted_conns`
439 *
440 * @return the number of dead connections
441 *
442 * See "deleted_conns"
443 */
444 int reap_dead();
445
446 /**
447 * @} // AsyncMessenger Internals
448 */
449 } ;
450
451 #endif /* CEPH_ASYNCMESSENGER_H */