]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/AsyncMessenger.h
import 15.2.5
[ceph.git] / ceph / src / msg / async / AsyncMessenger.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#ifndef CEPH_ASYNCMESSENGER_H
18#define CEPH_ASYNCMESSENGER_H
19
11fdf7f2 20#include <map>
11fdf7f2 21
7c673cae
FG
22#include "include/types.h"
23#include "include/xlist.h"
11fdf7f2 24#include "include/spinlock.h"
7c673cae
FG
25#include "include/unordered_map.h"
26#include "include/unordered_set.h"
27
9f95a23c 28#include "common/ceph_mutex.h"
7c673cae
FG
29#include "common/Cond.h"
30#include "common/Thread.h"
31
32#include "msg/SimplePolicyMessenger.h"
33#include "msg/DispatchQueue.h"
7c673cae
FG
34#include "AsyncConnection.h"
35#include "Event.h"
36
11fdf7f2 37#include "include/ceph_assert.h"
7c673cae
FG
38
39class AsyncMessenger;
40
41/**
42 * If the Messenger binds to a specific address, the Processor runs
43 * and listens for incoming connections.
44 */
45class Processor {
46 AsyncMessenger *msgr;
47 NetHandler net;
48 Worker *worker;
11fdf7f2 49 vector<ServerSocket> listen_sockets;
7c673cae
FG
50 EventCallbackRef listen_handler;
51
52 class C_processor_accept;
53
54 public:
55 Processor(AsyncMessenger *r, Worker *w, CephContext *c);
56 ~Processor() { delete listen_handler; };
57
58 void stop();
11fdf7f2 59 int bind(const entity_addrvec_t &bind_addrs,
7c673cae 60 const set<int>& avoid_ports,
11fdf7f2 61 entity_addrvec_t* bound_addrs);
7c673cae
FG
62 void start();
63 void accept();
64};
65
66/*
67 * AsyncMessenger is represented for maintaining a set of asynchronous connections,
68 * it may own a bind address and the accepted connections will be managed by
69 * AsyncMessenger.
70 *
71 */
72
73class AsyncMessenger : public SimplePolicyMessenger {
74 // First we have the public Messenger interface implementation...
75public:
76 /**
77 * Initialize the AsyncMessenger!
78 *
79 * @param cct The CephContext to use
80 * @param name The name to assign ourselves
81 * _nonce A unique ID to use for this AsyncMessenger. It should not
82 * be a value that will be repeated if the daemon restarts.
83 */
84 AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
85 string mname, uint64_t _nonce);
86
87 /**
88 * Destroy the AsyncMessenger. Pretty simple since all the work is done
89 * elsewhere.
90 */
91 ~AsyncMessenger() override;
92
93 /** @defgroup Accessors
94 * @{
95 */
11fdf7f2
TL
96 bool set_addr_unknowns(const entity_addrvec_t &addr) override;
97 void set_addrs(const entity_addrvec_t &addrs) override;
7c673cae
FG
98
99 int get_dispatch_queue_len() override {
100 return dispatch_queue.get_queue_len();
101 }
102
103 double get_dispatch_queue_max_age(utime_t now) override {
104 return dispatch_queue.get_max_age(now);
105 }
106 /** @} Accessors */
107
108 /**
109 * @defgroup Configuration functions
110 * @{
111 */
112 void set_cluster_protocol(int p) override {
11fdf7f2 113 ceph_assert(!started && !did_bind);
7c673cae
FG
114 cluster_protocol = p;
115 }
116
117 int bind(const entity_addr_t& bind_addr) override;
118 int rebind(const set<int>& avoid_ports) override;
f6b5b4d7
TL
119 int bindv(const entity_addrvec_t& bind_addrs) override;
120
7c673cae
FG
121 int client_bind(const entity_addr_t& bind_addr) override;
122
f6b5b4d7 123 int client_reset() override;
11fdf7f2
TL
124
125 bool should_use_msgr2() override;
126
7c673cae
FG
127 /** @} Configuration functions */
128
129 /**
130 * @defgroup Startup/Shutdown
131 * @{
132 */
133 int start() override;
134 void wait() override;
135 int shutdown() override;
136
137 /** @} // Startup/Shutdown */
138
139 /**
140 * @defgroup Messaging
141 * @{
142 */
11fdf7f2 143 int send_to(Message *m, int type, const entity_addrvec_t& addrs) override;
7c673cae
FG
144
145 /** @} // Messaging */
146
147 /**
148 * @defgroup Connection Management
149 * @{
150 */
11fdf7f2 151 ConnectionRef connect_to(int type,
9f95a23c
TL
152 const entity_addrvec_t& addrs,
153 bool anon, bool not_local_dest=false) override;
7c673cae 154 ConnectionRef get_loopback_connection() override;
11fdf7f2
TL
155 void mark_down(const entity_addr_t& addr) override {
156 mark_down_addrs(entity_addrvec_t(addr));
157 }
158 void mark_down_addrs(const entity_addrvec_t& addrs) override;
7c673cae
FG
159 void mark_down_all() override {
160 shutdown_connections(true);
161 }
162 /** @} // Connection Management */
163
164 /**
165 * @defgroup Inner classes
166 * @{
167 */
168
169 /**
170 * @} // Inner classes
171 */
172
173protected:
174 /**
175 * @defgroup Messenger Interfaces
176 * @{
177 */
178 /**
179 * Start up the DispatchQueue thread once we have somebody to dispatch to.
180 */
181 void ready() override;
182 /** @} // Messenger Interfaces */
183
184private:
185
186 /**
187 * @defgroup Utility functions
188 * @{
189 */
190
191 /**
192 * Create a connection associated with the given entity (of the given type).
193 * Initiate the connection. (This function returning does not guarantee
194 * connection success.)
195 *
11fdf7f2 196 * @param addrs The address(es) of the entity to connect to.
7c673cae
FG
197 * @param type The peer type of the entity at the address.
198 *
199 * @return a pointer to the newly-created connection. Caller does not own a
200 * reference; take one if you need it.
201 */
9f95a23c
TL
202 AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type,
203 bool anon);
7c673cae 204
11fdf7f2
TL
205
206 void _finish_bind(const entity_addrvec_t& bind_addrs,
207 const entity_addrvec_t& listen_addrs);
7c673cae 208
9f95a23c 209 entity_addrvec_t _filter_addrs(const entity_addrvec_t& addrs);
7c673cae
FG
210
211 private:
212 static const uint64_t ReapDeadConnectionThreshold = 5;
213
214 NetworkStack *stack;
215 std::vector<Processor*> processors;
216 friend class Processor;
217 DispatchQueue dispatch_queue;
218
219 // the worker run messenger's cron jobs
220 Worker *local_worker;
221
222 std::string ms_type;
223
224 /// overall lock used for AsyncMessenger data structures
9f95a23c 225 ceph::mutex lock = ceph::make_mutex("AsyncMessenger::lock");
7c673cae
FG
226 // AsyncMessenger stuff
227 /// approximately unique ID set by the Constructor for use in entity_addr_t
228 uint64_t nonce;
229
230 /// true, specifying we haven't learned our addr; set false when we find it.
231 // maybe this should be protected by the lock?
9f95a23c 232 bool need_addr = true;
7c673cae
FG
233
234 /**
11fdf7f2 235 * set to bind addresses if bind was called before NetworkStack was ready to
7c673cae
FG
236 * bind
237 */
11fdf7f2 238 entity_addrvec_t pending_bind_addrs;
7c673cae
FG
239
240 /**
241 * false; set to true if a pending bind exists
242 */
243 bool pending_bind = false;
244
245 /**
246 * The following aren't lock-protected since you shouldn't be able to race
247 * the only writers.
248 */
249
250 /**
251 * false; set to true if the AsyncMessenger bound to a specific address;
252 * and set false again by Accepter::stop().
253 */
9f95a23c 254 bool did_bind = false;
7c673cae 255 /// counter for the global seq our connection protocol uses
9f95a23c 256 __u32 global_seq = 0;
7c673cae 257 /// lock to protect the global_seq
11fdf7f2 258 ceph::spinlock global_seq_lock;
7c673cae
FG
259
260 /**
261 * hash map of addresses to Asyncconnection
262 *
263 * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
264 * invalid and can be replaced by anyone holding the msgr lock
265 */
11fdf7f2 266 ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns;
7c673cae
FG
267
268 /**
11fdf7f2 269 * list of connection are in the process of accepting
7c673cae
FG
270 *
271 * These are not yet in the conns map.
272 */
273 set<AsyncConnectionRef> accepting_conns;
274
9f95a23c
TL
275 /// anonymous outgoing connections
276 set<AsyncConnectionRef> anon_conns;
277
7c673cae
FG
278 /**
279 * list of connection are closed which need to be clean up
280 *
281 * Because AsyncMessenger and AsyncConnection follow a lock rule that
282 * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock
283 * but can't reversed. This rule is aimed to avoid dead lock.
284 * So if AsyncConnection want to unregister itself from AsyncMessenger,
285 * we pick up this idea that just queue itself to this set and do lazy
286 * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
287 * AsyncConnection in this set.
288 */
9f95a23c 289 ceph::mutex deleted_lock = ceph::make_mutex("AsyncMessenger::deleted_lock");
7c673cae
FG
290 set<AsyncConnectionRef> deleted_conns;
291
292 EventCallbackRef reap_handler;
293
294 /// internal cluster protocol version, if any, for talking to entities of the same type.
9f95a23c 295 int cluster_protocol = 0;
7c673cae 296
9f95a23c
TL
297 ceph::condition_variable stop_cond;
298 bool stopped = true;
7c673cae 299
9f95a23c
TL
300 /* You must hold this->lock for the duration of use! */
301 const auto& _lookup_conn(const entity_addrvec_t& k) {
302 static const AsyncConnectionRef nullref;
303 ceph_assert(ceph_mutex_is_locked(lock));
11fdf7f2 304 auto p = conns.find(k);
9f95a23c
TL
305 if (p == conns.end()) {
306 return nullref;
307 }
7c673cae
FG
308
309 // lazy delete, see "deleted_conns"
9f95a23c
TL
310 // don't worry omit, Connection::send_message can handle this case.
311 if (p->second->is_unregistered()) {
312 std::lock_guard l{deleted_lock};
313 if (deleted_conns.erase(p->second)) {
314 conns.erase(p);
315 return nullref;
316 }
7c673cae
FG
317 }
318
319 return p->second;
320 }
321
322 void _init_local_connection() {
9f95a23c 323 ceph_assert(ceph_mutex_is_locked(lock));
11fdf7f2
TL
324 local_connection->peer_addrs = *my_addrs;
325 local_connection->peer_type = my_name.type();
7c673cae
FG
326 local_connection->set_features(CEPH_FEATURES_ALL);
327 ms_deliver_handle_fast_connect(local_connection.get());
328 }
329
330 void shutdown_connections(bool queue_reset);
331
332public:
333
334 /// con used for sending messages to ourselves
11fdf7f2 335 AsyncConnectionRef local_connection;
7c673cae
FG
336
337 /**
338 * @defgroup AsyncMessenger internals
339 * @{
340 */
341 /**
342 * This wraps _lookup_conn.
343 */
11fdf7f2 344 AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) {
9f95a23c
TL
345 std::lock_guard l{lock};
346 return _lookup_conn(k); /* make new ref! */
7c673cae
FG
347 }
348
9f95a23c 349 int accept_conn(const AsyncConnectionRef& conn);
11fdf7f2
TL
350 bool learned_addr(const entity_addr_t &peer_addr_for_me);
351 void add_accept(Worker *w, ConnectedSocket cli_socket,
352 const entity_addr_t &listen_addr,
353 const entity_addr_t &peer_addr);
7c673cae
FG
354 NetworkStack *get_stack() {
355 return stack;
356 }
357
11fdf7f2
TL
358 uint64_t get_nonce() const {
359 return nonce;
7c673cae
FG
360 }
361
7c673cae
FG
362 /**
363 * Increment the global sequence for this AsyncMessenger and return it.
364 * This is for the connect protocol, although it doesn't hurt if somebody
365 * else calls it.
366 *
367 * @return a global sequence ID that nobody else has seen.
368 */
369 __u32 get_global_seq(__u32 old=0) {
11fdf7f2
TL
370 std::lock_guard<ceph::spinlock> lg(global_seq_lock);
371
7c673cae
FG
372 if (old > global_seq)
373 global_seq = old;
374 __u32 ret = ++global_seq;
11fdf7f2 375
7c673cae
FG
376 return ret;
377 }
378 /**
379 * Get the protocol version we support for the given peer type: either
380 * a peer protocol (if it matches our own), the protocol version for the
381 * peer (if we're connecting), or our protocol version (if we're accepting).
382 */
383 int get_proto_version(int peer_type, bool connect) const;
384
385 /**
386 * Fill in the address and peer type for the local connection, which
387 * is used for delivering messages back to ourself.
388 */
389 void init_local_connection() {
9f95a23c
TL
390 std::lock_guard l{lock};
391 local_connection->is_loopback = true;
7c673cae
FG
392 _init_local_connection();
393 }
394
395 /**
396 * Unregister connection from `conns`
397 *
398 * See "deleted_conns"
399 */
9f95a23c
TL
400 void unregister_conn(const AsyncConnectionRef& conn) {
401 std::lock_guard l{deleted_lock};
402 if (!accepting_conns.count(conn) || anon_conns.count(conn))
403 conn->get_perf_counter()->dec(l_msgr_active_connections);
81eedcae 404 deleted_conns.emplace(std::move(conn));
9f95a23c 405 conn->unregister();
7c673cae
FG
406
407 if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
408 local_worker->center.dispatch_event_external(reap_handler);
409 }
410 }
411
412 /**
413 * Reap dead connection from `deleted_conns`
414 *
415 * @return the number of dead connections
416 *
417 * See "deleted_conns"
418 */
9f95a23c 419 void reap_dead();
7c673cae
FG
420
421 /**
422 * @} // AsyncMessenger Internals
423 */
424} ;
425
426#endif /* CEPH_ASYNCMESSENGER_H */