]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | ||
3 | #include "auth/Auth.h" | |
4 | #include "messages/MPing.h" | |
f67539c2 | 5 | #include "common/ceph_argparse.h" |
9f95a23c | 6 | #include "crimson/auth/DummyAuth.h" |
f67539c2 | 7 | #include "crimson/common/throttle.h" |
11fdf7f2 TL |
8 | #include "crimson/net/Connection.h" |
9 | #include "crimson/net/Dispatcher.h" | |
10 | #include "crimson/net/Messenger.h" | |
11fdf7f2 TL |
11 | |
12 | #include <seastar/core/alien.hh> | |
13 | #include <seastar/core/app-template.hh> | |
14 | #include <seastar/core/future-util.hh> | |
f67539c2 TL |
15 | #include <seastar/core/internal/pollable_fd.hh> |
16 | #include <seastar/core/posix.hh> | |
11fdf7f2 TL |
17 | #include <seastar/core/reactor.hh> |
18 | ||
f67539c2 | 19 | using crimson::common::local_conf; |
11fdf7f2 TL |
20 | |
21 | enum class echo_role { | |
22 | as_server, | |
23 | as_client, | |
24 | }; | |
25 | ||
26 | namespace seastar_pingpong { | |
27 | struct DummyAuthAuthorizer : public AuthAuthorizer { | |
28 | DummyAuthAuthorizer() | |
29 | : AuthAuthorizer(CEPH_AUTH_CEPHX) | |
30 | {} | |
31 | bool verify_reply(bufferlist::const_iterator&, | |
32 | std::string *connection_secret) override { | |
33 | return true; | |
34 | } | |
35 | bool add_challenge(CephContext*, const bufferlist&) override { | |
36 | return true; | |
37 | } | |
38 | }; | |
39 | ||
40 | struct Server { | |
f67539c2 | 41 | crimson::common::Throttle byte_throttler; |
9f95a23c TL |
42 | crimson::net::MessengerRef msgr; |
43 | crimson::auth::DummyAuthClientServer dummy_auth; | |
f67539c2 | 44 | struct ServerDispatcher final : crimson::net::Dispatcher { |
11fdf7f2 TL |
45 | unsigned count = 0; |
46 | seastar::condition_variable on_reply; | |
f67539c2 TL |
47 | std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef c, |
48 | MessageRef m) final | |
49 | { | |
11fdf7f2 TL |
50 | std::cout << "server got ping " << *m << std::endl; |
51 | // reply with a pong | |
20effc67 | 52 | return c->send(crimson::make_message<MPing>()).then([this] { |
11fdf7f2 TL |
53 | ++count; |
54 | on_reply.signal(); | |
f67539c2 | 55 | return seastar::now(); |
11fdf7f2 TL |
56 | }); |
57 | } | |
11fdf7f2 | 58 | } dispatcher; |
9f95a23c | 59 | Server(crimson::net::MessengerRef msgr) |
f67539c2 | 60 | : byte_throttler(local_conf()->osd_client_message_size_cap), |
11fdf7f2 TL |
61 | msgr{msgr} |
62 | { | |
9f95a23c TL |
63 | msgr->set_crc_header(); |
64 | msgr->set_crc_data(); | |
11fdf7f2 TL |
65 | } |
66 | }; | |
67 | ||
68 | struct Client { | |
f67539c2 | 69 | crimson::common::Throttle byte_throttler; |
9f95a23c TL |
70 | crimson::net::MessengerRef msgr; |
71 | crimson::auth::DummyAuthClientServer dummy_auth; | |
f67539c2 | 72 | struct ClientDispatcher final : crimson::net::Dispatcher { |
11fdf7f2 TL |
73 | unsigned count = 0; |
74 | seastar::condition_variable on_reply; | |
f67539c2 TL |
75 | std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef c, |
76 | MessageRef m) final | |
77 | { | |
11fdf7f2 TL |
78 | std::cout << "client got pong " << *m << std::endl; |
79 | ++count; | |
80 | on_reply.signal(); | |
81 | return seastar::now(); | |
82 | } | |
83 | } dispatcher; | |
9f95a23c | 84 | Client(crimson::net::MessengerRef msgr) |
f67539c2 | 85 | : byte_throttler(local_conf()->osd_client_message_size_cap), |
11fdf7f2 TL |
86 | msgr{msgr} |
87 | { | |
9f95a23c TL |
88 | msgr->set_crc_header(); |
89 | msgr->set_crc_data(); | |
11fdf7f2 TL |
90 | } |
91 | }; | |
92 | } // namespace seastar_pingpong | |
93 | ||
94 | class SeastarContext { | |
f67539c2 TL |
95 | int begin_fd; |
96 | seastar::file_desc on_end; | |
11fdf7f2 TL |
97 | |
98 | public: | |
99 | SeastarContext() | |
f67539c2 TL |
100 | : begin_fd{eventfd(0, 0)}, |
101 | on_end{seastar::file_desc::eventfd(0, 0)} | |
11fdf7f2 TL |
102 | {} |
103 | ||
104 | template<class Func> | |
105 | std::thread with_seastar(Func&& func) { | |
f67539c2 TL |
106 | return std::thread{[this, on_end = on_end.get(), |
107 | func = std::forward<Func>(func)] { | |
11fdf7f2 TL |
108 | // alien: are you ready? |
109 | wait_for_seastar(); | |
110 | // alien: could you help me apply(func)? | |
111 | func(); | |
112 | // alien: i've sent my request. have you replied it? | |
113 | // wait_for_seastar(); | |
114 | // alien: you are free to go! | |
f67539c2 | 115 | ::eventfd_write(on_end, 1); |
11fdf7f2 TL |
116 | }}; |
117 | } | |
118 | ||
119 | void run(seastar::app_template& app, int argc, char** argv) { | |
120 | app.run(argc, argv, [this] { | |
f67539c2 TL |
121 | std::vector<const char*> args; |
122 | std::string cluster; | |
123 | std::string conf_file_list; | |
124 | auto init_params = ceph_argparse_early_args(args, | |
125 | CEPH_ENTITY_TYPE_CLIENT, | |
126 | &cluster, | |
127 | &conf_file_list); | |
128 | return crimson::common::sharded_conf().start(init_params.name, cluster) | |
129 | .then([conf_file_list] { | |
130 | return local_conf().parse_config_files(conf_file_list); | |
11fdf7f2 | 131 | }).then([this] { |
f67539c2 TL |
132 | return set_seastar_ready(); |
133 | }).then([on_end = std::move(on_end)] () mutable { | |
11fdf7f2 | 134 | // seastar: let me know once i am free to leave. |
f67539c2 TL |
135 | return seastar::do_with(seastar::pollable_fd(std::move(on_end)), [] |
136 | (seastar::pollable_fd& on_end_fds) { | |
137 | return on_end_fds.readable().then([&on_end_fds] { | |
138 | eventfd_t result = 0; | |
139 | on_end_fds.get_file_desc().read(&result, sizeof(result)); | |
140 | return seastar::make_ready_future<>(); | |
141 | }); | |
142 | }); | |
143 | }).then([]() { | |
144 | return crimson::common::sharded_conf().stop(); | |
11fdf7f2 TL |
145 | }).handle_exception([](auto ep) { |
146 | std::cerr << "Error: " << ep << std::endl; | |
147 | }).finally([] { | |
148 | seastar::engine().exit(0); | |
149 | }); | |
150 | }); | |
151 | } | |
152 | ||
153 | seastar::future<> set_seastar_ready() { | |
154 | // seastar: i am ready to serve! | |
f67539c2 | 155 | ::eventfd_write(begin_fd, 1); |
11fdf7f2 TL |
156 | return seastar::now(); |
157 | } | |
158 | ||
159 | private: | |
160 | void wait_for_seastar() { | |
161 | eventfd_t result = 0; | |
f67539c2 | 162 | if (int r = ::eventfd_read(begin_fd, &result); r < 0) { |
11fdf7f2 TL |
163 | std::cerr << "unable to eventfd_read():" << errno << std::endl; |
164 | } | |
165 | } | |
166 | }; | |
167 | ||
168 | static seastar::future<> | |
169 | seastar_echo(const entity_addr_t addr, echo_role role, unsigned count) | |
170 | { | |
171 | std::cout << "seastar/"; | |
172 | if (role == echo_role::as_server) { | |
9f95a23c TL |
173 | return seastar::do_with( |
174 | seastar_pingpong::Server{crimson::net::Messenger::create( | |
175 | entity_name_t::OSD(0), "server", addr.get_nonce())}, | |
176 | [addr, count](auto& server) mutable { | |
177 | std::cout << "server listening at " << addr << std::endl; | |
178 | // bind the server | |
179 | server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); | |
180 | server.msgr->set_policy_throttler(entity_name_t::TYPE_OSD, | |
181 | &server.byte_throttler); | |
182 | server.msgr->set_require_authorizer(false); | |
183 | server.msgr->set_auth_client(&server.dummy_auth); | |
184 | server.msgr->set_auth_server(&server.dummy_auth); | |
185 | return server.msgr->bind(entity_addrvec_t{addr} | |
f67539c2 TL |
186 | ).safe_then([&server] { |
187 | return server.msgr->start({&server.dispatcher}); | |
188 | }, crimson::net::Messenger::bind_ertr::all_same_way([](auto& e) { | |
189 | ceph_abort_msg("bind failed"); | |
190 | })).then([&dispatcher=server.dispatcher, count] { | |
9f95a23c TL |
191 | return dispatcher.on_reply.wait([&dispatcher, count] { |
192 | return dispatcher.count >= count; | |
193 | }); | |
194 | }).finally([&server] { | |
195 | std::cout << "server shutting down" << std::endl; | |
20effc67 | 196 | server.msgr->stop(); |
9f95a23c | 197 | return server.msgr->shutdown(); |
11fdf7f2 | 198 | }); |
9f95a23c | 199 | }); |
11fdf7f2 | 200 | } else { |
9f95a23c TL |
201 | return seastar::do_with( |
202 | seastar_pingpong::Client{crimson::net::Messenger::create( | |
203 | entity_name_t::OSD(1), "client", addr.get_nonce())}, | |
204 | [addr, count](auto& client) { | |
205 | std::cout << "client sending to " << addr << std::endl; | |
206 | client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); | |
207 | client.msgr->set_policy_throttler(entity_name_t::TYPE_OSD, | |
208 | &client.byte_throttler); | |
209 | client.msgr->set_require_authorizer(false); | |
210 | client.msgr->set_auth_client(&client.dummy_auth); | |
211 | client.msgr->set_auth_server(&client.dummy_auth); | |
f67539c2 | 212 | return client.msgr->start({&client.dispatcher}).then( |
9f95a23c TL |
213 | [addr, &client, &disp=client.dispatcher, count] { |
214 | auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD); | |
215 | return seastar::do_until( | |
216 | [&disp,count] { return disp.count >= count; }, | |
217 | [&disp,conn] { | |
20effc67 | 218 | return conn->send(crimson::make_message<MPing>()).then([&] { |
9f95a23c TL |
219 | return disp.on_reply.wait(); |
220 | }); | |
221 | } | |
222 | ); | |
223 | }).finally([&client] { | |
224 | std::cout << "client shutting down" << std::endl; | |
20effc67 | 225 | client.msgr->stop(); |
9f95a23c | 226 | return client.msgr->shutdown(); |
11fdf7f2 | 227 | }); |
9f95a23c | 228 | }); |
11fdf7f2 TL |
229 | } |
230 | } | |
231 | ||
232 | int main(int argc, char** argv) | |
233 | { | |
234 | namespace po = boost::program_options; | |
235 | po::options_description desc{"Allowed options"}; | |
236 | desc.add_options() | |
237 | ("help,h", "show help message") | |
238 | ("role", po::value<std::string>()->default_value("pong"), | |
239 | "role to play (ping | pong)") | |
240 | ("port", po::value<uint16_t>()->default_value(9010), | |
241 | "port #") | |
242 | ("nonce", po::value<uint32_t>()->default_value(42), | |
243 | "a unique number to identify the pong server") | |
244 | ("count", po::value<unsigned>()->default_value(10), | |
20effc67 | 245 | "stop after sending/echoing <count> MPing messages"); |
11fdf7f2 TL |
246 | po::variables_map vm; |
247 | std::vector<std::string> unrecognized_options; | |
248 | try { | |
249 | auto parsed = po::command_line_parser(argc, argv) | |
250 | .options(desc) | |
251 | .allow_unregistered() | |
252 | .run(); | |
253 | po::store(parsed, vm); | |
254 | if (vm.count("help")) { | |
255 | std::cout << desc << std::endl; | |
256 | return 0; | |
257 | } | |
258 | po::notify(vm); | |
259 | unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); | |
260 | } catch(const po::error& e) { | |
261 | std::cerr << "error: " << e.what() << std::endl; | |
262 | return 1; | |
263 | } | |
264 | ||
265 | entity_addr_t addr; | |
20effc67 | 266 | addr.set_type(entity_addr_t::TYPE_MSGR2); |
11fdf7f2 TL |
267 | addr.set_family(AF_INET); |
268 | addr.set_port(vm["port"].as<std::uint16_t>()); | |
269 | addr.set_nonce(vm["nonce"].as<std::uint32_t>()); | |
270 | ||
271 | echo_role role = echo_role::as_server; | |
272 | if (vm["role"].as<std::string>() == "ping") { | |
273 | role = echo_role::as_client; | |
274 | } | |
275 | ||
276 | auto count = vm["count"].as<unsigned>(); | |
277 | seastar::app_template app; | |
278 | SeastarContext sc; | |
279 | auto job = sc.with_seastar([&] { | |
20effc67 | 280 | auto fut = seastar::alien::submit_to(app.alien(), 0, [addr, role, count] { |
11fdf7f2 TL |
281 | return seastar_echo(addr, role, count); |
282 | }); | |
283 | fut.wait(); | |
284 | }); | |
285 | std::vector<char*> av{argv[0]}; | |
286 | std::transform(begin(unrecognized_options), | |
287 | end(unrecognized_options), | |
288 | std::back_inserter(av), | |
289 | [](auto& s) { | |
290 | return const_cast<char*>(s.c_str()); | |
291 | }); | |
292 | sc.run(app, av.size(), av.data()); | |
293 | job.join(); | |
294 | } | |
295 | ||
296 | /* | |
297 | * Local Variables: | |
298 | * compile-command: "make -j4 \ | |
299 | * -C ../../../build \ | |
300 | * unittest_seastar_echo" | |
301 | * End: | |
302 | */ |