]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #ifndef CEPH_MSG_ASYNC_STACK_H | |
18 | #define CEPH_MSG_ASYNC_STACK_H | |
19 | ||
11fdf7f2 | 20 | #include "include/spinlock.h" |
7c673cae | 21 | #include "common/perf_counters.h" |
7c673cae FG |
22 | #include "msg/msg_types.h" |
23 | #include "msg/async/Event.h" | |
24 | ||
25 | class Worker; | |
26 | class ConnectedSocketImpl { | |
27 | public: | |
28 | virtual ~ConnectedSocketImpl() {} | |
29 | virtual int is_connected() = 0; | |
30 | virtual ssize_t read(char*, size_t) = 0; | |
7c673cae FG |
31 | virtual ssize_t send(bufferlist &bl, bool more) = 0; |
32 | virtual void shutdown() = 0; | |
33 | virtual void close() = 0; | |
34 | virtual int fd() const = 0; | |
35 | }; | |
36 | ||
37 | class ConnectedSocket; | |
38 | struct SocketOptions { | |
39 | bool nonblock = true; | |
40 | bool nodelay = true; | |
41 | int rcbuf_size = 0; | |
42 | int priority = -1; | |
43 | entity_addr_t connect_bind_addr; | |
44 | }; | |
45 | ||
46 | /// \cond internal | |
47 | class ServerSocketImpl { | |
48 | public: | |
11fdf7f2 TL |
49 | unsigned addr_type; ///< entity_addr_t::TYPE_* |
50 | unsigned addr_slot; ///< position of our addr in myaddrs().v | |
51 | ServerSocketImpl(unsigned type, unsigned slot) | |
52 | : addr_type(type), addr_slot(slot) {} | |
7c673cae FG |
53 | virtual ~ServerSocketImpl() {} |
54 | virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0; | |
55 | virtual void abort_accept() = 0; | |
56 | /// Get file descriptor | |
57 | virtual int fd() const = 0; | |
58 | }; | |
59 | /// \endcond | |
60 | ||
61 | /// \addtogroup networking-module | |
62 | /// @{ | |
63 | ||
64 | /// A TCP (or other stream-based protocol) connection. | |
65 | /// | |
66 | /// A \c ConnectedSocket represents a full-duplex stream between | |
67 | /// two endpoints, a local endpoint and a remote endpoint. | |
68 | class ConnectedSocket { | |
69 | std::unique_ptr<ConnectedSocketImpl> _csi; | |
70 | ||
71 | public: | |
72 | /// Constructs a \c ConnectedSocket not corresponding to a connection | |
73 | ConnectedSocket() {}; | |
74 | /// \cond internal | |
75 | explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi) | |
76 | : _csi(std::move(csi)) {} | |
77 | /// \endcond | |
78 | ~ConnectedSocket() { | |
79 | if (_csi) | |
80 | _csi->close(); | |
81 | } | |
82 | /// Moves a \c ConnectedSocket object. | |
83 | ConnectedSocket(ConnectedSocket&& cs) = default; | |
84 | /// Move-assigns a \c ConnectedSocket object. | |
85 | ConnectedSocket& operator=(ConnectedSocket&& cs) = default; | |
86 | ||
87 | int is_connected() { | |
88 | return _csi->is_connected(); | |
89 | } | |
90 | /// Read the input stream with copy. | |
91 | /// | |
92 | /// Copy an object returning data sent from the remote endpoint. | |
93 | ssize_t read(char* buf, size_t len) { | |
94 | return _csi->read(buf, len); | |
95 | } | |
7c673cae FG |
96 | /// Gets the output stream. |
97 | /// | |
98 | /// Gets an object that sends data to the remote endpoint. | |
99 | ssize_t send(bufferlist &bl, bool more) { | |
100 | return _csi->send(bl, more); | |
101 | } | |
102 | /// Disables output to the socket. | |
103 | /// | |
104 | /// Current or future writes that have not been successfully flushed | |
105 | /// will immediately fail with an error. This is useful to abort | |
106 | /// operations on a socket that is not making progress due to a | |
107 | /// peer failure. | |
108 | void shutdown() { | |
109 | return _csi->shutdown(); | |
110 | } | |
111 | /// Disables input from the socket. | |
112 | /// | |
113 | /// Current or future reads will immediately fail with an error. | |
114 | /// This is useful to abort operations on a socket that is not making | |
115 | /// progress due to a peer failure. | |
116 | void close() { | |
117 | _csi->close(); | |
118 | _csi.reset(); | |
119 | } | |
120 | ||
121 | /// Get file descriptor | |
122 | int fd() const { | |
123 | return _csi->fd(); | |
124 | } | |
125 | ||
126 | explicit operator bool() const { | |
127 | return _csi.get(); | |
128 | } | |
129 | }; | |
130 | /// @} | |
131 | ||
132 | /// \addtogroup networking-module | |
133 | /// @{ | |
134 | ||
135 | /// A listening socket, waiting to accept incoming network connections. | |
136 | class ServerSocket { | |
137 | std::unique_ptr<ServerSocketImpl> _ssi; | |
138 | public: | |
139 | /// Constructs a \c ServerSocket not corresponding to a connection | |
140 | ServerSocket() {} | |
141 | /// \cond internal | |
142 | explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi) | |
143 | : _ssi(std::move(ssi)) {} | |
144 | ~ServerSocket() { | |
145 | if (_ssi) | |
146 | _ssi->abort_accept(); | |
147 | } | |
148 | /// \endcond | |
149 | /// Moves a \c ServerSocket object. | |
150 | ServerSocket(ServerSocket&& ss) = default; | |
151 | /// Move-assigns a \c ServerSocket object. | |
152 | ServerSocket& operator=(ServerSocket&& cs) = default; | |
153 | ||
154 | /// Accepts the next connection to successfully connect to this socket. | |
155 | /// | |
156 | /// \Accepts a \ref ConnectedSocket representing the connection, and | |
157 | /// a \ref entity_addr_t describing the remote endpoint. | |
158 | int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { | |
159 | return _ssi->accept(sock, opt, out, w); | |
160 | } | |
161 | ||
162 | /// Stops any \ref accept() in progress. | |
163 | /// | |
164 | /// Current and future \ref accept() calls will terminate immediately | |
165 | /// with an error. | |
166 | void abort_accept() { | |
167 | _ssi->abort_accept(); | |
168 | _ssi.reset(); | |
169 | } | |
170 | ||
171 | /// Get file descriptor | |
172 | int fd() const { | |
173 | return _ssi->fd(); | |
174 | } | |
175 | ||
11fdf7f2 TL |
176 | /// get listen/bind addr |
177 | unsigned get_addr_slot() { | |
178 | return _ssi->addr_slot; | |
179 | } | |
180 | ||
7c673cae FG |
181 | explicit operator bool() const { |
182 | return _ssi.get(); | |
183 | } | |
184 | }; | |
185 | /// @} | |
186 | ||
187 | class NetworkStack; | |
188 | ||
189 | enum { | |
190 | l_msgr_first = 94000, | |
191 | l_msgr_recv_messages, | |
192 | l_msgr_send_messages, | |
7c673cae FG |
193 | l_msgr_recv_bytes, |
194 | l_msgr_send_bytes, | |
195 | l_msgr_created_connections, | |
196 | l_msgr_active_connections, | |
31f18b77 FG |
197 | |
198 | l_msgr_running_total_time, | |
199 | l_msgr_running_send_time, | |
200 | l_msgr_running_recv_time, | |
201 | l_msgr_running_fast_dispatch_time, | |
202 | ||
9f95a23c TL |
203 | l_msgr_send_messages_queue_lat, |
204 | l_msgr_handle_ack_lat, | |
205 | ||
7c673cae FG |
206 | l_msgr_last, |
207 | }; | |
208 | ||
209 | class Worker { | |
210 | std::mutex init_lock; | |
211 | std::condition_variable init_cond; | |
212 | bool init = false; | |
213 | ||
214 | public: | |
215 | bool done = false; | |
216 | ||
217 | CephContext *cct; | |
218 | PerfCounters *perf_logger; | |
219 | unsigned id; | |
220 | ||
221 | std::atomic_uint references; | |
222 | EventCenter center; | |
223 | ||
224 | Worker(const Worker&) = delete; | |
225 | Worker& operator=(const Worker&) = delete; | |
226 | ||
9f95a23c TL |
227 | Worker(CephContext *c, unsigned worker_id) |
228 | : cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) { | |
7c673cae FG |
229 | char name[128]; |
230 | sprintf(name, "AsyncMessenger::Worker-%u", id); | |
231 | // initialize perf_logger | |
232 | PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); | |
233 | ||
234 | plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); | |
235 | plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); | |
11fdf7f2 TL |
236 | plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes", NULL, 0, unit_t(UNIT_BYTES)); |
237 | plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network sent bytes", NULL, 0, unit_t(UNIT_BYTES)); | |
7c673cae FG |
238 | plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); |
239 | plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); | |
240 | ||
31f18b77 FG |
241 | plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running"); |
242 | plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending"); | |
243 | plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving"); | |
244 | plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch"); | |
245 | ||
9f95a23c TL |
246 | plb.add_time_avg(l_msgr_send_messages_queue_lat, "msgr_send_messages_queue_lat", "Network sent messages lat"); |
247 | plb.add_time_avg(l_msgr_handle_ack_lat, "msgr_handle_ack_lat", "Connection handle ack lat"); | |
248 | ||
7c673cae FG |
249 | perf_logger = plb.create_perf_counters(); |
250 | cct->get_perfcounters_collection()->add(perf_logger); | |
251 | } | |
252 | virtual ~Worker() { | |
253 | if (perf_logger) { | |
254 | cct->get_perfcounters_collection()->remove(perf_logger); | |
255 | delete perf_logger; | |
256 | } | |
257 | } | |
258 | ||
11fdf7f2 | 259 | virtual int listen(entity_addr_t &addr, unsigned addr_slot, |
7c673cae FG |
260 | const SocketOptions &opts, ServerSocket *) = 0; |
261 | virtual int connect(const entity_addr_t &addr, | |
262 | const SocketOptions &opts, ConnectedSocket *socket) = 0; | |
263 | virtual void destroy() {} | |
264 | ||
265 | virtual void initialize() {} | |
266 | PerfCounters *get_perf_counter() { return perf_logger; } | |
267 | void release_worker() { | |
268 | int oldref = references.fetch_sub(1); | |
11fdf7f2 | 269 | ceph_assert(oldref > 0); |
7c673cae FG |
270 | } |
271 | void init_done() { | |
272 | init_lock.lock(); | |
273 | init = true; | |
274 | init_cond.notify_all(); | |
275 | init_lock.unlock(); | |
276 | } | |
277 | bool is_init() { | |
278 | std::lock_guard<std::mutex> l(init_lock); | |
279 | return init; | |
280 | } | |
281 | void wait_for_init() { | |
282 | std::unique_lock<std::mutex> l(init_lock); | |
283 | while (!init) | |
284 | init_cond.wait(l); | |
285 | } | |
286 | void reset() { | |
287 | init_lock.lock(); | |
288 | init = false; | |
289 | init_cond.notify_all(); | |
290 | init_lock.unlock(); | |
291 | done = false; | |
292 | } | |
293 | }; | |
294 | ||
11fdf7f2 | 295 | class NetworkStack { |
7c673cae FG |
296 | std::string type; |
297 | unsigned num_workers = 0; | |
11fdf7f2 | 298 | ceph::spinlock pool_spin; |
7c673cae FG |
299 | bool started = false; |
300 | ||
301 | std::function<void ()> add_thread(unsigned i); | |
302 | ||
303 | protected: | |
304 | CephContext *cct; | |
305 | vector<Worker*> workers; | |
306 | ||
307 | explicit NetworkStack(CephContext *c, const string &t); | |
308 | public: | |
309 | NetworkStack(const NetworkStack &) = delete; | |
310 | NetworkStack& operator=(const NetworkStack &) = delete; | |
11fdf7f2 | 311 | virtual ~NetworkStack() { |
7c673cae FG |
312 | for (auto &&w : workers) |
313 | delete w; | |
314 | } | |
315 | ||
316 | static std::shared_ptr<NetworkStack> create( | |
317 | CephContext *c, const string &type); | |
318 | ||
319 | static Worker* create_worker( | |
320 | CephContext *c, const string &t, unsigned i); | |
7c673cae FG |
321 | // backend need to override this method if backend doesn't support shared |
322 | // listen table. | |
323 | // For example, posix backend has in kernel global listen table. If one | |
324 | // thread bind a port, other threads also aware this. | |
325 | // But for dpdk backend, we maintain listen table in each thread. So we | |
326 | // need to let each thread do binding port. | |
327 | virtual bool support_local_listen_table() const { return false; } | |
328 | virtual bool nonblock_connect_need_writable_event() const { return true; } | |
329 | ||
330 | void start(); | |
331 | void stop(); | |
332 | virtual Worker *get_worker(); | |
9f95a23c TL |
333 | Worker *get_worker(unsigned worker_id) { |
334 | return workers[worker_id]; | |
7c673cae FG |
335 | } |
336 | void drain(); | |
337 | unsigned get_num_worker() const { | |
338 | return num_workers; | |
339 | } | |
340 | ||
341 | // direct is used in tests only | |
342 | virtual void spawn_worker(unsigned i, std::function<void ()> &&) = 0; | |
343 | virtual void join_worker(unsigned i) = 0; | |
344 | ||
7c673cae FG |
345 | virtual bool is_ready() { return true; }; |
346 | virtual void ready() { }; | |
347 | }; | |
348 | ||
349 | #endif //CEPH_MSG_ASYNC_STACK_H |