]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/AsyncMessenger.h
import ceph quincy 17.2.6
[ceph.git] / ceph / src / msg / async / AsyncMessenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_ASYNCMESSENGER_H
18 #define CEPH_ASYNCMESSENGER_H
19
20 #include <map>
21 #include <optional>
22
23 #include "include/types.h"
24 #include "include/xlist.h"
25 #include "include/spinlock.h"
26 #include "include/unordered_map.h"
27 #include "include/unordered_set.h"
28
29 #include "common/ceph_mutex.h"
30 #include "common/Cond.h"
31 #include "common/Thread.h"
32
33 #include "msg/SimplePolicyMessenger.h"
34 #include "msg/DispatchQueue.h"
35 #include "AsyncConnection.h"
36 #include "Event.h"
37
38 #include "include/ceph_assert.h"
39
40 class AsyncMessenger;
41
42 /**
43 * If the Messenger binds to a specific address, the Processor runs
44 * and listens for incoming connections.
45 */
46 class Processor {
47 AsyncMessenger *msgr;
48 ceph::NetHandler net;
49 Worker *worker;
50 std::vector<ServerSocket> listen_sockets;
51 EventCallbackRef listen_handler;
52
53 class C_processor_accept;
54
55 public:
56 Processor(AsyncMessenger *r, Worker *w, CephContext *c);
57 ~Processor() { delete listen_handler; };
58
59 void stop();
60 int bind(const entity_addrvec_t &bind_addrs,
61 const std::set<int>& avoid_ports,
62 entity_addrvec_t* bound_addrs);
63 void start();
64 void accept();
65 };
66
67 /*
68 * AsyncMessenger is represented for maintaining a set of asynchronous connections,
69 * it may own a bind address and the accepted connections will be managed by
70 * AsyncMessenger.
71 *
72 */
73
74 class AsyncMessenger : public SimplePolicyMessenger {
75 // First we have the public Messenger interface implementation...
76 public:
77 /**
78 * Initialize the AsyncMessenger!
79 *
80 * @param cct The CephContext to use
81 * @param name The name to assign ourselves
82 * _nonce A unique ID to use for this AsyncMessenger. It should not
83 * be a value that will be repeated if the daemon restarts.
84 */
85 AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
86 std::string mname, uint64_t _nonce);
87
88 /**
89 * Destroy the AsyncMessenger. Pretty simple since all the work is done
90 * elsewhere.
91 */
92 ~AsyncMessenger() override;
93
94 /** @defgroup Accessors
95 * @{
96 */
97 bool set_addr_unknowns(const entity_addrvec_t &addr) override;
98
99 int get_dispatch_queue_len() override {
100 return dispatch_queue.get_queue_len();
101 }
102
103 double get_dispatch_queue_max_age(utime_t now) override {
104 return dispatch_queue.get_max_age(now);
105 }
106 /** @} Accessors */
107
108 /**
109 * @defgroup Configuration functions
110 * @{
111 */
112 void set_cluster_protocol(int p) override {
113 ceph_assert(!started && !did_bind);
114 cluster_protocol = p;
115 }
116
117 int bind(const entity_addr_t& bind_addr,
118 std::optional<entity_addrvec_t> public_addrs=std::nullopt) override;
119 int rebind(const std::set<int>& avoid_ports) override;
120 int bindv(const entity_addrvec_t& bind_addrs,
121 std::optional<entity_addrvec_t> public_addrs=std::nullopt) override;
122
123 int client_bind(const entity_addr_t& bind_addr) override;
124
125 int client_reset() override;
126
127 bool should_use_msgr2() override;
128
129 /** @} Configuration functions */
130
131 /**
132 * @defgroup Startup/Shutdown
133 * @{
134 */
135 int start() override;
136 void wait() override;
137 int shutdown() override;
138
139 /** @} // Startup/Shutdown */
140
141 /**
142 * @defgroup Messaging
143 * @{
144 */
145 int send_to(Message *m, int type, const entity_addrvec_t& addrs) override;
146
147 /** @} // Messaging */
148
149 /**
150 * @defgroup Connection Management
151 * @{
152 */
153 ConnectionRef connect_to(int type,
154 const entity_addrvec_t& addrs,
155 bool anon, bool not_local_dest=false) override;
156 ConnectionRef get_loopback_connection() override;
157 void mark_down(const entity_addr_t& addr) override {
158 mark_down_addrs(entity_addrvec_t(addr));
159 }
160 void mark_down_addrs(const entity_addrvec_t& addrs) override;
161 void mark_down_all() override {
162 shutdown_connections(true);
163 }
164 /** @} // Connection Management */
165
166 /**
167 * @defgroup Inner classes
168 * @{
169 */
170
171 /**
172 * @} // Inner classes
173 */
174
175 protected:
176 /**
177 * @defgroup Messenger Interfaces
178 * @{
179 */
180 /**
181 * Start up the DispatchQueue thread once we have somebody to dispatch to.
182 */
183 void ready() override;
184 /** @} // Messenger Interfaces */
185
186 private:
187
188 /**
189 * @defgroup Utility functions
190 * @{
191 */
192
193 /**
194 * Create a connection associated with the given entity (of the given type).
195 * Initiate the connection. (This function returning does not guarantee
196 * connection success.)
197 *
198 * @param addrs The address(es) of the entity to connect to.
199 * @param type The peer type of the entity at the address.
200 *
201 * @return a pointer to the newly-created connection. Caller does not own a
202 * reference; take one if you need it.
203 */
204 AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type,
205 bool anon);
206
207
208 void _finish_bind(const entity_addrvec_t& bind_addrs,
209 const entity_addrvec_t& listen_addrs);
210
211 entity_addrvec_t _filter_addrs(const entity_addrvec_t& addrs);
212
213 private:
214 NetworkStack *stack;
215 std::vector<Processor*> processors;
216 friend class Processor;
217 DispatchQueue dispatch_queue;
218
219 // the worker run messenger's cron jobs
220 Worker *local_worker;
221
222 std::string ms_type;
223
224 /// overall lock used for AsyncMessenger data structures
225 ceph::mutex lock = ceph::make_mutex("AsyncMessenger::lock");
226 // AsyncMessenger stuff
227 /// approximately unique ID set by the Constructor for use in entity_addr_t
228 uint64_t nonce;
229
230 /// true, specifying we haven't learned our addr; set false when we find it.
231 // maybe this should be protected by the lock?
232 bool need_addr = true;
233
234 /**
235 * set to bind addresses if bind or bindv were called before NetworkStack
236 * was ready to bind
237 */
238 entity_addrvec_t pending_bind_addrs;
239
240 /**
241 * set to public addresses (those announced by the msgr's protocols).
242 * they are stored to handle the cases when either:
243 * a) bind or bindv were called before NetworkStack was ready to bind,
244 * b) rebind is called down the road.
245 */
246 std::optional<entity_addrvec_t> saved_public_addrs;
247
248 /**
249 * false; set to true if a pending bind exists
250 */
251 bool pending_bind = false;
252
253 /**
254 * The following aren't lock-protected since you shouldn't be able to race
255 * the only writers.
256 */
257
258 /**
259 * false; set to true if the AsyncMessenger bound to a specific address;
260 * and set false again by Accepter::stop().
261 */
262 bool did_bind = false;
263 /// counter for the global seq our connection protocol uses
264 __u32 global_seq = 0;
265 /// lock to protect the global_seq
266 ceph::spinlock global_seq_lock;
267
268 /**
269 * hash map of addresses to Asyncconnection
270 *
271 * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
272 * invalid and can be replaced by anyone holding the msgr lock
273 */
274 ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns;
275
276 /**
277 * list of connection are in the process of accepting
278 *
279 * These are not yet in the conns map.
280 */
281 std::set<AsyncConnectionRef> accepting_conns;
282
283 /// anonymous outgoing connections
284 std::set<AsyncConnectionRef> anon_conns;
285
286 /**
287 * list of connection are closed which need to be clean up
288 *
289 * Because AsyncMessenger and AsyncConnection follow a lock rule that
290 * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock
291 * but can't reversed. This rule is aimed to avoid dead lock.
292 * So if AsyncConnection want to unregister itself from AsyncMessenger,
293 * we pick up this idea that just queue itself to this set and do lazy
294 * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
295 * AsyncConnection in this set.
296 */
297 ceph::mutex deleted_lock = ceph::make_mutex("AsyncMessenger::deleted_lock");
298 std::set<AsyncConnectionRef> deleted_conns;
299
300 EventCallbackRef reap_handler;
301
302 /// internal cluster protocol version, if any, for talking to entities of the same type.
303 int cluster_protocol = 0;
304
305 ceph::condition_variable stop_cond;
306 bool stopped = true;
307
308 /* You must hold this->lock for the duration of use! */
309 const auto& _lookup_conn(const entity_addrvec_t& k) {
310 static const AsyncConnectionRef nullref;
311 ceph_assert(ceph_mutex_is_locked(lock));
312 auto p = conns.find(k);
313 if (p == conns.end()) {
314 return nullref;
315 }
316
317 // lazy delete, see "deleted_conns"
318 // don't worry omit, Connection::send_message can handle this case.
319 if (p->second->is_unregistered()) {
320 std::lock_guard l{deleted_lock};
321 if (deleted_conns.erase(p->second)) {
322 p->second->get_perf_counter()->dec(l_msgr_active_connections);
323 conns.erase(p);
324 return nullref;
325 }
326 }
327
328 return p->second;
329 }
330
331 void _init_local_connection() {
332 ceph_assert(ceph_mutex_is_locked(lock));
333 local_connection->peer_addrs = *my_addrs;
334 local_connection->peer_type = my_name.type();
335 local_connection->set_features(CEPH_FEATURES_ALL);
336 ms_deliver_handle_fast_connect(local_connection.get());
337 }
338
339 void shutdown_connections(bool queue_reset);
340
341 public:
342
343 /// con used for sending messages to ourselves
344 AsyncConnectionRef local_connection;
345
346 /**
347 * @defgroup AsyncMessenger internals
348 * @{
349 */
350 /**
351 * This wraps _lookup_conn.
352 */
353 AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) {
354 std::lock_guard l{lock};
355 return _lookup_conn(k); /* make new ref! */
356 }
357
358 int accept_conn(const AsyncConnectionRef& conn);
359 bool learned_addr(const entity_addr_t &peer_addr_for_me);
360 void add_accept(Worker *w, ConnectedSocket cli_socket,
361 const entity_addr_t &listen_addr,
362 const entity_addr_t &peer_addr);
363 NetworkStack *get_stack() {
364 return stack;
365 }
366
367 uint64_t get_nonce() const {
368 return nonce;
369 }
370
371 /**
372 * Increment the global sequence for this AsyncMessenger and return it.
373 * This is for the connect protocol, although it doesn't hurt if somebody
374 * else calls it.
375 *
376 * @return a global sequence ID that nobody else has seen.
377 */
378 __u32 get_global_seq(__u32 old=0) {
379 std::lock_guard<ceph::spinlock> lg(global_seq_lock);
380
381 if (old > global_seq)
382 global_seq = old;
383 __u32 ret = ++global_seq;
384
385 return ret;
386 }
387 /**
388 * Get the protocol version we support for the given peer type: either
389 * a peer protocol (if it matches our own), the protocol version for the
390 * peer (if we're connecting), or our protocol version (if we're accepting).
391 */
392 int get_proto_version(int peer_type, bool connect) const;
393
394 /**
395 * Fill in the address and peer type for the local connection, which
396 * is used for delivering messages back to ourself.
397 */
398 void init_local_connection() {
399 std::lock_guard l{lock};
400 local_connection->is_loopback = true;
401 _init_local_connection();
402 }
403
404 /**
405 * Unregister connection from `conns`
406 *
407 * See "deleted_conns"
408 */
409 void unregister_conn(const AsyncConnectionRef& conn) {
410 std::lock_guard l{deleted_lock};
411 deleted_conns.emplace(std::move(conn));
412 conn->unregister();
413
414 if (deleted_conns.size() >= cct->_conf->ms_async_reap_threshold) {
415 local_worker->center.dispatch_event_external(reap_handler);
416 }
417 }
418
419 /**
420 * Reap dead connection from `deleted_conns`
421 *
422 * @return the number of dead connections
423 *
424 * See "deleted_conns"
425 */
426 void reap_dead();
427
428 /**
429 * @} // AsyncMessenger Internals
430 */
431 } ;
432
433 #endif /* CEPH_ASYNCMESSENGER_H */