]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/crimson/test_socket.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / crimson / test_socket.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
1e59de90
TL
4#include "common/ceph_argparse.h"
5#include <fmt/os.h>
9f95a23c
TL
6#include <seastar/core/app-template.hh>
7#include <seastar/core/gate.hh>
8#include <seastar/core/sharded.hh>
9#include <seastar/core/sleep.hh>
f67539c2
TL
10#include <seastar/core/when_all.hh>
11#include <seastar/util/later.hh>
9f95a23c
TL
12
13#include "crimson/common/log.h"
14#include "crimson/net/Errors.h"
15#include "crimson/net/Fwd.h"
16#include "crimson/net/Socket.h"
17
1e59de90
TL
18using crimson::common::local_conf;
19
9f95a23c
TL
20namespace {
21
20effc67
TL
22using namespace std::chrono_literals;
23
9f95a23c
TL
24using seastar::engine;
25using seastar::future;
26using crimson::net::error;
20effc67 27using crimson::net::listen_ertr;
aee94f69 28using crimson::net::ShardedServerSocket;
9f95a23c
TL
29using crimson::net::Socket;
30using crimson::net::SocketRef;
31using crimson::net::stop_t;
32
33using SocketFRef = seastar::foreign_ptr<SocketRef>;
34
aee94f69
TL
35seastar::logger &logger() {
36 return crimson::get_logger(ceph_subsys_test);
37}
38
39entity_addr_t get_server_addr() {
9f95a23c 40 entity_addr_t saddr;
f67539c2 41 saddr.parse("127.0.0.1", nullptr);
aee94f69 42 saddr.set_port(9020);
9f95a23c 43 return saddr;
f67539c2 44}
9f95a23c 45
f67539c2 46future<SocketRef> socket_connect(const entity_addr_t& saddr) {
aee94f69
TL
47 logger().debug("socket_connect() to {} ...", saddr);
48 return Socket::connect(saddr).then([](auto socket) {
49 logger().debug("socket_connect() connected");
9f95a23c
TL
50 return socket;
51 });
52}
53
54future<> test_refused() {
aee94f69 55 logger().info("test_refused()...");
f67539c2
TL
56 auto saddr = get_server_addr();
57 return socket_connect(saddr).discard_result().then([saddr] {
aee94f69 58 logger().error("test_refused(): connection to {} is not refused", saddr);
f67539c2 59 ceph_abort();
aee94f69 60 }).handle_exception_type([](const std::system_error& e) {
9f95a23c 61 if (e.code() != std::errc::connection_refused) {
aee94f69 62 logger().error("test_refused() got unexpeted error {}", e);
9f95a23c
TL
63 ceph_abort();
64 } else {
aee94f69 65 logger().info("test_refused() ok\n");
9f95a23c 66 }
aee94f69
TL
67 }).handle_exception([](auto eptr) {
68 logger().error("test_refused() got unexpeted exception {}", eptr);
9f95a23c
TL
69 ceph_abort();
70 });
71}
72
aee94f69
TL
73future<> test_bind_same(bool is_fixed_cpu) {
74 logger().info("test_bind_same()...");
75 return ShardedServerSocket::create(is_fixed_cpu
76 ).then([is_fixed_cpu](auto pss1) {
f67539c2 77 auto saddr = get_server_addr();
aee94f69 78 return pss1->listen(saddr).safe_then([saddr, is_fixed_cpu] {
9f95a23c 79 // try to bind the same address
aee94f69
TL
80 return ShardedServerSocket::create(is_fixed_cpu
81 ).then([saddr](auto pss2) {
f67539c2 82 return pss2->listen(saddr).safe_then([] {
aee94f69 83 logger().error("test_bind_same() should raise address_in_use");
f67539c2 84 ceph_abort();
20effc67 85 }, listen_ertr::all_same_way(
aee94f69 86 [](const std::error_code& e) {
f67539c2
TL
87 if (e == std::errc::address_in_use) {
88 // successful!
aee94f69 89 logger().info("test_bind_same() ok\n");
f67539c2 90 } else {
aee94f69 91 logger().error("test_bind_same() got unexpected error {}", e);
f67539c2 92 ceph_abort();
9f95a23c 93 }
f67539c2
TL
94 // Note: need to return a explicit ready future, or there will be a
95 // runtime error: member access within null pointer of type 'struct promise_base'
96 return seastar::now();
97 })).then([pss2] {
aee94f69 98 return pss2->shutdown_destroy();
9f95a23c
TL
99 });
100 });
20effc67 101 }, listen_ertr::all_same_way(
aee94f69
TL
102 [saddr](const std::error_code& e) {
103 logger().error("test_bind_same(): there is another instance running at {}",
104 saddr);
f67539c2
TL
105 ceph_abort();
106 })).then([pss1] {
aee94f69
TL
107 return pss1->shutdown_destroy();
108 }).handle_exception([](auto eptr) {
109 logger().error("test_bind_same() got unexpeted exception {}", eptr);
9f95a23c
TL
110 ceph_abort();
111 });
112 });
113}
114
aee94f69
TL
115future<> test_accept(bool is_fixed_cpu) {
116 logger().info("test_accept()");
117 return ShardedServerSocket::create(is_fixed_cpu
118 ).then([](auto pss) {
f67539c2 119 auto saddr = get_server_addr();
aee94f69
TL
120 return pss->listen(saddr
121 ).safe_then([pss] {
122 return pss->accept([](auto socket, auto paddr) {
123 logger().info("test_accept(): accepted at shard {}", seastar::this_shard_id());
9f95a23c 124 // simple accept
aee94f69
TL
125 return seastar::sleep(100ms
126 ).then([socket = std::move(socket)]() mutable {
127 return socket->close(
128 ).finally([cleanup = std::move(socket)] {});
9f95a23c
TL
129 });
130 });
20effc67 131 }, listen_ertr::all_same_way(
aee94f69
TL
132 [saddr](const std::error_code& e) {
133 logger().error("test_accept(): there is another instance running at {}",
134 saddr);
f67539c2
TL
135 ceph_abort();
136 })).then([saddr] {
9f95a23c 137 return seastar::when_all(
aee94f69 138 socket_connect(saddr).then([](auto socket) {
9f95a23c 139 return socket->close().finally([cleanup = std::move(socket)] {}); }),
aee94f69 140 socket_connect(saddr).then([](auto socket) {
9f95a23c 141 return socket->close().finally([cleanup = std::move(socket)] {}); }),
aee94f69 142 socket_connect(saddr).then([](auto socket) {
9f95a23c
TL
143 return socket->close().finally([cleanup = std::move(socket)] {}); })
144 ).discard_result();
145 }).then([] {
146 // should be enough to be connected locally
147 return seastar::sleep(50ms);
148 }).then([] {
aee94f69 149 logger().info("test_accept() ok\n");
f67539c2 150 }).then([pss] {
aee94f69
TL
151 return pss->shutdown_destroy();
152 }).handle_exception([](auto eptr) {
153 logger().error("test_accept() got unexpeted exception {}", eptr);
9f95a23c
TL
154 ceph_abort();
155 });
156 });
157}
158
159class SocketFactory {
aee94f69 160 static constexpr seastar::shard_id CLIENT_CPU = 0u;
9f95a23c 161 SocketRef client_socket;
9f95a23c
TL
162 seastar::promise<> server_connected;
163
aee94f69
TL
164 static constexpr seastar::shard_id SERVER_CPU = 1u;
165 ShardedServerSocket *pss = nullptr;
166
167 seastar::shard_id server_socket_CPU;
168 SocketFRef server_socket;
169
9f95a23c 170 public:
9f95a23c 171 template <typename FuncC, typename FuncS>
aee94f69
TL
172 static future<> dispatch_sockets(
173 bool is_fixed_cpu,
174 FuncC&& cb_client,
175 FuncS&& cb_server) {
176 ceph_assert_always(seastar::this_shard_id() == CLIENT_CPU);
9f95a23c
TL
177 auto owner = std::make_unique<SocketFactory>();
178 auto psf = owner.get();
f67539c2 179 auto saddr = get_server_addr();
aee94f69
TL
180 return seastar::smp::submit_to(SERVER_CPU, [psf, saddr, is_fixed_cpu] {
181 return ShardedServerSocket::create(is_fixed_cpu
182 ).then([psf, saddr](auto pss) {
9f95a23c 183 psf->pss = pss;
f67539c2 184 return pss->listen(saddr
aee94f69
TL
185 ).safe_then([] {
186 }, listen_ertr::all_same_way([saddr](const std::error_code& e) {
187 logger().error("dispatch_sockets(): there is another instance running at {}",
188 saddr);
f67539c2
TL
189 ceph_abort();
190 }));
9f95a23c 191 });
f67539c2 192 }).then([psf, saddr] {
9f95a23c 193 return seastar::when_all_succeed(
aee94f69
TL
194 seastar::smp::submit_to(CLIENT_CPU, [psf, saddr] {
195 return socket_connect(saddr).then([psf](auto socket) {
196 ceph_assert_always(seastar::this_shard_id() == CLIENT_CPU);
9f95a23c
TL
197 psf->client_socket = std::move(socket);
198 });
199 }),
aee94f69
TL
200 seastar::smp::submit_to(SERVER_CPU, [psf] {
201 return psf->pss->accept([psf](auto _socket, auto paddr) {
202 logger().info("dispatch_sockets(): accepted at shard {}",
203 seastar::this_shard_id());
204 psf->server_socket_CPU = seastar::this_shard_id();
205 if (psf->pss->is_fixed_shard_dispatching()) {
206 ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
207 }
208 SocketFRef socket = seastar::make_foreign(std::move(_socket));
209 psf->server_socket = std::move(socket);
210 return seastar::smp::submit_to(CLIENT_CPU, [psf] {
9f95a23c
TL
211 psf->server_connected.set_value();
212 });
213 });
214 })
215 );
f67539c2
TL
216 }).then_unpack([] {
217 return seastar::now();
9f95a23c
TL
218 }).then([psf] {
219 return psf->server_connected.get_future();
f67539c2 220 }).then([psf] {
9f95a23c 221 if (psf->pss) {
aee94f69
TL
222 return seastar::smp::submit_to(SERVER_CPU, [psf] {
223 return psf->pss->shutdown_destroy();
9f95a23c
TL
224 });
225 }
226 return seastar::now();
227 }).then([psf,
228 cb_client = std::move(cb_client),
aee94f69
TL
229 cb_server = std::move(cb_server)]() mutable {
230 logger().debug("dispatch_sockets(): client/server socket are ready");
9f95a23c 231 return seastar::when_all_succeed(
aee94f69
TL
232 seastar::smp::submit_to(CLIENT_CPU,
233 [socket = psf->client_socket.get(), cb_client = std::move(cb_client)] {
f67539c2 234 return cb_client(socket).then([socket] {
aee94f69 235 logger().debug("closing client socket...");
9f95a23c 236 return socket->close();
aee94f69
TL
237 }).handle_exception([](auto eptr) {
238 logger().error("dispatch_sockets():"
239 " cb_client() got unexpeted exception {}", eptr);
9f95a23c
TL
240 ceph_abort();
241 });
242 }),
aee94f69
TL
243 seastar::smp::submit_to(psf->server_socket_CPU,
244 [socket = psf->server_socket.get(), cb_server = std::move(cb_server)] {
f67539c2 245 return cb_server(socket).then([socket] {
aee94f69 246 logger().debug("closing server socket...");
9f95a23c 247 return socket->close();
aee94f69
TL
248 }).handle_exception([](auto eptr) {
249 logger().error("dispatch_sockets():"
250 " cb_server() got unexpeted exception {}", eptr);
9f95a23c
TL
251 ceph_abort();
252 });
253 })
254 );
f67539c2
TL
255 }).then_unpack([] {
256 return seastar::now();
9f95a23c
TL
257 }).finally([cleanup = std::move(owner)] {});
258 }
259};
260
261class Connection {
262 static const uint64_t DATA_TAIL = 5327;
263 static const unsigned DATA_SIZE = 4096;
264 std::array<uint64_t, DATA_SIZE> data = {0};
265
266 void verify_data_read(const uint64_t read_data[]) {
267 ceph_assert(read_data[0] == read_count);
268 ceph_assert(data[DATA_SIZE - 1] = DATA_TAIL);
269 }
270
271 Socket* socket = nullptr;
272 uint64_t write_count = 0;
273 uint64_t read_count = 0;
274
275 Connection(Socket* socket) : socket{socket} {
276 assert(socket);
277 data[DATA_SIZE - 1] = DATA_TAIL;
278 }
279
280 future<> dispatch_write(unsigned round = 0, bool force_shut = false) {
aee94f69 281 logger().debug("dispatch_write(round={}, force_shut={})...", round, force_shut);
9f95a23c
TL
282 return seastar::repeat([this, round, force_shut] {
283 if (round != 0 && round <= write_count) {
f67539c2 284 return seastar::futurize_invoke([this, force_shut] {
9f95a23c 285 if (force_shut) {
aee94f69 286 logger().debug("dispatch_write() done, force shutdown output");
9f95a23c
TL
287 socket->force_shutdown_out();
288 } else {
aee94f69 289 logger().debug("dispatch_write() done");
9f95a23c
TL
290 }
291 }).then([] {
292 return seastar::make_ready_future<stop_t>(stop_t::yes);
293 });
294 } else {
295 data[0] = write_count;
aee94f69
TL
296 bufferlist bl;
297 bl.append(buffer::copy(
298 reinterpret_cast<const char*>(&data), sizeof(data)));
299 return socket->write(bl
9f95a23c
TL
300 ).then([this] {
301 return socket->flush();
302 }).then([this] {
303 write_count += 1;
304 return seastar::make_ready_future<stop_t>(stop_t::no);
305 });
306 }
307 });
308 }
309
310 future<> dispatch_write_unbounded() {
311 return dispatch_write(
312 ).then([] {
313 ceph_abort();
aee94f69 314 }).handle_exception_type([this](const std::system_error& e) {
9f95a23c
TL
315 if (e.code() != std::errc::broken_pipe &&
316 e.code() != std::errc::connection_reset) {
aee94f69
TL
317 logger().error("dispatch_write_unbounded(): "
318 "unexpected error {}", e);
9f95a23c
TL
319 throw;
320 }
321 // successful
aee94f69
TL
322 logger().debug("dispatch_write_unbounded(): "
323 "expected error {}", e);
9f95a23c
TL
324 shutdown();
325 });
326 }
327
328 future<> dispatch_read(unsigned round = 0, bool force_shut = false) {
aee94f69 329 logger().debug("dispatch_read(round={}, force_shut={})...", round, force_shut);
9f95a23c
TL
330 return seastar::repeat([this, round, force_shut] {
331 if (round != 0 && round <= read_count) {
f67539c2 332 return seastar::futurize_invoke([this, force_shut] {
9f95a23c 333 if (force_shut) {
aee94f69 334 logger().debug("dispatch_read() done, force shutdown input");
9f95a23c
TL
335 socket->force_shutdown_in();
336 } else {
aee94f69 337 logger().debug("dispatch_read() done");
9f95a23c
TL
338 }
339 }).then([] {
340 return seastar::make_ready_future<stop_t>(stop_t::yes);
341 });
342 } else {
f67539c2 343 return seastar::futurize_invoke([this] {
9f95a23c
TL
344 // we want to test both Socket::read() and Socket::read_exactly()
345 if (read_count % 2) {
346 return socket->read(DATA_SIZE * sizeof(uint64_t)
aee94f69 347 ).then([this](ceph::bufferlist bl) {
9f95a23c
TL
348 uint64_t read_data[DATA_SIZE];
349 auto p = bl.cbegin();
350 ::ceph::decode_raw(read_data, p);
351 verify_data_read(read_data);
352 });
353 } else {
354 return socket->read_exactly(DATA_SIZE * sizeof(uint64_t)
aee94f69
TL
355 ).then([this](auto bptr) {
356 uint64_t read_data[DATA_SIZE];
357 std::memcpy(read_data, bptr.c_str(), DATA_SIZE * sizeof(uint64_t));
9f95a23c
TL
358 verify_data_read(read_data);
359 });
360 }
361 }).then([this] {
362 ++read_count;
363 return seastar::make_ready_future<stop_t>(stop_t::no);
364 });
365 }
366 });
367 }
368
369 future<> dispatch_read_unbounded() {
370 return dispatch_read(
371 ).then([] {
372 ceph_abort();
aee94f69 373 }).handle_exception_type([this](const std::system_error& e) {
9f95a23c
TL
374 if (e.code() != error::read_eof
375 && e.code() != std::errc::connection_reset) {
aee94f69
TL
376 logger().error("dispatch_read_unbounded(): "
377 "unexpected error {}", e);
9f95a23c
TL
378 throw;
379 }
380 // successful
aee94f69
TL
381 logger().debug("dispatch_read_unbounded(): "
382 "expected error {}", e);
9f95a23c
TL
383 shutdown();
384 });
385 }
386
387 void shutdown() {
388 socket->shutdown();
389 }
390
391 public:
392 static future<> dispatch_rw_bounded(Socket* socket, unsigned round,
393 bool force_shut = false) {
aee94f69
TL
394 logger().debug("dispatch_rw_bounded(round={}, force_shut={})...",
395 round, force_shut);
9f95a23c 396 return seastar::do_with(Connection{socket},
aee94f69 397 [round, force_shut](auto& conn) {
9f95a23c
TL
398 ceph_assert(round != 0);
399 return seastar::when_all_succeed(
400 conn.dispatch_write(round, force_shut),
401 conn.dispatch_read(round, force_shut)
f67539c2
TL
402 ).then_unpack([] {
403 return seastar::now();
404 });
9f95a23c
TL
405 });
406 }
407
408 static future<> dispatch_rw_unbounded(Socket* socket, bool preemptive_shut = false) {
aee94f69
TL
409 logger().debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut);
410 return seastar::do_with(Connection{socket}, [preemptive_shut](auto& conn) {
9f95a23c
TL
411 return seastar::when_all_succeed(
412 conn.dispatch_write_unbounded(),
413 conn.dispatch_read_unbounded(),
f67539c2 414 seastar::futurize_invoke([&conn, preemptive_shut] {
9f95a23c
TL
415 if (preemptive_shut) {
416 return seastar::sleep(100ms).then([&conn] {
aee94f69 417 logger().debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)");
9f95a23c
TL
418 conn.shutdown();
419 });
420 } else {
421 return seastar::now();
422 }
423 })
f67539c2
TL
424 ).then_unpack([] {
425 return seastar::now();
426 });
9f95a23c
TL
427 });
428 }
429};
430
aee94f69
TL
431future<> test_read_write(bool is_fixed_cpu) {
432 logger().info("test_read_write()...");
9f95a23c 433 return SocketFactory::dispatch_sockets(
aee94f69
TL
434 is_fixed_cpu,
435 [](auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
436 [](auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
9f95a23c 437 ).then([] {
aee94f69
TL
438 logger().info("test_read_write() ok\n");
439 }).handle_exception([](auto eptr) {
440 logger().error("test_read_write() got unexpeted exception {}", eptr);
9f95a23c
TL
441 ceph_abort();
442 });
443}
444
aee94f69
TL
445future<> test_unexpected_down(bool is_fixed_cpu) {
446 logger().info("test_unexpected_down()...");
9f95a23c 447 return SocketFactory::dispatch_sockets(
aee94f69
TL
448 is_fixed_cpu,
449 [](auto cs) {
9f95a23c 450 return Connection::dispatch_rw_bounded(cs, 128, true
aee94f69
TL
451 ).handle_exception_type([](const std::system_error& e) {
452 logger().debug("test_unexpected_down(): client get error {}", e);
9f95a23c
TL
453 ceph_assert(e.code() == error::read_eof);
454 });
455 },
aee94f69 456 [](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
9f95a23c 457 ).then([] {
aee94f69
TL
458 logger().info("test_unexpected_down() ok\n");
459 }).handle_exception([](auto eptr) {
460 logger().error("test_unexpected_down() got unexpeted exception {}", eptr);
9f95a23c
TL
461 ceph_abort();
462 });
463}
464
aee94f69
TL
465future<> test_shutdown_propagated(bool is_fixed_cpu) {
466 logger().info("test_shutdown_propagated()...");
9f95a23c 467 return SocketFactory::dispatch_sockets(
aee94f69
TL
468 is_fixed_cpu,
469 [](auto cs) {
470 logger().debug("test_shutdown_propagated() shutdown client socket");
9f95a23c
TL
471 cs->shutdown();
472 return seastar::now();
473 },
aee94f69 474 [](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
9f95a23c 475 ).then([] {
aee94f69
TL
476 logger().info("test_shutdown_propagated() ok\n");
477 }).handle_exception([](auto eptr) {
478 logger().error("test_shutdown_propagated() got unexpeted exception {}", eptr);
9f95a23c
TL
479 ceph_abort();
480 });
481}
482
aee94f69
TL
483future<> test_preemptive_down(bool is_fixed_cpu) {
484 logger().info("test_preemptive_down()...");
9f95a23c 485 return SocketFactory::dispatch_sockets(
aee94f69
TL
486 is_fixed_cpu,
487 [](auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
488 [](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
9f95a23c 489 ).then([] {
aee94f69
TL
490 logger().info("test_preemptive_down() ok\n");
491 }).handle_exception([](auto eptr) {
492 logger().error("test_preemptive_down() got unexpeted exception {}", eptr);
9f95a23c
TL
493 ceph_abort();
494 });
495}
496
aee94f69
TL
497future<> do_test_with_type(bool is_fixed_cpu) {
498 return test_bind_same(is_fixed_cpu
499 ).then([is_fixed_cpu] {
500 return test_accept(is_fixed_cpu);
501 }).then([is_fixed_cpu] {
502 return test_read_write(is_fixed_cpu);
503 }).then([is_fixed_cpu] {
504 return test_unexpected_down(is_fixed_cpu);
505 }).then([is_fixed_cpu] {
506 return test_shutdown_propagated(is_fixed_cpu);
507 }).then([is_fixed_cpu] {
508 return test_preemptive_down(is_fixed_cpu);
509 });
510}
511
9f95a23c
TL
512}
513
1e59de90 514seastar::future<int> do_test(seastar::app_template& app)
9f95a23c 515{
1e59de90
TL
516 std::vector<const char*> args;
517 std::string cluster;
518 std::string conf_file_list;
519 auto init_params = ceph_argparse_early_args(args,
520 CEPH_ENTITY_TYPE_CLIENT,
521 &cluster,
522 &conf_file_list);
aee94f69
TL
523 return crimson::common::sharded_conf().start(
524 init_params.name, cluster
525 ).then([] {
526 return local_conf().start();
527 }).then([conf_file_list] {
1e59de90
TL
528 return local_conf().parse_config_files(conf_file_list);
529 }).then([] {
aee94f69
TL
530 return local_conf().set_val("ms_inject_internal_delays", "0");
531 }).then([] {
532 return test_refused();
533 }).then([] {
534 return do_test_with_type(true);
535 }).then([] {
536 return do_test_with_type(false);
537 }).then([] {
538 logger().info("All tests succeeded");
539 // Seastar has bugs to have events undispatched during shutdown,
540 // which will result in memory leak and thus fail LeakSanitizer.
541 return seastar::sleep(100ms);
1e59de90
TL
542 }).then([] {
543 return crimson::common::sharded_conf().stop();
544 }).then([] {
545 return 0;
aee94f69
TL
546 }).handle_exception([](auto eptr) {
547 logger().error("Test failed: got exception {}", eptr);
1e59de90
TL
548 return 1;
549 });
550}
551
552int main(int argc, char** argv)
553{
554 seastar::app_template app;
555 return app.run(argc, argv, [&app] {
556 return do_test(app);
9f95a23c
TL
557 });
558}