]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/crimson/test_alien_echo.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / crimson / test_alien_echo.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3#include "auth/Auth.h"
4#include "messages/MPing.h"
9f95a23c 5#include "crimson/auth/DummyAuth.h"
11fdf7f2
TL
6#include "crimson/net/Connection.h"
7#include "crimson/net/Dispatcher.h"
8#include "crimson/net/Messenger.h"
9#include "crimson/net/Config.h"
11fdf7f2
TL
10#include "crimson/thread/Throttle.h"
11
12#include <seastar/core/alien.hh>
13#include <seastar/core/app-template.hh>
14#include <seastar/core/future-util.hh>
15#include <seastar/core/reactor.hh>
16
17
18enum class echo_role {
19 as_server,
20 as_client,
21};
22
23namespace seastar_pingpong {
24struct DummyAuthAuthorizer : public AuthAuthorizer {
25 DummyAuthAuthorizer()
26 : AuthAuthorizer(CEPH_AUTH_CEPHX)
27 {}
28 bool verify_reply(bufferlist::const_iterator&,
29 std::string *connection_secret) override {
30 return true;
31 }
32 bool add_challenge(CephContext*, const bufferlist&) override {
33 return true;
34 }
35};
36
37struct Server {
9f95a23c
TL
38 crimson::thread::Throttle byte_throttler;
39 crimson::net::MessengerRef msgr;
40 crimson::auth::DummyAuthClientServer dummy_auth;
41 struct ServerDispatcher : crimson::net::Dispatcher {
11fdf7f2
TL
42 unsigned count = 0;
43 seastar::condition_variable on_reply;
9f95a23c 44 seastar::future<> ms_dispatch(crimson::net::Connection* c,
11fdf7f2
TL
45 MessageRef m) override {
46 std::cout << "server got ping " << *m << std::endl;
47 // reply with a pong
9f95a23c 48 return c->send(make_message<MPing>()).then([this] {
11fdf7f2
TL
49 ++count;
50 on_reply.signal();
51 });
52 }
9f95a23c
TL
53 seastar::future<crimson::net::msgr_tag_t, bufferlist>
54 ms_verify_authorizer(entity_type_t peer_type,
11fdf7f2
TL
55 auth_proto_t protocol,
56 bufferlist& auth) override {
9f95a23c 57 return seastar::make_ready_future<crimson::net::msgr_tag_t, bufferlist>(
11fdf7f2
TL
58 0, bufferlist{});
59 }
11fdf7f2 60 } dispatcher;
9f95a23c
TL
61 Server(crimson::net::MessengerRef msgr)
62 : byte_throttler(crimson::net::conf.osd_client_message_size_cap),
11fdf7f2
TL
63 msgr{msgr}
64 {
9f95a23c
TL
65 msgr->set_crc_header();
66 msgr->set_crc_data();
11fdf7f2
TL
67 }
68};
69
70struct Client {
9f95a23c
TL
71 crimson::thread::Throttle byte_throttler;
72 crimson::net::MessengerRef msgr;
73 crimson::auth::DummyAuthClientServer dummy_auth;
74 struct ClientDispatcher : crimson::net::Dispatcher {
11fdf7f2
TL
75 unsigned count = 0;
76 seastar::condition_variable on_reply;
9f95a23c 77 seastar::future<> ms_dispatch(crimson::net::Connection* c,
11fdf7f2
TL
78 MessageRef m) override {
79 std::cout << "client got pong " << *m << std::endl;
80 ++count;
81 on_reply.signal();
82 return seastar::now();
83 }
84 } dispatcher;
9f95a23c
TL
85 Client(crimson::net::MessengerRef msgr)
86 : byte_throttler(crimson::net::conf.osd_client_message_size_cap),
11fdf7f2
TL
87 msgr{msgr}
88 {
9f95a23c
TL
89 msgr->set_crc_header();
90 msgr->set_crc_data();
11fdf7f2
TL
91 }
92};
93} // namespace seastar_pingpong
94
95class SeastarContext {
96 seastar::file_desc begin_fd;
9f95a23c 97 seastar::readable_eventfd on_end;
11fdf7f2
TL
98
99public:
100 SeastarContext()
101 : begin_fd{seastar::file_desc::eventfd(0, 0)}
102 {}
103
104 template<class Func>
105 std::thread with_seastar(Func&& func) {
106 return std::thread{[this, func = std::forward<Func>(func)] {
107 // alien: are you ready?
108 wait_for_seastar();
109 // alien: could you help me apply(func)?
110 func();
111 // alien: i've sent my request. have you replied it?
112 // wait_for_seastar();
113 // alien: you are free to go!
9f95a23c 114 on_end.write_side().signal(1);
11fdf7f2
TL
115 }};
116 }
117
118 void run(seastar::app_template& app, int argc, char** argv) {
119 app.run(argc, argv, [this] {
120 return seastar::now().then([this] {
121 return set_seastar_ready();
122 }).then([this] {
123 // seastar: let me know once i am free to leave.
9f95a23c 124 return on_end.wait().then([](size_t){});
11fdf7f2
TL
125 }).handle_exception([](auto ep) {
126 std::cerr << "Error: " << ep << std::endl;
127 }).finally([] {
128 seastar::engine().exit(0);
129 });
130 });
131 }
132
133 seastar::future<> set_seastar_ready() {
134 // seastar: i am ready to serve!
135 ::eventfd_write(begin_fd.get(), 1);
136 return seastar::now();
137 }
138
139private:
140 void wait_for_seastar() {
141 eventfd_t result = 0;
142 if (int r = ::eventfd_read(begin_fd.get(), &result); r < 0) {
143 std::cerr << "unable to eventfd_read():" << errno << std::endl;
144 }
145 }
146};
147
148static seastar::future<>
149seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
150{
151 std::cout << "seastar/";
152 if (role == echo_role::as_server) {
9f95a23c
TL
153 return seastar::do_with(
154 seastar_pingpong::Server{crimson::net::Messenger::create(
155 entity_name_t::OSD(0), "server", addr.get_nonce())},
156 [addr, count](auto& server) mutable {
157 std::cout << "server listening at " << addr << std::endl;
158 // bind the server
159 server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
160 server.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
161 &server.byte_throttler);
162 server.msgr->set_require_authorizer(false);
163 server.msgr->set_auth_client(&server.dummy_auth);
164 server.msgr->set_auth_server(&server.dummy_auth);
165 return server.msgr->bind(entity_addrvec_t{addr}
166 ).then([&server] {
167 return server.msgr->start(&server.dispatcher);
168 }).then([&dispatcher=server.dispatcher, count] {
169 return dispatcher.on_reply.wait([&dispatcher, count] {
170 return dispatcher.count >= count;
171 });
172 }).finally([&server] {
173 std::cout << "server shutting down" << std::endl;
174 return server.msgr->shutdown();
11fdf7f2 175 });
9f95a23c 176 });
11fdf7f2 177 } else {
9f95a23c
TL
178 return seastar::do_with(
179 seastar_pingpong::Client{crimson::net::Messenger::create(
180 entity_name_t::OSD(1), "client", addr.get_nonce())},
181 [addr, count](auto& client) {
182 std::cout << "client sending to " << addr << std::endl;
183 client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
184 client.msgr->set_policy_throttler(entity_name_t::TYPE_OSD,
185 &client.byte_throttler);
186 client.msgr->set_require_authorizer(false);
187 client.msgr->set_auth_client(&client.dummy_auth);
188 client.msgr->set_auth_server(&client.dummy_auth);
189 return client.msgr->start(&client.dispatcher).then(
190 [addr, &client, &disp=client.dispatcher, count] {
191 auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD);
192 return seastar::do_until(
193 [&disp,count] { return disp.count >= count; },
194 [&disp,conn] {
195 return conn->send(make_message<MPing>()).then([&] {
196 return disp.on_reply.wait();
197 });
198 }
199 );
200 }).finally([&client] {
201 std::cout << "client shutting down" << std::endl;
202 return client.msgr->shutdown();
11fdf7f2 203 });
9f95a23c 204 });
11fdf7f2
TL
205 }
206}
207
208int main(int argc, char** argv)
209{
210 namespace po = boost::program_options;
211 po::options_description desc{"Allowed options"};
212 desc.add_options()
213 ("help,h", "show help message")
214 ("role", po::value<std::string>()->default_value("pong"),
215 "role to play (ping | pong)")
216 ("port", po::value<uint16_t>()->default_value(9010),
217 "port #")
218 ("nonce", po::value<uint32_t>()->default_value(42),
219 "a unique number to identify the pong server")
220 ("count", po::value<unsigned>()->default_value(10),
221 "stop after sending/echoing <count> MPing messages")
222 ("v2", po::value<bool>()->default_value(false),
223 "using msgr v2 protocol");
224 po::variables_map vm;
225 std::vector<std::string> unrecognized_options;
226 try {
227 auto parsed = po::command_line_parser(argc, argv)
228 .options(desc)
229 .allow_unregistered()
230 .run();
231 po::store(parsed, vm);
232 if (vm.count("help")) {
233 std::cout << desc << std::endl;
234 return 0;
235 }
236 po::notify(vm);
237 unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
238 } catch(const po::error& e) {
239 std::cerr << "error: " << e.what() << std::endl;
240 return 1;
241 }
242
243 entity_addr_t addr;
244 if (vm["v2"].as<bool>()) {
245 addr.set_type(entity_addr_t::TYPE_MSGR2);
246 } else {
247 addr.set_type(entity_addr_t::TYPE_LEGACY);
248 }
249 addr.set_family(AF_INET);
250 addr.set_port(vm["port"].as<std::uint16_t>());
251 addr.set_nonce(vm["nonce"].as<std::uint32_t>());
252
253 echo_role role = echo_role::as_server;
254 if (vm["role"].as<std::string>() == "ping") {
255 role = echo_role::as_client;
256 }
257
258 auto count = vm["count"].as<unsigned>();
259 seastar::app_template app;
260 SeastarContext sc;
261 auto job = sc.with_seastar([&] {
262 auto fut = seastar::alien::submit_to(0, [addr, role, count] {
263 return seastar_echo(addr, role, count);
264 });
265 fut.wait();
266 });
267 std::vector<char*> av{argv[0]};
268 std::transform(begin(unrecognized_options),
269 end(unrecognized_options),
270 std::back_inserter(av),
271 [](auto& s) {
272 return const_cast<char*>(s.c_str());
273 });
274 sc.run(app, av.size(), av.data());
275 job.join();
276}
277
278/*
279 * Local Variables:
280 * compile-command: "make -j4 \
281 * -C ../../../build \
282 * unittest_seastar_echo"
283 * End:
284 */