]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/msgr/test_async_networkstack.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / test / msgr / test_async_networkstack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <algorithm>
18 #include <atomic>
19 #include <iostream>
20 #include <random>
21 #include <string>
22 #include <set>
23 #include <vector>
24 #include <gtest/gtest.h>
25
26 #include "acconfig.h"
27 #include "include/Context.h"
28
29 #include "msg/async/Event.h"
30 #include "msg/async/Stack.h"
31
32
33 #if GTEST_HAS_PARAM_TEST
34
35 class NetworkWorkerTest : public ::testing::TestWithParam<const char*> {
36 public:
37 std::shared_ptr<NetworkStack> stack;
38 string addr, port_addr;
39
40 NetworkWorkerTest() {}
41 void SetUp() override {
42 cerr << __func__ << " start set up " << GetParam() << std::endl;
43 if (strncmp(GetParam(), "dpdk", 4)) {
44 g_ceph_context->_conf->set_val("ms_type", "async+posix", false);
45 addr = "127.0.0.1:15000";
46 port_addr = "127.0.0.1:15001";
47 } else {
48 g_ceph_context->_conf->set_val("ms_type", "async+dpdk", false);
49 g_ceph_context->_conf->set_val("ms_dpdk_debug_allow_loopback", "true", false);
50 g_ceph_context->_conf->set_val("ms_async_op_threads", "2", false);
51 g_ceph_context->_conf->set_val("ms_dpdk_coremask", "0x7", false);
52 g_ceph_context->_conf->set_val("ms_dpdk_host_ipv4_addr", "172.16.218.3", false);
53 g_ceph_context->_conf->set_val("ms_dpdk_gateway_ipv4_addr", "172.16.218.2", false);
54 g_ceph_context->_conf->set_val("ms_dpdk_netmask_ipv4_addr", "255.255.255.0", false);
55 addr = "172.16.218.3:15000";
56 port_addr = "172.16.218.3:15001";
57 }
58 stack = NetworkStack::create(g_ceph_context, GetParam());
59 stack->start();
60 }
61 void TearDown() override {
62 stack->stop();
63 }
64 string get_addr() const {
65 return addr;
66 }
67 string get_ip_different_port() const {
68 return port_addr;
69 }
70 string get_different_ip() const {
71 return "10.0.123.100:4323";
72 }
73 EventCenter *get_center(unsigned i) {
74 return &stack->get_worker(i)->center;
75 }
76 Worker *get_worker(unsigned i) {
77 return stack->get_worker(i);
78 }
79 template<typename func>
80 class C_dispatch : public EventCallback {
81 Worker *worker;
82 func f;
83 std::atomic_bool done;
84 public:
85 C_dispatch(Worker *w, func &&_f): worker(w), f(std::move(_f)), done(false) {}
86 void do_request(int id) override {
87 f(worker);
88 done = true;
89 }
90 void wait() {
91 int us = 1000 * 1000 * 1000;
92 while (!done) {
93 ASSERT_TRUE(us > 0);
94 usleep(100);
95 us -= 100;
96 }
97 }
98 };
99 template<typename func>
100 void exec_events(func &&f) {
101 std::vector<C_dispatch<func>*> dis;
102 for (unsigned i = 0; i < stack->get_num_worker(); ++i) {
103 Worker *w = stack->get_worker(i);
104 C_dispatch<func> *e = new C_dispatch<func>(w, std::move(f));
105 stack->get_worker(i)->center.dispatch_event_external(e);
106 dis.push_back(e);
107 }
108
109 for (auto &&e : dis) {
110 e->wait();
111 delete e;
112 }
113 }
114 };
115
116 class C_poll : public EventCallback {
117 EventCenter *center;
118 std::atomic<bool> woken;
119 static const int sleepus = 500;
120
121 public:
122 C_poll(EventCenter *c): center(c), woken(false) {}
123 void do_request(int r) override {
124 woken = true;
125 }
126 bool poll(int milliseconds) {
127 auto start = ceph::coarse_real_clock::now();
128 while (!woken) {
129 center->process_events(sleepus);
130 usleep(sleepus);
131 auto r = std::chrono::duration_cast<std::chrono::milliseconds>(
132 ceph::coarse_real_clock::now() - start);
133 if (r >= std::chrono::milliseconds(milliseconds))
134 break;
135 }
136 return woken;
137 }
138 void reset() {
139 woken = false;
140 }
141 };
142
143 TEST_P(NetworkWorkerTest, SimpleTest) {
144 entity_addr_t bind_addr;
145 ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
146 std::atomic_bool accepted(false);
147 std::atomic_bool *accepted_p = &accepted;
148
149 exec_events([this, accepted_p, bind_addr](Worker *worker) mutable {
150 entity_addr_t cli_addr;
151 SocketOptions options;
152 ServerSocket bind_socket;
153 EventCenter *center = &worker->center;
154 ssize_t r = 0;
155 if (stack->support_local_listen_table() || worker->id == 0)
156 r = worker->listen(bind_addr, options, &bind_socket);
157 ASSERT_EQ(0, r);
158
159 ConnectedSocket cli_socket, srv_socket;
160 if (worker->id == 0) {
161 r = worker->connect(bind_addr, options, &cli_socket);
162 ASSERT_EQ(0, r);
163 }
164
165 bool is_my_accept = false;
166 if (bind_socket) {
167 C_poll cb(center);
168 center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
169 if (cb.poll(500)) {
170 *accepted_p = true;
171 is_my_accept = true;
172 }
173 ASSERT_TRUE(*accepted_p);
174 center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
175 }
176
177 if (is_my_accept) {
178 r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
179 ASSERT_EQ(0, r);
180 ASSERT_TRUE(srv_socket.fd() > 0);
181 }
182
183 if (worker->id == 0) {
184 C_poll cb(center);
185 center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
186 r = cli_socket.is_connected();
187 if (r == 0) {
188 ASSERT_EQ(true, cb.poll(500));
189 r = cli_socket.is_connected();
190 }
191 ASSERT_EQ(1, r);
192 center->delete_file_event(cli_socket.fd(), EVENT_READABLE);
193 }
194
195 const char *message = "this is a new message";
196 int len = strlen(message);
197 bufferlist bl;
198 bl.append(message, len);
199 if (worker->id == 0) {
200 r = cli_socket.send(bl, false);
201 ASSERT_EQ(len, r);
202 }
203
204 char buf[1024];
205 C_poll cb(center);
206 if (is_my_accept) {
207 center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb);
208 {
209 r = srv_socket.read(buf, sizeof(buf));
210 while (r == -EAGAIN) {
211 ASSERT_TRUE(cb.poll(500));
212 r = srv_socket.read(buf, sizeof(buf));
213 cb.reset();
214 }
215 ASSERT_EQ(len, r);
216 ASSERT_EQ(0, memcmp(buf, message, len));
217 }
218 bind_socket.abort_accept();
219 }
220 if (worker->id == 0) {
221 cli_socket.shutdown();
222 // ack delay is 200 ms
223 }
224
225 bl.clear();
226 bl.append(message, len);
227 if (worker->id == 0) {
228 r = cli_socket.send(bl, false);
229 ASSERT_EQ(-EPIPE, r);
230 }
231 if (is_my_accept) {
232 cb.reset();
233 ASSERT_TRUE(cb.poll(500));
234 r = srv_socket.read(buf, sizeof(buf));
235 if (r == -EAGAIN) {
236 cb.reset();
237 ASSERT_TRUE(cb.poll(1000*500));
238 r = srv_socket.read(buf, sizeof(buf));
239 }
240 ASSERT_EQ(0, r);
241 center->delete_file_event(srv_socket.fd(), EVENT_READABLE);
242 srv_socket.close();
243 }
244 });
245 }
246
247 TEST_P(NetworkWorkerTest, ConnectFailedTest) {
248 entity_addr_t bind_addr;
249 ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
250
251 exec_events([this, bind_addr](Worker *worker) mutable {
252 EventCenter *center = &worker->center;
253 entity_addr_t cli_addr;
254 SocketOptions options;
255 ServerSocket bind_socket;
256 int r = 0;
257 if (stack->support_local_listen_table() || worker->id == 0)
258 r = worker->listen(bind_addr, options, &bind_socket);
259 ASSERT_EQ(0, r);
260
261 ConnectedSocket cli_socket1, cli_socket2;
262 if (worker->id == 0) {
263 ASSERT_TRUE(cli_addr.parse(get_ip_different_port().c_str()));
264 r = worker->connect(cli_addr, options, &cli_socket1);
265 ASSERT_EQ(0, r);
266 C_poll cb(center);
267 center->create_file_event(cli_socket1.fd(), EVENT_READABLE, &cb);
268 r = cli_socket1.is_connected();
269 if (r == 0) {
270 ASSERT_TRUE(cb.poll(500));
271 r = cli_socket1.is_connected();
272 }
273 ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET);
274 }
275
276 if (worker->id == 1) {
277 ASSERT_TRUE(cli_addr.parse(get_different_ip().c_str()));
278 r = worker->connect(cli_addr, options, &cli_socket2);
279 ASSERT_EQ(0, r);
280 C_poll cb(center);
281 center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb);
282 r = cli_socket2.is_connected();
283 if (r == 0) {
284 cb.poll(500);
285 r = cli_socket2.is_connected();
286 }
287 ASSERT_TRUE(r != 1);
288 center->delete_file_event(cli_socket2.fd(), EVENT_READABLE);
289 }
290 });
291 }
292
293 TEST_P(NetworkWorkerTest, ListenTest) {
294 Worker *worker = get_worker(0);
295 entity_addr_t bind_addr;
296 ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
297 SocketOptions options;
298 ServerSocket bind_socket1, bind_socket2;
299 int r = worker->listen(bind_addr, options, &bind_socket1);
300 ASSERT_EQ(0, r);
301
302 r = worker->listen(bind_addr, options, &bind_socket2);
303 ASSERT_EQ(-EADDRINUSE, r);
304 }
305
306 TEST_P(NetworkWorkerTest, AcceptAndCloseTest) {
307 entity_addr_t bind_addr;
308 ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
309 std::atomic_bool accepted(false);
310 std::atomic_bool *accepted_p = &accepted;
311 std::atomic_int unbind_count(stack->get_num_worker());
312 std::atomic_int *count_p = &unbind_count;
313 exec_events([this, bind_addr, accepted_p, count_p](Worker *worker) mutable {
314 SocketOptions options;
315 EventCenter *center = &worker->center;
316 entity_addr_t cli_addr;
317 int r = 0;
318 {
319 ServerSocket bind_socket;
320 if (stack->support_local_listen_table() || worker->id == 0)
321 r = worker->listen(bind_addr, options, &bind_socket);
322 ASSERT_EQ(0, r);
323
324 ConnectedSocket srv_socket, cli_socket;
325 if (bind_socket) {
326 r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
327 ASSERT_EQ(-EAGAIN, r);
328 }
329
330 C_poll cb(center);
331 if (worker->id == 0) {
332 center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
333 r = worker->connect(bind_addr, options, &cli_socket);
334 ASSERT_EQ(0, r);
335 ASSERT_TRUE(cb.poll(500));
336 }
337
338 if (bind_socket) {
339 cb.reset();
340 cb.poll(500);
341 ConnectedSocket srv_socket2;
342 do {
343 r = bind_socket.accept(&srv_socket2, options, &cli_addr, worker);
344 usleep(100);
345 } while (r == -EAGAIN && !*accepted_p);
346 if (r == 0)
347 *accepted_p = true;
348 ASSERT_TRUE(*accepted_p);
349 // srv_socket2 closed
350 center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
351 }
352
353 if (worker->id == 0) {
354 char buf[100];
355 cb.reset();
356 center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
357 int i = 3;
358 while (!i--) {
359 ASSERT_TRUE(cb.poll(500));
360 r = cli_socket.read(buf, sizeof(buf));
361 if (r == 0)
362 break;
363 }
364 ASSERT_EQ(0, r);
365 center->delete_file_event(cli_socket.fd(), EVENT_READABLE);
366 }
367
368 if (bind_socket)
369 center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
370 if (worker->id == 0) {
371 *accepted_p = false;
372 r = worker->connect(bind_addr, options, &cli_socket);
373 ASSERT_EQ(0, r);
374 cb.reset();
375 ASSERT_TRUE(cb.poll(500));
376 cli_socket.close();
377 }
378
379 if (bind_socket) {
380 do {
381 r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
382 usleep(100);
383 } while (r == -EAGAIN && !*accepted_p);
384 if (r == 0)
385 *accepted_p = true;
386 ASSERT_TRUE(*accepted_p);
387 center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
388 }
389 // unbind
390 }
391
392 --*count_p;
393 while (*count_p > 0)
394 usleep(100);
395
396 ConnectedSocket cli_socket;
397 r = worker->connect(bind_addr, options, &cli_socket);
398 ASSERT_EQ(0, r);
399 {
400 C_poll cb(center);
401 center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
402 r = cli_socket.is_connected();
403 if (r == 0) {
404 ASSERT_TRUE(cb.poll(500));
405 r = cli_socket.is_connected();
406 }
407 ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET);
408 }
409 });
410 }
411
412 TEST_P(NetworkWorkerTest, ComplexTest) {
413 entity_addr_t bind_addr;
414 std::atomic_bool listen_done(false);
415 std::atomic_bool *listen_p = &listen_done;
416 std::atomic_bool accepted(false);
417 std::atomic_bool *accepted_p = &accepted;
418 std::atomic_bool done(false);
419 std::atomic_bool *done_p = &done;
420 ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
421 exec_events([this, bind_addr, listen_p, accepted_p, done_p](Worker *worker) mutable {
422 entity_addr_t cli_addr;
423 EventCenter *center = &worker->center;
424 SocketOptions options;
425 ServerSocket bind_socket;
426 int r = 0;
427 if (stack->support_local_listen_table() || worker->id == 0) {
428 r = worker->listen(bind_addr, options, &bind_socket);
429 ASSERT_EQ(0, r);
430 *listen_p = true;
431 }
432 ConnectedSocket cli_socket, srv_socket;
433 if (worker->id == 1) {
434 while (!*listen_p) {
435 usleep(50);
436 r = worker->connect(bind_addr, options, &cli_socket);
437 ASSERT_EQ(0, r);
438 }
439 }
440
441 if (bind_socket) {
442 C_poll cb(center);
443 center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
444 int count = 3;
445 while (count--) {
446 if (cb.poll(500)) {
447 r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
448 ASSERT_EQ(0, r);
449 *accepted_p = true;
450 break;
451 }
452 }
453 ASSERT_TRUE(*accepted_p);
454 center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
455 }
456
457 if (worker->id == 1) {
458 C_poll cb(center);
459 center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb);
460 r = cli_socket.is_connected();
461 if (r == 0) {
462 ASSERT_TRUE(cb.poll(500));
463 r = cli_socket.is_connected();
464 }
465 ASSERT_EQ(1, r);
466 center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE);
467 }
468
469 const size_t message_size = 10240;
470 size_t count = 100;
471 string message(message_size, '!');
472 for (size_t i = 0; i < message_size; i += 100)
473 message[i] = ',';
474 size_t len = message_size * count;
475 C_poll cb(center);
476 if (worker->id == 1)
477 center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb);
478 if (srv_socket)
479 center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb);
480 size_t left = len;
481 len *= 2;
482 string read_string;
483 int again_count = 0;
484 int c = 2;
485 bufferlist bl;
486 for (size_t i = 0; i < count; ++i)
487 bl.push_back(bufferptr((char*)message.data(), message_size));
488 while (!*done_p) {
489 again_count = 0;
490 if (worker->id == 1) {
491 if (c > 0) {
492 ssize_t r = 0;
493 usleep(100);
494 if (left > 0) {
495 r = cli_socket.send(bl, false);
496 ASSERT_TRUE(r >= 0 || r == -EAGAIN);
497 if (r > 0)
498 left -= r;
499 if (r == -EAGAIN)
500 ++again_count;
501 }
502 if (left == 0) {
503 --c;
504 left = message_size * count;
505 ASSERT_EQ(0U, bl.length());
506 for (size_t i = 0; i < count; ++i)
507 bl.push_back(bufferptr((char*)message.data(), message_size));
508 }
509 }
510 }
511
512 if (srv_socket) {
513 char buf[1000];
514 if (len > 0) {
515 r = srv_socket.read(buf, sizeof(buf));
516 ASSERT_TRUE(r > 0 || r == -EAGAIN);
517 if (r > 0) {
518 read_string.append(buf, r);
519 len -= r;
520 } else if (r == -EAGAIN) {
521 ++again_count;
522 }
523 }
524 if (len == 0) {
525 for (size_t i = 0; i < read_string.size(); i += message_size)
526 ASSERT_EQ(0, memcmp(read_string.c_str()+i, message.c_str(), message_size));
527 *done_p = true;
528 }
529 }
530 if (again_count) {
531 cb.reset();
532 cb.poll(500);
533 }
534 }
535 if (worker->id == 1)
536 center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE);
537 if (srv_socket)
538 center->delete_file_event(srv_socket.fd(), EVENT_READABLE);
539
540 if (bind_socket)
541 bind_socket.abort_accept();
542 if (srv_socket)
543 srv_socket.close();
544 if (worker->id == 1)
545 cli_socket.close();
546 });
547 }
548
549 class StressFactory {
550 struct Client;
551 struct Server;
552 struct ThreadData {
553 Worker *worker;
554 std::set<Client*> clients;
555 std::set<Server*> servers;
556 ~ThreadData() {
557 for (auto && i : clients)
558 delete i;
559 for (auto && i : servers)
560 delete i;
561 }
562 };
563
564 struct RandomString {
565 size_t slen;
566 vector<std::string> strs;
567 std::random_device rd;
568 std::default_random_engine rng;
569
570 RandomString(size_t s): slen(s), rng(rd()) {}
571 void prepare(size_t n) {
572 static const char alphabet[] =
573 "abcdefghijklmnopqrstuvwxyz"
574 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
575 "0123456789";
576
577 std::uniform_int_distribution<> dist(
578 0, sizeof(alphabet) / sizeof(*alphabet) - 2);
579
580 strs.reserve(n);
581 std::generate_n(
582 std::back_inserter(strs), strs.capacity(), [&] {
583 std::string str;
584 str.reserve(slen);
585 std::generate_n(std::back_inserter(str), slen, [&]() {
586 return alphabet[dist(rng)];
587 });
588 return str;
589 }
590 );
591 }
592 std::string &get_random_string() {
593 std::uniform_int_distribution<> dist(
594 0, strs.size() - 1);
595 return strs[dist(rng)];
596 }
597 };
598 struct Message {
599 size_t idx;
600 size_t len;
601 std::string content;
602
603 explicit Message(RandomString &rs, size_t i, size_t l): idx(i) {
604 size_t slen = rs.slen;
605 len = std::max(slen, l);
606
607 std::vector<std::string> strs;
608 strs.reserve(len / slen);
609 std::generate_n(
610 std::back_inserter(strs), strs.capacity(), [&] {
611 return rs.get_random_string();
612 }
613 );
614 len = slen * strs.size();
615 content.reserve(len);
616 for (auto &&s : strs)
617 content.append(s);
618 }
619 bool verify(const char *b, size_t len = 0) const {
620 return content.compare(0, len, b, 0, len) == 0;
621 }
622 };
623
624 template <typename T>
625 class C_delete : public EventCallback {
626 T *ctxt;
627 public:
628 C_delete(T *c): ctxt(c) {}
629 void do_request(int id) override {
630 delete ctxt;
631 delete this;
632 }
633 };
634
635 class Client {
636 StressFactory *factory;
637 EventCenter *center;
638 ConnectedSocket socket;
639 std::deque<StressFactory::Message*> acking;
640 std::deque<StressFactory::Message*> writings;
641 std::string buffer;
642 size_t index = 0;
643 size_t left;
644 bool write_enabled = false;
645 size_t read_offset = 0, write_offset = 0;
646 bool first = true;
647 bool dead = false;
648 StressFactory::Message homeless_message;
649
650 class Client_read_handle : public EventCallback {
651 Client *c;
652 public:
653 Client_read_handle(Client *_c): c(_c) {}
654 void do_request(int id) override {
655 c->do_read_request();
656 }
657 } read_ctxt;
658
659 class Client_write_handle : public EventCallback {
660 Client *c;
661 public:
662 Client_write_handle(Client *_c): c(_c) {}
663 void do_request(int id) override {
664 c->do_write_request();
665 }
666 } write_ctxt;
667
668 public:
669 Client(StressFactory *f, EventCenter *cen, ConnectedSocket s, size_t c)
670 : factory(f), center(cen), socket(std::move(s)), left(c), homeless_message(factory->rs, -1, 1024),
671 read_ctxt(this), write_ctxt(this) {
672 center->create_file_event(
673 socket.fd(), EVENT_READABLE, &read_ctxt);
674 center->dispatch_event_external(&read_ctxt);
675 }
676 void close() {
677 ASSERT_FALSE(write_enabled);
678 dead = true;
679 socket.shutdown();
680 center->delete_file_event(socket.fd(), EVENT_READABLE);
681 center->dispatch_event_external(new C_delete<Client>(this));
682 }
683
684 void do_read_request() {
685 if (dead)
686 return ;
687 ASSERT_TRUE(socket.is_connected() >= 0);
688 if (!socket.is_connected())
689 return ;
690 ASSERT_TRUE(!acking.empty() || first);
691 if (first) {
692 first = false;
693 center->dispatch_event_external(&write_ctxt);
694 if (acking.empty())
695 return ;
696 }
697 StressFactory::Message *m = acking.front();
698 int r = 0;
699 if (buffer.empty())
700 buffer.resize(m->len);
701 bool must_no = false;
702 while (true) {
703 r = socket.read((char*)buffer.data() + read_offset,
704 m->len - read_offset);
705 ASSERT_TRUE(r == -EAGAIN || r > 0);
706 if (r == -EAGAIN)
707 break;
708 read_offset += r;
709
710 std::cerr << " client " << this << " receive " << m->idx << " len " << r << " content: " << std::endl;
711 ASSERT_FALSE(must_no);
712 if ((m->len - read_offset) == 0) {
713 ASSERT_TRUE(m->verify(buffer.data(), 0));
714 delete m;
715 acking.pop_front();
716 read_offset = 0;
717 buffer.clear();
718 if (acking.empty()) {
719 m = &homeless_message;
720 must_no = true;
721 } else {
722 m = acking.front();
723 buffer.resize(m->len);
724 }
725 }
726 }
727 if (acking.empty()) {
728 center->dispatch_event_external(&write_ctxt);
729 return ;
730 }
731 }
732
733 void do_write_request() {
734 if (dead)
735 return ;
736 ASSERT_TRUE(socket.is_connected() > 0);
737
738 while (left > 0 && factory->queue_depth > writings.size() + acking.size()) {
739 StressFactory::Message *m = new StressFactory::Message(
740 factory->rs, ++index,
741 factory->rd() % factory->max_message_length);
742 std::cerr << " client " << this << " generate message " << m->idx << " length " << m->len << std::endl;
743 ASSERT_EQ(m->len, m->content.size());
744 writings.push_back(m);
745 --left;
746 --factory->message_left;
747 }
748
749 while (!writings.empty()) {
750 StressFactory::Message *m = writings.front();
751 bufferlist bl;
752 bl.append(m->content.data() + write_offset, m->content.size() - write_offset);
753 ssize_t r = socket.send(bl, false);
754 if (r == 0)
755 break;
756 std::cerr << " client " << this << " send " << m->idx << " len " << r << " content: " << std::endl;
757 ASSERT_TRUE(r >= 0);
758 write_offset += r;
759 if (write_offset == m->content.size()) {
760 write_offset = 0;
761 writings.pop_front();
762 acking.push_back(m);
763 }
764 }
765 if (writings.empty() && write_enabled) {
766 center->delete_file_event(socket.fd(), EVENT_WRITABLE);
767 write_enabled = false;
768 } else if (!writings.empty() && !write_enabled) {
769 ASSERT_EQ(0, center->create_file_event(
770 socket.fd(), EVENT_WRITABLE, &write_ctxt));
771 write_enabled = true;
772 }
773 }
774
775 bool finish() const {
776 return left == 0 && acking.empty() && writings.empty();
777 }
778 };
779 friend class Client;
780
781 class Server {
782 StressFactory *factory;
783 EventCenter *center;
784 ConnectedSocket socket;
785 std::deque<std::string> buffers;
786 bool write_enabled = false;
787 bool dead = false;
788
789 class Server_read_handle : public EventCallback {
790 Server *s;
791 public:
792 Server_read_handle(Server *_s): s(_s) {}
793 void do_request(int id) override {
794 s->do_read_request();
795 }
796 } read_ctxt;
797
798 class Server_write_handle : public EventCallback {
799 Server *s;
800 public:
801 Server_write_handle(Server *_s): s(_s) {}
802 void do_request(int id) override {
803 s->do_write_request();
804 }
805 } write_ctxt;
806
807 public:
808 Server(StressFactory *f, EventCenter *c, ConnectedSocket s):
809 factory(f), center(c), socket(std::move(s)), read_ctxt(this), write_ctxt(this) {
810 center->create_file_event(socket.fd(), EVENT_READABLE, &read_ctxt);
811 center->dispatch_event_external(&read_ctxt);
812 }
813 void close() {
814 ASSERT_FALSE(write_enabled);
815 socket.shutdown();
816 center->delete_file_event(socket.fd(), EVENT_READABLE);
817 center->dispatch_event_external(new C_delete<Server>(this));
818 }
819 void do_read_request() {
820 if (dead)
821 return ;
822 int r = 0;
823 while (true) {
824 char buf[4096];
825 bufferptr data;
826 if (factory->zero_copy_read) {
827 r = socket.zero_copy_read(data);
828 } else {
829 r = socket.read(buf, sizeof(buf));
830 }
831 ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf)));
832 if (r == 0) {
833 ASSERT_TRUE(buffers.empty());
834 dead = true;
835 return ;
836 } else if (r == -EAGAIN)
837 break;
838 if (factory->zero_copy_read) {
839 buffers.emplace_back(data.c_str(), 0, data.length());
840 } else {
841 buffers.emplace_back(buf, 0, r);
842 }
843 std::cerr << " server " << this << " receive " << r << " content: " << std::endl;
844 }
845 if (!buffers.empty() && !write_enabled)
846 center->dispatch_event_external(&write_ctxt);
847 }
848
849 void do_write_request() {
850 if (dead)
851 return ;
852
853 while (!buffers.empty()) {
854 bufferlist bl;
855 auto it = buffers.begin();
856 for (size_t i = 0; i < buffers.size(); ++i) {
857 bl.push_back(bufferptr((char*)it->data(), it->size()));
858 ++it;
859 }
860
861 ssize_t r = socket.send(bl, false);
862 std::cerr << " server " << this << " send " << r << std::endl;
863 if (r == 0)
864 break;
865 ASSERT_TRUE(r >= 0);
866 while (r > 0) {
867 ASSERT_TRUE(!buffers.empty());
868 string &buffer = buffers.front();
869 if (r >= (int)buffer.size()) {
870 r -= (int)buffer.size();
871 buffers.pop_front();
872 } else {
873 std::cerr << " server " << this << " sent " << r << std::endl;
874 buffer = buffer.substr(r, buffer.size());
875 break;
876 }
877 }
878 }
879 if (buffers.empty()) {
880 if (write_enabled) {
881 center->delete_file_event(socket.fd(), EVENT_WRITABLE);
882 write_enabled = false;
883 }
884 } else if (!write_enabled) {
885 ASSERT_EQ(0, center->create_file_event(
886 socket.fd(), EVENT_WRITABLE, &write_ctxt));
887 write_enabled = true;
888 }
889 }
890
891 bool finish() {
892 return dead;
893 }
894 };
895 friend class Server;
896
897 class C_accept : public EventCallback {
898 StressFactory *factory;
899 ServerSocket bind_socket;
900 ThreadData *t_data;
901 Worker *worker;
902
903 public:
904 C_accept(StressFactory *f, ServerSocket s, ThreadData *data, Worker *w)
905 : factory(f), bind_socket(std::move(s)), t_data(data), worker(w) {}
906 void do_request(int id) override {
907 while (true) {
908 entity_addr_t cli_addr;
909 ConnectedSocket srv_socket;
910 SocketOptions options;
911 int r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
912 if (r == -EAGAIN) {
913 break;
914 }
915 ASSERT_EQ(0, r);
916 ASSERT_TRUE(srv_socket.fd() > 0);
917 Server *cb = new Server(factory, &t_data->worker->center, std::move(srv_socket));
918 t_data->servers.insert(cb);
919 }
920 }
921 };
922 friend class C_accept;
923
924 public:
925 static const size_t min_client_send_messages = 100;
926 static const size_t max_client_send_messages = 1000;
927 std::shared_ptr<NetworkStack> stack;
928 RandomString rs;
929 std::random_device rd;
930 const size_t client_num, queue_depth, max_message_length;
931 atomic_int message_count, message_left;
932 entity_addr_t bind_addr;
933 std::atomic_bool already_bind = {false};
934 bool zero_copy_read;
935 SocketOptions options;
936
937 explicit StressFactory(std::shared_ptr<NetworkStack> s, const string &addr,
938 size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy)
939 : stack(s), rs(128), client_num(cli), queue_depth(qd),
940 max_message_length(l), message_count(mc), message_left(mc),
941 zero_copy_read(zero_copy) {
942 bind_addr.parse(addr.c_str());
943 rs.prepare(100);
944 }
945 ~StressFactory() {
946 }
947
948 void add_client(ThreadData *t_data) {
949 static Mutex lock("add_client_lock");
950 Mutex::Locker l(lock);
951 ConnectedSocket sock;
952 int r = t_data->worker->connect(bind_addr, options, &sock);
953 std::default_random_engine rng(rd());
954 std::uniform_int_distribution<> dist(
955 min_client_send_messages, max_client_send_messages);
956 ASSERT_EQ(0, r);
957 int c = dist(rng);
958 if (c > message_count.load())
959 c = message_count.load();
960 Client *cb = new Client(this, &t_data->worker->center, std::move(sock), c);
961 t_data->clients.insert(cb);
962 message_count -= c;
963 }
964
965 void drop_client(ThreadData *t_data, Client *c) {
966 c->close();
967 ASSERT_EQ(1U, t_data->clients.erase(c));
968 }
969
970 void drop_server(ThreadData *t_data, Server *s) {
971 s->close();
972 ASSERT_EQ(1U, t_data->servers.erase(s));
973 }
974
975 void start(Worker *worker) {
976 int r = 0;
977 ThreadData t_data;
978 t_data.worker = worker;
979 ServerSocket bind_socket;
980 if (stack->support_local_listen_table() || worker->id == 0) {
981 r = worker->listen(bind_addr, options, &bind_socket);
982 ASSERT_EQ(0, r);
983 already_bind = true;
984 }
985 while (!already_bind)
986 usleep(50);
987 C_accept *accept_handler = nullptr;
988 int bind_fd = 0;
989 if (bind_socket) {
990 bind_fd = bind_socket.fd();
991 accept_handler = new C_accept(this, std::move(bind_socket), &t_data, worker);
992 ASSERT_EQ(0, worker->center.create_file_event(
993 bind_fd, EVENT_READABLE, accept_handler));
994 }
995
996 int echo_throttle = message_count;
997 while (message_count > 0 || !t_data.clients.empty() || !t_data.servers.empty()) {
998 if (message_count > 0 && t_data.clients.size() < client_num && t_data.servers.size() < client_num)
999 add_client(&t_data);
1000 for (auto &&c : t_data.clients) {
1001 if (c->finish()) {
1002 drop_client(&t_data, c);
1003 break;
1004 }
1005 }
1006 for (auto &&s : t_data.servers) {
1007 if (s->finish()) {
1008 drop_server(&t_data, s);
1009 break;
1010 }
1011 }
1012
1013 worker->center.process_events(1);
1014 if (echo_throttle > message_left) {
1015 std::cerr << " clients " << t_data.clients.size() << " servers " << t_data.servers.size()
1016 << " message count " << message_left << std::endl;
1017 echo_throttle -= 100;
1018 }
1019 }
1020 if (bind_fd)
1021 worker->center.delete_file_event(bind_fd, EVENT_READABLE);
1022 delete accept_handler;
1023 }
1024 };
1025
1026 TEST_P(NetworkWorkerTest, StressTest) {
1027 StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024,
1028 strncmp(GetParam(), "dpdk", 4) == 0);
1029 StressFactory *f = &factory;
1030 exec_events([f](Worker *worker) mutable {
1031 f->start(worker);
1032 });
1033 ASSERT_EQ(0, factory.message_left);
1034 }
1035
1036
1037 INSTANTIATE_TEST_CASE_P(
1038 NetworkStack,
1039 NetworkWorkerTest,
1040 ::testing::Values(
1041 #ifdef HAVE_DPDK
1042 "dpdk",
1043 #endif
1044 "posix"
1045 )
1046 );
1047
1048 #else
1049
1050 // Google Test may not support value-parameterized tests with some
1051 // compilers. If we use conditional compilation to compile out all
1052 // code referring to the gtest_main library, MSVC linker will not link
1053 // that library at all and consequently complain about missing entry
1054 // point defined in that library (fatal error LNK1561: entry point
1055 // must be defined). This dummy test keeps gtest_main linked in.
1056 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
1057
1058 #endif
1059
1060
1061 /*
1062 * Local Variables:
1063 * compile-command: "cd ../.. ; make ceph_test_async_networkstack &&
1064 * ./ceph_test_async_networkstack
1065 *
1066 * End:
1067 */