]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Stack.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / msg / async / Stack.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_ASYNC_STACK_H
18 #define CEPH_MSG_ASYNC_STACK_H
19
20 #include "common/perf_counters.h"
21 #include "common/perf_counters_key.h"
22 #include "include/spinlock.h"
23 #include "msg/async/Event.h"
24 #include "msg/msg_types.h"
25 #include <string>
26
27 class Worker;
28 class ConnectedSocketImpl {
29 public:
30 virtual ~ConnectedSocketImpl() {}
31 virtual int is_connected() = 0;
32 virtual ssize_t read(char*, size_t) = 0;
33 virtual ssize_t send(ceph::buffer::list &bl, bool more) = 0;
34 virtual void shutdown() = 0;
35 virtual void close() = 0;
36 virtual int fd() const = 0;
37 virtual void set_priority(int sd, int prio, int domain) = 0;
38 };
39
40 class ConnectedSocket;
41 struct SocketOptions {
42 bool nonblock = true;
43 bool nodelay = true;
44 int rcbuf_size = 0;
45 int priority = -1;
46 entity_addr_t connect_bind_addr;
47 };
48
49 /// \cond internal
50 class ServerSocketImpl {
51 public:
52 unsigned addr_type; ///< entity_addr_t::TYPE_*
53 unsigned addr_slot; ///< position of our addr in myaddrs().v
54 ServerSocketImpl(unsigned type, unsigned slot)
55 : addr_type(type), addr_slot(slot) {}
56 virtual ~ServerSocketImpl() {}
57 virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
58 virtual void abort_accept() = 0;
59 /// Get file descriptor
60 virtual int fd() const = 0;
61 };
62 /// \endcond
63
64 /// \addtogroup networking-module
65 /// @{
66
67 /// A TCP (or other stream-based protocol) connection.
68 ///
69 /// A \c ConnectedSocket represents a full-duplex stream between
70 /// two endpoints, a local endpoint and a remote endpoint.
71 class ConnectedSocket {
72 std::unique_ptr<ConnectedSocketImpl> _csi;
73
74 public:
75 /// Constructs a \c ConnectedSocket not corresponding to a connection
76 ConnectedSocket() {};
77 /// \cond internal
78 explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi)
79 : _csi(std::move(csi)) {}
80 /// \endcond
81 ~ConnectedSocket() {
82 if (_csi)
83 _csi->close();
84 }
85 /// Moves a \c ConnectedSocket object.
86 ConnectedSocket(ConnectedSocket&& cs) = default;
87 /// Move-assigns a \c ConnectedSocket object.
88 ConnectedSocket& operator=(ConnectedSocket&& cs) = default;
89
90 int is_connected() {
91 return _csi->is_connected();
92 }
93 /// Read the input stream with copy.
94 ///
95 /// Copy an object returning data sent from the remote endpoint.
96 ssize_t read(char* buf, size_t len) {
97 return _csi->read(buf, len);
98 }
99 /// Gets the output stream.
100 ///
101 /// Gets an object that sends data to the remote endpoint.
102 ssize_t send(ceph::buffer::list &bl, bool more) {
103 return _csi->send(bl, more);
104 }
105 /// Disables output to the socket.
106 ///
107 /// Current or future writes that have not been successfully flushed
108 /// will immediately fail with an error. This is useful to abort
109 /// operations on a socket that is not making progress due to a
110 /// peer failure.
111 void shutdown() {
112 return _csi->shutdown();
113 }
114 /// Disables input from the socket.
115 ///
116 /// Current or future reads will immediately fail with an error.
117 /// This is useful to abort operations on a socket that is not making
118 /// progress due to a peer failure.
119 void close() {
120 _csi->close();
121 _csi.reset();
122 }
123
124 /// Get file descriptor
125 int fd() const {
126 return _csi->fd();
127 }
128
129 void set_priority(int sd, int prio, int domain) {
130 _csi->set_priority(sd, prio, domain);
131 }
132
133 explicit operator bool() const {
134 return _csi.get();
135 }
136 };
137 /// @}
138
139 /// \addtogroup networking-module
140 /// @{
141
142 /// A listening socket, waiting to accept incoming network connections.
143 class ServerSocket {
144 std::unique_ptr<ServerSocketImpl> _ssi;
145 public:
146 /// Constructs a \c ServerSocket not corresponding to a connection
147 ServerSocket() {}
148 /// \cond internal
149 explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi)
150 : _ssi(std::move(ssi)) {}
151 ~ServerSocket() {
152 if (_ssi)
153 _ssi->abort_accept();
154 }
155 /// \endcond
156 /// Moves a \c ServerSocket object.
157 ServerSocket(ServerSocket&& ss) = default;
158 /// Move-assigns a \c ServerSocket object.
159 ServerSocket& operator=(ServerSocket&& cs) = default;
160
161 /// Accepts the next connection to successfully connect to this socket.
162 ///
163 /// \Accepts a \ref ConnectedSocket representing the connection, and
164 /// a \ref entity_addr_t describing the remote endpoint.
165 int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
166 return _ssi->accept(sock, opt, out, w);
167 }
168
169 /// Stops any \ref accept() in progress.
170 ///
171 /// Current and future \ref accept() calls will terminate immediately
172 /// with an error.
173 void abort_accept() {
174 _ssi->abort_accept();
175 _ssi.reset();
176 }
177
178 /// Get file descriptor
179 int fd() const {
180 return _ssi->fd();
181 }
182
183 /// get listen/bind addr
184 unsigned get_addr_slot() {
185 return _ssi->addr_slot;
186 }
187
188 explicit operator bool() const {
189 return _ssi.get();
190 }
191 };
192 /// @}
193
194 class NetworkStack;
195
196 enum {
197 l_msgr_first = 94000,
198 l_msgr_recv_messages,
199 l_msgr_send_messages,
200 l_msgr_recv_bytes,
201 l_msgr_send_bytes,
202 l_msgr_created_connections,
203 l_msgr_active_connections,
204
205 l_msgr_running_total_time,
206 l_msgr_running_send_time,
207 l_msgr_running_recv_time,
208 l_msgr_running_fast_dispatch_time,
209
210 l_msgr_send_messages_queue_lat,
211 l_msgr_handle_ack_lat,
212
213 l_msgr_recv_encrypted_bytes,
214 l_msgr_send_encrypted_bytes,
215
216 l_msgr_last,
217 };
218
219 enum {
220 l_msgr_labeled_first = l_msgr_last + 1,
221
222 l_msgr_connection_ready_timeouts,
223 l_msgr_connection_idle_timeouts,
224
225 l_msgr_labeled_last,
226 };
227
228 class Worker {
229 std::mutex init_lock;
230 std::condition_variable init_cond;
231 bool init = false;
232
233 public:
234 bool done = false;
235
236 CephContext *cct;
237 PerfCounters *perf_logger;
238 PerfCounters *perf_labeled_logger;
239 unsigned id;
240
241 std::atomic_uint references;
242 EventCenter center;
243
244 Worker(const Worker&) = delete;
245 Worker& operator=(const Worker&) = delete;
246
247 Worker(CephContext *c, unsigned worker_id)
248 : cct(c), id(worker_id), references(0), center(c) {
249 char name[128];
250 char name_prefix[] = "AsyncMessenger::Worker";
251 sprintf(name, "%s-%u", name_prefix, id);
252
253 // initialize perf_logger
254 PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
255
256 plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
257 plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
258 plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes", NULL, 0, unit_t(UNIT_BYTES));
259 plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network sent bytes", NULL, 0, unit_t(UNIT_BYTES));
260 plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
261 plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
262
263 plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running");
264 plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending");
265 plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving");
266 plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch");
267
268 plb.add_time_avg(l_msgr_send_messages_queue_lat, "msgr_send_messages_queue_lat", "Network sent messages lat");
269 plb.add_time_avg(l_msgr_handle_ack_lat, "msgr_handle_ack_lat", "Connection handle ack lat");
270
271 plb.add_u64_counter(l_msgr_recv_encrypted_bytes, "msgr_recv_encrypted_bytes", "Network received encrypted bytes", NULL, 0, unit_t(UNIT_BYTES));
272 plb.add_u64_counter(l_msgr_send_encrypted_bytes, "msgr_send_encrypted_bytes", "Network sent encrypted bytes", NULL, 0, unit_t(UNIT_BYTES));
273
274 perf_logger = plb.create_perf_counters();
275 cct->get_perfcounters_collection()->add(perf_logger);
276
277 // Add labeled perfcounters
278 std::string labels = ceph::perf_counters::key_create(
279 name_prefix, {{"id", std::to_string(id)}});
280 PerfCountersBuilder plb_labeled(
281 cct, labels, l_msgr_labeled_first,
282 l_msgr_labeled_last);
283
284 plb_labeled.add_u64_counter(
285 l_msgr_connection_ready_timeouts, "msgr_connection_ready_timeouts",
286 "Number of not yet ready connections declared as dead", NULL,
287 PerfCountersBuilder::PRIO_USEFUL);
288 plb_labeled.add_u64_counter(
289 l_msgr_connection_idle_timeouts, "msgr_connection_idle_timeouts",
290 "Number of connections closed due to idleness", NULL,
291 PerfCountersBuilder::PRIO_USEFUL);
292
293 perf_labeled_logger = plb_labeled.create_perf_counters();
294 cct->get_perfcounters_collection()->add(perf_labeled_logger);
295 }
296 virtual ~Worker() {
297 if (perf_logger) {
298 cct->get_perfcounters_collection()->remove(perf_logger);
299 delete perf_logger;
300 }
301 if (perf_labeled_logger) {
302 cct->get_perfcounters_collection()->remove(perf_labeled_logger);
303 delete perf_labeled_logger;
304 }
305 }
306
307 virtual int listen(entity_addr_t &addr, unsigned addr_slot,
308 const SocketOptions &opts, ServerSocket *) = 0;
309 virtual int connect(const entity_addr_t &addr,
310 const SocketOptions &opts, ConnectedSocket *socket) = 0;
311 virtual void destroy() {}
312
313 virtual void initialize() {}
314 PerfCounters *get_perf_counter() { return perf_logger; }
315 PerfCounters *get_labeled_perf_counter() { return perf_labeled_logger; }
316 void release_worker() {
317 int oldref = references.fetch_sub(1);
318 ceph_assert(oldref > 0);
319 }
320 void init_done() {
321 init_lock.lock();
322 init = true;
323 init_cond.notify_all();
324 init_lock.unlock();
325 }
326 bool is_init() {
327 std::lock_guard<std::mutex> l(init_lock);
328 return init;
329 }
330 void wait_for_init() {
331 std::unique_lock<std::mutex> l(init_lock);
332 while (!init)
333 init_cond.wait(l);
334 }
335 void reset() {
336 init_lock.lock();
337 init = false;
338 init_cond.notify_all();
339 init_lock.unlock();
340 done = false;
341 }
342 };
343
344 class NetworkStack {
345 ceph::spinlock pool_spin;
346 bool started = false;
347
348 std::function<void ()> add_thread(Worker* w);
349
350 virtual Worker* create_worker(CephContext *c, unsigned i) = 0;
351 virtual void rename_thread(unsigned id) {
352 static constexpr int TASK_COMM_LEN = 16;
353 char tp_name[TASK_COMM_LEN];
354 sprintf(tp_name, "msgr-worker-%u", id);
355 ceph_pthread_setname(pthread_self(), tp_name);
356 }
357
358 protected:
359 CephContext *cct;
360 std::vector<Worker*> workers;
361
362 explicit NetworkStack(CephContext *c);
363 public:
364 NetworkStack(const NetworkStack &) = delete;
365 NetworkStack& operator=(const NetworkStack &) = delete;
366 virtual ~NetworkStack() {
367 for (auto &&w : workers)
368 delete w;
369 }
370
371 static std::shared_ptr<NetworkStack> create(
372 CephContext *c, const std::string &type);
373
374 // backend need to override this method if backend doesn't support shared
375 // listen table.
376 // For example, posix backend has in kernel global listen table. If one
377 // thread bind a port, other threads also aware this.
378 // But for dpdk backend, we maintain listen table in each thread. So we
379 // need to let each thread do binding port.
380 virtual bool support_local_listen_table() const { return false; }
381 virtual bool nonblock_connect_need_writable_event() const { return true; }
382
383 void start();
384 void stop();
385 virtual Worker *get_worker();
386 Worker *get_worker(unsigned worker_id) {
387 return workers[worker_id];
388 }
389 void drain();
390 unsigned get_num_worker() const {
391 return workers.size();
392 }
393
394 // direct is used in tests only
395 virtual void spawn_worker(std::function<void ()> &&) = 0;
396 virtual void join_worker(unsigned i) = 0;
397
398 virtual bool is_ready() { return true; };
399 virtual void ready() { };
400 };
401
402 #endif //CEPH_MSG_ASYNC_STACK_H