1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_ASYNC_STACK_H
18 #define CEPH_MSG_ASYNC_STACK_H
20 #include "include/Spinlock.h"
21 #include "common/perf_counters.h"
22 #include "common/simple_spin.h"
23 #include "msg/msg_types.h"
24 #include "msg/async/Event.h"
27 class ConnectedSocketImpl
{
29 virtual ~ConnectedSocketImpl() {}
30 virtual int is_connected() = 0;
31 virtual ssize_t
read(char*, size_t) = 0;
32 virtual ssize_t
zero_copy_read(bufferptr
&) = 0;
33 virtual ssize_t
send(bufferlist
&bl
, bool more
) = 0;
34 virtual void shutdown() = 0;
35 virtual void close() = 0;
36 virtual int fd() const = 0;
39 class ConnectedSocket
;
40 struct SocketOptions
{
45 entity_addr_t connect_bind_addr
;
49 class ServerSocketImpl
{
51 virtual ~ServerSocketImpl() {}
52 virtual int accept(ConnectedSocket
*sock
, const SocketOptions
&opt
, entity_addr_t
*out
, Worker
*w
) = 0;
53 virtual void abort_accept() = 0;
54 /// Get file descriptor
55 virtual int fd() const = 0;
59 /// \addtogroup networking-module
62 /// A TCP (or other stream-based protocol) connection.
64 /// A \c ConnectedSocket represents a full-duplex stream between
65 /// two endpoints, a local endpoint and a remote endpoint.
66 class ConnectedSocket
{
67 std::unique_ptr
<ConnectedSocketImpl
> _csi
;
70 /// Constructs a \c ConnectedSocket not corresponding to a connection
73 explicit ConnectedSocket(std::unique_ptr
<ConnectedSocketImpl
> csi
)
74 : _csi(std::move(csi
)) {}
80 /// Moves a \c ConnectedSocket object.
81 ConnectedSocket(ConnectedSocket
&& cs
) = default;
82 /// Move-assigns a \c ConnectedSocket object.
83 ConnectedSocket
& operator=(ConnectedSocket
&& cs
) = default;
86 return _csi
->is_connected();
88 /// Read the input stream with copy.
90 /// Copy an object returning data sent from the remote endpoint.
91 ssize_t
read(char* buf
, size_t len
) {
92 return _csi
->read(buf
, len
);
94 /// Gets the input stream.
96 /// Gets an object returning data sent from the remote endpoint.
97 ssize_t
zero_copy_read(bufferptr
&data
) {
98 return _csi
->zero_copy_read(data
);
100 /// Gets the output stream.
102 /// Gets an object that sends data to the remote endpoint.
103 ssize_t
send(bufferlist
&bl
, bool more
) {
104 return _csi
->send(bl
, more
);
106 /// Disables output to the socket.
108 /// Current or future writes that have not been successfully flushed
109 /// will immediately fail with an error. This is useful to abort
110 /// operations on a socket that is not making progress due to a
113 return _csi
->shutdown();
115 /// Disables input from the socket.
117 /// Current or future reads will immediately fail with an error.
118 /// This is useful to abort operations on a socket that is not making
119 /// progress due to a peer failure.
125 /// Get file descriptor
130 explicit operator bool() const {
136 /// \addtogroup networking-module
139 /// A listening socket, waiting to accept incoming network connections.
141 std::unique_ptr
<ServerSocketImpl
> _ssi
;
143 /// Constructs a \c ServerSocket not corresponding to a connection
146 explicit ServerSocket(std::unique_ptr
<ServerSocketImpl
> ssi
)
147 : _ssi(std::move(ssi
)) {}
150 _ssi
->abort_accept();
153 /// Moves a \c ServerSocket object.
154 ServerSocket(ServerSocket
&& ss
) = default;
155 /// Move-assigns a \c ServerSocket object.
156 ServerSocket
& operator=(ServerSocket
&& cs
) = default;
158 /// Accepts the next connection to successfully connect to this socket.
160 /// \Accepts a \ref ConnectedSocket representing the connection, and
161 /// a \ref entity_addr_t describing the remote endpoint.
162 int accept(ConnectedSocket
*sock
, const SocketOptions
&opt
, entity_addr_t
*out
, Worker
*w
) {
163 return _ssi
->accept(sock
, opt
, out
, w
);
166 /// Stops any \ref accept() in progress.
168 /// Current and future \ref accept() calls will terminate immediately
170 void abort_accept() {
171 _ssi
->abort_accept();
175 /// Get file descriptor
180 explicit operator bool() const {
189 l_msgr_first
= 94000,
190 l_msgr_recv_messages
,
191 l_msgr_send_messages
,
192 l_msgr_send_messages_inline
,
195 l_msgr_created_connections
,
196 l_msgr_active_connections
,
201 std::mutex init_lock
;
202 std::condition_variable init_cond
;
209 PerfCounters
*perf_logger
;
212 std::atomic_uint references
;
215 Worker(const Worker
&) = delete;
216 Worker
& operator=(const Worker
&) = delete;
218 Worker(CephContext
*c
, unsigned i
)
219 : cct(c
), perf_logger(NULL
), id(i
), references(0), center(c
) {
221 sprintf(name
, "AsyncMessenger::Worker-%u", id
);
222 // initialize perf_logger
223 PerfCountersBuilder
plb(cct
, name
, l_msgr_first
, l_msgr_last
);
225 plb
.add_u64_counter(l_msgr_recv_messages
, "msgr_recv_messages", "Network received messages");
226 plb
.add_u64_counter(l_msgr_send_messages
, "msgr_send_messages", "Network sent messages");
227 plb
.add_u64_counter(l_msgr_send_messages_inline
, "msgr_send_messages_inline", "Network sent inline messages");
228 plb
.add_u64_counter(l_msgr_recv_bytes
, "msgr_recv_bytes", "Network received bytes");
229 plb
.add_u64_counter(l_msgr_send_bytes
, "msgr_send_bytes", "Network received bytes");
230 plb
.add_u64_counter(l_msgr_active_connections
, "msgr_active_connections", "Active connection number");
231 plb
.add_u64_counter(l_msgr_created_connections
, "msgr_created_connections", "Created connection number");
233 perf_logger
= plb
.create_perf_counters();
234 cct
->get_perfcounters_collection()->add(perf_logger
);
238 cct
->get_perfcounters_collection()->remove(perf_logger
);
243 virtual int listen(entity_addr_t
&addr
,
244 const SocketOptions
&opts
, ServerSocket
*) = 0;
245 virtual int connect(const entity_addr_t
&addr
,
246 const SocketOptions
&opts
, ConnectedSocket
*socket
) = 0;
247 virtual void destroy() {}
249 virtual void initialize() {}
250 PerfCounters
*get_perf_counter() { return perf_logger
; }
251 void release_worker() {
252 int oldref
= references
.fetch_sub(1);
258 init_cond
.notify_all();
262 std::lock_guard
<std::mutex
> l(init_lock
);
265 void wait_for_init() {
266 std::unique_lock
<std::mutex
> l(init_lock
);
273 init_cond
.notify_all();
279 class NetworkStack
: public CephContext::ForkWatcher
{
281 unsigned num_workers
= 0;
283 bool started
= false;
285 std::function
<void ()> add_thread(unsigned i
);
289 vector
<Worker
*> workers
;
291 explicit NetworkStack(CephContext
*c
, const string
&t
);
293 NetworkStack(const NetworkStack
&) = delete;
294 NetworkStack
& operator=(const NetworkStack
&) = delete;
295 ~NetworkStack() override
{
296 for (auto &&w
: workers
)
300 static std::shared_ptr
<NetworkStack
> create(
301 CephContext
*c
, const string
&type
);
303 static Worker
* create_worker(
304 CephContext
*c
, const string
&t
, unsigned i
);
305 // backend need to override this method if supports zero copy read
306 virtual bool support_zero_copy_read() const { return false; }
307 // backend need to override this method if backend doesn't support shared
309 // For example, posix backend has in kernel global listen table. If one
310 // thread bind a port, other threads also aware this.
311 // But for dpdk backend, we maintain listen table in each thread. So we
312 // need to let each thread do binding port.
313 virtual bool support_local_listen_table() const { return false; }
314 virtual bool nonblock_connect_need_writable_event() const { return true; }
318 virtual Worker
*get_worker();
319 Worker
*get_worker(unsigned i
) {
323 unsigned get_num_worker() const {
327 // direct is used in tests only
328 virtual void spawn_worker(unsigned i
, std::function
<void ()> &&) = 0;
329 virtual void join_worker(unsigned i
) = 0;
331 void handle_pre_fork() override
{
335 void handle_post_fork() override
{
339 virtual bool is_ready() { return true; };
340 virtual void ready() { };
343 #endif //CEPH_MSG_ASYNC_STACK_H