]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <seastar/core/app-template.hh> | |
5 | #include <seastar/core/gate.hh> | |
6 | #include <seastar/core/sharded.hh> | |
7 | #include <seastar/core/sleep.hh> | |
f67539c2 TL |
8 | #include <seastar/core/when_all.hh> |
9 | #include <seastar/util/later.hh> | |
9f95a23c TL |
10 | |
11 | #include "crimson/common/log.h" | |
12 | #include "crimson/net/Errors.h" | |
13 | #include "crimson/net/Fwd.h" | |
14 | #include "crimson/net/Socket.h" | |
15 | ||
16 | namespace { | |
17 | ||
18 | using seastar::engine; | |
19 | using seastar::future; | |
20 | using crimson::net::error; | |
21 | using crimson::net::FixedCPUServerSocket; | |
22 | using crimson::net::Socket; | |
23 | using crimson::net::SocketRef; | |
24 | using crimson::net::stop_t; | |
25 | ||
26 | using SocketFRef = seastar::foreign_ptr<SocketRef>; | |
27 | ||
28 | static seastar::logger logger{"crimsontest"}; | |
f67539c2 TL |
29 | static entity_addr_t get_server_addr() { |
30 | static int port = 9020; | |
31 | ++port; | |
32 | ceph_assert(port < 9030 && "socket and messenger test ports should not overlap"); | |
9f95a23c | 33 | entity_addr_t saddr; |
f67539c2 TL |
34 | saddr.parse("127.0.0.1", nullptr); |
35 | saddr.set_port(port); | |
9f95a23c | 36 | return saddr; |
f67539c2 | 37 | } |
9f95a23c | 38 | |
f67539c2 TL |
39 | future<SocketRef> socket_connect(const entity_addr_t& saddr) { |
40 | logger.debug("socket_connect() to {} ...", saddr); | |
41 | return Socket::connect(saddr).then([] (auto socket) { | |
9f95a23c TL |
42 | logger.debug("socket_connect() connected"); |
43 | return socket; | |
44 | }); | |
45 | } | |
46 | ||
47 | future<> test_refused() { | |
48 | logger.info("test_refused()..."); | |
f67539c2 TL |
49 | auto saddr = get_server_addr(); |
50 | return socket_connect(saddr).discard_result().then([saddr] { | |
51 | logger.error("test_refused(): connection to {} is not refused", saddr); | |
52 | ceph_abort(); | |
9f95a23c TL |
53 | }).handle_exception_type([] (const std::system_error& e) { |
54 | if (e.code() != std::errc::connection_refused) { | |
55 | logger.error("test_refused() got unexpeted error {}", e); | |
56 | ceph_abort(); | |
57 | } else { | |
58 | logger.info("test_refused() ok\n"); | |
59 | } | |
60 | }).handle_exception([] (auto eptr) { | |
61 | logger.error("test_refused() got unexpeted exception {}", eptr); | |
62 | ceph_abort(); | |
63 | }); | |
64 | } | |
65 | ||
66 | future<> test_bind_same() { | |
67 | logger.info("test_bind_same()..."); | |
68 | return FixedCPUServerSocket::create().then([] (auto pss1) { | |
f67539c2 TL |
69 | auto saddr = get_server_addr(); |
70 | return pss1->listen(saddr).safe_then([saddr] { | |
9f95a23c | 71 | // try to bind the same address |
f67539c2 TL |
72 | return FixedCPUServerSocket::create().then([saddr] (auto pss2) { |
73 | return pss2->listen(saddr).safe_then([] { | |
74 | logger.error("test_bind_same() should raise address_in_use"); | |
75 | ceph_abort(); | |
76 | }, FixedCPUServerSocket::listen_ertr::all_same_way( | |
77 | [] (const std::error_code& e) { | |
78 | if (e == std::errc::address_in_use) { | |
79 | // successful! | |
9f95a23c | 80 | logger.info("test_bind_same() ok\n"); |
f67539c2 TL |
81 | } else { |
82 | logger.error("test_bind_same() got unexpected error {}", e); | |
83 | ceph_abort(); | |
9f95a23c | 84 | } |
f67539c2 TL |
85 | // Note: need to return a explicit ready future, or there will be a |
86 | // runtime error: member access within null pointer of type 'struct promise_base' | |
87 | return seastar::now(); | |
88 | })).then([pss2] { | |
89 | return pss2->destroy(); | |
9f95a23c TL |
90 | }); |
91 | }); | |
f67539c2 TL |
92 | }, FixedCPUServerSocket::listen_ertr::all_same_way( |
93 | [saddr] (const std::error_code& e) { | |
94 | logger.error("test_bind_same(): there is another instance running at {}", | |
95 | saddr); | |
96 | ceph_abort(); | |
97 | })).then([pss1] { | |
9f95a23c TL |
98 | return pss1->destroy(); |
99 | }).handle_exception([] (auto eptr) { | |
100 | logger.error("test_bind_same() got unexpeted exception {}", eptr); | |
101 | ceph_abort(); | |
102 | }); | |
103 | }); | |
104 | } | |
105 | ||
106 | future<> test_accept() { | |
107 | logger.info("test_accept()"); | |
108 | return FixedCPUServerSocket::create().then([] (auto pss) { | |
f67539c2 TL |
109 | auto saddr = get_server_addr(); |
110 | return pss->listen(saddr).safe_then([pss] { | |
9f95a23c TL |
111 | return pss->accept([] (auto socket, auto paddr) { |
112 | // simple accept | |
113 | return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable { | |
114 | return socket->close().finally([cleanup = std::move(socket)] {}); | |
115 | }); | |
116 | }); | |
f67539c2 TL |
117 | }, FixedCPUServerSocket::listen_ertr::all_same_way( |
118 | [saddr] (const std::error_code& e) { | |
119 | logger.error("test_accept(): there is another instance running at {}", | |
120 | saddr); | |
121 | ceph_abort(); | |
122 | })).then([saddr] { | |
9f95a23c | 123 | return seastar::when_all( |
f67539c2 | 124 | socket_connect(saddr).then([] (auto socket) { |
9f95a23c | 125 | return socket->close().finally([cleanup = std::move(socket)] {}); }), |
f67539c2 | 126 | socket_connect(saddr).then([] (auto socket) { |
9f95a23c | 127 | return socket->close().finally([cleanup = std::move(socket)] {}); }), |
f67539c2 | 128 | socket_connect(saddr).then([] (auto socket) { |
9f95a23c TL |
129 | return socket->close().finally([cleanup = std::move(socket)] {}); }) |
130 | ).discard_result(); | |
131 | }).then([] { | |
132 | // should be enough to be connected locally | |
133 | return seastar::sleep(50ms); | |
134 | }).then([] { | |
135 | logger.info("test_accept() ok\n"); | |
f67539c2 | 136 | }).then([pss] { |
9f95a23c TL |
137 | return pss->destroy(); |
138 | }).handle_exception([] (auto eptr) { | |
139 | logger.error("test_accept() got unexpeted exception {}", eptr); | |
140 | ceph_abort(); | |
141 | }); | |
142 | }); | |
143 | } | |
144 | ||
145 | class SocketFactory { | |
146 | SocketRef client_socket; | |
147 | SocketFRef server_socket; | |
148 | FixedCPUServerSocket *pss = nullptr; | |
149 | seastar::promise<> server_connected; | |
150 | ||
151 | public: | |
152 | // cb_client() on CPU#0, cb_server() on CPU#1 | |
153 | template <typename FuncC, typename FuncS> | |
154 | static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) { | |
f67539c2 | 155 | assert(seastar::this_shard_id() == 0u); |
9f95a23c TL |
156 | auto owner = std::make_unique<SocketFactory>(); |
157 | auto psf = owner.get(); | |
f67539c2 TL |
158 | auto saddr = get_server_addr(); |
159 | return seastar::smp::submit_to(1u, [psf, saddr] { | |
160 | return FixedCPUServerSocket::create().then([psf, saddr] (auto pss) { | |
9f95a23c | 161 | psf->pss = pss; |
f67539c2 TL |
162 | return pss->listen(saddr |
163 | ).safe_then([]{}, FixedCPUServerSocket::listen_ertr::all_same_way( | |
164 | [saddr] (const std::error_code& e) { | |
165 | logger.error("dispatch_sockets(): there is another instance running at {}", | |
166 | saddr); | |
167 | ceph_abort(); | |
168 | })); | |
9f95a23c | 169 | }); |
f67539c2 | 170 | }).then([psf, saddr] { |
9f95a23c | 171 | return seastar::when_all_succeed( |
f67539c2 TL |
172 | seastar::smp::submit_to(0u, [psf, saddr] { |
173 | return socket_connect(saddr).then([psf] (auto socket) { | |
9f95a23c TL |
174 | psf->client_socket = std::move(socket); |
175 | }); | |
176 | }), | |
177 | seastar::smp::submit_to(1u, [psf] { | |
178 | return psf->pss->accept([psf] (auto socket, auto paddr) { | |
179 | psf->server_socket = seastar::make_foreign(std::move(socket)); | |
180 | return seastar::smp::submit_to(0u, [psf] { | |
181 | psf->server_connected.set_value(); | |
182 | }); | |
183 | }); | |
184 | }) | |
185 | ); | |
f67539c2 TL |
186 | }).then_unpack([] { |
187 | return seastar::now(); | |
9f95a23c TL |
188 | }).then([psf] { |
189 | return psf->server_connected.get_future(); | |
f67539c2 | 190 | }).then([psf] { |
9f95a23c TL |
191 | if (psf->pss) { |
192 | return seastar::smp::submit_to(1u, [psf] { | |
193 | return psf->pss->destroy(); | |
194 | }); | |
195 | } | |
196 | return seastar::now(); | |
197 | }).then([psf, | |
198 | cb_client = std::move(cb_client), | |
199 | cb_server = std::move(cb_server)] () mutable { | |
200 | logger.debug("dispatch_sockets(): client/server socket are ready"); | |
201 | return seastar::when_all_succeed( | |
202 | seastar::smp::submit_to(0u, [socket = psf->client_socket.get(), | |
203 | cb_client = std::move(cb_client)] { | |
f67539c2 | 204 | return cb_client(socket).then([socket] { |
9f95a23c TL |
205 | logger.debug("closing client socket..."); |
206 | return socket->close(); | |
207 | }).handle_exception([] (auto eptr) { | |
208 | logger.error("dispatch_sockets():" | |
209 | " cb_client() got unexpeted exception {}", eptr); | |
210 | ceph_abort(); | |
211 | }); | |
212 | }), | |
213 | seastar::smp::submit_to(1u, [socket = psf->server_socket.get(), | |
214 | cb_server = std::move(cb_server)] { | |
f67539c2 | 215 | return cb_server(socket).then([socket] { |
9f95a23c TL |
216 | logger.debug("closing server socket..."); |
217 | return socket->close(); | |
218 | }).handle_exception([] (auto eptr) { | |
219 | logger.error("dispatch_sockets():" | |
220 | " cb_server() got unexpeted exception {}", eptr); | |
221 | ceph_abort(); | |
222 | }); | |
223 | }) | |
224 | ); | |
f67539c2 TL |
225 | }).then_unpack([] { |
226 | return seastar::now(); | |
9f95a23c TL |
227 | }).finally([cleanup = std::move(owner)] {}); |
228 | } | |
229 | }; | |
230 | ||
231 | class Connection { | |
232 | static const uint64_t DATA_TAIL = 5327; | |
233 | static const unsigned DATA_SIZE = 4096; | |
234 | std::array<uint64_t, DATA_SIZE> data = {0}; | |
235 | ||
236 | void verify_data_read(const uint64_t read_data[]) { | |
237 | ceph_assert(read_data[0] == read_count); | |
238 | ceph_assert(data[DATA_SIZE - 1] = DATA_TAIL); | |
239 | } | |
240 | ||
241 | Socket* socket = nullptr; | |
242 | uint64_t write_count = 0; | |
243 | uint64_t read_count = 0; | |
244 | ||
245 | Connection(Socket* socket) : socket{socket} { | |
246 | assert(socket); | |
247 | data[DATA_SIZE - 1] = DATA_TAIL; | |
248 | } | |
249 | ||
250 | future<> dispatch_write(unsigned round = 0, bool force_shut = false) { | |
251 | logger.debug("dispatch_write(round={}, force_shut={})...", round, force_shut); | |
252 | return seastar::repeat([this, round, force_shut] { | |
253 | if (round != 0 && round <= write_count) { | |
f67539c2 | 254 | return seastar::futurize_invoke([this, force_shut] { |
9f95a23c TL |
255 | if (force_shut) { |
256 | logger.debug("dispatch_write() done, force shutdown output"); | |
257 | socket->force_shutdown_out(); | |
258 | } else { | |
259 | logger.debug("dispatch_write() done"); | |
260 | } | |
261 | }).then([] { | |
262 | return seastar::make_ready_future<stop_t>(stop_t::yes); | |
263 | }); | |
264 | } else { | |
265 | data[0] = write_count; | |
266 | return socket->write(seastar::net::packet( | |
267 | reinterpret_cast<const char*>(&data), sizeof(data)) | |
268 | ).then([this] { | |
269 | return socket->flush(); | |
270 | }).then([this] { | |
271 | write_count += 1; | |
272 | return seastar::make_ready_future<stop_t>(stop_t::no); | |
273 | }); | |
274 | } | |
275 | }); | |
276 | } | |
277 | ||
278 | future<> dispatch_write_unbounded() { | |
279 | return dispatch_write( | |
280 | ).then([] { | |
281 | ceph_abort(); | |
282 | }).handle_exception_type([this] (const std::system_error& e) { | |
283 | if (e.code() != std::errc::broken_pipe && | |
284 | e.code() != std::errc::connection_reset) { | |
285 | logger.error("dispatch_write_unbounded(): " | |
286 | "unexpected error {}", e); | |
287 | throw; | |
288 | } | |
289 | // successful | |
290 | logger.debug("dispatch_write_unbounded(): " | |
291 | "expected error {}", e); | |
292 | shutdown(); | |
293 | }); | |
294 | } | |
295 | ||
296 | future<> dispatch_read(unsigned round = 0, bool force_shut = false) { | |
297 | logger.debug("dispatch_read(round={}, force_shut={})...", round, force_shut); | |
298 | return seastar::repeat([this, round, force_shut] { | |
299 | if (round != 0 && round <= read_count) { | |
f67539c2 | 300 | return seastar::futurize_invoke([this, force_shut] { |
9f95a23c TL |
301 | if (force_shut) { |
302 | logger.debug("dispatch_read() done, force shutdown input"); | |
303 | socket->force_shutdown_in(); | |
304 | } else { | |
305 | logger.debug("dispatch_read() done"); | |
306 | } | |
307 | }).then([] { | |
308 | return seastar::make_ready_future<stop_t>(stop_t::yes); | |
309 | }); | |
310 | } else { | |
f67539c2 | 311 | return seastar::futurize_invoke([this] { |
9f95a23c TL |
312 | // we want to test both Socket::read() and Socket::read_exactly() |
313 | if (read_count % 2) { | |
314 | return socket->read(DATA_SIZE * sizeof(uint64_t) | |
315 | ).then([this] (ceph::bufferlist bl) { | |
316 | uint64_t read_data[DATA_SIZE]; | |
317 | auto p = bl.cbegin(); | |
318 | ::ceph::decode_raw(read_data, p); | |
319 | verify_data_read(read_data); | |
320 | }); | |
321 | } else { | |
322 | return socket->read_exactly(DATA_SIZE * sizeof(uint64_t) | |
323 | ).then([this] (auto buf) { | |
324 | auto read_data = reinterpret_cast<const uint64_t*>(buf.get()); | |
325 | verify_data_read(read_data); | |
326 | }); | |
327 | } | |
328 | }).then([this] { | |
329 | ++read_count; | |
330 | return seastar::make_ready_future<stop_t>(stop_t::no); | |
331 | }); | |
332 | } | |
333 | }); | |
334 | } | |
335 | ||
336 | future<> dispatch_read_unbounded() { | |
337 | return dispatch_read( | |
338 | ).then([] { | |
339 | ceph_abort(); | |
340 | }).handle_exception_type([this] (const std::system_error& e) { | |
341 | if (e.code() != error::read_eof | |
342 | && e.code() != std::errc::connection_reset) { | |
343 | logger.error("dispatch_read_unbounded(): " | |
344 | "unexpected error {}", e); | |
345 | throw; | |
346 | } | |
347 | // successful | |
348 | logger.debug("dispatch_read_unbounded(): " | |
349 | "expected error {}", e); | |
350 | shutdown(); | |
351 | }); | |
352 | } | |
353 | ||
354 | void shutdown() { | |
355 | socket->shutdown(); | |
356 | } | |
357 | ||
358 | public: | |
359 | static future<> dispatch_rw_bounded(Socket* socket, unsigned round, | |
360 | bool force_shut = false) { | |
361 | logger.debug("dispatch_rw_bounded(round={}, force_shut={})...", | |
362 | round, force_shut); | |
363 | return seastar::do_with(Connection{socket}, | |
364 | [round, force_shut] (auto& conn) { | |
365 | ceph_assert(round != 0); | |
366 | return seastar::when_all_succeed( | |
367 | conn.dispatch_write(round, force_shut), | |
368 | conn.dispatch_read(round, force_shut) | |
f67539c2 TL |
369 | ).then_unpack([] { |
370 | return seastar::now(); | |
371 | }); | |
9f95a23c TL |
372 | }); |
373 | } | |
374 | ||
375 | static future<> dispatch_rw_unbounded(Socket* socket, bool preemptive_shut = false) { | |
376 | logger.debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut); | |
377 | return seastar::do_with(Connection{socket}, [preemptive_shut] (auto& conn) { | |
378 | return seastar::when_all_succeed( | |
379 | conn.dispatch_write_unbounded(), | |
380 | conn.dispatch_read_unbounded(), | |
f67539c2 | 381 | seastar::futurize_invoke([&conn, preemptive_shut] { |
9f95a23c TL |
382 | if (preemptive_shut) { |
383 | return seastar::sleep(100ms).then([&conn] { | |
384 | logger.debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)"); | |
385 | conn.shutdown(); | |
386 | }); | |
387 | } else { | |
388 | return seastar::now(); | |
389 | } | |
390 | }) | |
f67539c2 TL |
391 | ).then_unpack([] { |
392 | return seastar::now(); | |
393 | }); | |
9f95a23c TL |
394 | }); |
395 | } | |
396 | }; | |
397 | ||
398 | future<> test_read_write() { | |
399 | logger.info("test_read_write()..."); | |
400 | return SocketFactory::dispatch_sockets( | |
401 | [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128); }, | |
402 | [] (auto ss) { return Connection::dispatch_rw_bounded(ss, 128); } | |
403 | ).then([] { | |
404 | logger.info("test_read_write() ok\n"); | |
405 | }).handle_exception([] (auto eptr) { | |
406 | logger.error("test_read_write() got unexpeted exception {}", eptr); | |
407 | ceph_abort(); | |
408 | }); | |
409 | } | |
410 | ||
411 | future<> test_unexpected_down() { | |
412 | logger.info("test_unexpected_down()..."); | |
413 | return SocketFactory::dispatch_sockets( | |
414 | [] (auto cs) { | |
415 | return Connection::dispatch_rw_bounded(cs, 128, true | |
416 | ).handle_exception_type([] (const std::system_error& e) { | |
417 | logger.debug("test_unexpected_down(): client get error {}", e); | |
418 | ceph_assert(e.code() == error::read_eof); | |
419 | }); | |
420 | }, | |
421 | [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); } | |
422 | ).then([] { | |
423 | logger.info("test_unexpected_down() ok\n"); | |
424 | }).handle_exception([] (auto eptr) { | |
425 | logger.error("test_unexpected_down() got unexpeted exception {}", eptr); | |
426 | ceph_abort(); | |
427 | }); | |
428 | } | |
429 | ||
430 | future<> test_shutdown_propagated() { | |
431 | logger.info("test_shutdown_propagated()..."); | |
432 | return SocketFactory::dispatch_sockets( | |
433 | [] (auto cs) { | |
434 | logger.debug("test_shutdown_propagated() shutdown client socket"); | |
435 | cs->shutdown(); | |
436 | return seastar::now(); | |
437 | }, | |
438 | [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); } | |
439 | ).then([] { | |
440 | logger.info("test_shutdown_propagated() ok\n"); | |
441 | }).handle_exception([] (auto eptr) { | |
442 | logger.error("test_shutdown_propagated() got unexpeted exception {}", eptr); | |
443 | ceph_abort(); | |
444 | }); | |
445 | } | |
446 | ||
447 | future<> test_preemptive_down() { | |
448 | logger.info("test_preemptive_down()..."); | |
449 | return SocketFactory::dispatch_sockets( | |
450 | [] (auto cs) { return Connection::dispatch_rw_unbounded(cs, true); }, | |
451 | [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); } | |
452 | ).then([] { | |
453 | logger.info("test_preemptive_down() ok\n"); | |
454 | }).handle_exception([] (auto eptr) { | |
455 | logger.error("test_preemptive_down() got unexpeted exception {}", eptr); | |
456 | ceph_abort(); | |
457 | }); | |
458 | } | |
459 | ||
460 | } | |
461 | ||
462 | int main(int argc, char** argv) | |
463 | { | |
464 | seastar::app_template app; | |
465 | return app.run(argc, argv, [] { | |
f67539c2 | 466 | return seastar::futurize_invoke([] { |
9f95a23c TL |
467 | return test_refused(); |
468 | }).then([] { | |
469 | return test_bind_same(); | |
470 | }).then([] { | |
471 | return test_accept(); | |
472 | }).then([] { | |
473 | return test_read_write(); | |
474 | }).then([] { | |
475 | return test_unexpected_down(); | |
476 | }).then([] { | |
477 | return test_shutdown_propagated(); | |
478 | }).then([] { | |
479 | return test_preemptive_down(); | |
480 | }).then([] { | |
481 | logger.info("All tests succeeded"); | |
482 | // Seastar has bugs to have events undispatched during shutdown, | |
483 | // which will result in memory leak and thus fail LeakSanitizer. | |
484 | return seastar::sleep(100ms); | |
485 | }).handle_exception([] (auto eptr) { | |
486 | std::cout << "Test failure" << std::endl; | |
487 | return seastar::make_exception_future<>(eptr); | |
488 | }); | |
489 | }); | |
490 | } |