1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
4 #include "messages/MPing.h"
5 #include "common/ceph_argparse.h"
6 #include "crimson/auth/DummyAuth.h"
7 #include "crimson/common/throttle.h"
8 #include "crimson/net/Connection.h"
9 #include "crimson/net/Dispatcher.h"
10 #include "crimson/net/Messenger.h"
12 #include <seastar/core/alien.hh>
13 #include <seastar/core/app-template.hh>
14 #include <seastar/core/future-util.hh>
15 #include <seastar/core/internal/pollable_fd.hh>
16 #include <seastar/core/posix.hh>
17 #include <seastar/core/reactor.hh>
19 using crimson::common::local_conf
;
21 enum class echo_role
{
26 namespace seastar_pingpong
{
27 struct DummyAuthAuthorizer
: public AuthAuthorizer
{
29 : AuthAuthorizer(CEPH_AUTH_CEPHX
)
31 bool verify_reply(bufferlist::const_iterator
&,
32 std::string
*connection_secret
) override
{
35 bool add_challenge(CephContext
*, const bufferlist
&) override
{
41 crimson::common::Throttle byte_throttler
;
42 crimson::net::MessengerRef msgr
;
43 crimson::auth::DummyAuthClientServer dummy_auth
;
44 struct ServerDispatcher final
: crimson::net::Dispatcher
{
46 seastar::condition_variable on_reply
;
47 std::optional
<seastar::future
<>> ms_dispatch(crimson::net::ConnectionRef c
,
50 std::cout
<< "server got ping " << *m
<< std::endl
;
52 return c
->send(crimson::make_message
<MPing
>()).then([this] {
55 return seastar::now();
59 Server(crimson::net::MessengerRef msgr
)
60 : byte_throttler(local_conf()->osd_client_message_size_cap
),
66 crimson::common::Throttle byte_throttler
;
67 crimson::net::MessengerRef msgr
;
68 crimson::auth::DummyAuthClientServer dummy_auth
;
69 struct ClientDispatcher final
: crimson::net::Dispatcher
{
71 seastar::condition_variable on_reply
;
72 std::optional
<seastar::future
<>> ms_dispatch(crimson::net::ConnectionRef c
,
75 std::cout
<< "client got pong " << *m
<< std::endl
;
78 return seastar::now();
81 Client(crimson::net::MessengerRef msgr
)
82 : byte_throttler(local_conf()->osd_client_message_size_cap
),
86 } // namespace seastar_pingpong
88 class SeastarContext
{
90 seastar::file_desc on_end
;
94 : begin_fd
{eventfd(0, 0)},
95 on_end
{seastar::file_desc::eventfd(0, 0)}
99 std::thread
with_seastar(Func
&& func
) {
100 return std::thread
{[this, on_end
= on_end
.get(),
101 func
= std::forward
<Func
>(func
)] {
102 // alien: are you ready?
104 // alien: could you help me apply(func)?
106 // alien: i've sent my request. have you replied it?
107 // wait_for_seastar();
108 // alien: you are free to go!
109 ::eventfd_write(on_end
, 1);
113 void run(seastar::app_template
& app
, int argc
, char** argv
) {
114 app
.run(argc
, argv
, [this] {
115 std::vector
<const char*> args
;
117 std::string conf_file_list
;
118 auto init_params
= ceph_argparse_early_args(args
,
119 CEPH_ENTITY_TYPE_CLIENT
,
122 return crimson::common::sharded_conf().start(init_params
.name
, cluster
)
123 .then([conf_file_list
] {
124 return local_conf().parse_config_files(conf_file_list
);
126 return set_seastar_ready();
127 }).then([on_end
= std::move(on_end
)] () mutable {
128 // seastar: let me know once i am free to leave.
129 return seastar::do_with(seastar::pollable_fd(std::move(on_end
)), []
130 (seastar::pollable_fd
& on_end_fds
) {
131 return on_end_fds
.readable().then([&on_end_fds
] {
132 eventfd_t result
= 0;
133 on_end_fds
.get_file_desc().read(&result
, sizeof(result
));
134 return seastar::make_ready_future
<>();
138 return crimson::common::sharded_conf().stop();
139 }).handle_exception([](auto ep
) {
140 std::cerr
<< "Error: " << ep
<< std::endl
;
142 seastar::engine().exit(0);
147 seastar::future
<> set_seastar_ready() {
148 // seastar: i am ready to serve!
149 ::eventfd_write(begin_fd
, 1);
150 return seastar::now();
154 void wait_for_seastar() {
155 eventfd_t result
= 0;
156 if (int r
= ::eventfd_read(begin_fd
, &result
); r
< 0) {
157 std::cerr
<< "unable to eventfd_read():" << errno
<< std::endl
;
162 static seastar::future
<>
163 seastar_echo(const entity_addr_t addr
, echo_role role
, unsigned count
)
165 std::cout
<< "seastar/";
166 if (role
== echo_role::as_server
) {
167 return seastar::do_with(
168 seastar_pingpong::Server
{crimson::net::Messenger::create(
169 entity_name_t::OSD(0), "server", addr
.get_nonce())},
170 [addr
, count
](auto& server
) mutable {
171 std::cout
<< "server listening at " << addr
<< std::endl
;
173 server
.msgr
->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
174 server
.msgr
->set_policy_throttler(entity_name_t::TYPE_OSD
,
175 &server
.byte_throttler
);
176 server
.msgr
->set_auth_client(&server
.dummy_auth
);
177 server
.msgr
->set_auth_server(&server
.dummy_auth
);
178 return server
.msgr
->bind(entity_addrvec_t
{addr
}
179 ).safe_then([&server
] {
180 return server
.msgr
->start({&server
.dispatcher
});
181 }, crimson::net::Messenger::bind_ertr::all_same_way([](auto& e
) {
182 ceph_abort_msg("bind failed");
183 })).then([&dispatcher
=server
.dispatcher
, count
] {
184 return dispatcher
.on_reply
.wait([&dispatcher
, count
] {
185 return dispatcher
.count
>= count
;
187 }).finally([&server
] {
188 std::cout
<< "server shutting down" << std::endl
;
190 return server
.msgr
->shutdown();
194 return seastar::do_with(
195 seastar_pingpong::Client
{crimson::net::Messenger::create(
196 entity_name_t::OSD(1), "client", addr
.get_nonce())},
197 [addr
, count
](auto& client
) {
198 std::cout
<< "client sending to " << addr
<< std::endl
;
199 client
.msgr
->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
200 client
.msgr
->set_policy_throttler(entity_name_t::TYPE_OSD
,
201 &client
.byte_throttler
);
202 client
.msgr
->set_auth_client(&client
.dummy_auth
);
203 client
.msgr
->set_auth_server(&client
.dummy_auth
);
204 return client
.msgr
->start({&client
.dispatcher
}).then(
205 [addr
, &client
, &disp
=client
.dispatcher
, count
] {
206 auto conn
= client
.msgr
->connect(addr
, entity_name_t::TYPE_OSD
);
207 return seastar::do_until(
208 [&disp
,count
] { return disp
.count
>= count
; },
210 return conn
->send(crimson::make_message
<MPing
>()).then([&] {
211 return disp
.on_reply
.wait();
215 }).finally([&client
] {
216 std::cout
<< "client shutting down" << std::endl
;
218 return client
.msgr
->shutdown();
224 int main(int argc
, char** argv
)
226 namespace po
= boost::program_options
;
227 po::options_description desc
{"Allowed options"};
229 ("help,h", "show help message")
230 ("role", po::value
<std::string
>()->default_value("pong"),
231 "role to play (ping | pong)")
232 ("port", po::value
<uint16_t>()->default_value(9010),
234 ("nonce", po::value
<uint32_t>()->default_value(42),
235 "a unique number to identify the pong server")
236 ("count", po::value
<unsigned>()->default_value(10),
237 "stop after sending/echoing <count> MPing messages");
238 po::variables_map vm
;
239 std::vector
<std::string
> unrecognized_options
;
241 auto parsed
= po::command_line_parser(argc
, argv
)
243 .allow_unregistered()
245 po::store(parsed
, vm
);
246 if (vm
.count("help")) {
247 std::cout
<< desc
<< std::endl
;
251 unrecognized_options
= po::collect_unrecognized(parsed
.options
, po::include_positional
);
252 } catch(const po::error
& e
) {
253 std::cerr
<< "error: " << e
.what() << std::endl
;
258 addr
.set_type(entity_addr_t::TYPE_MSGR2
);
259 addr
.set_family(AF_INET
);
260 addr
.set_port(vm
["port"].as
<std::uint16_t>());
261 addr
.set_nonce(vm
["nonce"].as
<std::uint32_t>());
263 echo_role role
= echo_role::as_server
;
264 if (vm
["role"].as
<std::string
>() == "ping") {
265 role
= echo_role::as_client
;
268 auto count
= vm
["count"].as
<unsigned>();
269 seastar::app_template app
;
271 auto job
= sc
.with_seastar([&] {
272 auto fut
= seastar::alien::submit_to(app
.alien(), 0, [addr
, role
, count
] {
273 return seastar_echo(addr
, role
, count
);
277 std::vector
<char*> av
{argv
[0]};
278 std::transform(begin(unrecognized_options
),
279 end(unrecognized_options
),
280 std::back_inserter(av
),
282 return const_cast<char*>(s
.c_str());
284 sc
.run(app
, av
.size(), av
.data());
290 * compile-command: "make -j4 \
291 * -C ../../../build \
292 * unittest_seastar_echo"