]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/AsyncMessenger.h
bump version to 15.2.4-pve1
[ceph.git] / ceph / src / msg / async / AsyncMessenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_ASYNCMESSENGER_H
18 #define CEPH_ASYNCMESSENGER_H
19
20 #include <map>
21
22 #include "include/types.h"
23 #include "include/xlist.h"
24 #include "include/spinlock.h"
25 #include "include/unordered_map.h"
26 #include "include/unordered_set.h"
27
28 #include "common/ceph_mutex.h"
29 #include "common/Cond.h"
30 #include "common/Thread.h"
31
32 #include "msg/SimplePolicyMessenger.h"
33 #include "msg/DispatchQueue.h"
34 #include "AsyncConnection.h"
35 #include "Event.h"
36
37 #include "include/ceph_assert.h"
38
39 class AsyncMessenger;
40
41 /**
42 * If the Messenger binds to a specific address, the Processor runs
43 * and listens for incoming connections.
44 */
45 class Processor {
46 AsyncMessenger *msgr;
47 NetHandler net;
48 Worker *worker;
49 vector<ServerSocket> listen_sockets;
50 EventCallbackRef listen_handler;
51
52 class C_processor_accept;
53
54 public:
55 Processor(AsyncMessenger *r, Worker *w, CephContext *c);
56 ~Processor() { delete listen_handler; };
57
58 void stop();
59 int bind(const entity_addrvec_t &bind_addrs,
60 const set<int>& avoid_ports,
61 entity_addrvec_t* bound_addrs);
62 void start();
63 void accept();
64 };
65
66 /*
67 * AsyncMessenger is represented for maintaining a set of asynchronous connections,
68 * it may own a bind address and the accepted connections will be managed by
69 * AsyncMessenger.
70 *
71 */
72
73 class AsyncMessenger : public SimplePolicyMessenger {
74 // First we have the public Messenger interface implementation...
75 public:
76 /**
77 * Initialize the AsyncMessenger!
78 *
79 * @param cct The CephContext to use
80 * @param name The name to assign ourselves
81 * _nonce A unique ID to use for this AsyncMessenger. It should not
82 * be a value that will be repeated if the daemon restarts.
83 */
84 AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
85 string mname, uint64_t _nonce);
86
87 /**
88 * Destroy the AsyncMessenger. Pretty simple since all the work is done
89 * elsewhere.
90 */
91 ~AsyncMessenger() override;
92
93 /** @defgroup Accessors
94 * @{
95 */
96 bool set_addr_unknowns(const entity_addrvec_t &addr) override;
97 void set_addrs(const entity_addrvec_t &addrs) override;
98
99 int get_dispatch_queue_len() override {
100 return dispatch_queue.get_queue_len();
101 }
102
103 double get_dispatch_queue_max_age(utime_t now) override {
104 return dispatch_queue.get_max_age(now);
105 }
106 /** @} Accessors */
107
108 /**
109 * @defgroup Configuration functions
110 * @{
111 */
112 void set_cluster_protocol(int p) override {
113 ceph_assert(!started && !did_bind);
114 cluster_protocol = p;
115 }
116
117 int bind(const entity_addr_t& bind_addr) override;
118 int rebind(const set<int>& avoid_ports) override;
119 int client_bind(const entity_addr_t& bind_addr) override;
120
121 int bindv(const entity_addrvec_t& bind_addrs) override;
122
123 bool should_use_msgr2() override;
124
125 /** @} Configuration functions */
126
127 /**
128 * @defgroup Startup/Shutdown
129 * @{
130 */
131 int start() override;
132 void wait() override;
133 int shutdown() override;
134
135 /** @} // Startup/Shutdown */
136
137 /**
138 * @defgroup Messaging
139 * @{
140 */
141 int send_to(Message *m, int type, const entity_addrvec_t& addrs) override;
142
143 /** @} // Messaging */
144
145 /**
146 * @defgroup Connection Management
147 * @{
148 */
149 ConnectionRef connect_to(int type,
150 const entity_addrvec_t& addrs,
151 bool anon, bool not_local_dest=false) override;
152 ConnectionRef get_loopback_connection() override;
153 void mark_down(const entity_addr_t& addr) override {
154 mark_down_addrs(entity_addrvec_t(addr));
155 }
156 void mark_down_addrs(const entity_addrvec_t& addrs) override;
157 void mark_down_all() override {
158 shutdown_connections(true);
159 }
160 /** @} // Connection Management */
161
162 /**
163 * @defgroup Inner classes
164 * @{
165 */
166
167 /**
168 * @} // Inner classes
169 */
170
171 protected:
172 /**
173 * @defgroup Messenger Interfaces
174 * @{
175 */
176 /**
177 * Start up the DispatchQueue thread once we have somebody to dispatch to.
178 */
179 void ready() override;
180 /** @} // Messenger Interfaces */
181
182 private:
183
184 /**
185 * @defgroup Utility functions
186 * @{
187 */
188
189 /**
190 * Create a connection associated with the given entity (of the given type).
191 * Initiate the connection. (This function returning does not guarantee
192 * connection success.)
193 *
194 * @param addrs The address(es) of the entity to connect to.
195 * @param type The peer type of the entity at the address.
196 *
197 * @return a pointer to the newly-created connection. Caller does not own a
198 * reference; take one if you need it.
199 */
200 AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type,
201 bool anon);
202
203
204 void _finish_bind(const entity_addrvec_t& bind_addrs,
205 const entity_addrvec_t& listen_addrs);
206
207 entity_addrvec_t _filter_addrs(const entity_addrvec_t& addrs);
208
209 private:
210 static const uint64_t ReapDeadConnectionThreshold = 5;
211
212 NetworkStack *stack;
213 std::vector<Processor*> processors;
214 friend class Processor;
215 DispatchQueue dispatch_queue;
216
217 // the worker run messenger's cron jobs
218 Worker *local_worker;
219
220 std::string ms_type;
221
222 /// overall lock used for AsyncMessenger data structures
223 ceph::mutex lock = ceph::make_mutex("AsyncMessenger::lock");
224 // AsyncMessenger stuff
225 /// approximately unique ID set by the Constructor for use in entity_addr_t
226 uint64_t nonce;
227
228 /// true, specifying we haven't learned our addr; set false when we find it.
229 // maybe this should be protected by the lock?
230 bool need_addr = true;
231
232 /**
233 * set to bind addresses if bind was called before NetworkStack was ready to
234 * bind
235 */
236 entity_addrvec_t pending_bind_addrs;
237
238 /**
239 * false; set to true if a pending bind exists
240 */
241 bool pending_bind = false;
242
243 /**
244 * The following aren't lock-protected since you shouldn't be able to race
245 * the only writers.
246 */
247
248 /**
249 * false; set to true if the AsyncMessenger bound to a specific address;
250 * and set false again by Accepter::stop().
251 */
252 bool did_bind = false;
253 /// counter for the global seq our connection protocol uses
254 __u32 global_seq = 0;
255 /// lock to protect the global_seq
256 ceph::spinlock global_seq_lock;
257
258 /**
259 * hash map of addresses to Asyncconnection
260 *
261 * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
262 * invalid and can be replaced by anyone holding the msgr lock
263 */
264 ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns;
265
266 /**
267 * list of connection are in the process of accepting
268 *
269 * These are not yet in the conns map.
270 */
271 set<AsyncConnectionRef> accepting_conns;
272
273 /// anonymous outgoing connections
274 set<AsyncConnectionRef> anon_conns;
275
276 /**
277 * list of connection are closed which need to be clean up
278 *
279 * Because AsyncMessenger and AsyncConnection follow a lock rule that
280 * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock
281 * but can't reversed. This rule is aimed to avoid dead lock.
282 * So if AsyncConnection want to unregister itself from AsyncMessenger,
283 * we pick up this idea that just queue itself to this set and do lazy
284 * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
285 * AsyncConnection in this set.
286 */
287 ceph::mutex deleted_lock = ceph::make_mutex("AsyncMessenger::deleted_lock");
288 set<AsyncConnectionRef> deleted_conns;
289
290 EventCallbackRef reap_handler;
291
292 /// internal cluster protocol version, if any, for talking to entities of the same type.
293 int cluster_protocol = 0;
294
295 ceph::condition_variable stop_cond;
296 bool stopped = true;
297
298 /* You must hold this->lock for the duration of use! */
299 const auto& _lookup_conn(const entity_addrvec_t& k) {
300 static const AsyncConnectionRef nullref;
301 ceph_assert(ceph_mutex_is_locked(lock));
302 auto p = conns.find(k);
303 if (p == conns.end()) {
304 return nullref;
305 }
306
307 // lazy delete, see "deleted_conns"
308 // don't worry omit, Connection::send_message can handle this case.
309 if (p->second->is_unregistered()) {
310 std::lock_guard l{deleted_lock};
311 if (deleted_conns.erase(p->second)) {
312 conns.erase(p);
313 return nullref;
314 }
315 }
316
317 return p->second;
318 }
319
320 void _init_local_connection() {
321 ceph_assert(ceph_mutex_is_locked(lock));
322 local_connection->peer_addrs = *my_addrs;
323 local_connection->peer_type = my_name.type();
324 local_connection->set_features(CEPH_FEATURES_ALL);
325 ms_deliver_handle_fast_connect(local_connection.get());
326 }
327
328 void shutdown_connections(bool queue_reset);
329
330 public:
331
332 /// con used for sending messages to ourselves
333 AsyncConnectionRef local_connection;
334
335 /**
336 * @defgroup AsyncMessenger internals
337 * @{
338 */
339 /**
340 * This wraps _lookup_conn.
341 */
342 AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) {
343 std::lock_guard l{lock};
344 return _lookup_conn(k); /* make new ref! */
345 }
346
347 int accept_conn(const AsyncConnectionRef& conn);
348 bool learned_addr(const entity_addr_t &peer_addr_for_me);
349 void add_accept(Worker *w, ConnectedSocket cli_socket,
350 const entity_addr_t &listen_addr,
351 const entity_addr_t &peer_addr);
352 NetworkStack *get_stack() {
353 return stack;
354 }
355
356 uint64_t get_nonce() const {
357 return nonce;
358 }
359
360 /**
361 * Increment the global sequence for this AsyncMessenger and return it.
362 * This is for the connect protocol, although it doesn't hurt if somebody
363 * else calls it.
364 *
365 * @return a global sequence ID that nobody else has seen.
366 */
367 __u32 get_global_seq(__u32 old=0) {
368 std::lock_guard<ceph::spinlock> lg(global_seq_lock);
369
370 if (old > global_seq)
371 global_seq = old;
372 __u32 ret = ++global_seq;
373
374 return ret;
375 }
376 /**
377 * Get the protocol version we support for the given peer type: either
378 * a peer protocol (if it matches our own), the protocol version for the
379 * peer (if we're connecting), or our protocol version (if we're accepting).
380 */
381 int get_proto_version(int peer_type, bool connect) const;
382
383 /**
384 * Fill in the address and peer type for the local connection, which
385 * is used for delivering messages back to ourself.
386 */
387 void init_local_connection() {
388 std::lock_guard l{lock};
389 local_connection->is_loopback = true;
390 _init_local_connection();
391 }
392
393 /**
394 * Unregister connection from `conns`
395 *
396 * See "deleted_conns"
397 */
398 void unregister_conn(const AsyncConnectionRef& conn) {
399 std::lock_guard l{deleted_lock};
400 if (!accepting_conns.count(conn) || anon_conns.count(conn))
401 conn->get_perf_counter()->dec(l_msgr_active_connections);
402 deleted_conns.emplace(std::move(conn));
403 conn->unregister();
404
405 if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
406 local_worker->center.dispatch_event_external(reap_handler);
407 }
408 }
409
410 /**
411 * Reap dead connection from `deleted_conns`
412 *
413 * @return the number of dead connections
414 *
415 * See "deleted_conns"
416 */
417 void reap_dead();
418
419 /**
420 * @} // AsyncMessenger Internals
421 */
422 } ;
423
424 #endif /* CEPH_ASYNCMESSENGER_H */