]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Stack.h
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / msg / async / Stack.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_ASYNC_STACK_H
18 #define CEPH_MSG_ASYNC_STACK_H
19
20 #include "include/Spinlock.h"
21 #include "common/perf_counters.h"
22 #include "common/simple_spin.h"
23 #include "msg/msg_types.h"
24 #include "msg/async/Event.h"
25
26 class Worker;
27 class ConnectedSocketImpl {
28 public:
29 virtual ~ConnectedSocketImpl() {}
30 virtual int is_connected() = 0;
31 virtual ssize_t read(char*, size_t) = 0;
32 virtual ssize_t zero_copy_read(bufferptr&) = 0;
33 virtual ssize_t send(bufferlist &bl, bool more) = 0;
34 virtual void shutdown() = 0;
35 virtual void close() = 0;
36 virtual int fd() const = 0;
37 };
38
39 class ConnectedSocket;
40 struct SocketOptions {
41 bool nonblock = true;
42 bool nodelay = true;
43 int rcbuf_size = 0;
44 int priority = -1;
45 entity_addr_t connect_bind_addr;
46 };
47
48 /// \cond internal
49 class ServerSocketImpl {
50 public:
51 virtual ~ServerSocketImpl() {}
52 virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
53 virtual void abort_accept() = 0;
54 /// Get file descriptor
55 virtual int fd() const = 0;
56 };
57 /// \endcond
58
59 /// \addtogroup networking-module
60 /// @{
61
62 /// A TCP (or other stream-based protocol) connection.
63 ///
64 /// A \c ConnectedSocket represents a full-duplex stream between
65 /// two endpoints, a local endpoint and a remote endpoint.
66 class ConnectedSocket {
67 std::unique_ptr<ConnectedSocketImpl> _csi;
68
69 public:
70 /// Constructs a \c ConnectedSocket not corresponding to a connection
71 ConnectedSocket() {};
72 /// \cond internal
73 explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi)
74 : _csi(std::move(csi)) {}
75 /// \endcond
76 ~ConnectedSocket() {
77 if (_csi)
78 _csi->close();
79 }
80 /// Moves a \c ConnectedSocket object.
81 ConnectedSocket(ConnectedSocket&& cs) = default;
82 /// Move-assigns a \c ConnectedSocket object.
83 ConnectedSocket& operator=(ConnectedSocket&& cs) = default;
84
85 int is_connected() {
86 return _csi->is_connected();
87 }
88 /// Read the input stream with copy.
89 ///
90 /// Copy an object returning data sent from the remote endpoint.
91 ssize_t read(char* buf, size_t len) {
92 return _csi->read(buf, len);
93 }
94 /// Gets the input stream.
95 ///
96 /// Gets an object returning data sent from the remote endpoint.
97 ssize_t zero_copy_read(bufferptr &data) {
98 return _csi->zero_copy_read(data);
99 }
100 /// Gets the output stream.
101 ///
102 /// Gets an object that sends data to the remote endpoint.
103 ssize_t send(bufferlist &bl, bool more) {
104 return _csi->send(bl, more);
105 }
106 /// Disables output to the socket.
107 ///
108 /// Current or future writes that have not been successfully flushed
109 /// will immediately fail with an error. This is useful to abort
110 /// operations on a socket that is not making progress due to a
111 /// peer failure.
112 void shutdown() {
113 return _csi->shutdown();
114 }
115 /// Disables input from the socket.
116 ///
117 /// Current or future reads will immediately fail with an error.
118 /// This is useful to abort operations on a socket that is not making
119 /// progress due to a peer failure.
120 void close() {
121 _csi->close();
122 _csi.reset();
123 }
124
125 /// Get file descriptor
126 int fd() const {
127 return _csi->fd();
128 }
129
130 explicit operator bool() const {
131 return _csi.get();
132 }
133 };
134 /// @}
135
136 /// \addtogroup networking-module
137 /// @{
138
139 /// A listening socket, waiting to accept incoming network connections.
140 class ServerSocket {
141 std::unique_ptr<ServerSocketImpl> _ssi;
142 public:
143 /// Constructs a \c ServerSocket not corresponding to a connection
144 ServerSocket() {}
145 /// \cond internal
146 explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi)
147 : _ssi(std::move(ssi)) {}
148 ~ServerSocket() {
149 if (_ssi)
150 _ssi->abort_accept();
151 }
152 /// \endcond
153 /// Moves a \c ServerSocket object.
154 ServerSocket(ServerSocket&& ss) = default;
155 /// Move-assigns a \c ServerSocket object.
156 ServerSocket& operator=(ServerSocket&& cs) = default;
157
158 /// Accepts the next connection to successfully connect to this socket.
159 ///
160 /// \Accepts a \ref ConnectedSocket representing the connection, and
161 /// a \ref entity_addr_t describing the remote endpoint.
162 int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
163 return _ssi->accept(sock, opt, out, w);
164 }
165
166 /// Stops any \ref accept() in progress.
167 ///
168 /// Current and future \ref accept() calls will terminate immediately
169 /// with an error.
170 void abort_accept() {
171 _ssi->abort_accept();
172 _ssi.reset();
173 }
174
175 /// Get file descriptor
176 int fd() const {
177 return _ssi->fd();
178 }
179
180 explicit operator bool() const {
181 return _ssi.get();
182 }
183 };
184 /// @}
185
186 class NetworkStack;
187
188 enum {
189 l_msgr_first = 94000,
190 l_msgr_recv_messages,
191 l_msgr_send_messages,
192 l_msgr_send_messages_inline,
193 l_msgr_recv_bytes,
194 l_msgr_send_bytes,
195 l_msgr_created_connections,
196 l_msgr_active_connections,
197 l_msgr_last,
198 };
199
200 class Worker {
201 std::mutex init_lock;
202 std::condition_variable init_cond;
203 bool init = false;
204
205 public:
206 bool done = false;
207
208 CephContext *cct;
209 PerfCounters *perf_logger;
210 unsigned id;
211
212 std::atomic_uint references;
213 EventCenter center;
214
215 Worker(const Worker&) = delete;
216 Worker& operator=(const Worker&) = delete;
217
218 Worker(CephContext *c, unsigned i)
219 : cct(c), perf_logger(NULL), id(i), references(0), center(c) {
220 char name[128];
221 sprintf(name, "AsyncMessenger::Worker-%u", id);
222 // initialize perf_logger
223 PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
224
225 plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
226 plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
227 plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages");
228 plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
229 plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
230 plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
231 plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
232
233 perf_logger = plb.create_perf_counters();
234 cct->get_perfcounters_collection()->add(perf_logger);
235 }
236 virtual ~Worker() {
237 if (perf_logger) {
238 cct->get_perfcounters_collection()->remove(perf_logger);
239 delete perf_logger;
240 }
241 }
242
243 virtual int listen(entity_addr_t &addr,
244 const SocketOptions &opts, ServerSocket *) = 0;
245 virtual int connect(const entity_addr_t &addr,
246 const SocketOptions &opts, ConnectedSocket *socket) = 0;
247 virtual void destroy() {}
248
249 virtual void initialize() {}
250 PerfCounters *get_perf_counter() { return perf_logger; }
251 void release_worker() {
252 int oldref = references.fetch_sub(1);
253 assert(oldref > 0);
254 }
255 void init_done() {
256 init_lock.lock();
257 init = true;
258 init_cond.notify_all();
259 init_lock.unlock();
260 }
261 bool is_init() {
262 std::lock_guard<std::mutex> l(init_lock);
263 return init;
264 }
265 void wait_for_init() {
266 std::unique_lock<std::mutex> l(init_lock);
267 while (!init)
268 init_cond.wait(l);
269 }
270 void reset() {
271 init_lock.lock();
272 init = false;
273 init_cond.notify_all();
274 init_lock.unlock();
275 done = false;
276 }
277 };
278
279 class NetworkStack : public CephContext::ForkWatcher {
280 std::string type;
281 unsigned num_workers = 0;
282 Spinlock pool_spin;
283 bool started = false;
284
285 std::function<void ()> add_thread(unsigned i);
286
287 protected:
288 CephContext *cct;
289 vector<Worker*> workers;
290
291 explicit NetworkStack(CephContext *c, const string &t);
292 public:
293 NetworkStack(const NetworkStack &) = delete;
294 NetworkStack& operator=(const NetworkStack &) = delete;
295 ~NetworkStack() override {
296 for (auto &&w : workers)
297 delete w;
298 }
299
300 static std::shared_ptr<NetworkStack> create(
301 CephContext *c, const string &type);
302
303 static Worker* create_worker(
304 CephContext *c, const string &t, unsigned i);
305 // backend need to override this method if supports zero copy read
306 virtual bool support_zero_copy_read() const { return false; }
307 // backend need to override this method if backend doesn't support shared
308 // listen table.
309 // For example, posix backend has in kernel global listen table. If one
310 // thread bind a port, other threads also aware this.
311 // But for dpdk backend, we maintain listen table in each thread. So we
312 // need to let each thread do binding port.
313 virtual bool support_local_listen_table() const { return false; }
314 virtual bool nonblock_connect_need_writable_event() const { return true; }
315
316 void start();
317 void stop();
318 virtual Worker *get_worker();
319 Worker *get_worker(unsigned i) {
320 return workers[i];
321 }
322 void drain();
323 unsigned get_num_worker() const {
324 return num_workers;
325 }
326
327 // direct is used in tests only
328 virtual void spawn_worker(unsigned i, std::function<void ()> &&) = 0;
329 virtual void join_worker(unsigned i) = 0;
330
331 void handle_pre_fork() override {
332 stop();
333 }
334
335 void handle_post_fork() override {
336 start();
337 }
338
339 virtual bool is_ready() { return true; };
340 virtual void ready() { };
341 };
342
343 #endif //CEPH_MSG_ASYNC_STACK_H