1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_ASYNC_STACK_H
18 #define CEPH_MSG_ASYNC_STACK_H
20 #include "include/spinlock.h"
21 #include "common/perf_counters.h"
22 #include "msg/msg_types.h"
23 #include "msg/async/Event.h"
26 class ConnectedSocketImpl
{
28 virtual ~ConnectedSocketImpl() {}
29 virtual int is_connected() = 0;
30 virtual ssize_t
read(char*, size_t) = 0;
31 virtual ssize_t
send(ceph::buffer::list
&bl
, bool more
) = 0;
32 virtual void shutdown() = 0;
33 virtual void close() = 0;
34 virtual int fd() const = 0;
37 class ConnectedSocket
;
38 struct SocketOptions
{
43 entity_addr_t connect_bind_addr
;
47 class ServerSocketImpl
{
49 unsigned addr_type
; ///< entity_addr_t::TYPE_*
50 unsigned addr_slot
; ///< position of our addr in myaddrs().v
51 ServerSocketImpl(unsigned type
, unsigned slot
)
52 : addr_type(type
), addr_slot(slot
) {}
53 virtual ~ServerSocketImpl() {}
54 virtual int accept(ConnectedSocket
*sock
, const SocketOptions
&opt
, entity_addr_t
*out
, Worker
*w
) = 0;
55 virtual void abort_accept() = 0;
56 /// Get file descriptor
57 virtual int fd() const = 0;
61 /// \addtogroup networking-module
64 /// A TCP (or other stream-based protocol) connection.
66 /// A \c ConnectedSocket represents a full-duplex stream between
67 /// two endpoints, a local endpoint and a remote endpoint.
68 class ConnectedSocket
{
69 std::unique_ptr
<ConnectedSocketImpl
> _csi
;
72 /// Constructs a \c ConnectedSocket not corresponding to a connection
75 explicit ConnectedSocket(std::unique_ptr
<ConnectedSocketImpl
> csi
)
76 : _csi(std::move(csi
)) {}
82 /// Moves a \c ConnectedSocket object.
83 ConnectedSocket(ConnectedSocket
&& cs
) = default;
84 /// Move-assigns a \c ConnectedSocket object.
85 ConnectedSocket
& operator=(ConnectedSocket
&& cs
) = default;
88 return _csi
->is_connected();
90 /// Read the input stream with copy.
92 /// Copy an object returning data sent from the remote endpoint.
93 ssize_t
read(char* buf
, size_t len
) {
94 return _csi
->read(buf
, len
);
96 /// Gets the output stream.
98 /// Gets an object that sends data to the remote endpoint.
99 ssize_t
send(ceph::buffer::list
&bl
, bool more
) {
100 return _csi
->send(bl
, more
);
102 /// Disables output to the socket.
104 /// Current or future writes that have not been successfully flushed
105 /// will immediately fail with an error. This is useful to abort
106 /// operations on a socket that is not making progress due to a
109 return _csi
->shutdown();
111 /// Disables input from the socket.
113 /// Current or future reads will immediately fail with an error.
114 /// This is useful to abort operations on a socket that is not making
115 /// progress due to a peer failure.
121 /// Get file descriptor
126 explicit operator bool() const {
132 /// \addtogroup networking-module
135 /// A listening socket, waiting to accept incoming network connections.
137 std::unique_ptr
<ServerSocketImpl
> _ssi
;
139 /// Constructs a \c ServerSocket not corresponding to a connection
142 explicit ServerSocket(std::unique_ptr
<ServerSocketImpl
> ssi
)
143 : _ssi(std::move(ssi
)) {}
146 _ssi
->abort_accept();
149 /// Moves a \c ServerSocket object.
150 ServerSocket(ServerSocket
&& ss
) = default;
151 /// Move-assigns a \c ServerSocket object.
152 ServerSocket
& operator=(ServerSocket
&& cs
) = default;
154 /// Accepts the next connection to successfully connect to this socket.
156 /// \Accepts a \ref ConnectedSocket representing the connection, and
157 /// a \ref entity_addr_t describing the remote endpoint.
158 int accept(ConnectedSocket
*sock
, const SocketOptions
&opt
, entity_addr_t
*out
, Worker
*w
) {
159 return _ssi
->accept(sock
, opt
, out
, w
);
162 /// Stops any \ref accept() in progress.
164 /// Current and future \ref accept() calls will terminate immediately
166 void abort_accept() {
167 _ssi
->abort_accept();
171 /// Get file descriptor
176 /// get listen/bind addr
177 unsigned get_addr_slot() {
178 return _ssi
->addr_slot
;
181 explicit operator bool() const {
190 l_msgr_first
= 94000,
191 l_msgr_recv_messages
,
192 l_msgr_send_messages
,
195 l_msgr_created_connections
,
196 l_msgr_active_connections
,
198 l_msgr_running_total_time
,
199 l_msgr_running_send_time
,
200 l_msgr_running_recv_time
,
201 l_msgr_running_fast_dispatch_time
,
203 l_msgr_send_messages_queue_lat
,
204 l_msgr_handle_ack_lat
,
210 std::mutex init_lock
;
211 std::condition_variable init_cond
;
218 PerfCounters
*perf_logger
;
221 std::atomic_uint references
;
224 Worker(const Worker
&) = delete;
225 Worker
& operator=(const Worker
&) = delete;
227 Worker(CephContext
*c
, unsigned worker_id
)
228 : cct(c
), perf_logger(NULL
), id(worker_id
), references(0), center(c
) {
230 sprintf(name
, "AsyncMessenger::Worker-%u", id
);
231 // initialize perf_logger
232 PerfCountersBuilder
plb(cct
, name
, l_msgr_first
, l_msgr_last
);
234 plb
.add_u64_counter(l_msgr_recv_messages
, "msgr_recv_messages", "Network received messages");
235 plb
.add_u64_counter(l_msgr_send_messages
, "msgr_send_messages", "Network sent messages");
236 plb
.add_u64_counter(l_msgr_recv_bytes
, "msgr_recv_bytes", "Network received bytes", NULL
, 0, unit_t(UNIT_BYTES
));
237 plb
.add_u64_counter(l_msgr_send_bytes
, "msgr_send_bytes", "Network sent bytes", NULL
, 0, unit_t(UNIT_BYTES
));
238 plb
.add_u64_counter(l_msgr_active_connections
, "msgr_active_connections", "Active connection number");
239 plb
.add_u64_counter(l_msgr_created_connections
, "msgr_created_connections", "Created connection number");
241 plb
.add_time(l_msgr_running_total_time
, "msgr_running_total_time", "The total time of thread running");
242 plb
.add_time(l_msgr_running_send_time
, "msgr_running_send_time", "The total time of message sending");
243 plb
.add_time(l_msgr_running_recv_time
, "msgr_running_recv_time", "The total time of message receiving");
244 plb
.add_time(l_msgr_running_fast_dispatch_time
, "msgr_running_fast_dispatch_time", "The total time of fast dispatch");
246 plb
.add_time_avg(l_msgr_send_messages_queue_lat
, "msgr_send_messages_queue_lat", "Network sent messages lat");
247 plb
.add_time_avg(l_msgr_handle_ack_lat
, "msgr_handle_ack_lat", "Connection handle ack lat");
249 perf_logger
= plb
.create_perf_counters();
250 cct
->get_perfcounters_collection()->add(perf_logger
);
254 cct
->get_perfcounters_collection()->remove(perf_logger
);
259 virtual int listen(entity_addr_t
&addr
, unsigned addr_slot
,
260 const SocketOptions
&opts
, ServerSocket
*) = 0;
261 virtual int connect(const entity_addr_t
&addr
,
262 const SocketOptions
&opts
, ConnectedSocket
*socket
) = 0;
263 virtual void destroy() {}
265 virtual void initialize() {}
266 PerfCounters
*get_perf_counter() { return perf_logger
; }
267 void release_worker() {
268 int oldref
= references
.fetch_sub(1);
269 ceph_assert(oldref
> 0);
274 init_cond
.notify_all();
278 std::lock_guard
<std::mutex
> l(init_lock
);
281 void wait_for_init() {
282 std::unique_lock
<std::mutex
> l(init_lock
);
289 init_cond
.notify_all();
296 ceph::spinlock pool_spin
;
297 bool started
= false;
299 std::function
<void ()> add_thread(Worker
* w
);
301 virtual Worker
* create_worker(CephContext
*c
, unsigned i
) = 0;
302 virtual void rename_thread(unsigned id
) {
303 static constexpr int TASK_COMM_LEN
= 16;
304 char tp_name
[TASK_COMM_LEN
];
305 sprintf(tp_name
, "msgr-worker-%u", id
);
306 ceph_pthread_setname(pthread_self(), tp_name
);
311 std::vector
<Worker
*> workers
;
313 explicit NetworkStack(CephContext
*c
);
315 NetworkStack(const NetworkStack
&) = delete;
316 NetworkStack
& operator=(const NetworkStack
&) = delete;
317 virtual ~NetworkStack() {
318 for (auto &&w
: workers
)
322 static std::shared_ptr
<NetworkStack
> create(
323 CephContext
*c
, const std::string
&type
);
325 // backend need to override this method if backend doesn't support shared
327 // For example, posix backend has in kernel global listen table. If one
328 // thread bind a port, other threads also aware this.
329 // But for dpdk backend, we maintain listen table in each thread. So we
330 // need to let each thread do binding port.
331 virtual bool support_local_listen_table() const { return false; }
332 virtual bool nonblock_connect_need_writable_event() const { return true; }
336 virtual Worker
*get_worker();
337 Worker
*get_worker(unsigned worker_id
) {
338 return workers
[worker_id
];
341 unsigned get_num_worker() const {
342 return workers
.size();
345 // direct is used in tests only
346 virtual void spawn_worker(std::function
<void ()> &&) = 0;
347 virtual void join_worker(unsigned i
) = 0;
349 virtual bool is_ready() { return true; };
350 virtual void ready() { };
353 #endif //CEPH_MSG_ASYNC_STACK_H