]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/crimson/test_alien_echo.cc
e75d3d03c59697c01add391f81bf5a68302be26f
[ceph.git] / ceph / src / test / crimson / test_alien_echo.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3 #include "auth/Auth.h"
4 #include "messages/MPing.h"
5 #include "common/ceph_argparse.h"
6 #include "crimson/auth/DummyAuth.h"
7 #include "crimson/common/throttle.h"
8 #include "crimson/net/Connection.h"
9 #include "crimson/net/Dispatcher.h"
10 #include "crimson/net/Messenger.h"
11
12 #include <seastar/core/alien.hh>
13 #include <seastar/core/app-template.hh>
14 #include <seastar/core/future-util.hh>
15 #include <seastar/core/internal/pollable_fd.hh>
16 #include <seastar/core/posix.hh>
17 #include <seastar/core/reactor.hh>
18
19 using crimson::common::local_conf;
20
21 enum class echo_role {
22 as_server,
23 as_client,
24 };
25
26 namespace seastar_pingpong {
27 struct DummyAuthAuthorizer : public AuthAuthorizer {
28 DummyAuthAuthorizer()
29 : AuthAuthorizer(CEPH_AUTH_CEPHX)
30 {}
31 bool verify_reply(bufferlist::const_iterator&,
32 std::string *connection_secret) override {
33 return true;
34 }
35 bool add_challenge(CephContext*, const bufferlist&) override {
36 return true;
37 }
38 };
39
40 struct Server {
41 crimson::common::Throttle byte_throttler;
42 crimson::net::MessengerRef msgr;
43 crimson::auth::DummyAuthClientServer dummy_auth;
44 struct ServerDispatcher final : crimson::net::Dispatcher {
45 unsigned count = 0;
46 seastar::condition_variable on_reply;
47 std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef c,
48 MessageRef m) final
49 {
50 std::cout << "server got ping " << *m << std::endl;
51 // reply with a pong
52 return c->send(crimson::make_message<MPing>()).then([this] {
53 ++count;
54 on_reply.signal();
55 return seastar::now();
56 });
57 }
58 } dispatcher;
59 Server(crimson::net::MessengerRef msgr)
60 : byte_throttler(local_conf()->osd_client_message_size_cap),
61 msgr{msgr}
62 { }
63 };
64
65 struct Client {
66 crimson::common::Throttle byte_throttler;
67 crimson::net::MessengerRef msgr;
68 crimson::auth::DummyAuthClientServer dummy_auth;
69 struct ClientDispatcher final : crimson::net::Dispatcher {
70 unsigned count = 0;
71 seastar::condition_variable on_reply;
72 std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef c,
73 MessageRef m) final
74 {
75 std::cout << "client got pong " << *m << std::endl;
76 ++count;
77 on_reply.signal();
78 return seastar::now();
79 }
80 } dispatcher;
81 Client(crimson::net::MessengerRef msgr)
82 : byte_throttler(local_conf()->osd_client_message_size_cap),
83 msgr{msgr}
84 { }
85 };
86 } // namespace seastar_pingpong
87
88 class SeastarContext {
89 int begin_fd;
90 seastar::file_desc on_end;
91
92 public:
93 SeastarContext()
94 : begin_fd{eventfd(0, 0)},
95 on_end{seastar::file_desc::eventfd(0, 0)}
96 {}
97
98 template<class Func>
99 std::thread with_seastar(Func&& func) {
100 return std::thread{[this, on_end = on_end.get(),
101 func = std::forward<Func>(func)] {
102 // alien: are you ready?
103 wait_for_seastar();
104 // alien: could you help me apply(func)?
105 func();
106 // alien: i've sent my request. have you replied it?
107 // wait_for_seastar();
108 // alien: you are free to go!
109 ::eventfd_write(on_end, 1);
110 }};
111 }
112
113 void run(seastar::app_template& app, int argc, char** argv) {
114 app.run(argc, argv, [this] {
115 std::vector<const char*> args;
116 std::string cluster;
117 std::string conf_file_list;
118 auto init_params = ceph_argparse_early_args(args,
119 CEPH_ENTITY_TYPE_CLIENT,
120 &cluster,
121 &conf_file_list);
122 return crimson::common::sharded_conf().start(init_params.name, cluster)
123 .then([conf_file_list] {
124 return local_conf().parse_config_files(conf_file_list);
125 }).then([this] {
126 return set_seastar_ready();
127 }).then([on_end = std::move(on_end)] () mutable {
128 // seastar: let me know once i am free to leave.
129 return seastar::do_with(seastar::pollable_fd(std::move(on_end)), []
130 (seastar::pollable_fd& on_end_fds) {
131 return on_end_fds.readable().then([&on_end_fds] {
132 eventfd_t result = 0;
133 on_end_fds.get_file_desc().read(&result, sizeof(result));
134 return seastar::make_ready_future<>();
135 });
136 });
137 }).then([]() {
138 return crimson::common::sharded_conf().stop();
139 }).handle_exception([](auto ep) {
140 std::cerr << "Error: " << ep << std::endl;
141 }).finally([] {
142 seastar::engine().exit(0);
143 });
144 });
145 }
146
147 seastar::future<> set_seastar_ready() {
148 // seastar: i am ready to serve!
149 ::eventfd_write(begin_fd, 1);
150 return seastar::now();
151 }
152
153 private:
154 void wait_for_seastar() {
155 eventfd_t result = 0;
156 if (int r = ::eventfd_read(begin_fd, &result); r < 0) {
157 std::cerr << "unable to eventfd_read():" << errno << std::endl;
158 }
159 }
160 };
161
162 static seastar::future<>
163 seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
164 {
165 std::cout << "seastar/";
166 if (role == echo_role::as_server) {
167 return seastar::do_with(
168 seastar_pingpong::Server{crimson::net::Messenger::create(
169 entity_name_t::OSD(0), "server", addr.get_nonce())},
170 [addr, count](auto& server) mutable {
171 std::cout << "server listening at " << addr << std::endl;
172 // bind the server
173 server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
174 server.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
175 &server.byte_throttler);
176 server.msgr->set_auth_client(&server.dummy_auth);
177 server.msgr->set_auth_server(&server.dummy_auth);
178 return server.msgr->bind(entity_addrvec_t{addr}
179 ).safe_then([&server] {
180 return server.msgr->start({&server.dispatcher});
181 }, crimson::net::Messenger::bind_ertr::all_same_way([](auto& e) {
182 ceph_abort_msg("bind failed");
183 })).then([&dispatcher=server.dispatcher, count] {
184 return dispatcher.on_reply.wait([&dispatcher, count] {
185 return dispatcher.count >= count;
186 });
187 }).finally([&server] {
188 std::cout << "server shutting down" << std::endl;
189 server.msgr->stop();
190 return server.msgr->shutdown();
191 });
192 });
193 } else {
194 return seastar::do_with(
195 seastar_pingpong::Client{crimson::net::Messenger::create(
196 entity_name_t::OSD(1), "client", addr.get_nonce())},
197 [addr, count](auto& client) {
198 std::cout << "client sending to " << addr << std::endl;
199 client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
200 client.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
201 &client.byte_throttler);
202 client.msgr->set_auth_client(&client.dummy_auth);
203 client.msgr->set_auth_server(&client.dummy_auth);
204 return client.msgr->start({&client.dispatcher}).then(
205 [addr, &client, &disp=client.dispatcher, count] {
206 auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD);
207 return seastar::do_until(
208 [&disp,count] { return disp.count >= count; },
209 [&disp,conn] {
210 return conn->send(crimson::make_message<MPing>()).then([&] {
211 return disp.on_reply.wait();
212 });
213 }
214 );
215 }).finally([&client] {
216 std::cout << "client shutting down" << std::endl;
217 client.msgr->stop();
218 return client.msgr->shutdown();
219 });
220 });
221 }
222 }
223
224 int main(int argc, char** argv)
225 {
226 namespace po = boost::program_options;
227 po::options_description desc{"Allowed options"};
228 desc.add_options()
229 ("help,h", "show help message")
230 ("role", po::value<std::string>()->default_value("pong"),
231 "role to play (ping | pong)")
232 ("port", po::value<uint16_t>()->default_value(9010),
233 "port #")
234 ("nonce", po::value<uint32_t>()->default_value(42),
235 "a unique number to identify the pong server")
236 ("count", po::value<unsigned>()->default_value(10),
237 "stop after sending/echoing <count> MPing messages");
238 po::variables_map vm;
239 std::vector<std::string> unrecognized_options;
240 try {
241 auto parsed = po::command_line_parser(argc, argv)
242 .options(desc)
243 .allow_unregistered()
244 .run();
245 po::store(parsed, vm);
246 if (vm.count("help")) {
247 std::cout << desc << std::endl;
248 return 0;
249 }
250 po::notify(vm);
251 unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
252 } catch(const po::error& e) {
253 std::cerr << "error: " << e.what() << std::endl;
254 return 1;
255 }
256
257 entity_addr_t addr;
258 addr.set_type(entity_addr_t::TYPE_MSGR2);
259 addr.set_family(AF_INET);
260 addr.set_port(vm["port"].as<std::uint16_t>());
261 addr.set_nonce(vm["nonce"].as<std::uint32_t>());
262
263 echo_role role = echo_role::as_server;
264 if (vm["role"].as<std::string>() == "ping") {
265 role = echo_role::as_client;
266 }
267
268 auto count = vm["count"].as<unsigned>();
269 seastar::app_template app;
270 SeastarContext sc;
271 auto job = sc.with_seastar([&] {
272 auto fut = seastar::alien::submit_to(app.alien(), 0, [addr, role, count] {
273 return seastar_echo(addr, role, count);
274 });
275 fut.wait();
276 });
277 std::vector<char*> av{argv[0]};
278 std::transform(begin(unrecognized_options),
279 end(unrecognized_options),
280 std::back_inserter(av),
281 [](auto& s) {
282 return const_cast<char*>(s.c_str());
283 });
284 sc.run(app, av.size(), av.data());
285 job.join();
286 }
287
288 /*
289 * Local Variables:
290 * compile-command: "make -j4 \
291 * -C ../../../build \
292 * unittest_seastar_echo"
293 * End:
294 */