]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/Stack.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / Stack.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#ifndef CEPH_MSG_ASYNC_STACK_H
18#define CEPH_MSG_ASYNC_STACK_H
19
11fdf7f2 20#include "include/spinlock.h"
7c673cae 21#include "common/perf_counters.h"
7c673cae
FG
22#include "msg/msg_types.h"
23#include "msg/async/Event.h"
24
25class Worker;
26class ConnectedSocketImpl {
27 public:
28 virtual ~ConnectedSocketImpl() {}
29 virtual int is_connected() = 0;
30 virtual ssize_t read(char*, size_t) = 0;
7c673cae
FG
31 virtual ssize_t send(bufferlist &bl, bool more) = 0;
32 virtual void shutdown() = 0;
33 virtual void close() = 0;
34 virtual int fd() const = 0;
35};
36
37class ConnectedSocket;
38struct SocketOptions {
39 bool nonblock = true;
40 bool nodelay = true;
41 int rcbuf_size = 0;
42 int priority = -1;
43 entity_addr_t connect_bind_addr;
44};
45
46/// \cond internal
47class ServerSocketImpl {
48 public:
11fdf7f2
TL
49 unsigned addr_type; ///< entity_addr_t::TYPE_*
50 unsigned addr_slot; ///< position of our addr in myaddrs().v
51 ServerSocketImpl(unsigned type, unsigned slot)
52 : addr_type(type), addr_slot(slot) {}
7c673cae
FG
53 virtual ~ServerSocketImpl() {}
54 virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
55 virtual void abort_accept() = 0;
56 /// Get file descriptor
57 virtual int fd() const = 0;
58};
59/// \endcond
60
61/// \addtogroup networking-module
62/// @{
63
64/// A TCP (or other stream-based protocol) connection.
65///
66/// A \c ConnectedSocket represents a full-duplex stream between
67/// two endpoints, a local endpoint and a remote endpoint.
68class ConnectedSocket {
69 std::unique_ptr<ConnectedSocketImpl> _csi;
70
71 public:
72 /// Constructs a \c ConnectedSocket not corresponding to a connection
73 ConnectedSocket() {};
74 /// \cond internal
75 explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi)
76 : _csi(std::move(csi)) {}
77 /// \endcond
78 ~ConnectedSocket() {
79 if (_csi)
80 _csi->close();
81 }
82 /// Moves a \c ConnectedSocket object.
83 ConnectedSocket(ConnectedSocket&& cs) = default;
84 /// Move-assigns a \c ConnectedSocket object.
85 ConnectedSocket& operator=(ConnectedSocket&& cs) = default;
86
87 int is_connected() {
88 return _csi->is_connected();
89 }
90 /// Read the input stream with copy.
91 ///
92 /// Copy an object returning data sent from the remote endpoint.
93 ssize_t read(char* buf, size_t len) {
94 return _csi->read(buf, len);
95 }
7c673cae
FG
96 /// Gets the output stream.
97 ///
98 /// Gets an object that sends data to the remote endpoint.
99 ssize_t send(bufferlist &bl, bool more) {
100 return _csi->send(bl, more);
101 }
102 /// Disables output to the socket.
103 ///
104 /// Current or future writes that have not been successfully flushed
105 /// will immediately fail with an error. This is useful to abort
106 /// operations on a socket that is not making progress due to a
107 /// peer failure.
108 void shutdown() {
109 return _csi->shutdown();
110 }
111 /// Disables input from the socket.
112 ///
113 /// Current or future reads will immediately fail with an error.
114 /// This is useful to abort operations on a socket that is not making
115 /// progress due to a peer failure.
116 void close() {
117 _csi->close();
118 _csi.reset();
119 }
120
121 /// Get file descriptor
122 int fd() const {
123 return _csi->fd();
124 }
125
126 explicit operator bool() const {
127 return _csi.get();
128 }
129};
130/// @}
131
132/// \addtogroup networking-module
133/// @{
134
135/// A listening socket, waiting to accept incoming network connections.
136class ServerSocket {
137 std::unique_ptr<ServerSocketImpl> _ssi;
138 public:
139 /// Constructs a \c ServerSocket not corresponding to a connection
140 ServerSocket() {}
141 /// \cond internal
142 explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi)
143 : _ssi(std::move(ssi)) {}
144 ~ServerSocket() {
145 if (_ssi)
146 _ssi->abort_accept();
147 }
148 /// \endcond
149 /// Moves a \c ServerSocket object.
150 ServerSocket(ServerSocket&& ss) = default;
151 /// Move-assigns a \c ServerSocket object.
152 ServerSocket& operator=(ServerSocket&& cs) = default;
153
154 /// Accepts the next connection to successfully connect to this socket.
155 ///
156 /// \Accepts a \ref ConnectedSocket representing the connection, and
157 /// a \ref entity_addr_t describing the remote endpoint.
158 int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
159 return _ssi->accept(sock, opt, out, w);
160 }
161
162 /// Stops any \ref accept() in progress.
163 ///
164 /// Current and future \ref accept() calls will terminate immediately
165 /// with an error.
166 void abort_accept() {
167 _ssi->abort_accept();
168 _ssi.reset();
169 }
170
171 /// Get file descriptor
172 int fd() const {
173 return _ssi->fd();
174 }
175
11fdf7f2
TL
176 /// get listen/bind addr
177 unsigned get_addr_slot() {
178 return _ssi->addr_slot;
179 }
180
7c673cae
FG
181 explicit operator bool() const {
182 return _ssi.get();
183 }
184};
185/// @}
186
187class NetworkStack;
188
189enum {
190 l_msgr_first = 94000,
191 l_msgr_recv_messages,
192 l_msgr_send_messages,
7c673cae
FG
193 l_msgr_recv_bytes,
194 l_msgr_send_bytes,
195 l_msgr_created_connections,
196 l_msgr_active_connections,
31f18b77
FG
197
198 l_msgr_running_total_time,
199 l_msgr_running_send_time,
200 l_msgr_running_recv_time,
201 l_msgr_running_fast_dispatch_time,
202
9f95a23c
TL
203 l_msgr_send_messages_queue_lat,
204 l_msgr_handle_ack_lat,
205
7c673cae
FG
206 l_msgr_last,
207};
208
209class Worker {
210 std::mutex init_lock;
211 std::condition_variable init_cond;
212 bool init = false;
213
214 public:
215 bool done = false;
216
217 CephContext *cct;
218 PerfCounters *perf_logger;
219 unsigned id;
220
221 std::atomic_uint references;
222 EventCenter center;
223
224 Worker(const Worker&) = delete;
225 Worker& operator=(const Worker&) = delete;
226
9f95a23c
TL
227 Worker(CephContext *c, unsigned worker_id)
228 : cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) {
7c673cae
FG
229 char name[128];
230 sprintf(name, "AsyncMessenger::Worker-%u", id);
231 // initialize perf_logger
232 PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
233
234 plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
235 plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
11fdf7f2
TL
236 plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes", NULL, 0, unit_t(UNIT_BYTES));
237 plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network sent bytes", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
238 plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
239 plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
240
31f18b77
FG
241 plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running");
242 plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending");
243 plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving");
244 plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch");
245
9f95a23c
TL
246 plb.add_time_avg(l_msgr_send_messages_queue_lat, "msgr_send_messages_queue_lat", "Network sent messages lat");
247 plb.add_time_avg(l_msgr_handle_ack_lat, "msgr_handle_ack_lat", "Connection handle ack lat");
248
7c673cae
FG
249 perf_logger = plb.create_perf_counters();
250 cct->get_perfcounters_collection()->add(perf_logger);
251 }
252 virtual ~Worker() {
253 if (perf_logger) {
254 cct->get_perfcounters_collection()->remove(perf_logger);
255 delete perf_logger;
256 }
257 }
258
11fdf7f2 259 virtual int listen(entity_addr_t &addr, unsigned addr_slot,
7c673cae
FG
260 const SocketOptions &opts, ServerSocket *) = 0;
261 virtual int connect(const entity_addr_t &addr,
262 const SocketOptions &opts, ConnectedSocket *socket) = 0;
263 virtual void destroy() {}
264
265 virtual void initialize() {}
266 PerfCounters *get_perf_counter() { return perf_logger; }
267 void release_worker() {
268 int oldref = references.fetch_sub(1);
11fdf7f2 269 ceph_assert(oldref > 0);
7c673cae
FG
270 }
271 void init_done() {
272 init_lock.lock();
273 init = true;
274 init_cond.notify_all();
275 init_lock.unlock();
276 }
277 bool is_init() {
278 std::lock_guard<std::mutex> l(init_lock);
279 return init;
280 }
281 void wait_for_init() {
282 std::unique_lock<std::mutex> l(init_lock);
283 while (!init)
284 init_cond.wait(l);
285 }
286 void reset() {
287 init_lock.lock();
288 init = false;
289 init_cond.notify_all();
290 init_lock.unlock();
291 done = false;
292 }
293};
294
11fdf7f2 295class NetworkStack {
7c673cae
FG
296 std::string type;
297 unsigned num_workers = 0;
11fdf7f2 298 ceph::spinlock pool_spin;
7c673cae
FG
299 bool started = false;
300
301 std::function<void ()> add_thread(unsigned i);
302
303 protected:
304 CephContext *cct;
305 vector<Worker*> workers;
306
307 explicit NetworkStack(CephContext *c, const string &t);
308 public:
309 NetworkStack(const NetworkStack &) = delete;
310 NetworkStack& operator=(const NetworkStack &) = delete;
11fdf7f2 311 virtual ~NetworkStack() {
7c673cae
FG
312 for (auto &&w : workers)
313 delete w;
314 }
315
316 static std::shared_ptr<NetworkStack> create(
317 CephContext *c, const string &type);
318
319 static Worker* create_worker(
320 CephContext *c, const string &t, unsigned i);
7c673cae
FG
321 // backend need to override this method if backend doesn't support shared
322 // listen table.
323 // For example, posix backend has in kernel global listen table. If one
324 // thread bind a port, other threads also aware this.
325 // But for dpdk backend, we maintain listen table in each thread. So we
326 // need to let each thread do binding port.
327 virtual bool support_local_listen_table() const { return false; }
328 virtual bool nonblock_connect_need_writable_event() const { return true; }
329
330 void start();
331 void stop();
332 virtual Worker *get_worker();
9f95a23c
TL
333 Worker *get_worker(unsigned worker_id) {
334 return workers[worker_id];
7c673cae
FG
335 }
336 void drain();
337 unsigned get_num_worker() const {
338 return num_workers;
339 }
340
341 // direct is used in tests only
342 virtual void spawn_worker(unsigned i, std::function<void ()> &&) = 0;
343 virtual void join_worker(unsigned i) = 0;
344
7c673cae
FG
345 virtual bool is_ready() { return true; };
346 virtual void ready() { };
347};
348
349#endif //CEPH_MSG_ASYNC_STACK_H