]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | ||
3 | #include "auth/Auth.h" | |
4 | #include "messages/MPing.h" | |
f67539c2 | 5 | #include "common/ceph_argparse.h" |
9f95a23c | 6 | #include "crimson/auth/DummyAuth.h" |
f67539c2 | 7 | #include "crimson/common/throttle.h" |
11fdf7f2 TL |
8 | #include "crimson/net/Connection.h" |
9 | #include "crimson/net/Dispatcher.h" | |
10 | #include "crimson/net/Messenger.h" | |
11fdf7f2 TL |
11 | |
12 | #include <seastar/core/alien.hh> | |
13 | #include <seastar/core/app-template.hh> | |
14 | #include <seastar/core/future-util.hh> | |
f67539c2 TL |
15 | #include <seastar/core/internal/pollable_fd.hh> |
16 | #include <seastar/core/posix.hh> | |
11fdf7f2 TL |
17 | #include <seastar/core/reactor.hh> |
18 | ||
f67539c2 | 19 | using crimson::common::local_conf; |
11fdf7f2 TL |
20 | |
21 | enum class echo_role { | |
22 | as_server, | |
23 | as_client, | |
24 | }; | |
25 | ||
26 | namespace seastar_pingpong { | |
27 | struct DummyAuthAuthorizer : public AuthAuthorizer { | |
28 | DummyAuthAuthorizer() | |
29 | : AuthAuthorizer(CEPH_AUTH_CEPHX) | |
30 | {} | |
31 | bool verify_reply(bufferlist::const_iterator&, | |
32 | std::string *connection_secret) override { | |
33 | return true; | |
34 | } | |
35 | bool add_challenge(CephContext*, const bufferlist&) override { | |
36 | return true; | |
37 | } | |
38 | }; | |
39 | ||
40 | struct Server { | |
f67539c2 | 41 | crimson::common::Throttle byte_throttler; |
9f95a23c TL |
42 | crimson::net::MessengerRef msgr; |
43 | crimson::auth::DummyAuthClientServer dummy_auth; | |
f67539c2 | 44 | struct ServerDispatcher final : crimson::net::Dispatcher { |
11fdf7f2 TL |
45 | unsigned count = 0; |
46 | seastar::condition_variable on_reply; | |
f67539c2 TL |
47 | std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef c, |
48 | MessageRef m) final | |
49 | { | |
11fdf7f2 TL |
50 | std::cout << "server got ping " << *m << std::endl; |
51 | // reply with a pong | |
20effc67 | 52 | return c->send(crimson::make_message<MPing>()).then([this] { |
11fdf7f2 TL |
53 | ++count; |
54 | on_reply.signal(); | |
f67539c2 | 55 | return seastar::now(); |
11fdf7f2 TL |
56 | }); |
57 | } | |
11fdf7f2 | 58 | } dispatcher; |
9f95a23c | 59 | Server(crimson::net::MessengerRef msgr) |
f67539c2 | 60 | : byte_throttler(local_conf()->osd_client_message_size_cap), |
11fdf7f2 | 61 | msgr{msgr} |
1e59de90 | 62 | { } |
11fdf7f2 TL |
63 | }; |
64 | ||
65 | struct Client { | |
f67539c2 | 66 | crimson::common::Throttle byte_throttler; |
9f95a23c TL |
67 | crimson::net::MessengerRef msgr; |
68 | crimson::auth::DummyAuthClientServer dummy_auth; | |
f67539c2 | 69 | struct ClientDispatcher final : crimson::net::Dispatcher { |
11fdf7f2 TL |
70 | unsigned count = 0; |
71 | seastar::condition_variable on_reply; | |
f67539c2 TL |
72 | std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef c, |
73 | MessageRef m) final | |
74 | { | |
11fdf7f2 TL |
75 | std::cout << "client got pong " << *m << std::endl; |
76 | ++count; | |
77 | on_reply.signal(); | |
78 | return seastar::now(); | |
79 | } | |
80 | } dispatcher; | |
9f95a23c | 81 | Client(crimson::net::MessengerRef msgr) |
f67539c2 | 82 | : byte_throttler(local_conf()->osd_client_message_size_cap), |
11fdf7f2 | 83 | msgr{msgr} |
1e59de90 | 84 | { } |
11fdf7f2 TL |
85 | }; |
86 | } // namespace seastar_pingpong | |
87 | ||
88 | class SeastarContext { | |
f67539c2 TL |
89 | int begin_fd; |
90 | seastar::file_desc on_end; | |
11fdf7f2 TL |
91 | |
92 | public: | |
93 | SeastarContext() | |
f67539c2 TL |
94 | : begin_fd{eventfd(0, 0)}, |
95 | on_end{seastar::file_desc::eventfd(0, 0)} | |
11fdf7f2 TL |
96 | {} |
97 | ||
98 | template<class Func> | |
99 | std::thread with_seastar(Func&& func) { | |
f67539c2 TL |
100 | return std::thread{[this, on_end = on_end.get(), |
101 | func = std::forward<Func>(func)] { | |
11fdf7f2 TL |
102 | // alien: are you ready? |
103 | wait_for_seastar(); | |
104 | // alien: could you help me apply(func)? | |
105 | func(); | |
106 | // alien: i've sent my request. have you replied it? | |
107 | // wait_for_seastar(); | |
108 | // alien: you are free to go! | |
f67539c2 | 109 | ::eventfd_write(on_end, 1); |
11fdf7f2 TL |
110 | }}; |
111 | } | |
112 | ||
113 | void run(seastar::app_template& app, int argc, char** argv) { | |
114 | app.run(argc, argv, [this] { | |
f67539c2 TL |
115 | std::vector<const char*> args; |
116 | std::string cluster; | |
117 | std::string conf_file_list; | |
118 | auto init_params = ceph_argparse_early_args(args, | |
119 | CEPH_ENTITY_TYPE_CLIENT, | |
120 | &cluster, | |
121 | &conf_file_list); | |
122 | return crimson::common::sharded_conf().start(init_params.name, cluster) | |
123 | .then([conf_file_list] { | |
124 | return local_conf().parse_config_files(conf_file_list); | |
11fdf7f2 | 125 | }).then([this] { |
f67539c2 TL |
126 | return set_seastar_ready(); |
127 | }).then([on_end = std::move(on_end)] () mutable { | |
11fdf7f2 | 128 | // seastar: let me know once i am free to leave. |
f67539c2 TL |
129 | return seastar::do_with(seastar::pollable_fd(std::move(on_end)), [] |
130 | (seastar::pollable_fd& on_end_fds) { | |
131 | return on_end_fds.readable().then([&on_end_fds] { | |
132 | eventfd_t result = 0; | |
133 | on_end_fds.get_file_desc().read(&result, sizeof(result)); | |
134 | return seastar::make_ready_future<>(); | |
135 | }); | |
136 | }); | |
137 | }).then([]() { | |
138 | return crimson::common::sharded_conf().stop(); | |
11fdf7f2 TL |
139 | }).handle_exception([](auto ep) { |
140 | std::cerr << "Error: " << ep << std::endl; | |
141 | }).finally([] { | |
142 | seastar::engine().exit(0); | |
143 | }); | |
144 | }); | |
145 | } | |
146 | ||
147 | seastar::future<> set_seastar_ready() { | |
148 | // seastar: i am ready to serve! | |
f67539c2 | 149 | ::eventfd_write(begin_fd, 1); |
11fdf7f2 TL |
150 | return seastar::now(); |
151 | } | |
152 | ||
153 | private: | |
154 | void wait_for_seastar() { | |
155 | eventfd_t result = 0; | |
f67539c2 | 156 | if (int r = ::eventfd_read(begin_fd, &result); r < 0) { |
11fdf7f2 TL |
157 | std::cerr << "unable to eventfd_read():" << errno << std::endl; |
158 | } | |
159 | } | |
160 | }; | |
161 | ||
162 | static seastar::future<> | |
163 | seastar_echo(const entity_addr_t addr, echo_role role, unsigned count) | |
164 | { | |
165 | std::cout << "seastar/"; | |
166 | if (role == echo_role::as_server) { | |
9f95a23c TL |
167 | return seastar::do_with( |
168 | seastar_pingpong::Server{crimson::net::Messenger::create( | |
aee94f69 | 169 | entity_name_t::OSD(0), "server", addr.get_nonce(), true)}, |
9f95a23c TL |
170 | [addr, count](auto& server) mutable { |
171 | std::cout << "server listening at " << addr << std::endl; | |
172 | // bind the server | |
173 | server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); | |
174 | server.msgr->set_policy_throttler(entity_name_t::TYPE_OSD, | |
175 | &server.byte_throttler); | |
9f95a23c TL |
176 | server.msgr->set_auth_client(&server.dummy_auth); |
177 | server.msgr->set_auth_server(&server.dummy_auth); | |
178 | return server.msgr->bind(entity_addrvec_t{addr} | |
f67539c2 TL |
179 | ).safe_then([&server] { |
180 | return server.msgr->start({&server.dispatcher}); | |
181 | }, crimson::net::Messenger::bind_ertr::all_same_way([](auto& e) { | |
182 | ceph_abort_msg("bind failed"); | |
183 | })).then([&dispatcher=server.dispatcher, count] { | |
9f95a23c TL |
184 | return dispatcher.on_reply.wait([&dispatcher, count] { |
185 | return dispatcher.count >= count; | |
186 | }); | |
187 | }).finally([&server] { | |
188 | std::cout << "server shutting down" << std::endl; | |
20effc67 | 189 | server.msgr->stop(); |
9f95a23c | 190 | return server.msgr->shutdown(); |
11fdf7f2 | 191 | }); |
9f95a23c | 192 | }); |
11fdf7f2 | 193 | } else { |
9f95a23c TL |
194 | return seastar::do_with( |
195 | seastar_pingpong::Client{crimson::net::Messenger::create( | |
aee94f69 | 196 | entity_name_t::OSD(1), "client", addr.get_nonce(), true)}, |
9f95a23c TL |
197 | [addr, count](auto& client) { |
198 | std::cout << "client sending to " << addr << std::endl; | |
199 | client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); | |
200 | client.msgr->set_policy_throttler(entity_name_t::TYPE_OSD, | |
201 | &client.byte_throttler); | |
9f95a23c TL |
202 | client.msgr->set_auth_client(&client.dummy_auth); |
203 | client.msgr->set_auth_server(&client.dummy_auth); | |
f67539c2 | 204 | return client.msgr->start({&client.dispatcher}).then( |
9f95a23c TL |
205 | [addr, &client, &disp=client.dispatcher, count] { |
206 | auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD); | |
207 | return seastar::do_until( | |
208 | [&disp,count] { return disp.count >= count; }, | |
209 | [&disp,conn] { | |
20effc67 | 210 | return conn->send(crimson::make_message<MPing>()).then([&] { |
9f95a23c TL |
211 | return disp.on_reply.wait(); |
212 | }); | |
213 | } | |
214 | ); | |
215 | }).finally([&client] { | |
216 | std::cout << "client shutting down" << std::endl; | |
20effc67 | 217 | client.msgr->stop(); |
9f95a23c | 218 | return client.msgr->shutdown(); |
11fdf7f2 | 219 | }); |
9f95a23c | 220 | }); |
11fdf7f2 TL |
221 | } |
222 | } | |
223 | ||
224 | int main(int argc, char** argv) | |
225 | { | |
226 | namespace po = boost::program_options; | |
227 | po::options_description desc{"Allowed options"}; | |
228 | desc.add_options() | |
229 | ("help,h", "show help message") | |
230 | ("role", po::value<std::string>()->default_value("pong"), | |
231 | "role to play (ping | pong)") | |
232 | ("port", po::value<uint16_t>()->default_value(9010), | |
233 | "port #") | |
234 | ("nonce", po::value<uint32_t>()->default_value(42), | |
235 | "a unique number to identify the pong server") | |
236 | ("count", po::value<unsigned>()->default_value(10), | |
20effc67 | 237 | "stop after sending/echoing <count> MPing messages"); |
11fdf7f2 TL |
238 | po::variables_map vm; |
239 | std::vector<std::string> unrecognized_options; | |
240 | try { | |
241 | auto parsed = po::command_line_parser(argc, argv) | |
242 | .options(desc) | |
243 | .allow_unregistered() | |
244 | .run(); | |
245 | po::store(parsed, vm); | |
246 | if (vm.count("help")) { | |
247 | std::cout << desc << std::endl; | |
248 | return 0; | |
249 | } | |
250 | po::notify(vm); | |
251 | unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); | |
252 | } catch(const po::error& e) { | |
253 | std::cerr << "error: " << e.what() << std::endl; | |
254 | return 1; | |
255 | } | |
256 | ||
257 | entity_addr_t addr; | |
20effc67 | 258 | addr.set_type(entity_addr_t::TYPE_MSGR2); |
11fdf7f2 TL |
259 | addr.set_family(AF_INET); |
260 | addr.set_port(vm["port"].as<std::uint16_t>()); | |
261 | addr.set_nonce(vm["nonce"].as<std::uint32_t>()); | |
262 | ||
263 | echo_role role = echo_role::as_server; | |
264 | if (vm["role"].as<std::string>() == "ping") { | |
265 | role = echo_role::as_client; | |
266 | } | |
267 | ||
268 | auto count = vm["count"].as<unsigned>(); | |
269 | seastar::app_template app; | |
270 | SeastarContext sc; | |
271 | auto job = sc.with_seastar([&] { | |
20effc67 | 272 | auto fut = seastar::alien::submit_to(app.alien(), 0, [addr, role, count] { |
11fdf7f2 TL |
273 | return seastar_echo(addr, role, count); |
274 | }); | |
275 | fut.wait(); | |
276 | }); | |
277 | std::vector<char*> av{argv[0]}; | |
278 | std::transform(begin(unrecognized_options), | |
279 | end(unrecognized_options), | |
280 | std::back_inserter(av), | |
281 | [](auto& s) { | |
282 | return const_cast<char*>(s.c_str()); | |
283 | }); | |
284 | sc.run(app, av.size(), av.data()); | |
285 | job.join(); | |
286 | } | |
287 | ||
288 | /* | |
289 | * Local Variables: | |
290 | * compile-command: "make -j4 \ | |
291 | * -C ../../../build \ | |
292 | * unittest_seastar_echo" | |
293 | * End: | |
294 | */ |