1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_ASYNCMESSENGER_H
18 #define CEPH_ASYNCMESSENGER_H
20 #include "include/types.h"
21 #include "include/xlist.h"
25 #include "include/unordered_map.h"
26 #include "include/unordered_set.h"
28 #include "common/Mutex.h"
29 #include "common/Cond.h"
30 #include "common/Thread.h"
32 #include "include/Spinlock.h"
34 #include "msg/SimplePolicyMessenger.h"
35 #include "msg/DispatchQueue.h"
36 #include "include/assert.h"
37 #include "AsyncConnection.h"
44 * If the Messenger binds to a specific address, the Processor runs
45 * and listens for incoming connections.
51 ServerSocket listen_socket
;
52 EventCallbackRef listen_handler
;
54 class C_processor_accept
;
57 Processor(AsyncMessenger
*r
, Worker
*w
, CephContext
*c
);
58 ~Processor() { delete listen_handler
; };
61 int bind(const entity_addr_t
&bind_addr
,
62 const set
<int>& avoid_ports
,
63 entity_addr_t
* bound_addr
);
69 * AsyncMessenger is represented for maintaining a set of asynchronous connections,
70 * it may own a bind address and the accepted connections will be managed by
75 class AsyncMessenger
: public SimplePolicyMessenger
{
76 // First we have the public Messenger interface implementation...
79 * Initialize the AsyncMessenger!
81 * @param cct The CephContext to use
82 * @param name The name to assign ourselves
83 * _nonce A unique ID to use for this AsyncMessenger. It should not
84 * be a value that will be repeated if the daemon restarts.
86 AsyncMessenger(CephContext
*cct
, entity_name_t name
, const std::string
&type
,
87 string mname
, uint64_t _nonce
);
90 * Destroy the AsyncMessenger. Pretty simple since all the work is done
93 ~AsyncMessenger() override
;
95 /** @defgroup Accessors
98 void set_addr_unknowns(const entity_addr_t
&addr
) override
;
100 int get_dispatch_queue_len() override
{
101 return dispatch_queue
.get_queue_len();
104 double get_dispatch_queue_max_age(utime_t now
) override
{
105 return dispatch_queue
.get_max_age(now
);
110 * @defgroup Configuration functions
113 void set_cluster_protocol(int p
) override
{
114 assert(!started
&& !did_bind
);
115 cluster_protocol
= p
;
118 int bind(const entity_addr_t
& bind_addr
) override
;
119 int rebind(const set
<int>& avoid_ports
) override
;
120 int client_bind(const entity_addr_t
& bind_addr
) override
;
122 /** @} Configuration functions */
125 * @defgroup Startup/Shutdown
128 int start() override
;
129 void wait() override
;
130 int shutdown() override
;
132 /** @} // Startup/Shutdown */
135 * @defgroup Messaging
138 int send_message(Message
*m
, const entity_inst_t
& dest
) override
{
139 Mutex::Locker
l(lock
);
141 return _send_message(m
, dest
);
144 /** @} // Messaging */
147 * @defgroup Connection Management
150 ConnectionRef
get_connection(const entity_inst_t
& dest
) override
;
151 ConnectionRef
get_loopback_connection() override
;
152 void mark_down(const entity_addr_t
& addr
) override
;
153 void mark_down_all() override
{
154 shutdown_connections(true);
156 /** @} // Connection Management */
159 * @defgroup Inner classes
164 * @} // Inner classes
169 * @defgroup Messenger Interfaces
173 * Start up the DispatchQueue thread once we have somebody to dispatch to.
175 void ready() override
;
176 /** @} // Messenger Interfaces */
181 * @defgroup Utility functions
186 * Create a connection associated with the given entity (of the given type).
187 * Initiate the connection. (This function returning does not guarantee
188 * connection success.)
190 * @param addr The address of the entity to connect to.
191 * @param type The peer type of the entity at the address.
193 * @return a pointer to the newly-created connection. Caller does not own a
194 * reference; take one if you need it.
196 AsyncConnectionRef
create_connect(const entity_addr_t
& addr
, int type
);
199 * Queue up a Message for delivery to the entity specified
200 * by addr and dest_type.
201 * submit_message() is responsible for creating
202 * new AsyncConnection (and closing old ones) as necessary.
204 * @param m The Message to queue up. This function eats a reference.
205 * @param con The existing Connection to use, or NULL if you don't know of one.
206 * @param dest_addr The address to send the Message to.
207 * @param dest_type The peer type of the address we're sending to
208 * just drop silently under failure.
210 void submit_message(Message
*m
, AsyncConnectionRef con
,
211 const entity_addr_t
& dest_addr
, int dest_type
);
213 int _send_message(Message
*m
, const entity_inst_t
& dest
);
214 void _finish_bind(const entity_addr_t
& bind_addr
,
215 const entity_addr_t
& listen_addr
);
218 static const uint64_t ReapDeadConnectionThreshold
= 5;
221 std::vector
<Processor
*> processors
;
222 friend class Processor
;
223 DispatchQueue dispatch_queue
;
225 // the worker run messenger's cron jobs
226 Worker
*local_worker
;
230 /// overall lock used for AsyncMessenger data structures
232 // AsyncMessenger stuff
233 /// approximately unique ID set by the Constructor for use in entity_addr_t
236 /// true, specifying we haven't learned our addr; set false when we find it.
237 // maybe this should be protected by the lock?
241 * set to bind address if bind was called before NetworkStack was ready to
244 entity_addr_t pending_bind_addr
;
247 * false; set to true if a pending bind exists
249 bool pending_bind
= false;
252 * The following aren't lock-protected since you shouldn't be able to race
257 * false; set to true if the AsyncMessenger bound to a specific address;
258 * and set false again by Accepter::stop().
261 /// counter for the global seq our connection protocol uses
263 /// lock to protect the global_seq
264 ceph_spinlock_t global_seq_lock
;
267 * hash map of addresses to Asyncconnection
269 * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
270 * invalid and can be replaced by anyone holding the msgr lock
272 ceph::unordered_map
<entity_addr_t
, AsyncConnectionRef
> conns
;
275 * list of connection are in teh process of accepting
277 * These are not yet in the conns map.
279 set
<AsyncConnectionRef
> accepting_conns
;
282 * list of connection are closed which need to be clean up
284 * Because AsyncMessenger and AsyncConnection follow a lock rule that
285 * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock
286 * but can't reversed. This rule is aimed to avoid dead lock.
287 * So if AsyncConnection want to unregister itself from AsyncMessenger,
288 * we pick up this idea that just queue itself to this set and do lazy
289 * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
290 * AsyncConnection in this set.
293 set
<AsyncConnectionRef
> deleted_conns
;
295 EventCallbackRef reap_handler
;
297 /// internal cluster protocol version, if any, for talking to entities of the same type.
298 int cluster_protocol
;
303 AsyncConnectionRef
_lookup_conn(const entity_addr_t
& k
) {
304 assert(lock
.is_locked());
305 ceph::unordered_map
<entity_addr_t
, AsyncConnectionRef
>::iterator p
= conns
.find(k
);
306 if (p
== conns
.end())
309 // lazy delete, see "deleted_conns"
310 Mutex::Locker
l(deleted_lock
);
311 if (deleted_conns
.erase(p
->second
)) {
312 p
->second
->get_perf_counter()->dec(l_msgr_active_connections
);
320 void _init_local_connection() {
321 assert(lock
.is_locked());
322 local_connection
->peer_addr
= my_inst
.addr
;
323 local_connection
->peer_type
= my_inst
.name
.type();
324 local_connection
->set_features(CEPH_FEATURES_ALL
);
325 ms_deliver_handle_fast_connect(local_connection
.get());
328 void shutdown_connections(bool queue_reset
);
332 /// con used for sending messages to ourselves
333 ConnectionRef local_connection
;
336 * @defgroup AsyncMessenger internals
340 * This wraps _lookup_conn.
342 AsyncConnectionRef
lookup_conn(const entity_addr_t
& k
) {
343 Mutex::Locker
l(lock
);
344 return _lookup_conn(k
);
347 int accept_conn(AsyncConnectionRef conn
) {
348 Mutex::Locker
l(lock
);
349 auto it
= conns
.find(conn
->peer_addr
);
350 if (it
!= conns
.end()) {
351 AsyncConnectionRef existing
= it
->second
;
353 // lazy delete, see "deleted_conns"
354 // If conn already in, we will return 0
355 Mutex::Locker
l(deleted_lock
);
356 if (deleted_conns
.erase(existing
)) {
357 existing
->get_perf_counter()->dec(l_msgr_active_connections
);
359 } else if (conn
!= existing
) {
363 conns
[conn
->peer_addr
] = conn
;
364 conn
->get_perf_counter()->inc(l_msgr_active_connections
);
365 accepting_conns
.erase(conn
);
369 void learned_addr(const entity_addr_t
&peer_addr_for_me
);
370 void add_accept(Worker
*w
, ConnectedSocket cli_socket
, entity_addr_t
&addr
);
371 NetworkStack
*get_stack() {
376 * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.
378 AuthAuthorizer
*get_authorizer(int peer_type
, bool force_new
) {
379 return ms_deliver_get_authorizer(peer_type
, force_new
);
383 * This wraps ms_deliver_verify_authorizer; we use it for AsyncConnection.
385 bool verify_authorizer(Connection
*con
, int peer_type
, int protocol
, bufferlist
& auth
, bufferlist
& auth_reply
,
386 bool& isvalid
, CryptoKey
& session_key
) {
387 return ms_deliver_verify_authorizer(con
, peer_type
, protocol
, auth
,
388 auth_reply
, isvalid
, session_key
);
391 * Increment the global sequence for this AsyncMessenger and return it.
392 * This is for the connect protocol, although it doesn't hurt if somebody
395 * @return a global sequence ID that nobody else has seen.
397 __u32
get_global_seq(__u32 old
=0) {
398 ceph_spin_lock(&global_seq_lock
);
399 if (old
> global_seq
)
401 __u32 ret
= ++global_seq
;
402 ceph_spin_unlock(&global_seq_lock
);
406 * Get the protocol version we support for the given peer type: either
407 * a peer protocol (if it matches our own), the protocol version for the
408 * peer (if we're connecting), or our protocol version (if we're accepting).
410 int get_proto_version(int peer_type
, bool connect
) const;
413 * Fill in the address and peer type for the local connection, which
414 * is used for delivering messages back to ourself.
416 void init_local_connection() {
417 Mutex::Locker
l(lock
);
418 _init_local_connection();
422 * Unregister connection from `conns`
424 * See "deleted_conns"
426 void unregister_conn(AsyncConnectionRef conn
) {
427 Mutex::Locker
l(deleted_lock
);
428 deleted_conns
.insert(conn
);
430 if (deleted_conns
.size() >= ReapDeadConnectionThreshold
) {
431 local_worker
->center
.dispatch_event_external(reap_handler
);
436 * Reap dead connection from `deleted_conns`
438 * @return the number of dead connections
440 * See "deleted_conns"
445 * @} // AsyncMessenger Internals
449 #endif /* CEPH_ASYNCMESSENGER_H */