]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/crimson/test_alien_echo.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / crimson / test_alien_echo.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3#include "auth/Auth.h"
4#include "messages/MPing.h"
f67539c2 5#include "common/ceph_argparse.h"
9f95a23c 6#include "crimson/auth/DummyAuth.h"
f67539c2 7#include "crimson/common/throttle.h"
11fdf7f2
TL
8#include "crimson/net/Connection.h"
9#include "crimson/net/Dispatcher.h"
10#include "crimson/net/Messenger.h"
11fdf7f2
TL
11
12#include <seastar/core/alien.hh>
13#include <seastar/core/app-template.hh>
14#include <seastar/core/future-util.hh>
f67539c2
TL
15#include <seastar/core/internal/pollable_fd.hh>
16#include <seastar/core/posix.hh>
11fdf7f2
TL
17#include <seastar/core/reactor.hh>
18
f67539c2 19using crimson::common::local_conf;
11fdf7f2
TL
20
21enum class echo_role {
22 as_server,
23 as_client,
24};
25
26namespace seastar_pingpong {
27struct DummyAuthAuthorizer : public AuthAuthorizer {
28 DummyAuthAuthorizer()
29 : AuthAuthorizer(CEPH_AUTH_CEPHX)
30 {}
31 bool verify_reply(bufferlist::const_iterator&,
32 std::string *connection_secret) override {
33 return true;
34 }
35 bool add_challenge(CephContext*, const bufferlist&) override {
36 return true;
37 }
38};
39
40struct Server {
f67539c2 41 crimson::common::Throttle byte_throttler;
9f95a23c
TL
42 crimson::net::MessengerRef msgr;
43 crimson::auth::DummyAuthClientServer dummy_auth;
f67539c2 44 struct ServerDispatcher final : crimson::net::Dispatcher {
11fdf7f2
TL
45 unsigned count = 0;
46 seastar::condition_variable on_reply;
f67539c2
TL
47 std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef c,
48 MessageRef m) final
49 {
11fdf7f2
TL
50 std::cout << "server got ping " << *m << std::endl;
51 // reply with a pong
20effc67 52 return c->send(crimson::make_message<MPing>()).then([this] {
11fdf7f2
TL
53 ++count;
54 on_reply.signal();
f67539c2 55 return seastar::now();
11fdf7f2
TL
56 });
57 }
11fdf7f2 58 } dispatcher;
9f95a23c 59 Server(crimson::net::MessengerRef msgr)
f67539c2 60 : byte_throttler(local_conf()->osd_client_message_size_cap),
11fdf7f2 61 msgr{msgr}
1e59de90 62 { }
11fdf7f2
TL
63};
64
65struct Client {
f67539c2 66 crimson::common::Throttle byte_throttler;
9f95a23c
TL
67 crimson::net::MessengerRef msgr;
68 crimson::auth::DummyAuthClientServer dummy_auth;
f67539c2 69 struct ClientDispatcher final : crimson::net::Dispatcher {
11fdf7f2
TL
70 unsigned count = 0;
71 seastar::condition_variable on_reply;
f67539c2
TL
72 std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef c,
73 MessageRef m) final
74 {
11fdf7f2
TL
75 std::cout << "client got pong " << *m << std::endl;
76 ++count;
77 on_reply.signal();
78 return seastar::now();
79 }
80 } dispatcher;
9f95a23c 81 Client(crimson::net::MessengerRef msgr)
f67539c2 82 : byte_throttler(local_conf()->osd_client_message_size_cap),
11fdf7f2 83 msgr{msgr}
1e59de90 84 { }
11fdf7f2
TL
85};
86} // namespace seastar_pingpong
87
88class SeastarContext {
f67539c2
TL
89 int begin_fd;
90 seastar::file_desc on_end;
11fdf7f2
TL
91
92public:
93 SeastarContext()
f67539c2
TL
94 : begin_fd{eventfd(0, 0)},
95 on_end{seastar::file_desc::eventfd(0, 0)}
11fdf7f2
TL
96 {}
97
98 template<class Func>
99 std::thread with_seastar(Func&& func) {
f67539c2
TL
100 return std::thread{[this, on_end = on_end.get(),
101 func = std::forward<Func>(func)] {
11fdf7f2
TL
102 // alien: are you ready?
103 wait_for_seastar();
104 // alien: could you help me apply(func)?
105 func();
106 // alien: i've sent my request. have you replied it?
107 // wait_for_seastar();
108 // alien: you are free to go!
f67539c2 109 ::eventfd_write(on_end, 1);
11fdf7f2
TL
110 }};
111 }
112
113 void run(seastar::app_template& app, int argc, char** argv) {
114 app.run(argc, argv, [this] {
f67539c2
TL
115 std::vector<const char*> args;
116 std::string cluster;
117 std::string conf_file_list;
118 auto init_params = ceph_argparse_early_args(args,
119 CEPH_ENTITY_TYPE_CLIENT,
120 &cluster,
121 &conf_file_list);
122 return crimson::common::sharded_conf().start(init_params.name, cluster)
123 .then([conf_file_list] {
124 return local_conf().parse_config_files(conf_file_list);
11fdf7f2 125 }).then([this] {
f67539c2
TL
126 return set_seastar_ready();
127 }).then([on_end = std::move(on_end)] () mutable {
11fdf7f2 128 // seastar: let me know once i am free to leave.
f67539c2
TL
129 return seastar::do_with(seastar::pollable_fd(std::move(on_end)), []
130 (seastar::pollable_fd& on_end_fds) {
131 return on_end_fds.readable().then([&on_end_fds] {
132 eventfd_t result = 0;
133 on_end_fds.get_file_desc().read(&result, sizeof(result));
134 return seastar::make_ready_future<>();
135 });
136 });
137 }).then([]() {
138 return crimson::common::sharded_conf().stop();
11fdf7f2
TL
139 }).handle_exception([](auto ep) {
140 std::cerr << "Error: " << ep << std::endl;
141 }).finally([] {
142 seastar::engine().exit(0);
143 });
144 });
145 }
146
147 seastar::future<> set_seastar_ready() {
148 // seastar: i am ready to serve!
f67539c2 149 ::eventfd_write(begin_fd, 1);
11fdf7f2
TL
150 return seastar::now();
151 }
152
153private:
154 void wait_for_seastar() {
155 eventfd_t result = 0;
f67539c2 156 if (int r = ::eventfd_read(begin_fd, &result); r < 0) {
11fdf7f2
TL
157 std::cerr << "unable to eventfd_read():" << errno << std::endl;
158 }
159 }
160};
161
162static seastar::future<>
163seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
164{
165 std::cout << "seastar/";
166 if (role == echo_role::as_server) {
9f95a23c
TL
167 return seastar::do_with(
168 seastar_pingpong::Server{crimson::net::Messenger::create(
aee94f69 169 entity_name_t::OSD(0), "server", addr.get_nonce(), true)},
9f95a23c
TL
170 [addr, count](auto& server) mutable {
171 std::cout << "server listening at " << addr << std::endl;
172 // bind the server
173 server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
174 server.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
175 &server.byte_throttler);
9f95a23c
TL
176 server.msgr->set_auth_client(&server.dummy_auth);
177 server.msgr->set_auth_server(&server.dummy_auth);
178 return server.msgr->bind(entity_addrvec_t{addr}
f67539c2
TL
179 ).safe_then([&server] {
180 return server.msgr->start({&server.dispatcher});
181 }, crimson::net::Messenger::bind_ertr::all_same_way([](auto& e) {
182 ceph_abort_msg("bind failed");
183 })).then([&dispatcher=server.dispatcher, count] {
9f95a23c
TL
184 return dispatcher.on_reply.wait([&dispatcher, count] {
185 return dispatcher.count >= count;
186 });
187 }).finally([&server] {
188 std::cout << "server shutting down" << std::endl;
20effc67 189 server.msgr->stop();
9f95a23c 190 return server.msgr->shutdown();
11fdf7f2 191 });
9f95a23c 192 });
11fdf7f2 193 } else {
9f95a23c
TL
194 return seastar::do_with(
195 seastar_pingpong::Client{crimson::net::Messenger::create(
aee94f69 196 entity_name_t::OSD(1), "client", addr.get_nonce(), true)},
9f95a23c
TL
197 [addr, count](auto& client) {
198 std::cout << "client sending to " << addr << std::endl;
199 client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
200 client.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
201 &client.byte_throttler);
9f95a23c
TL
202 client.msgr->set_auth_client(&client.dummy_auth);
203 client.msgr->set_auth_server(&client.dummy_auth);
f67539c2 204 return client.msgr->start({&client.dispatcher}).then(
9f95a23c
TL
205 [addr, &client, &disp=client.dispatcher, count] {
206 auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD);
207 return seastar::do_until(
208 [&disp,count] { return disp.count >= count; },
209 [&disp,conn] {
20effc67 210 return conn->send(crimson::make_message<MPing>()).then([&] {
9f95a23c
TL
211 return disp.on_reply.wait();
212 });
213 }
214 );
215 }).finally([&client] {
216 std::cout << "client shutting down" << std::endl;
20effc67 217 client.msgr->stop();
9f95a23c 218 return client.msgr->shutdown();
11fdf7f2 219 });
9f95a23c 220 });
11fdf7f2
TL
221 }
222}
223
224int main(int argc, char** argv)
225{
226 namespace po = boost::program_options;
227 po::options_description desc{"Allowed options"};
228 desc.add_options()
229 ("help,h", "show help message")
230 ("role", po::value<std::string>()->default_value("pong"),
231 "role to play (ping | pong)")
232 ("port", po::value<uint16_t>()->default_value(9010),
233 "port #")
234 ("nonce", po::value<uint32_t>()->default_value(42),
235 "a unique number to identify the pong server")
236 ("count", po::value<unsigned>()->default_value(10),
20effc67 237 "stop after sending/echoing <count> MPing messages");
11fdf7f2
TL
238 po::variables_map vm;
239 std::vector<std::string> unrecognized_options;
240 try {
241 auto parsed = po::command_line_parser(argc, argv)
242 .options(desc)
243 .allow_unregistered()
244 .run();
245 po::store(parsed, vm);
246 if (vm.count("help")) {
247 std::cout << desc << std::endl;
248 return 0;
249 }
250 po::notify(vm);
251 unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
252 } catch(const po::error& e) {
253 std::cerr << "error: " << e.what() << std::endl;
254 return 1;
255 }
256
257 entity_addr_t addr;
20effc67 258 addr.set_type(entity_addr_t::TYPE_MSGR2);
11fdf7f2
TL
259 addr.set_family(AF_INET);
260 addr.set_port(vm["port"].as<std::uint16_t>());
261 addr.set_nonce(vm["nonce"].as<std::uint32_t>());
262
263 echo_role role = echo_role::as_server;
264 if (vm["role"].as<std::string>() == "ping") {
265 role = echo_role::as_client;
266 }
267
268 auto count = vm["count"].as<unsigned>();
269 seastar::app_template app;
270 SeastarContext sc;
271 auto job = sc.with_seastar([&] {
20effc67 272 auto fut = seastar::alien::submit_to(app.alien(), 0, [addr, role, count] {
11fdf7f2
TL
273 return seastar_echo(addr, role, count);
274 });
275 fut.wait();
276 });
277 std::vector<char*> av{argv[0]};
278 std::transform(begin(unrecognized_options),
279 end(unrecognized_options),
280 std::back_inserter(av),
281 [](auto& s) {
282 return const_cast<char*>(s.c_str());
283 });
284 sc.run(app, av.size(), av.data());
285 job.join();
286}
287
288/*
289 * Local Variables:
290 * compile-command: "make -j4 \
291 * -C ../../../build \
292 * unittest_seastar_echo"
293 * End:
294 */