1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_ASYNCMESSENGER_H
18 #define CEPH_ASYNCMESSENGER_H
23 #include "include/types.h"
24 #include "include/xlist.h"
25 #include "include/spinlock.h"
26 #include "include/unordered_map.h"
27 #include "include/unordered_set.h"
29 #include "common/ceph_mutex.h"
30 #include "common/Cond.h"
31 #include "common/Thread.h"
33 #include "msg/SimplePolicyMessenger.h"
34 #include "msg/DispatchQueue.h"
35 #include "AsyncConnection.h"
38 #include "include/ceph_assert.h"
43 * If the Messenger binds to a specific address, the Processor runs
44 * and listens for incoming connections.
50 std::vector
<ServerSocket
> listen_sockets
;
51 EventCallbackRef listen_handler
;
53 class C_processor_accept
;
56 Processor(AsyncMessenger
*r
, Worker
*w
, CephContext
*c
);
57 ~Processor() { delete listen_handler
; };
60 int bind(const entity_addrvec_t
&bind_addrs
,
61 const std::set
<int>& avoid_ports
,
62 entity_addrvec_t
* bound_addrs
);
68 * AsyncMessenger is represented for maintaining a set of asynchronous connections,
69 * it may own a bind address and the accepted connections will be managed by
74 class AsyncMessenger
: public SimplePolicyMessenger
{
75 // First we have the public Messenger interface implementation...
78 * Initialize the AsyncMessenger!
80 * @param cct The CephContext to use
81 * @param name The name to assign ourselves
82 * _nonce A unique ID to use for this AsyncMessenger. It should not
83 * be a value that will be repeated if the daemon restarts.
85 AsyncMessenger(CephContext
*cct
, entity_name_t name
, const std::string
&type
,
86 std::string mname
, uint64_t _nonce
);
89 * Destroy the AsyncMessenger. Pretty simple since all the work is done
92 ~AsyncMessenger() override
;
94 /** @defgroup Accessors
97 bool set_addr_unknowns(const entity_addrvec_t
&addr
) override
;
99 int get_dispatch_queue_len() override
{
100 return dispatch_queue
.get_queue_len();
103 double get_dispatch_queue_max_age(utime_t now
) override
{
104 return dispatch_queue
.get_max_age(now
);
109 * @defgroup Configuration functions
112 void set_cluster_protocol(int p
) override
{
113 ceph_assert(!started
&& !did_bind
);
114 cluster_protocol
= p
;
117 int bind(const entity_addr_t
& bind_addr
,
118 std::optional
<entity_addrvec_t
> public_addrs
=std::nullopt
) override
;
119 int rebind(const std::set
<int>& avoid_ports
) override
;
120 int bindv(const entity_addrvec_t
& bind_addrs
,
121 std::optional
<entity_addrvec_t
> public_addrs
=std::nullopt
) override
;
123 int client_bind(const entity_addr_t
& bind_addr
) override
;
125 int client_reset() override
;
127 bool should_use_msgr2() override
;
129 /** @} Configuration functions */
132 * @defgroup Startup/Shutdown
135 int start() override
;
136 void wait() override
;
137 int shutdown() override
;
139 /** @} // Startup/Shutdown */
142 * @defgroup Messaging
145 int send_to(Message
*m
, int type
, const entity_addrvec_t
& addrs
) override
;
147 /** @} // Messaging */
150 * @defgroup Connection Management
153 ConnectionRef
connect_to(int type
,
154 const entity_addrvec_t
& addrs
,
155 bool anon
, bool not_local_dest
=false) override
;
156 ConnectionRef
get_loopback_connection() override
;
157 void mark_down(const entity_addr_t
& addr
) override
{
158 mark_down_addrs(entity_addrvec_t(addr
));
160 void mark_down_addrs(const entity_addrvec_t
& addrs
) override
;
161 void mark_down_all() override
{
162 shutdown_connections(true);
164 /** @} // Connection Management */
167 * @defgroup Inner classes
172 * @} // Inner classes
177 * @defgroup Messenger Interfaces
181 * Start up the DispatchQueue thread once we have somebody to dispatch to.
183 void ready() override
;
184 /** @} // Messenger Interfaces */
189 * @defgroup Utility functions
194 * Create a connection associated with the given entity (of the given type).
195 * Initiate the connection. (This function returning does not guarantee
196 * connection success.)
198 * @param addrs The address(es) of the entity to connect to.
199 * @param type The peer type of the entity at the address.
201 * @return a pointer to the newly-created connection. Caller does not own a
202 * reference; take one if you need it.
204 AsyncConnectionRef
create_connect(const entity_addrvec_t
& addrs
, int type
,
208 void _finish_bind(const entity_addrvec_t
& bind_addrs
,
209 const entity_addrvec_t
& listen_addrs
);
211 entity_addrvec_t
_filter_addrs(const entity_addrvec_t
& addrs
);
215 std::vector
<Processor
*> processors
;
216 friend class Processor
;
217 DispatchQueue dispatch_queue
;
219 // the worker run messenger's cron jobs
220 Worker
*local_worker
;
224 /// overall lock used for AsyncMessenger data structures
225 ceph::mutex lock
= ceph::make_mutex("AsyncMessenger::lock");
226 // AsyncMessenger stuff
227 /// approximately unique ID set by the Constructor for use in entity_addr_t
230 /// true, specifying we haven't learned our addr; set false when we find it.
231 // maybe this should be protected by the lock?
232 bool need_addr
= true;
235 * set to bind addresses if bind or bindv were called before NetworkStack
238 entity_addrvec_t pending_bind_addrs
;
241 * set to public addresses (those announced by the msgr's protocols).
242 * they are stored to handle the cases when either:
243 * a) bind or bindv were called before NetworkStack was ready to bind,
244 * b) rebind is called down the road.
246 std::optional
<entity_addrvec_t
> saved_public_addrs
;
249 * false; set to true if a pending bind exists
251 bool pending_bind
= false;
254 * The following aren't lock-protected since you shouldn't be able to race
259 * false; set to true if the AsyncMessenger bound to a specific address;
260 * and set false again by Accepter::stop().
262 bool did_bind
= false;
263 /// counter for the global seq our connection protocol uses
264 __u32 global_seq
= 0;
265 /// lock to protect the global_seq
266 ceph::spinlock global_seq_lock
;
269 * hash map of addresses to Asyncconnection
271 * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
272 * invalid and can be replaced by anyone holding the msgr lock
274 ceph::unordered_map
<entity_addrvec_t
, AsyncConnectionRef
> conns
;
277 * list of connection are in the process of accepting
279 * These are not yet in the conns map.
281 std::set
<AsyncConnectionRef
> accepting_conns
;
283 /// anonymous outgoing connections
284 std::set
<AsyncConnectionRef
> anon_conns
;
287 * list of connection are closed which need to be clean up
289 * Because AsyncMessenger and AsyncConnection follow a lock rule that
290 * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock
291 * but can't reversed. This rule is aimed to avoid dead lock.
292 * So if AsyncConnection want to unregister itself from AsyncMessenger,
293 * we pick up this idea that just queue itself to this set and do lazy
294 * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
295 * AsyncConnection in this set.
297 ceph::mutex deleted_lock
= ceph::make_mutex("AsyncMessenger::deleted_lock");
298 std::set
<AsyncConnectionRef
> deleted_conns
;
300 EventCallbackRef reap_handler
;
302 /// internal cluster protocol version, if any, for talking to entities of the same type.
303 int cluster_protocol
= 0;
305 ceph::condition_variable stop_cond
;
308 /* You must hold this->lock for the duration of use! */
309 const auto& _lookup_conn(const entity_addrvec_t
& k
) {
310 static const AsyncConnectionRef nullref
;
311 ceph_assert(ceph_mutex_is_locked(lock
));
312 auto p
= conns
.find(k
);
313 if (p
== conns
.end()) {
317 // lazy delete, see "deleted_conns"
318 // don't worry omit, Connection::send_message can handle this case.
319 if (p
->second
->is_unregistered()) {
320 std::lock_guard l
{deleted_lock
};
321 if (deleted_conns
.erase(p
->second
)) {
322 p
->second
->get_perf_counter()->dec(l_msgr_active_connections
);
331 void _init_local_connection() {
332 ceph_assert(ceph_mutex_is_locked(lock
));
333 local_connection
->peer_addrs
= *my_addrs
;
334 local_connection
->peer_type
= my_name
.type();
335 local_connection
->set_features(CEPH_FEATURES_ALL
);
336 ms_deliver_handle_fast_connect(local_connection
.get());
339 void shutdown_connections(bool queue_reset
);
343 /// con used for sending messages to ourselves
344 AsyncConnectionRef local_connection
;
347 * @defgroup AsyncMessenger internals
351 * This wraps _lookup_conn.
353 AsyncConnectionRef
lookup_conn(const entity_addrvec_t
& k
) {
354 std::lock_guard l
{lock
};
355 return _lookup_conn(k
); /* make new ref! */
358 int accept_conn(const AsyncConnectionRef
& conn
);
359 bool learned_addr(const entity_addr_t
&peer_addr_for_me
);
360 void add_accept(Worker
*w
, ConnectedSocket cli_socket
,
361 const entity_addr_t
&listen_addr
,
362 const entity_addr_t
&peer_addr
);
363 NetworkStack
*get_stack() {
367 uint64_t get_nonce() const {
372 * Increment the global sequence for this AsyncMessenger and return it.
373 * This is for the connect protocol, although it doesn't hurt if somebody
376 * @return a global sequence ID that nobody else has seen.
378 __u32
get_global_seq(__u32 old
=0) {
379 std::lock_guard
<ceph::spinlock
> lg(global_seq_lock
);
381 if (old
> global_seq
)
383 __u32 ret
= ++global_seq
;
388 * Get the protocol version we support for the given peer type: either
389 * a peer protocol (if it matches our own), the protocol version for the
390 * peer (if we're connecting), or our protocol version (if we're accepting).
392 int get_proto_version(int peer_type
, bool connect
) const;
395 * Fill in the address and peer type for the local connection, which
396 * is used for delivering messages back to ourself.
398 void init_local_connection() {
399 std::lock_guard l
{lock
};
400 local_connection
->is_loopback
= true;
401 _init_local_connection();
405 * Unregister connection from `conns`
407 * See "deleted_conns"
409 void unregister_conn(const AsyncConnectionRef
& conn
) {
410 std::lock_guard l
{deleted_lock
};
411 deleted_conns
.emplace(std::move(conn
));
414 if (deleted_conns
.size() >= cct
->_conf
->ms_async_reap_threshold
) {
415 local_worker
->center
.dispatch_event_external(reap_handler
);
420 * Reap dead connection from `deleted_conns`
422 * @return the number of dead connections
424 * See "deleted_conns"
429 * @} // AsyncMessenger Internals
433 #endif /* CEPH_ASYNCMESSENGER_H */