]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #ifndef CEPH_MSG_ASYNC_STACK_H | |
18 | #define CEPH_MSG_ASYNC_STACK_H | |
19 | ||
20 | #include "include/Spinlock.h" | |
21 | #include "common/perf_counters.h" | |
22 | #include "common/simple_spin.h" | |
23 | #include "msg/msg_types.h" | |
24 | #include "msg/async/Event.h" | |
25 | ||
26 | class Worker; | |
27 | class ConnectedSocketImpl { | |
28 | public: | |
29 | virtual ~ConnectedSocketImpl() {} | |
30 | virtual int is_connected() = 0; | |
31 | virtual ssize_t read(char*, size_t) = 0; | |
32 | virtual ssize_t zero_copy_read(bufferptr&) = 0; | |
33 | virtual ssize_t send(bufferlist &bl, bool more) = 0; | |
34 | virtual void shutdown() = 0; | |
35 | virtual void close() = 0; | |
36 | virtual int fd() const = 0; | |
37 | }; | |
38 | ||
39 | class ConnectedSocket; | |
40 | struct SocketOptions { | |
41 | bool nonblock = true; | |
42 | bool nodelay = true; | |
43 | int rcbuf_size = 0; | |
44 | int priority = -1; | |
45 | entity_addr_t connect_bind_addr; | |
46 | }; | |
47 | ||
48 | /// \cond internal | |
49 | class ServerSocketImpl { | |
50 | public: | |
51 | virtual ~ServerSocketImpl() {} | |
52 | virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0; | |
53 | virtual void abort_accept() = 0; | |
54 | /// Get file descriptor | |
55 | virtual int fd() const = 0; | |
56 | }; | |
57 | /// \endcond | |
58 | ||
59 | /// \addtogroup networking-module | |
60 | /// @{ | |
61 | ||
62 | /// A TCP (or other stream-based protocol) connection. | |
63 | /// | |
64 | /// A \c ConnectedSocket represents a full-duplex stream between | |
65 | /// two endpoints, a local endpoint and a remote endpoint. | |
66 | class ConnectedSocket { | |
67 | std::unique_ptr<ConnectedSocketImpl> _csi; | |
68 | ||
69 | public: | |
70 | /// Constructs a \c ConnectedSocket not corresponding to a connection | |
71 | ConnectedSocket() {}; | |
72 | /// \cond internal | |
73 | explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi) | |
74 | : _csi(std::move(csi)) {} | |
75 | /// \endcond | |
76 | ~ConnectedSocket() { | |
77 | if (_csi) | |
78 | _csi->close(); | |
79 | } | |
80 | /// Moves a \c ConnectedSocket object. | |
81 | ConnectedSocket(ConnectedSocket&& cs) = default; | |
82 | /// Move-assigns a \c ConnectedSocket object. | |
83 | ConnectedSocket& operator=(ConnectedSocket&& cs) = default; | |
84 | ||
85 | int is_connected() { | |
86 | return _csi->is_connected(); | |
87 | } | |
88 | /// Read the input stream with copy. | |
89 | /// | |
90 | /// Copy an object returning data sent from the remote endpoint. | |
91 | ssize_t read(char* buf, size_t len) { | |
92 | return _csi->read(buf, len); | |
93 | } | |
94 | /// Gets the input stream. | |
95 | /// | |
96 | /// Gets an object returning data sent from the remote endpoint. | |
97 | ssize_t zero_copy_read(bufferptr &data) { | |
98 | return _csi->zero_copy_read(data); | |
99 | } | |
100 | /// Gets the output stream. | |
101 | /// | |
102 | /// Gets an object that sends data to the remote endpoint. | |
103 | ssize_t send(bufferlist &bl, bool more) { | |
104 | return _csi->send(bl, more); | |
105 | } | |
106 | /// Disables output to the socket. | |
107 | /// | |
108 | /// Current or future writes that have not been successfully flushed | |
109 | /// will immediately fail with an error. This is useful to abort | |
110 | /// operations on a socket that is not making progress due to a | |
111 | /// peer failure. | |
112 | void shutdown() { | |
113 | return _csi->shutdown(); | |
114 | } | |
115 | /// Disables input from the socket. | |
116 | /// | |
117 | /// Current or future reads will immediately fail with an error. | |
118 | /// This is useful to abort operations on a socket that is not making | |
119 | /// progress due to a peer failure. | |
120 | void close() { | |
121 | _csi->close(); | |
122 | _csi.reset(); | |
123 | } | |
124 | ||
125 | /// Get file descriptor | |
126 | int fd() const { | |
127 | return _csi->fd(); | |
128 | } | |
129 | ||
130 | explicit operator bool() const { | |
131 | return _csi.get(); | |
132 | } | |
133 | }; | |
134 | /// @} | |
135 | ||
136 | /// \addtogroup networking-module | |
137 | /// @{ | |
138 | ||
139 | /// A listening socket, waiting to accept incoming network connections. | |
140 | class ServerSocket { | |
141 | std::unique_ptr<ServerSocketImpl> _ssi; | |
142 | public: | |
143 | /// Constructs a \c ServerSocket not corresponding to a connection | |
144 | ServerSocket() {} | |
145 | /// \cond internal | |
146 | explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi) | |
147 | : _ssi(std::move(ssi)) {} | |
148 | ~ServerSocket() { | |
149 | if (_ssi) | |
150 | _ssi->abort_accept(); | |
151 | } | |
152 | /// \endcond | |
153 | /// Moves a \c ServerSocket object. | |
154 | ServerSocket(ServerSocket&& ss) = default; | |
155 | /// Move-assigns a \c ServerSocket object. | |
156 | ServerSocket& operator=(ServerSocket&& cs) = default; | |
157 | ||
158 | /// Accepts the next connection to successfully connect to this socket. | |
159 | /// | |
160 | /// \Accepts a \ref ConnectedSocket representing the connection, and | |
161 | /// a \ref entity_addr_t describing the remote endpoint. | |
162 | int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { | |
163 | return _ssi->accept(sock, opt, out, w); | |
164 | } | |
165 | ||
166 | /// Stops any \ref accept() in progress. | |
167 | /// | |
168 | /// Current and future \ref accept() calls will terminate immediately | |
169 | /// with an error. | |
170 | void abort_accept() { | |
171 | _ssi->abort_accept(); | |
172 | _ssi.reset(); | |
173 | } | |
174 | ||
175 | /// Get file descriptor | |
176 | int fd() const { | |
177 | return _ssi->fd(); | |
178 | } | |
179 | ||
180 | explicit operator bool() const { | |
181 | return _ssi.get(); | |
182 | } | |
183 | }; | |
184 | /// @} | |
185 | ||
186 | class NetworkStack; | |
187 | ||
188 | enum { | |
189 | l_msgr_first = 94000, | |
190 | l_msgr_recv_messages, | |
191 | l_msgr_send_messages, | |
7c673cae FG |
192 | l_msgr_recv_bytes, |
193 | l_msgr_send_bytes, | |
194 | l_msgr_created_connections, | |
195 | l_msgr_active_connections, | |
31f18b77 FG |
196 | |
197 | l_msgr_running_total_time, | |
198 | l_msgr_running_send_time, | |
199 | l_msgr_running_recv_time, | |
200 | l_msgr_running_fast_dispatch_time, | |
201 | ||
7c673cae FG |
202 | l_msgr_last, |
203 | }; | |
204 | ||
205 | class Worker { | |
206 | std::mutex init_lock; | |
207 | std::condition_variable init_cond; | |
208 | bool init = false; | |
209 | ||
210 | public: | |
211 | bool done = false; | |
212 | ||
213 | CephContext *cct; | |
214 | PerfCounters *perf_logger; | |
215 | unsigned id; | |
216 | ||
217 | std::atomic_uint references; | |
218 | EventCenter center; | |
219 | ||
220 | Worker(const Worker&) = delete; | |
221 | Worker& operator=(const Worker&) = delete; | |
222 | ||
223 | Worker(CephContext *c, unsigned i) | |
224 | : cct(c), perf_logger(NULL), id(i), references(0), center(c) { | |
225 | char name[128]; | |
226 | sprintf(name, "AsyncMessenger::Worker-%u", id); | |
227 | // initialize perf_logger | |
228 | PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); | |
229 | ||
230 | plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); | |
231 | plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); | |
7c673cae | 232 | plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); |
c07f9fc5 | 233 | plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network sent bytes"); |
7c673cae FG |
234 | plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); |
235 | plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); | |
236 | ||
31f18b77 FG |
237 | plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running"); |
238 | plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending"); | |
239 | plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving"); | |
240 | plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch"); | |
241 | ||
7c673cae FG |
242 | perf_logger = plb.create_perf_counters(); |
243 | cct->get_perfcounters_collection()->add(perf_logger); | |
244 | } | |
245 | virtual ~Worker() { | |
246 | if (perf_logger) { | |
247 | cct->get_perfcounters_collection()->remove(perf_logger); | |
248 | delete perf_logger; | |
249 | } | |
250 | } | |
251 | ||
252 | virtual int listen(entity_addr_t &addr, | |
253 | const SocketOptions &opts, ServerSocket *) = 0; | |
254 | virtual int connect(const entity_addr_t &addr, | |
255 | const SocketOptions &opts, ConnectedSocket *socket) = 0; | |
256 | virtual void destroy() {} | |
257 | ||
258 | virtual void initialize() {} | |
259 | PerfCounters *get_perf_counter() { return perf_logger; } | |
260 | void release_worker() { | |
261 | int oldref = references.fetch_sub(1); | |
262 | assert(oldref > 0); | |
263 | } | |
264 | void init_done() { | |
265 | init_lock.lock(); | |
266 | init = true; | |
267 | init_cond.notify_all(); | |
268 | init_lock.unlock(); | |
269 | } | |
270 | bool is_init() { | |
271 | std::lock_guard<std::mutex> l(init_lock); | |
272 | return init; | |
273 | } | |
274 | void wait_for_init() { | |
275 | std::unique_lock<std::mutex> l(init_lock); | |
276 | while (!init) | |
277 | init_cond.wait(l); | |
278 | } | |
279 | void reset() { | |
280 | init_lock.lock(); | |
281 | init = false; | |
282 | init_cond.notify_all(); | |
283 | init_lock.unlock(); | |
284 | done = false; | |
285 | } | |
286 | }; | |
287 | ||
288 | class NetworkStack : public CephContext::ForkWatcher { | |
289 | std::string type; | |
290 | unsigned num_workers = 0; | |
291 | Spinlock pool_spin; | |
292 | bool started = false; | |
293 | ||
294 | std::function<void ()> add_thread(unsigned i); | |
295 | ||
296 | protected: | |
297 | CephContext *cct; | |
298 | vector<Worker*> workers; | |
299 | ||
300 | explicit NetworkStack(CephContext *c, const string &t); | |
301 | public: | |
302 | NetworkStack(const NetworkStack &) = delete; | |
303 | NetworkStack& operator=(const NetworkStack &) = delete; | |
304 | ~NetworkStack() override { | |
305 | for (auto &&w : workers) | |
306 | delete w; | |
307 | } | |
308 | ||
309 | static std::shared_ptr<NetworkStack> create( | |
310 | CephContext *c, const string &type); | |
311 | ||
312 | static Worker* create_worker( | |
313 | CephContext *c, const string &t, unsigned i); | |
314 | // backend need to override this method if supports zero copy read | |
315 | virtual bool support_zero_copy_read() const { return false; } | |
316 | // backend need to override this method if backend doesn't support shared | |
317 | // listen table. | |
318 | // For example, posix backend has in kernel global listen table. If one | |
319 | // thread bind a port, other threads also aware this. | |
320 | // But for dpdk backend, we maintain listen table in each thread. So we | |
321 | // need to let each thread do binding port. | |
322 | virtual bool support_local_listen_table() const { return false; } | |
323 | virtual bool nonblock_connect_need_writable_event() const { return true; } | |
324 | ||
325 | void start(); | |
326 | void stop(); | |
327 | virtual Worker *get_worker(); | |
328 | Worker *get_worker(unsigned i) { | |
329 | return workers[i]; | |
330 | } | |
331 | void drain(); | |
332 | unsigned get_num_worker() const { | |
333 | return num_workers; | |
334 | } | |
335 | ||
336 | // direct is used in tests only | |
337 | virtual void spawn_worker(unsigned i, std::function<void ()> &&) = 0; | |
338 | virtual void join_worker(unsigned i) = 0; | |
339 | ||
340 | void handle_pre_fork() override { | |
341 | stop(); | |
342 | } | |
343 | ||
344 | void handle_post_fork() override { | |
345 | start(); | |
346 | } | |
347 | ||
348 | virtual bool is_ready() { return true; }; | |
349 | virtual void ready() { }; | |
350 | }; | |
351 | ||
352 | #endif //CEPH_MSG_ASYNC_STACK_H |