1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
3 #include <boost/program_options/variables_map.hpp>
4 #include <boost/program_options/parsers.hpp>
7 #include "global/global_init.h"
8 #include "messages/MPing.h"
9 #include "msg/Dispatcher.h"
10 #include "msg/Messenger.h"
12 #include "auth/DummyAuth.h"
14 enum class echo_role
{
19 namespace native_pingpong
{
21 constexpr int CEPH_OSD_PROTOCOL
= 10;
24 Server(CephContext
* cct
, const entity_inst_t
& entity
)
25 : dummy_auth(cct
), dispatcher(cct
)
27 msgr
.reset(Messenger::create(cct
, "async", entity
.name
, "pong", entity
.addr
.get_nonce()));
28 dummy_auth
.auth_registry
.refresh_config();
29 msgr
->set_cluster_protocol(CEPH_OSD_PROTOCOL
);
30 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
31 msgr
->set_auth_client(&dummy_auth
);
32 msgr
->set_auth_server(&dummy_auth
);
33 msgr
->set_require_authorizer(false);
35 DummyAuthClientServer dummy_auth
;
36 unique_ptr
<Messenger
> msgr
;
37 struct ServerDispatcher
: Dispatcher
{
39 std::condition_variable on_reply
;
41 ServerDispatcher(CephContext
* cct
)
44 bool ms_can_fast_dispatch_any() const override
{
47 bool ms_can_fast_dispatch(const Message
* m
) const override
{
48 return m
->get_type() == CEPH_MSG_PING
;
50 void ms_fast_dispatch(Message
* m
) override
{
51 m
->get_connection()->send_message(new MPing
);
54 std::lock_guard lock
{mutex
};
57 on_reply
.notify_one();
59 bool ms_dispatch(Message
*) override
{
62 bool ms_handle_reset(Connection
*) override
{
65 void ms_handle_remote_reset(Connection
*) override
{
67 bool ms_handle_refused(Connection
*) override
{
72 std::unique_lock lock
{mutex
};
73 return on_reply
.wait(lock
, [this] { return replied
; });
82 unique_ptr
<Messenger
> msgr
;
83 Client(CephContext
*cct
)
84 : dummy_auth(cct
), dispatcher(cct
)
86 msgr
.reset(Messenger::create(cct
, "async", entity_name_t::CLIENT(-1), "ping", getpid()));
87 dummy_auth
.auth_registry
.refresh_config();
88 msgr
->set_cluster_protocol(CEPH_OSD_PROTOCOL
);
89 msgr
->set_default_policy(Messenger::Policy::lossy_client(0));
90 msgr
->set_auth_client(&dummy_auth
);
91 msgr
->set_auth_server(&dummy_auth
);
92 msgr
->set_require_authorizer(false);
94 DummyAuthClientServer dummy_auth
;
95 struct ClientDispatcher
: Dispatcher
{
97 std::condition_variable on_reply
;
100 ClientDispatcher(CephContext
* cct
)
103 bool ms_can_fast_dispatch_any() const override
{
106 bool ms_can_fast_dispatch(const Message
* m
) const override
{
107 return m
->get_type() == CEPH_MSG_PING
;
109 void ms_fast_dispatch(Message
* m
) override
{
112 std::lock_guard lock
{mutex
};
115 on_reply
.notify_one();
117 bool ms_dispatch(Message
*) override
{
120 bool ms_handle_reset(Connection
*) override
{
123 void ms_handle_remote_reset(Connection
*) override
{
125 bool ms_handle_refused(Connection
*) override
{
128 bool ping(Messenger
* msgr
, const entity_inst_t
& peer
) {
129 auto conn
= msgr
->connect_to(peer
.name
.type(),
130 entity_addrvec_t
{peer
.addr
});
132 conn
->send_message(new MPing
);
133 std::unique_lock lock
{mutex
};
134 return on_reply
.wait_for(lock
, 500ms
, [&] {
139 void ping(const entity_inst_t
& peer
) {
140 dispatcher
.ping(msgr
.get(), peer
);
143 } // namespace native_pingpong
145 static void ceph_echo(CephContext
* cct
,
146 entity_addr_t addr
, echo_role role
, unsigned count
)
148 std::cout
<< "ceph/";
149 entity_inst_t entity
{entity_name_t::OSD(0), addr
};
150 if (role
== echo_role::as_server
) {
151 std::cout
<< "server listening at " << addr
<< std::endl
;
152 native_pingpong::Server server
{cct
, entity
};
153 server
.msgr
->bind(addr
);
154 server
.msgr
->add_dispatcher_head(&server
.dispatcher
);
155 server
.msgr
->start();
156 for (unsigned i
= 0; i
< count
; i
++) {
159 server
.msgr
->shutdown();
162 std::cout
<< "client sending to " << addr
<< std::endl
;
163 native_pingpong::Client client
{cct
};
164 client
.msgr
->add_dispatcher_head(&client
.dispatcher
);
165 client
.msgr
->start();
166 auto conn
= client
.msgr
->connect_to(entity
.name
.type(),
167 entity_addrvec_t
{entity
.addr
});
168 for (unsigned i
= 0; i
< count
; i
++) {
169 std::cout
<< "seq=" << i
<< std::endl
;
172 client
.msgr
->shutdown();
177 int main(int argc
, char** argv
)
179 namespace po
= boost::program_options
;
180 po::options_description desc
{"Allowed options"};
182 ("help,h", "show help message")
183 ("role", po::value
<std::string
>()->default_value("pong"),
184 "role to play (ping | pong)")
185 ("port", po::value
<uint16_t>()->default_value(9010),
187 ("nonce", po::value
<uint32_t>()->default_value(42),
188 "a unique number to identify the pong server")
189 ("count", po::value
<unsigned>()->default_value(10),
190 "stop after sending/echoing <count> MPing messages")
191 ("v2", po::value
<bool>()->default_value(false),
192 "using msgr v2 protocol");
193 po::variables_map vm
;
194 std::vector
<std::string
> unrecognized_options
;
196 auto parsed
= po::command_line_parser(argc
, argv
)
198 .allow_unregistered()
200 po::store(parsed
, vm
);
201 if (vm
.count("help")) {
202 std::cout
<< desc
<< std::endl
;
206 unrecognized_options
= po::collect_unrecognized(parsed
.options
, po::include_positional
);
207 } catch(const po::error
& e
) {
208 std::cerr
<< "error: " << e
.what() << std::endl
;
213 if (vm
["v2"].as
<bool>()) {
214 addr
.set_type(entity_addr_t::TYPE_MSGR2
);
216 addr
.set_type(entity_addr_t::TYPE_LEGACY
);
218 addr
.set_family(AF_INET
);
219 addr
.set_port(vm
["port"].as
<std::uint16_t>());
220 addr
.set_nonce(vm
["nonce"].as
<std::uint32_t>());
222 echo_role role
= echo_role::as_server
;
223 if (vm
["role"].as
<std::string
>() == "ping") {
224 role
= echo_role::as_client
;
227 auto count
= vm
["count"].as
<unsigned>();
228 std::vector
<const char*> args(argv
, argv
+ argc
);
229 auto cct
= global_init(nullptr, args
,
230 CEPH_ENTITY_TYPE_CLIENT
,
231 CODE_ENVIRONMENT_UTILITY
,
232 CINIT_FLAG_NO_MON_CONFIG
);
233 common_init_finish(cct
.get());
234 ceph_echo(cct
.get(), addr
, role
, count
);