]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/crimson/test_socket.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / test / crimson / test_socket.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <seastar/core/app-template.hh>
5#include <seastar/core/gate.hh>
6#include <seastar/core/sharded.hh>
7#include <seastar/core/sleep.hh>
f67539c2
TL
8#include <seastar/core/when_all.hh>
9#include <seastar/util/later.hh>
9f95a23c
TL
10
11#include "crimson/common/log.h"
12#include "crimson/net/Errors.h"
13#include "crimson/net/Fwd.h"
14#include "crimson/net/Socket.h"
15
16namespace {
17
18using seastar::engine;
19using seastar::future;
20using crimson::net::error;
21using crimson::net::FixedCPUServerSocket;
22using crimson::net::Socket;
23using crimson::net::SocketRef;
24using crimson::net::stop_t;
25
26using SocketFRef = seastar::foreign_ptr<SocketRef>;
27
28static seastar::logger logger{"crimsontest"};
f67539c2
TL
29static entity_addr_t get_server_addr() {
30 static int port = 9020;
31 ++port;
32 ceph_assert(port < 9030 && "socket and messenger test ports should not overlap");
9f95a23c 33 entity_addr_t saddr;
f67539c2
TL
34 saddr.parse("127.0.0.1", nullptr);
35 saddr.set_port(port);
9f95a23c 36 return saddr;
f67539c2 37}
9f95a23c 38
f67539c2
TL
39future<SocketRef> socket_connect(const entity_addr_t& saddr) {
40 logger.debug("socket_connect() to {} ...", saddr);
41 return Socket::connect(saddr).then([] (auto socket) {
9f95a23c
TL
42 logger.debug("socket_connect() connected");
43 return socket;
44 });
45}
46
47future<> test_refused() {
48 logger.info("test_refused()...");
f67539c2
TL
49 auto saddr = get_server_addr();
50 return socket_connect(saddr).discard_result().then([saddr] {
51 logger.error("test_refused(): connection to {} is not refused", saddr);
52 ceph_abort();
9f95a23c
TL
53 }).handle_exception_type([] (const std::system_error& e) {
54 if (e.code() != std::errc::connection_refused) {
55 logger.error("test_refused() got unexpeted error {}", e);
56 ceph_abort();
57 } else {
58 logger.info("test_refused() ok\n");
59 }
60 }).handle_exception([] (auto eptr) {
61 logger.error("test_refused() got unexpeted exception {}", eptr);
62 ceph_abort();
63 });
64}
65
66future<> test_bind_same() {
67 logger.info("test_bind_same()...");
68 return FixedCPUServerSocket::create().then([] (auto pss1) {
f67539c2
TL
69 auto saddr = get_server_addr();
70 return pss1->listen(saddr).safe_then([saddr] {
9f95a23c 71 // try to bind the same address
f67539c2
TL
72 return FixedCPUServerSocket::create().then([saddr] (auto pss2) {
73 return pss2->listen(saddr).safe_then([] {
74 logger.error("test_bind_same() should raise address_in_use");
75 ceph_abort();
76 }, FixedCPUServerSocket::listen_ertr::all_same_way(
77 [] (const std::error_code& e) {
78 if (e == std::errc::address_in_use) {
79 // successful!
9f95a23c 80 logger.info("test_bind_same() ok\n");
f67539c2
TL
81 } else {
82 logger.error("test_bind_same() got unexpected error {}", e);
83 ceph_abort();
9f95a23c 84 }
f67539c2
TL
85 // Note: need to return a explicit ready future, or there will be a
86 // runtime error: member access within null pointer of type 'struct promise_base'
87 return seastar::now();
88 })).then([pss2] {
89 return pss2->destroy();
9f95a23c
TL
90 });
91 });
f67539c2
TL
92 }, FixedCPUServerSocket::listen_ertr::all_same_way(
93 [saddr] (const std::error_code& e) {
94 logger.error("test_bind_same(): there is another instance running at {}",
95 saddr);
96 ceph_abort();
97 })).then([pss1] {
9f95a23c
TL
98 return pss1->destroy();
99 }).handle_exception([] (auto eptr) {
100 logger.error("test_bind_same() got unexpeted exception {}", eptr);
101 ceph_abort();
102 });
103 });
104}
105
106future<> test_accept() {
107 logger.info("test_accept()");
108 return FixedCPUServerSocket::create().then([] (auto pss) {
f67539c2
TL
109 auto saddr = get_server_addr();
110 return pss->listen(saddr).safe_then([pss] {
9f95a23c
TL
111 return pss->accept([] (auto socket, auto paddr) {
112 // simple accept
113 return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable {
114 return socket->close().finally([cleanup = std::move(socket)] {});
115 });
116 });
f67539c2
TL
117 }, FixedCPUServerSocket::listen_ertr::all_same_way(
118 [saddr] (const std::error_code& e) {
119 logger.error("test_accept(): there is another instance running at {}",
120 saddr);
121 ceph_abort();
122 })).then([saddr] {
9f95a23c 123 return seastar::when_all(
f67539c2 124 socket_connect(saddr).then([] (auto socket) {
9f95a23c 125 return socket->close().finally([cleanup = std::move(socket)] {}); }),
f67539c2 126 socket_connect(saddr).then([] (auto socket) {
9f95a23c 127 return socket->close().finally([cleanup = std::move(socket)] {}); }),
f67539c2 128 socket_connect(saddr).then([] (auto socket) {
9f95a23c
TL
129 return socket->close().finally([cleanup = std::move(socket)] {}); })
130 ).discard_result();
131 }).then([] {
132 // should be enough to be connected locally
133 return seastar::sleep(50ms);
134 }).then([] {
135 logger.info("test_accept() ok\n");
f67539c2 136 }).then([pss] {
9f95a23c
TL
137 return pss->destroy();
138 }).handle_exception([] (auto eptr) {
139 logger.error("test_accept() got unexpeted exception {}", eptr);
140 ceph_abort();
141 });
142 });
143}
144
145class SocketFactory {
146 SocketRef client_socket;
147 SocketFRef server_socket;
148 FixedCPUServerSocket *pss = nullptr;
149 seastar::promise<> server_connected;
150
151 public:
152 // cb_client() on CPU#0, cb_server() on CPU#1
153 template <typename FuncC, typename FuncS>
154 static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) {
f67539c2 155 assert(seastar::this_shard_id() == 0u);
9f95a23c
TL
156 auto owner = std::make_unique<SocketFactory>();
157 auto psf = owner.get();
f67539c2
TL
158 auto saddr = get_server_addr();
159 return seastar::smp::submit_to(1u, [psf, saddr] {
160 return FixedCPUServerSocket::create().then([psf, saddr] (auto pss) {
9f95a23c 161 psf->pss = pss;
f67539c2
TL
162 return pss->listen(saddr
163 ).safe_then([]{}, FixedCPUServerSocket::listen_ertr::all_same_way(
164 [saddr] (const std::error_code& e) {
165 logger.error("dispatch_sockets(): there is another instance running at {}",
166 saddr);
167 ceph_abort();
168 }));
9f95a23c 169 });
f67539c2 170 }).then([psf, saddr] {
9f95a23c 171 return seastar::when_all_succeed(
f67539c2
TL
172 seastar::smp::submit_to(0u, [psf, saddr] {
173 return socket_connect(saddr).then([psf] (auto socket) {
9f95a23c
TL
174 psf->client_socket = std::move(socket);
175 });
176 }),
177 seastar::smp::submit_to(1u, [psf] {
178 return psf->pss->accept([psf] (auto socket, auto paddr) {
179 psf->server_socket = seastar::make_foreign(std::move(socket));
180 return seastar::smp::submit_to(0u, [psf] {
181 psf->server_connected.set_value();
182 });
183 });
184 })
185 );
f67539c2
TL
186 }).then_unpack([] {
187 return seastar::now();
9f95a23c
TL
188 }).then([psf] {
189 return psf->server_connected.get_future();
f67539c2 190 }).then([psf] {
9f95a23c
TL
191 if (psf->pss) {
192 return seastar::smp::submit_to(1u, [psf] {
193 return psf->pss->destroy();
194 });
195 }
196 return seastar::now();
197 }).then([psf,
198 cb_client = std::move(cb_client),
199 cb_server = std::move(cb_server)] () mutable {
200 logger.debug("dispatch_sockets(): client/server socket are ready");
201 return seastar::when_all_succeed(
202 seastar::smp::submit_to(0u, [socket = psf->client_socket.get(),
203 cb_client = std::move(cb_client)] {
f67539c2 204 return cb_client(socket).then([socket] {
9f95a23c
TL
205 logger.debug("closing client socket...");
206 return socket->close();
207 }).handle_exception([] (auto eptr) {
208 logger.error("dispatch_sockets():"
209 " cb_client() got unexpeted exception {}", eptr);
210 ceph_abort();
211 });
212 }),
213 seastar::smp::submit_to(1u, [socket = psf->server_socket.get(),
214 cb_server = std::move(cb_server)] {
f67539c2 215 return cb_server(socket).then([socket] {
9f95a23c
TL
216 logger.debug("closing server socket...");
217 return socket->close();
218 }).handle_exception([] (auto eptr) {
219 logger.error("dispatch_sockets():"
220 " cb_server() got unexpeted exception {}", eptr);
221 ceph_abort();
222 });
223 })
224 );
f67539c2
TL
225 }).then_unpack([] {
226 return seastar::now();
9f95a23c
TL
227 }).finally([cleanup = std::move(owner)] {});
228 }
229};
230
231class Connection {
232 static const uint64_t DATA_TAIL = 5327;
233 static const unsigned DATA_SIZE = 4096;
234 std::array<uint64_t, DATA_SIZE> data = {0};
235
236 void verify_data_read(const uint64_t read_data[]) {
237 ceph_assert(read_data[0] == read_count);
238 ceph_assert(data[DATA_SIZE - 1] = DATA_TAIL);
239 }
240
241 Socket* socket = nullptr;
242 uint64_t write_count = 0;
243 uint64_t read_count = 0;
244
245 Connection(Socket* socket) : socket{socket} {
246 assert(socket);
247 data[DATA_SIZE - 1] = DATA_TAIL;
248 }
249
250 future<> dispatch_write(unsigned round = 0, bool force_shut = false) {
251 logger.debug("dispatch_write(round={}, force_shut={})...", round, force_shut);
252 return seastar::repeat([this, round, force_shut] {
253 if (round != 0 && round <= write_count) {
f67539c2 254 return seastar::futurize_invoke([this, force_shut] {
9f95a23c
TL
255 if (force_shut) {
256 logger.debug("dispatch_write() done, force shutdown output");
257 socket->force_shutdown_out();
258 } else {
259 logger.debug("dispatch_write() done");
260 }
261 }).then([] {
262 return seastar::make_ready_future<stop_t>(stop_t::yes);
263 });
264 } else {
265 data[0] = write_count;
266 return socket->write(seastar::net::packet(
267 reinterpret_cast<const char*>(&data), sizeof(data))
268 ).then([this] {
269 return socket->flush();
270 }).then([this] {
271 write_count += 1;
272 return seastar::make_ready_future<stop_t>(stop_t::no);
273 });
274 }
275 });
276 }
277
278 future<> dispatch_write_unbounded() {
279 return dispatch_write(
280 ).then([] {
281 ceph_abort();
282 }).handle_exception_type([this] (const std::system_error& e) {
283 if (e.code() != std::errc::broken_pipe &&
284 e.code() != std::errc::connection_reset) {
285 logger.error("dispatch_write_unbounded(): "
286 "unexpected error {}", e);
287 throw;
288 }
289 // successful
290 logger.debug("dispatch_write_unbounded(): "
291 "expected error {}", e);
292 shutdown();
293 });
294 }
295
296 future<> dispatch_read(unsigned round = 0, bool force_shut = false) {
297 logger.debug("dispatch_read(round={}, force_shut={})...", round, force_shut);
298 return seastar::repeat([this, round, force_shut] {
299 if (round != 0 && round <= read_count) {
f67539c2 300 return seastar::futurize_invoke([this, force_shut] {
9f95a23c
TL
301 if (force_shut) {
302 logger.debug("dispatch_read() done, force shutdown input");
303 socket->force_shutdown_in();
304 } else {
305 logger.debug("dispatch_read() done");
306 }
307 }).then([] {
308 return seastar::make_ready_future<stop_t>(stop_t::yes);
309 });
310 } else {
f67539c2 311 return seastar::futurize_invoke([this] {
9f95a23c
TL
312 // we want to test both Socket::read() and Socket::read_exactly()
313 if (read_count % 2) {
314 return socket->read(DATA_SIZE * sizeof(uint64_t)
315 ).then([this] (ceph::bufferlist bl) {
316 uint64_t read_data[DATA_SIZE];
317 auto p = bl.cbegin();
318 ::ceph::decode_raw(read_data, p);
319 verify_data_read(read_data);
320 });
321 } else {
322 return socket->read_exactly(DATA_SIZE * sizeof(uint64_t)
323 ).then([this] (auto buf) {
324 auto read_data = reinterpret_cast<const uint64_t*>(buf.get());
325 verify_data_read(read_data);
326 });
327 }
328 }).then([this] {
329 ++read_count;
330 return seastar::make_ready_future<stop_t>(stop_t::no);
331 });
332 }
333 });
334 }
335
336 future<> dispatch_read_unbounded() {
337 return dispatch_read(
338 ).then([] {
339 ceph_abort();
340 }).handle_exception_type([this] (const std::system_error& e) {
341 if (e.code() != error::read_eof
342 && e.code() != std::errc::connection_reset) {
343 logger.error("dispatch_read_unbounded(): "
344 "unexpected error {}", e);
345 throw;
346 }
347 // successful
348 logger.debug("dispatch_read_unbounded(): "
349 "expected error {}", e);
350 shutdown();
351 });
352 }
353
354 void shutdown() {
355 socket->shutdown();
356 }
357
358 public:
359 static future<> dispatch_rw_bounded(Socket* socket, unsigned round,
360 bool force_shut = false) {
361 logger.debug("dispatch_rw_bounded(round={}, force_shut={})...",
362 round, force_shut);
363 return seastar::do_with(Connection{socket},
364 [round, force_shut] (auto& conn) {
365 ceph_assert(round != 0);
366 return seastar::when_all_succeed(
367 conn.dispatch_write(round, force_shut),
368 conn.dispatch_read(round, force_shut)
f67539c2
TL
369 ).then_unpack([] {
370 return seastar::now();
371 });
9f95a23c
TL
372 });
373 }
374
375 static future<> dispatch_rw_unbounded(Socket* socket, bool preemptive_shut = false) {
376 logger.debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut);
377 return seastar::do_with(Connection{socket}, [preemptive_shut] (auto& conn) {
378 return seastar::when_all_succeed(
379 conn.dispatch_write_unbounded(),
380 conn.dispatch_read_unbounded(),
f67539c2 381 seastar::futurize_invoke([&conn, preemptive_shut] {
9f95a23c
TL
382 if (preemptive_shut) {
383 return seastar::sleep(100ms).then([&conn] {
384 logger.debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)");
385 conn.shutdown();
386 });
387 } else {
388 return seastar::now();
389 }
390 })
f67539c2
TL
391 ).then_unpack([] {
392 return seastar::now();
393 });
9f95a23c
TL
394 });
395 }
396};
397
398future<> test_read_write() {
399 logger.info("test_read_write()...");
400 return SocketFactory::dispatch_sockets(
401 [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
402 [] (auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
403 ).then([] {
404 logger.info("test_read_write() ok\n");
405 }).handle_exception([] (auto eptr) {
406 logger.error("test_read_write() got unexpeted exception {}", eptr);
407 ceph_abort();
408 });
409}
410
411future<> test_unexpected_down() {
412 logger.info("test_unexpected_down()...");
413 return SocketFactory::dispatch_sockets(
414 [] (auto cs) {
415 return Connection::dispatch_rw_bounded(cs, 128, true
416 ).handle_exception_type([] (const std::system_error& e) {
417 logger.debug("test_unexpected_down(): client get error {}", e);
418 ceph_assert(e.code() == error::read_eof);
419 });
420 },
421 [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
422 ).then([] {
423 logger.info("test_unexpected_down() ok\n");
424 }).handle_exception([] (auto eptr) {
425 logger.error("test_unexpected_down() got unexpeted exception {}", eptr);
426 ceph_abort();
427 });
428}
429
430future<> test_shutdown_propagated() {
431 logger.info("test_shutdown_propagated()...");
432 return SocketFactory::dispatch_sockets(
433 [] (auto cs) {
434 logger.debug("test_shutdown_propagated() shutdown client socket");
435 cs->shutdown();
436 return seastar::now();
437 },
438 [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
439 ).then([] {
440 logger.info("test_shutdown_propagated() ok\n");
441 }).handle_exception([] (auto eptr) {
442 logger.error("test_shutdown_propagated() got unexpeted exception {}", eptr);
443 ceph_abort();
444 });
445}
446
447future<> test_preemptive_down() {
448 logger.info("test_preemptive_down()...");
449 return SocketFactory::dispatch_sockets(
450 [] (auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
451 [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
452 ).then([] {
453 logger.info("test_preemptive_down() ok\n");
454 }).handle_exception([] (auto eptr) {
455 logger.error("test_preemptive_down() got unexpeted exception {}", eptr);
456 ceph_abort();
457 });
458}
459
460}
461
462int main(int argc, char** argv)
463{
464 seastar::app_template app;
465 return app.run(argc, argv, [] {
f67539c2 466 return seastar::futurize_invoke([] {
9f95a23c
TL
467 return test_refused();
468 }).then([] {
469 return test_bind_same();
470 }).then([] {
471 return test_accept();
472 }).then([] {
473 return test_read_write();
474 }).then([] {
475 return test_unexpected_down();
476 }).then([] {
477 return test_shutdown_propagated();
478 }).then([] {
479 return test_preemptive_down();
480 }).then([] {
481 logger.info("All tests succeeded");
482 // Seastar has bugs to have events undispatched during shutdown,
483 // which will result in memory leak and thus fail LeakSanitizer.
484 return seastar::sleep(100ms);
485 }).handle_exception([] (auto eptr) {
486 std::cout << "Test failure" << std::endl;
487 return seastar::make_exception_future<>(eptr);
488 });
489 });
490}