]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Stack.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / Stack.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_ASYNC_STACK_H
18 #define CEPH_MSG_ASYNC_STACK_H
19
20 #include "include/spinlock.h"
21 #include "common/perf_counters.h"
22 #include "msg/msg_types.h"
23 #include "msg/async/Event.h"
24
25 class Worker;
26 class ConnectedSocketImpl {
27 public:
28 virtual ~ConnectedSocketImpl() {}
29 virtual int is_connected() = 0;
30 virtual ssize_t read(char*, size_t) = 0;
31 virtual ssize_t send(ceph::buffer::list &bl, bool more) = 0;
32 virtual void shutdown() = 0;
33 virtual void close() = 0;
34 virtual int fd() const = 0;
35 };
36
37 class ConnectedSocket;
38 struct SocketOptions {
39 bool nonblock = true;
40 bool nodelay = true;
41 int rcbuf_size = 0;
42 int priority = -1;
43 entity_addr_t connect_bind_addr;
44 };
45
46 /// \cond internal
47 class ServerSocketImpl {
48 public:
49 unsigned addr_type; ///< entity_addr_t::TYPE_*
50 unsigned addr_slot; ///< position of our addr in myaddrs().v
51 ServerSocketImpl(unsigned type, unsigned slot)
52 : addr_type(type), addr_slot(slot) {}
53 virtual ~ServerSocketImpl() {}
54 virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
55 virtual void abort_accept() = 0;
56 /// Get file descriptor
57 virtual int fd() const = 0;
58 };
59 /// \endcond
60
61 /// \addtogroup networking-module
62 /// @{
63
64 /// A TCP (or other stream-based protocol) connection.
65 ///
66 /// A \c ConnectedSocket represents a full-duplex stream between
67 /// two endpoints, a local endpoint and a remote endpoint.
68 class ConnectedSocket {
69 std::unique_ptr<ConnectedSocketImpl> _csi;
70
71 public:
72 /// Constructs a \c ConnectedSocket not corresponding to a connection
73 ConnectedSocket() {};
74 /// \cond internal
75 explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi)
76 : _csi(std::move(csi)) {}
77 /// \endcond
78 ~ConnectedSocket() {
79 if (_csi)
80 _csi->close();
81 }
82 /// Moves a \c ConnectedSocket object.
83 ConnectedSocket(ConnectedSocket&& cs) = default;
84 /// Move-assigns a \c ConnectedSocket object.
85 ConnectedSocket& operator=(ConnectedSocket&& cs) = default;
86
87 int is_connected() {
88 return _csi->is_connected();
89 }
90 /// Read the input stream with copy.
91 ///
92 /// Copy an object returning data sent from the remote endpoint.
93 ssize_t read(char* buf, size_t len) {
94 return _csi->read(buf, len);
95 }
96 /// Gets the output stream.
97 ///
98 /// Gets an object that sends data to the remote endpoint.
99 ssize_t send(ceph::buffer::list &bl, bool more) {
100 return _csi->send(bl, more);
101 }
102 /// Disables output to the socket.
103 ///
104 /// Current or future writes that have not been successfully flushed
105 /// will immediately fail with an error. This is useful to abort
106 /// operations on a socket that is not making progress due to a
107 /// peer failure.
108 void shutdown() {
109 return _csi->shutdown();
110 }
111 /// Disables input from the socket.
112 ///
113 /// Current or future reads will immediately fail with an error.
114 /// This is useful to abort operations on a socket that is not making
115 /// progress due to a peer failure.
116 void close() {
117 _csi->close();
118 _csi.reset();
119 }
120
121 /// Get file descriptor
122 int fd() const {
123 return _csi->fd();
124 }
125
126 explicit operator bool() const {
127 return _csi.get();
128 }
129 };
130 /// @}
131
132 /// \addtogroup networking-module
133 /// @{
134
135 /// A listening socket, waiting to accept incoming network connections.
136 class ServerSocket {
137 std::unique_ptr<ServerSocketImpl> _ssi;
138 public:
139 /// Constructs a \c ServerSocket not corresponding to a connection
140 ServerSocket() {}
141 /// \cond internal
142 explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi)
143 : _ssi(std::move(ssi)) {}
144 ~ServerSocket() {
145 if (_ssi)
146 _ssi->abort_accept();
147 }
148 /// \endcond
149 /// Moves a \c ServerSocket object.
150 ServerSocket(ServerSocket&& ss) = default;
151 /// Move-assigns a \c ServerSocket object.
152 ServerSocket& operator=(ServerSocket&& cs) = default;
153
154 /// Accepts the next connection to successfully connect to this socket.
155 ///
156 /// \Accepts a \ref ConnectedSocket representing the connection, and
157 /// a \ref entity_addr_t describing the remote endpoint.
158 int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
159 return _ssi->accept(sock, opt, out, w);
160 }
161
162 /// Stops any \ref accept() in progress.
163 ///
164 /// Current and future \ref accept() calls will terminate immediately
165 /// with an error.
166 void abort_accept() {
167 _ssi->abort_accept();
168 _ssi.reset();
169 }
170
171 /// Get file descriptor
172 int fd() const {
173 return _ssi->fd();
174 }
175
176 /// get listen/bind addr
177 unsigned get_addr_slot() {
178 return _ssi->addr_slot;
179 }
180
181 explicit operator bool() const {
182 return _ssi.get();
183 }
184 };
185 /// @}
186
187 class NetworkStack;
188
189 enum {
190 l_msgr_first = 94000,
191 l_msgr_recv_messages,
192 l_msgr_send_messages,
193 l_msgr_recv_bytes,
194 l_msgr_send_bytes,
195 l_msgr_created_connections,
196 l_msgr_active_connections,
197
198 l_msgr_running_total_time,
199 l_msgr_running_send_time,
200 l_msgr_running_recv_time,
201 l_msgr_running_fast_dispatch_time,
202
203 l_msgr_send_messages_queue_lat,
204 l_msgr_handle_ack_lat,
205
206 l_msgr_last,
207 };
208
209 class Worker {
210 std::mutex init_lock;
211 std::condition_variable init_cond;
212 bool init = false;
213
214 public:
215 bool done = false;
216
217 CephContext *cct;
218 PerfCounters *perf_logger;
219 unsigned id;
220
221 std::atomic_uint references;
222 EventCenter center;
223
224 Worker(const Worker&) = delete;
225 Worker& operator=(const Worker&) = delete;
226
227 Worker(CephContext *c, unsigned worker_id)
228 : cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) {
229 char name[128];
230 sprintf(name, "AsyncMessenger::Worker-%u", id);
231 // initialize perf_logger
232 PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
233
234 plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
235 plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
236 plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes", NULL, 0, unit_t(UNIT_BYTES));
237 plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network sent bytes", NULL, 0, unit_t(UNIT_BYTES));
238 plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
239 plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
240
241 plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running");
242 plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending");
243 plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving");
244 plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch");
245
246 plb.add_time_avg(l_msgr_send_messages_queue_lat, "msgr_send_messages_queue_lat", "Network sent messages lat");
247 plb.add_time_avg(l_msgr_handle_ack_lat, "msgr_handle_ack_lat", "Connection handle ack lat");
248
249 perf_logger = plb.create_perf_counters();
250 cct->get_perfcounters_collection()->add(perf_logger);
251 }
252 virtual ~Worker() {
253 if (perf_logger) {
254 cct->get_perfcounters_collection()->remove(perf_logger);
255 delete perf_logger;
256 }
257 }
258
259 virtual int listen(entity_addr_t &addr, unsigned addr_slot,
260 const SocketOptions &opts, ServerSocket *) = 0;
261 virtual int connect(const entity_addr_t &addr,
262 const SocketOptions &opts, ConnectedSocket *socket) = 0;
263 virtual void destroy() {}
264
265 virtual void initialize() {}
266 PerfCounters *get_perf_counter() { return perf_logger; }
267 void release_worker() {
268 int oldref = references.fetch_sub(1);
269 ceph_assert(oldref > 0);
270 }
271 void init_done() {
272 init_lock.lock();
273 init = true;
274 init_cond.notify_all();
275 init_lock.unlock();
276 }
277 bool is_init() {
278 std::lock_guard<std::mutex> l(init_lock);
279 return init;
280 }
281 void wait_for_init() {
282 std::unique_lock<std::mutex> l(init_lock);
283 while (!init)
284 init_cond.wait(l);
285 }
286 void reset() {
287 init_lock.lock();
288 init = false;
289 init_cond.notify_all();
290 init_lock.unlock();
291 done = false;
292 }
293 };
294
295 class NetworkStack {
296 ceph::spinlock pool_spin;
297 bool started = false;
298
299 std::function<void ()> add_thread(Worker* w);
300
301 virtual Worker* create_worker(CephContext *c, unsigned i) = 0;
302 virtual void rename_thread(unsigned id) {
303 static constexpr int TASK_COMM_LEN = 16;
304 char tp_name[TASK_COMM_LEN];
305 sprintf(tp_name, "msgr-worker-%u", id);
306 ceph_pthread_setname(pthread_self(), tp_name);
307 }
308
309 protected:
310 CephContext *cct;
311 std::vector<Worker*> workers;
312
313 explicit NetworkStack(CephContext *c);
314 public:
315 NetworkStack(const NetworkStack &) = delete;
316 NetworkStack& operator=(const NetworkStack &) = delete;
317 virtual ~NetworkStack() {
318 for (auto &&w : workers)
319 delete w;
320 }
321
322 static std::shared_ptr<NetworkStack> create(
323 CephContext *c, const std::string &type);
324
325 // backend need to override this method if backend doesn't support shared
326 // listen table.
327 // For example, posix backend has in kernel global listen table. If one
328 // thread bind a port, other threads also aware this.
329 // But for dpdk backend, we maintain listen table in each thread. So we
330 // need to let each thread do binding port.
331 virtual bool support_local_listen_table() const { return false; }
332 virtual bool nonblock_connect_need_writable_event() const { return true; }
333
334 void start();
335 void stop();
336 virtual Worker *get_worker();
337 Worker *get_worker(unsigned worker_id) {
338 return workers[worker_id];
339 }
340 void drain();
341 unsigned get_num_worker() const {
342 return workers.size();
343 }
344
345 // direct is used in tests only
346 virtual void spawn_worker(std::function<void ()> &&) = 0;
347 virtual void join_worker(unsigned i) = 0;
348
349 virtual bool is_ready() { return true; };
350 virtual void ready() { };
351 };
352
353 #endif //CEPH_MSG_ASYNC_STACK_H