]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | ||
3 | #include "auth/Auth.h" | |
4 | #include "messages/MPing.h" | |
9f95a23c | 5 | #include "crimson/auth/DummyAuth.h" |
11fdf7f2 TL |
6 | #include "crimson/net/Connection.h" |
7 | #include "crimson/net/Dispatcher.h" | |
8 | #include "crimson/net/Messenger.h" | |
9 | #include "crimson/net/Config.h" | |
11fdf7f2 TL |
10 | #include "crimson/thread/Throttle.h" |
11 | ||
12 | #include <seastar/core/alien.hh> | |
13 | #include <seastar/core/app-template.hh> | |
14 | #include <seastar/core/future-util.hh> | |
15 | #include <seastar/core/reactor.hh> | |
16 | ||
17 | ||
18 | enum class echo_role { | |
19 | as_server, | |
20 | as_client, | |
21 | }; | |
22 | ||
23 | namespace seastar_pingpong { | |
24 | struct DummyAuthAuthorizer : public AuthAuthorizer { | |
25 | DummyAuthAuthorizer() | |
26 | : AuthAuthorizer(CEPH_AUTH_CEPHX) | |
27 | {} | |
28 | bool verify_reply(bufferlist::const_iterator&, | |
29 | std::string *connection_secret) override { | |
30 | return true; | |
31 | } | |
32 | bool add_challenge(CephContext*, const bufferlist&) override { | |
33 | return true; | |
34 | } | |
35 | }; | |
36 | ||
37 | struct Server { | |
9f95a23c TL |
38 | crimson::thread::Throttle byte_throttler; |
39 | crimson::net::MessengerRef msgr; | |
40 | crimson::auth::DummyAuthClientServer dummy_auth; | |
41 | struct ServerDispatcher : crimson::net::Dispatcher { | |
11fdf7f2 TL |
42 | unsigned count = 0; |
43 | seastar::condition_variable on_reply; | |
9f95a23c | 44 | seastar::future<> ms_dispatch(crimson::net::Connection* c, |
11fdf7f2 TL |
45 | MessageRef m) override { |
46 | std::cout << "server got ping " << *m << std::endl; | |
47 | // reply with a pong | |
9f95a23c | 48 | return c->send(make_message<MPing>()).then([this] { |
11fdf7f2 TL |
49 | ++count; |
50 | on_reply.signal(); | |
51 | }); | |
52 | } | |
9f95a23c TL |
53 | seastar::future<crimson::net::msgr_tag_t, bufferlist> |
54 | ms_verify_authorizer(entity_type_t peer_type, | |
11fdf7f2 TL |
55 | auth_proto_t protocol, |
56 | bufferlist& auth) override { | |
9f95a23c | 57 | return seastar::make_ready_future<crimson::net::msgr_tag_t, bufferlist>( |
11fdf7f2 TL |
58 | 0, bufferlist{}); |
59 | } | |
11fdf7f2 | 60 | } dispatcher; |
9f95a23c TL |
61 | Server(crimson::net::MessengerRef msgr) |
62 | : byte_throttler(crimson::net::conf.osd_client_message_size_cap), | |
11fdf7f2 TL |
63 | msgr{msgr} |
64 | { | |
9f95a23c TL |
65 | msgr->set_crc_header(); |
66 | msgr->set_crc_data(); | |
11fdf7f2 TL |
67 | } |
68 | }; | |
69 | ||
70 | struct Client { | |
9f95a23c TL |
71 | crimson::thread::Throttle byte_throttler; |
72 | crimson::net::MessengerRef msgr; | |
73 | crimson::auth::DummyAuthClientServer dummy_auth; | |
74 | struct ClientDispatcher : crimson::net::Dispatcher { | |
11fdf7f2 TL |
75 | unsigned count = 0; |
76 | seastar::condition_variable on_reply; | |
9f95a23c | 77 | seastar::future<> ms_dispatch(crimson::net::Connection* c, |
11fdf7f2 TL |
78 | MessageRef m) override { |
79 | std::cout << "client got pong " << *m << std::endl; | |
80 | ++count; | |
81 | on_reply.signal(); | |
82 | return seastar::now(); | |
83 | } | |
84 | } dispatcher; | |
9f95a23c TL |
85 | Client(crimson::net::MessengerRef msgr) |
86 | : byte_throttler(crimson::net::conf.osd_client_message_size_cap), | |
11fdf7f2 TL |
87 | msgr{msgr} |
88 | { | |
9f95a23c TL |
89 | msgr->set_crc_header(); |
90 | msgr->set_crc_data(); | |
11fdf7f2 TL |
91 | } |
92 | }; | |
93 | } // namespace seastar_pingpong | |
94 | ||
95 | class SeastarContext { | |
96 | seastar::file_desc begin_fd; | |
9f95a23c | 97 | seastar::readable_eventfd on_end; |
11fdf7f2 TL |
98 | |
99 | public: | |
100 | SeastarContext() | |
101 | : begin_fd{seastar::file_desc::eventfd(0, 0)} | |
102 | {} | |
103 | ||
104 | template<class Func> | |
105 | std::thread with_seastar(Func&& func) { | |
106 | return std::thread{[this, func = std::forward<Func>(func)] { | |
107 | // alien: are you ready? | |
108 | wait_for_seastar(); | |
109 | // alien: could you help me apply(func)? | |
110 | func(); | |
111 | // alien: i've sent my request. have you replied it? | |
112 | // wait_for_seastar(); | |
113 | // alien: you are free to go! | |
9f95a23c | 114 | on_end.write_side().signal(1); |
11fdf7f2 TL |
115 | }}; |
116 | } | |
117 | ||
118 | void run(seastar::app_template& app, int argc, char** argv) { | |
119 | app.run(argc, argv, [this] { | |
120 | return seastar::now().then([this] { | |
121 | return set_seastar_ready(); | |
122 | }).then([this] { | |
123 | // seastar: let me know once i am free to leave. | |
9f95a23c | 124 | return on_end.wait().then([](size_t){}); |
11fdf7f2 TL |
125 | }).handle_exception([](auto ep) { |
126 | std::cerr << "Error: " << ep << std::endl; | |
127 | }).finally([] { | |
128 | seastar::engine().exit(0); | |
129 | }); | |
130 | }); | |
131 | } | |
132 | ||
133 | seastar::future<> set_seastar_ready() { | |
134 | // seastar: i am ready to serve! | |
135 | ::eventfd_write(begin_fd.get(), 1); | |
136 | return seastar::now(); | |
137 | } | |
138 | ||
139 | private: | |
140 | void wait_for_seastar() { | |
141 | eventfd_t result = 0; | |
142 | if (int r = ::eventfd_read(begin_fd.get(), &result); r < 0) { | |
143 | std::cerr << "unable to eventfd_read():" << errno << std::endl; | |
144 | } | |
145 | } | |
146 | }; | |
147 | ||
148 | static seastar::future<> | |
149 | seastar_echo(const entity_addr_t addr, echo_role role, unsigned count) | |
150 | { | |
151 | std::cout << "seastar/"; | |
152 | if (role == echo_role::as_server) { | |
9f95a23c TL |
153 | return seastar::do_with( |
154 | seastar_pingpong::Server{crimson::net::Messenger::create( | |
155 | entity_name_t::OSD(0), "server", addr.get_nonce())}, | |
156 | [addr, count](auto& server) mutable { | |
157 | std::cout << "server listening at " << addr << std::endl; | |
158 | // bind the server | |
159 | server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); | |
160 | server.msgr->set_policy_throttler(entity_name_t::TYPE_OSD, | |
161 | &server.byte_throttler); | |
162 | server.msgr->set_require_authorizer(false); | |
163 | server.msgr->set_auth_client(&server.dummy_auth); | |
164 | server.msgr->set_auth_server(&server.dummy_auth); | |
165 | return server.msgr->bind(entity_addrvec_t{addr} | |
166 | ).then([&server] { | |
167 | return server.msgr->start(&server.dispatcher); | |
168 | }).then([&dispatcher=server.dispatcher, count] { | |
169 | return dispatcher.on_reply.wait([&dispatcher, count] { | |
170 | return dispatcher.count >= count; | |
171 | }); | |
172 | }).finally([&server] { | |
173 | std::cout << "server shutting down" << std::endl; | |
174 | return server.msgr->shutdown(); | |
11fdf7f2 | 175 | }); |
9f95a23c | 176 | }); |
11fdf7f2 | 177 | } else { |
9f95a23c TL |
178 | return seastar::do_with( |
179 | seastar_pingpong::Client{crimson::net::Messenger::create( | |
180 | entity_name_t::OSD(1), "client", addr.get_nonce())}, | |
181 | [addr, count](auto& client) { | |
182 | std::cout << "client sending to " << addr << std::endl; | |
183 | client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); | |
184 | client.msgr->set_policy_throttler(entity_name_t::TYPE_OSD, | |
185 | &client.byte_throttler); | |
186 | client.msgr->set_require_authorizer(false); | |
187 | client.msgr->set_auth_client(&client.dummy_auth); | |
188 | client.msgr->set_auth_server(&client.dummy_auth); | |
189 | return client.msgr->start(&client.dispatcher).then( | |
190 | [addr, &client, &disp=client.dispatcher, count] { | |
191 | auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD); | |
192 | return seastar::do_until( | |
193 | [&disp,count] { return disp.count >= count; }, | |
194 | [&disp,conn] { | |
195 | return conn->send(make_message<MPing>()).then([&] { | |
196 | return disp.on_reply.wait(); | |
197 | }); | |
198 | } | |
199 | ); | |
200 | }).finally([&client] { | |
201 | std::cout << "client shutting down" << std::endl; | |
202 | return client.msgr->shutdown(); | |
11fdf7f2 | 203 | }); |
9f95a23c | 204 | }); |
11fdf7f2 TL |
205 | } |
206 | } | |
207 | ||
208 | int main(int argc, char** argv) | |
209 | { | |
210 | namespace po = boost::program_options; | |
211 | po::options_description desc{"Allowed options"}; | |
212 | desc.add_options() | |
213 | ("help,h", "show help message") | |
214 | ("role", po::value<std::string>()->default_value("pong"), | |
215 | "role to play (ping | pong)") | |
216 | ("port", po::value<uint16_t>()->default_value(9010), | |
217 | "port #") | |
218 | ("nonce", po::value<uint32_t>()->default_value(42), | |
219 | "a unique number to identify the pong server") | |
220 | ("count", po::value<unsigned>()->default_value(10), | |
221 | "stop after sending/echoing <count> MPing messages") | |
222 | ("v2", po::value<bool>()->default_value(false), | |
223 | "using msgr v2 protocol"); | |
224 | po::variables_map vm; | |
225 | std::vector<std::string> unrecognized_options; | |
226 | try { | |
227 | auto parsed = po::command_line_parser(argc, argv) | |
228 | .options(desc) | |
229 | .allow_unregistered() | |
230 | .run(); | |
231 | po::store(parsed, vm); | |
232 | if (vm.count("help")) { | |
233 | std::cout << desc << std::endl; | |
234 | return 0; | |
235 | } | |
236 | po::notify(vm); | |
237 | unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); | |
238 | } catch(const po::error& e) { | |
239 | std::cerr << "error: " << e.what() << std::endl; | |
240 | return 1; | |
241 | } | |
242 | ||
243 | entity_addr_t addr; | |
244 | if (vm["v2"].as<bool>()) { | |
245 | addr.set_type(entity_addr_t::TYPE_MSGR2); | |
246 | } else { | |
247 | addr.set_type(entity_addr_t::TYPE_LEGACY); | |
248 | } | |
249 | addr.set_family(AF_INET); | |
250 | addr.set_port(vm["port"].as<std::uint16_t>()); | |
251 | addr.set_nonce(vm["nonce"].as<std::uint32_t>()); | |
252 | ||
253 | echo_role role = echo_role::as_server; | |
254 | if (vm["role"].as<std::string>() == "ping") { | |
255 | role = echo_role::as_client; | |
256 | } | |
257 | ||
258 | auto count = vm["count"].as<unsigned>(); | |
259 | seastar::app_template app; | |
260 | SeastarContext sc; | |
261 | auto job = sc.with_seastar([&] { | |
262 | auto fut = seastar::alien::submit_to(0, [addr, role, count] { | |
263 | return seastar_echo(addr, role, count); | |
264 | }); | |
265 | fut.wait(); | |
266 | }); | |
267 | std::vector<char*> av{argv[0]}; | |
268 | std::transform(begin(unrecognized_options), | |
269 | end(unrecognized_options), | |
270 | std::back_inserter(av), | |
271 | [](auto& s) { | |
272 | return const_cast<char*>(s.c_str()); | |
273 | }); | |
274 | sc.run(app, av.size(), av.data()); | |
275 | job.join(); | |
276 | } | |
277 | ||
278 | /* | |
279 | * Local Variables: | |
280 | * compile-command: "make -j4 \ | |
281 | * -C ../../../build \ | |
282 | * unittest_seastar_echo" | |
283 | * End: | |
284 | */ |