]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/AsyncMessenger.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / AsyncMessenger.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#ifndef CEPH_ASYNCMESSENGER_H
18#define CEPH_ASYNCMESSENGER_H
19
11fdf7f2
TL
20#include <map>
21#include <mutex>
22
7c673cae
FG
23#include "include/types.h"
24#include "include/xlist.h"
11fdf7f2 25#include "include/spinlock.h"
7c673cae
FG
26#include "include/unordered_map.h"
27#include "include/unordered_set.h"
28
29#include "common/Mutex.h"
7c673cae
FG
30#include "common/Cond.h"
31#include "common/Thread.h"
32
33#include "msg/SimplePolicyMessenger.h"
34#include "msg/DispatchQueue.h"
7c673cae
FG
35#include "AsyncConnection.h"
36#include "Event.h"
37
11fdf7f2 38#include "include/ceph_assert.h"
7c673cae
FG
39
40class AsyncMessenger;
41
42/**
43 * If the Messenger binds to a specific address, the Processor runs
44 * and listens for incoming connections.
45 */
46class Processor {
47 AsyncMessenger *msgr;
48 NetHandler net;
49 Worker *worker;
11fdf7f2 50 vector<ServerSocket> listen_sockets;
7c673cae
FG
51 EventCallbackRef listen_handler;
52
53 class C_processor_accept;
54
55 public:
56 Processor(AsyncMessenger *r, Worker *w, CephContext *c);
57 ~Processor() { delete listen_handler; };
58
59 void stop();
11fdf7f2 60 int bind(const entity_addrvec_t &bind_addrs,
7c673cae 61 const set<int>& avoid_ports,
11fdf7f2 62 entity_addrvec_t* bound_addrs);
7c673cae
FG
63 void start();
64 void accept();
65};
66
67/*
68 * AsyncMessenger is represented for maintaining a set of asynchronous connections,
69 * it may own a bind address and the accepted connections will be managed by
70 * AsyncMessenger.
71 *
72 */
73
74class AsyncMessenger : public SimplePolicyMessenger {
75 // First we have the public Messenger interface implementation...
76public:
77 /**
78 * Initialize the AsyncMessenger!
79 *
80 * @param cct The CephContext to use
81 * @param name The name to assign ourselves
82 * _nonce A unique ID to use for this AsyncMessenger. It should not
83 * be a value that will be repeated if the daemon restarts.
84 */
85 AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
86 string mname, uint64_t _nonce);
87
88 /**
89 * Destroy the AsyncMessenger. Pretty simple since all the work is done
90 * elsewhere.
91 */
92 ~AsyncMessenger() override;
93
94 /** @defgroup Accessors
95 * @{
96 */
11fdf7f2
TL
97 bool set_addr_unknowns(const entity_addrvec_t &addr) override;
98 void set_addrs(const entity_addrvec_t &addrs) override;
7c673cae
FG
99
100 int get_dispatch_queue_len() override {
101 return dispatch_queue.get_queue_len();
102 }
103
104 double get_dispatch_queue_max_age(utime_t now) override {
105 return dispatch_queue.get_max_age(now);
106 }
107 /** @} Accessors */
108
109 /**
110 * @defgroup Configuration functions
111 * @{
112 */
113 void set_cluster_protocol(int p) override {
11fdf7f2 114 ceph_assert(!started && !did_bind);
7c673cae
FG
115 cluster_protocol = p;
116 }
117
118 int bind(const entity_addr_t& bind_addr) override;
119 int rebind(const set<int>& avoid_ports) override;
120 int client_bind(const entity_addr_t& bind_addr) override;
121
11fdf7f2
TL
122 int bindv(const entity_addrvec_t& bind_addrs) override;
123
124 bool should_use_msgr2() override;
125
7c673cae
FG
126 /** @} Configuration functions */
127
128 /**
129 * @defgroup Startup/Shutdown
130 * @{
131 */
132 int start() override;
133 void wait() override;
134 int shutdown() override;
135
136 /** @} // Startup/Shutdown */
137
138 /**
139 * @defgroup Messaging
140 * @{
141 */
11fdf7f2 142 int send_to(Message *m, int type, const entity_addrvec_t& addrs) override;
7c673cae
FG
143
144 /** @} // Messaging */
145
146 /**
147 * @defgroup Connection Management
148 * @{
149 */
11fdf7f2
TL
150 ConnectionRef connect_to(int type,
151 const entity_addrvec_t& addrs) override;
7c673cae 152 ConnectionRef get_loopback_connection() override;
11fdf7f2
TL
153 void mark_down(const entity_addr_t& addr) override {
154 mark_down_addrs(entity_addrvec_t(addr));
155 }
156 void mark_down_addrs(const entity_addrvec_t& addrs) override;
7c673cae
FG
157 void mark_down_all() override {
158 shutdown_connections(true);
159 }
160 /** @} // Connection Management */
161
162 /**
163 * @defgroup Inner classes
164 * @{
165 */
166
167 /**
168 * @} // Inner classes
169 */
170
171protected:
172 /**
173 * @defgroup Messenger Interfaces
174 * @{
175 */
176 /**
177 * Start up the DispatchQueue thread once we have somebody to dispatch to.
178 */
179 void ready() override;
180 /** @} // Messenger Interfaces */
181
182private:
183
184 /**
185 * @defgroup Utility functions
186 * @{
187 */
188
189 /**
190 * Create a connection associated with the given entity (of the given type).
191 * Initiate the connection. (This function returning does not guarantee
192 * connection success.)
193 *
11fdf7f2 194 * @param addrs The address(es) of the entity to connect to.
7c673cae
FG
195 * @param type The peer type of the entity at the address.
196 *
197 * @return a pointer to the newly-created connection. Caller does not own a
198 * reference; take one if you need it.
199 */
11fdf7f2 200 AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type);
7c673cae
FG
201
202 /**
203 * Queue up a Message for delivery to the entity specified
204 * by addr and dest_type.
205 * submit_message() is responsible for creating
206 * new AsyncConnection (and closing old ones) as necessary.
207 *
208 * @param m The Message to queue up. This function eats a reference.
209 * @param con The existing Connection to use, or NULL if you don't know of one.
210 * @param dest_addr The address to send the Message to.
211 * @param dest_type The peer type of the address we're sending to
212 * just drop silently under failure.
213 */
214 void submit_message(Message *m, AsyncConnectionRef con,
11fdf7f2
TL
215 const entity_addrvec_t& dest_addrs, int dest_type);
216
217 void _finish_bind(const entity_addrvec_t& bind_addrs,
218 const entity_addrvec_t& listen_addrs);
7c673cae 219
11fdf7f2
TL
220 entity_addrvec_t _filter_addrs(int type,
221 const entity_addrvec_t& addrs);
7c673cae
FG
222
223 private:
224 static const uint64_t ReapDeadConnectionThreshold = 5;
225
226 NetworkStack *stack;
227 std::vector<Processor*> processors;
228 friend class Processor;
229 DispatchQueue dispatch_queue;
230
231 // the worker run messenger's cron jobs
232 Worker *local_worker;
233
234 std::string ms_type;
235
236 /// overall lock used for AsyncMessenger data structures
237 Mutex lock;
238 // AsyncMessenger stuff
239 /// approximately unique ID set by the Constructor for use in entity_addr_t
240 uint64_t nonce;
241
242 /// true, specifying we haven't learned our addr; set false when we find it.
243 // maybe this should be protected by the lock?
244 bool need_addr;
245
246 /**
11fdf7f2 247 * set to bind addresses if bind was called before NetworkStack was ready to
7c673cae
FG
248 * bind
249 */
11fdf7f2 250 entity_addrvec_t pending_bind_addrs;
7c673cae
FG
251
252 /**
253 * false; set to true if a pending bind exists
254 */
255 bool pending_bind = false;
256
257 /**
258 * The following aren't lock-protected since you shouldn't be able to race
259 * the only writers.
260 */
261
262 /**
263 * false; set to true if the AsyncMessenger bound to a specific address;
264 * and set false again by Accepter::stop().
265 */
266 bool did_bind;
267 /// counter for the global seq our connection protocol uses
268 __u32 global_seq;
269 /// lock to protect the global_seq
11fdf7f2 270 ceph::spinlock global_seq_lock;
7c673cae
FG
271
272 /**
273 * hash map of addresses to Asyncconnection
274 *
275 * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
276 * invalid and can be replaced by anyone holding the msgr lock
277 */
11fdf7f2 278 ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns;
7c673cae
FG
279
280 /**
11fdf7f2 281 * list of connection are in the process of accepting
7c673cae
FG
282 *
283 * These are not yet in the conns map.
284 */
285 set<AsyncConnectionRef> accepting_conns;
286
287 /**
288 * list of connection are closed which need to be clean up
289 *
290 * Because AsyncMessenger and AsyncConnection follow a lock rule that
291 * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock
292 * but can't reversed. This rule is aimed to avoid dead lock.
293 * So if AsyncConnection want to unregister itself from AsyncMessenger,
294 * we pick up this idea that just queue itself to this set and do lazy
295 * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
296 * AsyncConnection in this set.
297 */
298 Mutex deleted_lock;
299 set<AsyncConnectionRef> deleted_conns;
300
301 EventCallbackRef reap_handler;
302
303 /// internal cluster protocol version, if any, for talking to entities of the same type.
304 int cluster_protocol;
305
306 Cond stop_cond;
307 bool stopped;
308
11fdf7f2
TL
309 AsyncConnectionRef _lookup_conn(const entity_addrvec_t& k) {
310 ceph_assert(lock.is_locked());
311 auto p = conns.find(k);
7c673cae
FG
312 if (p == conns.end())
313 return NULL;
314
315 // lazy delete, see "deleted_conns"
316 Mutex::Locker l(deleted_lock);
317 if (deleted_conns.erase(p->second)) {
318 p->second->get_perf_counter()->dec(l_msgr_active_connections);
319 conns.erase(p);
320 return NULL;
321 }
322
323 return p->second;
324 }
325
326 void _init_local_connection() {
11fdf7f2
TL
327 ceph_assert(lock.is_locked());
328 local_connection->peer_addrs = *my_addrs;
329 local_connection->peer_type = my_name.type();
7c673cae
FG
330 local_connection->set_features(CEPH_FEATURES_ALL);
331 ms_deliver_handle_fast_connect(local_connection.get());
332 }
333
334 void shutdown_connections(bool queue_reset);
335
336public:
337
338 /// con used for sending messages to ourselves
11fdf7f2 339 AsyncConnectionRef local_connection;
7c673cae
FG
340
341 /**
342 * @defgroup AsyncMessenger internals
343 * @{
344 */
345 /**
346 * This wraps _lookup_conn.
347 */
11fdf7f2 348 AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) {
7c673cae
FG
349 Mutex::Locker l(lock);
350 return _lookup_conn(k);
351 }
352
11fdf7f2
TL
353 int accept_conn(AsyncConnectionRef conn);
354 bool learned_addr(const entity_addr_t &peer_addr_for_me);
355 void add_accept(Worker *w, ConnectedSocket cli_socket,
356 const entity_addr_t &listen_addr,
357 const entity_addr_t &peer_addr);
7c673cae
FG
358 NetworkStack *get_stack() {
359 return stack;
360 }
361
11fdf7f2
TL
362 uint64_t get_nonce() const {
363 return nonce;
7c673cae
FG
364 }
365
7c673cae
FG
366 /**
367 * Increment the global sequence for this AsyncMessenger and return it.
368 * This is for the connect protocol, although it doesn't hurt if somebody
369 * else calls it.
370 *
371 * @return a global sequence ID that nobody else has seen.
372 */
373 __u32 get_global_seq(__u32 old=0) {
11fdf7f2
TL
374 std::lock_guard<ceph::spinlock> lg(global_seq_lock);
375
7c673cae
FG
376 if (old > global_seq)
377 global_seq = old;
378 __u32 ret = ++global_seq;
11fdf7f2 379
7c673cae
FG
380 return ret;
381 }
382 /**
383 * Get the protocol version we support for the given peer type: either
384 * a peer protocol (if it matches our own), the protocol version for the
385 * peer (if we're connecting), or our protocol version (if we're accepting).
386 */
387 int get_proto_version(int peer_type, bool connect) const;
388
389 /**
390 * Fill in the address and peer type for the local connection, which
391 * is used for delivering messages back to ourself.
392 */
393 void init_local_connection() {
394 Mutex::Locker l(lock);
395 _init_local_connection();
396 }
397
398 /**
399 * Unregister connection from `conns`
400 *
401 * See "deleted_conns"
402 */
403 void unregister_conn(AsyncConnectionRef conn) {
404 Mutex::Locker l(deleted_lock);
405 deleted_conns.insert(conn);
406
407 if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
408 local_worker->center.dispatch_event_external(reap_handler);
409 }
410 }
411
412 /**
413 * Reap dead connection from `deleted_conns`
414 *
415 * @return the number of dead connections
416 *
417 * See "deleted_conns"
418 */
419 int reap_dead();
420
421 /**
422 * @} // AsyncMessenger Internals
423 */
424} ;
425
426#endif /* CEPH_ASYNCMESSENGER_H */