]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Stack.h
update sources to v12.1.0
[ceph.git] / ceph / src / msg / async / Stack.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_ASYNC_STACK_H
18 #define CEPH_MSG_ASYNC_STACK_H
19
20 #include "include/Spinlock.h"
21 #include "common/perf_counters.h"
22 #include "common/simple_spin.h"
23 #include "msg/msg_types.h"
24 #include "msg/async/Event.h"
25
26 class Worker;
27 class ConnectedSocketImpl {
28 public:
29 virtual ~ConnectedSocketImpl() {}
30 virtual int is_connected() = 0;
31 virtual ssize_t read(char*, size_t) = 0;
32 virtual ssize_t zero_copy_read(bufferptr&) = 0;
33 virtual ssize_t send(bufferlist &bl, bool more) = 0;
34 virtual void shutdown() = 0;
35 virtual void close() = 0;
36 virtual int fd() const = 0;
37 };
38
39 class ConnectedSocket;
40 struct SocketOptions {
41 bool nonblock = true;
42 bool nodelay = true;
43 int rcbuf_size = 0;
44 int priority = -1;
45 entity_addr_t connect_bind_addr;
46 };
47
48 /// \cond internal
49 class ServerSocketImpl {
50 public:
51 virtual ~ServerSocketImpl() {}
52 virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
53 virtual void abort_accept() = 0;
54 /// Get file descriptor
55 virtual int fd() const = 0;
56 };
57 /// \endcond
58
59 /// \addtogroup networking-module
60 /// @{
61
62 /// A TCP (or other stream-based protocol) connection.
63 ///
64 /// A \c ConnectedSocket represents a full-duplex stream between
65 /// two endpoints, a local endpoint and a remote endpoint.
66 class ConnectedSocket {
67 std::unique_ptr<ConnectedSocketImpl> _csi;
68
69 public:
70 /// Constructs a \c ConnectedSocket not corresponding to a connection
71 ConnectedSocket() {};
72 /// \cond internal
73 explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi)
74 : _csi(std::move(csi)) {}
75 /// \endcond
76 ~ConnectedSocket() {
77 if (_csi)
78 _csi->close();
79 }
80 /// Moves a \c ConnectedSocket object.
81 ConnectedSocket(ConnectedSocket&& cs) = default;
82 /// Move-assigns a \c ConnectedSocket object.
83 ConnectedSocket& operator=(ConnectedSocket&& cs) = default;
84
85 int is_connected() {
86 return _csi->is_connected();
87 }
88 /// Read the input stream with copy.
89 ///
90 /// Copy an object returning data sent from the remote endpoint.
91 ssize_t read(char* buf, size_t len) {
92 return _csi->read(buf, len);
93 }
94 /// Gets the input stream.
95 ///
96 /// Gets an object returning data sent from the remote endpoint.
97 ssize_t zero_copy_read(bufferptr &data) {
98 return _csi->zero_copy_read(data);
99 }
100 /// Gets the output stream.
101 ///
102 /// Gets an object that sends data to the remote endpoint.
103 ssize_t send(bufferlist &bl, bool more) {
104 return _csi->send(bl, more);
105 }
106 /// Disables output to the socket.
107 ///
108 /// Current or future writes that have not been successfully flushed
109 /// will immediately fail with an error. This is useful to abort
110 /// operations on a socket that is not making progress due to a
111 /// peer failure.
112 void shutdown() {
113 return _csi->shutdown();
114 }
115 /// Disables input from the socket.
116 ///
117 /// Current or future reads will immediately fail with an error.
118 /// This is useful to abort operations on a socket that is not making
119 /// progress due to a peer failure.
120 void close() {
121 _csi->close();
122 _csi.reset();
123 }
124
125 /// Get file descriptor
126 int fd() const {
127 return _csi->fd();
128 }
129
130 explicit operator bool() const {
131 return _csi.get();
132 }
133 };
134 /// @}
135
136 /// \addtogroup networking-module
137 /// @{
138
139 /// A listening socket, waiting to accept incoming network connections.
140 class ServerSocket {
141 std::unique_ptr<ServerSocketImpl> _ssi;
142 public:
143 /// Constructs a \c ServerSocket not corresponding to a connection
144 ServerSocket() {}
145 /// \cond internal
146 explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi)
147 : _ssi(std::move(ssi)) {}
148 ~ServerSocket() {
149 if (_ssi)
150 _ssi->abort_accept();
151 }
152 /// \endcond
153 /// Moves a \c ServerSocket object.
154 ServerSocket(ServerSocket&& ss) = default;
155 /// Move-assigns a \c ServerSocket object.
156 ServerSocket& operator=(ServerSocket&& cs) = default;
157
158 /// Accepts the next connection to successfully connect to this socket.
159 ///
160 /// \Accepts a \ref ConnectedSocket representing the connection, and
161 /// a \ref entity_addr_t describing the remote endpoint.
162 int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
163 return _ssi->accept(sock, opt, out, w);
164 }
165
166 /// Stops any \ref accept() in progress.
167 ///
168 /// Current and future \ref accept() calls will terminate immediately
169 /// with an error.
170 void abort_accept() {
171 _ssi->abort_accept();
172 _ssi.reset();
173 }
174
175 /// Get file descriptor
176 int fd() const {
177 return _ssi->fd();
178 }
179
180 explicit operator bool() const {
181 return _ssi.get();
182 }
183 };
184 /// @}
185
186 class NetworkStack;
187
188 enum {
189 l_msgr_first = 94000,
190 l_msgr_recv_messages,
191 l_msgr_send_messages,
192 l_msgr_recv_bytes,
193 l_msgr_send_bytes,
194 l_msgr_created_connections,
195 l_msgr_active_connections,
196
197 l_msgr_running_total_time,
198 l_msgr_running_send_time,
199 l_msgr_running_recv_time,
200 l_msgr_running_fast_dispatch_time,
201
202 l_msgr_last,
203 };
204
205 class Worker {
206 std::mutex init_lock;
207 std::condition_variable init_cond;
208 bool init = false;
209
210 public:
211 bool done = false;
212
213 CephContext *cct;
214 PerfCounters *perf_logger;
215 unsigned id;
216
217 std::atomic_uint references;
218 EventCenter center;
219
220 Worker(const Worker&) = delete;
221 Worker& operator=(const Worker&) = delete;
222
223 Worker(CephContext *c, unsigned i)
224 : cct(c), perf_logger(NULL), id(i), references(0), center(c) {
225 char name[128];
226 sprintf(name, "AsyncMessenger::Worker-%u", id);
227 // initialize perf_logger
228 PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
229
230 plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
231 plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
232 plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
233 plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
234 plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
235 plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
236
237 plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running");
238 plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending");
239 plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving");
240 plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch");
241
242 perf_logger = plb.create_perf_counters();
243 cct->get_perfcounters_collection()->add(perf_logger);
244 }
245 virtual ~Worker() {
246 if (perf_logger) {
247 cct->get_perfcounters_collection()->remove(perf_logger);
248 delete perf_logger;
249 }
250 }
251
252 virtual int listen(entity_addr_t &addr,
253 const SocketOptions &opts, ServerSocket *) = 0;
254 virtual int connect(const entity_addr_t &addr,
255 const SocketOptions &opts, ConnectedSocket *socket) = 0;
256 virtual void destroy() {}
257
258 virtual void initialize() {}
259 PerfCounters *get_perf_counter() { return perf_logger; }
260 void release_worker() {
261 int oldref = references.fetch_sub(1);
262 assert(oldref > 0);
263 }
264 void init_done() {
265 init_lock.lock();
266 init = true;
267 init_cond.notify_all();
268 init_lock.unlock();
269 }
270 bool is_init() {
271 std::lock_guard<std::mutex> l(init_lock);
272 return init;
273 }
274 void wait_for_init() {
275 std::unique_lock<std::mutex> l(init_lock);
276 while (!init)
277 init_cond.wait(l);
278 }
279 void reset() {
280 init_lock.lock();
281 init = false;
282 init_cond.notify_all();
283 init_lock.unlock();
284 done = false;
285 }
286 };
287
288 class NetworkStack : public CephContext::ForkWatcher {
289 std::string type;
290 unsigned num_workers = 0;
291 Spinlock pool_spin;
292 bool started = false;
293
294 std::function<void ()> add_thread(unsigned i);
295
296 protected:
297 CephContext *cct;
298 vector<Worker*> workers;
299
300 explicit NetworkStack(CephContext *c, const string &t);
301 public:
302 NetworkStack(const NetworkStack &) = delete;
303 NetworkStack& operator=(const NetworkStack &) = delete;
304 ~NetworkStack() override {
305 for (auto &&w : workers)
306 delete w;
307 }
308
309 static std::shared_ptr<NetworkStack> create(
310 CephContext *c, const string &type);
311
312 static Worker* create_worker(
313 CephContext *c, const string &t, unsigned i);
314 // backend need to override this method if supports zero copy read
315 virtual bool support_zero_copy_read() const { return false; }
316 // backend need to override this method if backend doesn't support shared
317 // listen table.
318 // For example, posix backend has in kernel global listen table. If one
319 // thread bind a port, other threads also aware this.
320 // But for dpdk backend, we maintain listen table in each thread. So we
321 // need to let each thread do binding port.
322 virtual bool support_local_listen_table() const { return false; }
323 virtual bool nonblock_connect_need_writable_event() const { return true; }
324
325 void start();
326 void stop();
327 virtual Worker *get_worker();
328 Worker *get_worker(unsigned i) {
329 return workers[i];
330 }
331 void drain();
332 unsigned get_num_worker() const {
333 return num_workers;
334 }
335
336 // direct is used in tests only
337 virtual void spawn_worker(unsigned i, std::function<void ()> &&) = 0;
338 virtual void join_worker(unsigned i) = 0;
339
340 void handle_pre_fork() override {
341 stop();
342 }
343
344 void handle_post_fork() override {
345 start();
346 }
347
348 virtual bool is_ready() { return true; };
349 virtual void ready() { };
350 };
351
352 #endif //CEPH_MSG_ASYNC_STACK_H