]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/crimson/test_alien_echo.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / crimson / test_alien_echo.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3#include "auth/Auth.h"
4#include "messages/MPing.h"
f67539c2 5#include "common/ceph_argparse.h"
9f95a23c 6#include "crimson/auth/DummyAuth.h"
f67539c2 7#include "crimson/common/throttle.h"
11fdf7f2
TL
8#include "crimson/net/Connection.h"
9#include "crimson/net/Dispatcher.h"
10#include "crimson/net/Messenger.h"
11fdf7f2
TL
11
12#include <seastar/core/alien.hh>
13#include <seastar/core/app-template.hh>
14#include <seastar/core/future-util.hh>
f67539c2
TL
15#include <seastar/core/internal/pollable_fd.hh>
16#include <seastar/core/posix.hh>
11fdf7f2
TL
17#include <seastar/core/reactor.hh>
18
f67539c2 19using crimson::common::local_conf;
11fdf7f2
TL
20
21enum class echo_role {
22 as_server,
23 as_client,
24};
25
26namespace seastar_pingpong {
27struct DummyAuthAuthorizer : public AuthAuthorizer {
28 DummyAuthAuthorizer()
29 : AuthAuthorizer(CEPH_AUTH_CEPHX)
30 {}
31 bool verify_reply(bufferlist::const_iterator&,
32 std::string *connection_secret) override {
33 return true;
34 }
35 bool add_challenge(CephContext*, const bufferlist&) override {
36 return true;
37 }
38};
39
40struct Server {
f67539c2 41 crimson::common::Throttle byte_throttler;
9f95a23c
TL
42 crimson::net::MessengerRef msgr;
43 crimson::auth::DummyAuthClientServer dummy_auth;
f67539c2 44 struct ServerDispatcher final : crimson::net::Dispatcher {
11fdf7f2
TL
45 unsigned count = 0;
46 seastar::condition_variable on_reply;
f67539c2
TL
47 std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef c,
48 MessageRef m) final
49 {
11fdf7f2
TL
50 std::cout << "server got ping " << *m << std::endl;
51 // reply with a pong
20effc67 52 return c->send(crimson::make_message<MPing>()).then([this] {
11fdf7f2
TL
53 ++count;
54 on_reply.signal();
f67539c2 55 return seastar::now();
11fdf7f2
TL
56 });
57 }
11fdf7f2 58 } dispatcher;
9f95a23c 59 Server(crimson::net::MessengerRef msgr)
f67539c2 60 : byte_throttler(local_conf()->osd_client_message_size_cap),
11fdf7f2
TL
61 msgr{msgr}
62 {
9f95a23c
TL
63 msgr->set_crc_header();
64 msgr->set_crc_data();
11fdf7f2
TL
65 }
66};
67
68struct Client {
f67539c2 69 crimson::common::Throttle byte_throttler;
9f95a23c
TL
70 crimson::net::MessengerRef msgr;
71 crimson::auth::DummyAuthClientServer dummy_auth;
f67539c2 72 struct ClientDispatcher final : crimson::net::Dispatcher {
11fdf7f2
TL
73 unsigned count = 0;
74 seastar::condition_variable on_reply;
f67539c2
TL
75 std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef c,
76 MessageRef m) final
77 {
11fdf7f2
TL
78 std::cout << "client got pong " << *m << std::endl;
79 ++count;
80 on_reply.signal();
81 return seastar::now();
82 }
83 } dispatcher;
9f95a23c 84 Client(crimson::net::MessengerRef msgr)
f67539c2 85 : byte_throttler(local_conf()->osd_client_message_size_cap),
11fdf7f2
TL
86 msgr{msgr}
87 {
9f95a23c
TL
88 msgr->set_crc_header();
89 msgr->set_crc_data();
11fdf7f2
TL
90 }
91};
92} // namespace seastar_pingpong
93
94class SeastarContext {
f67539c2
TL
95 int begin_fd;
96 seastar::file_desc on_end;
11fdf7f2
TL
97
98public:
99 SeastarContext()
f67539c2
TL
100 : begin_fd{eventfd(0, 0)},
101 on_end{seastar::file_desc::eventfd(0, 0)}
11fdf7f2
TL
102 {}
103
104 template<class Func>
105 std::thread with_seastar(Func&& func) {
f67539c2
TL
106 return std::thread{[this, on_end = on_end.get(),
107 func = std::forward<Func>(func)] {
11fdf7f2
TL
108 // alien: are you ready?
109 wait_for_seastar();
110 // alien: could you help me apply(func)?
111 func();
112 // alien: i've sent my request. have you replied it?
113 // wait_for_seastar();
114 // alien: you are free to go!
f67539c2 115 ::eventfd_write(on_end, 1);
11fdf7f2
TL
116 }};
117 }
118
119 void run(seastar::app_template& app, int argc, char** argv) {
120 app.run(argc, argv, [this] {
f67539c2
TL
121 std::vector<const char*> args;
122 std::string cluster;
123 std::string conf_file_list;
124 auto init_params = ceph_argparse_early_args(args,
125 CEPH_ENTITY_TYPE_CLIENT,
126 &cluster,
127 &conf_file_list);
128 return crimson::common::sharded_conf().start(init_params.name, cluster)
129 .then([conf_file_list] {
130 return local_conf().parse_config_files(conf_file_list);
11fdf7f2 131 }).then([this] {
f67539c2
TL
132 return set_seastar_ready();
133 }).then([on_end = std::move(on_end)] () mutable {
11fdf7f2 134 // seastar: let me know once i am free to leave.
f67539c2
TL
135 return seastar::do_with(seastar::pollable_fd(std::move(on_end)), []
136 (seastar::pollable_fd& on_end_fds) {
137 return on_end_fds.readable().then([&on_end_fds] {
138 eventfd_t result = 0;
139 on_end_fds.get_file_desc().read(&result, sizeof(result));
140 return seastar::make_ready_future<>();
141 });
142 });
143 }).then([]() {
144 return crimson::common::sharded_conf().stop();
11fdf7f2
TL
145 }).handle_exception([](auto ep) {
146 std::cerr << "Error: " << ep << std::endl;
147 }).finally([] {
148 seastar::engine().exit(0);
149 });
150 });
151 }
152
153 seastar::future<> set_seastar_ready() {
154 // seastar: i am ready to serve!
f67539c2 155 ::eventfd_write(begin_fd, 1);
11fdf7f2
TL
156 return seastar::now();
157 }
158
159private:
160 void wait_for_seastar() {
161 eventfd_t result = 0;
f67539c2 162 if (int r = ::eventfd_read(begin_fd, &result); r < 0) {
11fdf7f2
TL
163 std::cerr << "unable to eventfd_read():" << errno << std::endl;
164 }
165 }
166};
167
168static seastar::future<>
169seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
170{
171 std::cout << "seastar/";
172 if (role == echo_role::as_server) {
9f95a23c
TL
173 return seastar::do_with(
174 seastar_pingpong::Server{crimson::net::Messenger::create(
175 entity_name_t::OSD(0), "server", addr.get_nonce())},
176 [addr, count](auto& server) mutable {
177 std::cout << "server listening at " << addr << std::endl;
178 // bind the server
179 server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
180 server.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
181 &server.byte_throttler);
182 server.msgr->set_require_authorizer(false);
183 server.msgr->set_auth_client(&server.dummy_auth);
184 server.msgr->set_auth_server(&server.dummy_auth);
185 return server.msgr->bind(entity_addrvec_t{addr}
f67539c2
TL
186 ).safe_then([&server] {
187 return server.msgr->start({&server.dispatcher});
188 }, crimson::net::Messenger::bind_ertr::all_same_way([](auto& e) {
189 ceph_abort_msg("bind failed");
190 })).then([&dispatcher=server.dispatcher, count] {
9f95a23c
TL
191 return dispatcher.on_reply.wait([&dispatcher, count] {
192 return dispatcher.count >= count;
193 });
194 }).finally([&server] {
195 std::cout << "server shutting down" << std::endl;
20effc67 196 server.msgr->stop();
9f95a23c 197 return server.msgr->shutdown();
11fdf7f2 198 });
9f95a23c 199 });
11fdf7f2 200 } else {
9f95a23c
TL
201 return seastar::do_with(
202 seastar_pingpong::Client{crimson::net::Messenger::create(
203 entity_name_t::OSD(1), "client", addr.get_nonce())},
204 [addr, count](auto& client) {
205 std::cout << "client sending to " << addr << std::endl;
206 client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
207 client.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
208 &client.byte_throttler);
209 client.msgr->set_require_authorizer(false);
210 client.msgr->set_auth_client(&client.dummy_auth);
211 client.msgr->set_auth_server(&client.dummy_auth);
f67539c2 212 return client.msgr->start({&client.dispatcher}).then(
9f95a23c
TL
213 [addr, &client, &disp=client.dispatcher, count] {
214 auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD);
215 return seastar::do_until(
216 [&disp,count] { return disp.count >= count; },
217 [&disp,conn] {
20effc67 218 return conn->send(crimson::make_message<MPing>()).then([&] {
9f95a23c
TL
219 return disp.on_reply.wait();
220 });
221 }
222 );
223 }).finally([&client] {
224 std::cout << "client shutting down" << std::endl;
20effc67 225 client.msgr->stop();
9f95a23c 226 return client.msgr->shutdown();
11fdf7f2 227 });
9f95a23c 228 });
11fdf7f2
TL
229 }
230}
231
232int main(int argc, char** argv)
233{
234 namespace po = boost::program_options;
235 po::options_description desc{"Allowed options"};
236 desc.add_options()
237 ("help,h", "show help message")
238 ("role", po::value<std::string>()->default_value("pong"),
239 "role to play (ping | pong)")
240 ("port", po::value<uint16_t>()->default_value(9010),
241 "port #")
242 ("nonce", po::value<uint32_t>()->default_value(42),
243 "a unique number to identify the pong server")
244 ("count", po::value<unsigned>()->default_value(10),
20effc67 245 "stop after sending/echoing <count> MPing messages");
11fdf7f2
TL
246 po::variables_map vm;
247 std::vector<std::string> unrecognized_options;
248 try {
249 auto parsed = po::command_line_parser(argc, argv)
250 .options(desc)
251 .allow_unregistered()
252 .run();
253 po::store(parsed, vm);
254 if (vm.count("help")) {
255 std::cout << desc << std::endl;
256 return 0;
257 }
258 po::notify(vm);
259 unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
260 } catch(const po::error& e) {
261 std::cerr << "error: " << e.what() << std::endl;
262 return 1;
263 }
264
265 entity_addr_t addr;
20effc67 266 addr.set_type(entity_addr_t::TYPE_MSGR2);
11fdf7f2
TL
267 addr.set_family(AF_INET);
268 addr.set_port(vm["port"].as<std::uint16_t>());
269 addr.set_nonce(vm["nonce"].as<std::uint32_t>());
270
271 echo_role role = echo_role::as_server;
272 if (vm["role"].as<std::string>() == "ping") {
273 role = echo_role::as_client;
274 }
275
276 auto count = vm["count"].as<unsigned>();
277 seastar::app_template app;
278 SeastarContext sc;
279 auto job = sc.with_seastar([&] {
20effc67 280 auto fut = seastar::alien::submit_to(app.alien(), 0, [addr, role, count] {
11fdf7f2
TL
281 return seastar_echo(addr, role, count);
282 });
283 fut.wait();
284 });
285 std::vector<char*> av{argv[0]};
286 std::transform(begin(unrecognized_options),
287 end(unrecognized_options),
288 std::back_inserter(av),
289 [](auto& s) {
290 return const_cast<char*>(s.c_str());
291 });
292 sc.run(app, av.size(), av.data());
293 job.join();
294}
295
296/*
297 * Local Variables:
298 * compile-command: "make -j4 \
299 * -C ../../../build \
300 * unittest_seastar_echo"
301 * End:
302 */