1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
3 #include <boost/program_options/variables_map.hpp>
4 #include <boost/program_options/parsers.hpp>
7 #include "global/global_init.h"
8 #include "messages/MPing.h"
9 #include "msg/Dispatcher.h"
10 #include "msg/Messenger.h"
12 #include "auth/DummyAuth.h"
14 enum class echo_role
{
19 namespace native_pingpong
{
21 constexpr int CEPH_OSD_PROTOCOL
= 10;
24 Server(CephContext
* cct
, const entity_inst_t
& entity
)
25 : dummy_auth(cct
), dispatcher(cct
)
27 msgr
.reset(Messenger::create(cct
, "async",
28 entity
.name
, "pong", entity
.addr
.get_nonce(), 0));
29 dummy_auth
.auth_registry
.refresh_config();
30 msgr
->set_cluster_protocol(CEPH_OSD_PROTOCOL
);
31 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
32 msgr
->set_auth_client(&dummy_auth
);
33 msgr
->set_auth_server(&dummy_auth
);
34 dispatcher
.ms_set_require_authorizer(false);
36 DummyAuthClientServer dummy_auth
;
37 unique_ptr
<Messenger
> msgr
;
38 struct ServerDispatcher
: Dispatcher
{
40 std::condition_variable on_reply
;
42 ServerDispatcher(CephContext
* cct
)
45 bool ms_can_fast_dispatch_any() const override
{
48 bool ms_can_fast_dispatch(const Message
* m
) const override
{
49 return m
->get_type() == CEPH_MSG_PING
;
51 void ms_fast_dispatch(Message
* m
) override
{
52 m
->get_connection()->send_message(new MPing
);
55 std::lock_guard lock
{mutex
};
58 on_reply
.notify_one();
60 bool ms_dispatch(Message
*) override
{
63 bool ms_handle_reset(Connection
*) override
{
66 void ms_handle_remote_reset(Connection
*) override
{
68 bool ms_handle_refused(Connection
*) override
{
73 std::unique_lock lock
{mutex
};
74 return on_reply
.wait(lock
, [this] { return replied
; });
83 unique_ptr
<Messenger
> msgr
;
84 Client(CephContext
*cct
)
85 : dummy_auth(cct
), dispatcher(cct
)
87 msgr
.reset(Messenger::create(cct
, "async",
88 entity_name_t::CLIENT(-1), "ping",
90 dummy_auth
.auth_registry
.refresh_config();
91 msgr
->set_cluster_protocol(CEPH_OSD_PROTOCOL
);
92 msgr
->set_default_policy(Messenger::Policy::lossy_client(0));
93 msgr
->set_auth_client(&dummy_auth
);
94 msgr
->set_auth_server(&dummy_auth
);
95 dispatcher
.ms_set_require_authorizer(false);
97 DummyAuthClientServer dummy_auth
;
98 struct ClientDispatcher
: Dispatcher
{
100 std::condition_variable on_reply
;
101 bool replied
= false;
103 ClientDispatcher(CephContext
* cct
)
106 bool ms_can_fast_dispatch_any() const override
{
109 bool ms_can_fast_dispatch(const Message
* m
) const override
{
110 return m
->get_type() == CEPH_MSG_PING
;
112 void ms_fast_dispatch(Message
* m
) override
{
115 std::lock_guard lock
{mutex
};
118 on_reply
.notify_one();
120 bool ms_dispatch(Message
*) override
{
123 bool ms_handle_reset(Connection
*) override
{
126 void ms_handle_remote_reset(Connection
*) override
{
128 bool ms_handle_refused(Connection
*) override
{
131 bool ping(Messenger
* msgr
, const entity_inst_t
& peer
) {
132 auto conn
= msgr
->connect_to(peer
.name
.type(),
133 entity_addrvec_t
{peer
.addr
});
135 conn
->send_message(new MPing
);
136 std::unique_lock lock
{mutex
};
137 return on_reply
.wait_for(lock
, 500ms
, [&] {
142 void ping(const entity_inst_t
& peer
) {
143 dispatcher
.ping(msgr
.get(), peer
);
146 } // namespace native_pingpong
148 static void ceph_echo(CephContext
* cct
,
149 entity_addr_t addr
, echo_role role
, unsigned count
)
151 std::cout
<< "ceph/";
152 entity_inst_t entity
{entity_name_t::OSD(0), addr
};
153 if (role
== echo_role::as_server
) {
154 std::cout
<< "server listening at " << addr
<< std::endl
;
155 native_pingpong::Server server
{cct
, entity
};
156 server
.msgr
->bind(addr
);
157 server
.msgr
->add_dispatcher_head(&server
.dispatcher
);
158 server
.msgr
->start();
159 for (unsigned i
= 0; i
< count
; i
++) {
162 server
.msgr
->shutdown();
165 std::cout
<< "client sending to " << addr
<< std::endl
;
166 native_pingpong::Client client
{cct
};
167 client
.msgr
->add_dispatcher_head(&client
.dispatcher
);
168 client
.msgr
->start();
169 auto conn
= client
.msgr
->connect_to(entity
.name
.type(),
170 entity_addrvec_t
{entity
.addr
});
171 for (unsigned i
= 0; i
< count
; i
++) {
172 std::cout
<< "seq=" << i
<< std::endl
;
175 client
.msgr
->shutdown();
180 int main(int argc
, char** argv
)
182 namespace po
= boost::program_options
;
183 po::options_description desc
{"Allowed options"};
185 ("help,h", "show help message")
186 ("role", po::value
<std::string
>()->default_value("pong"),
187 "role to play (ping | pong)")
188 ("port", po::value
<uint16_t>()->default_value(9010),
190 ("nonce", po::value
<uint32_t>()->default_value(42),
191 "a unique number to identify the pong server")
192 ("count", po::value
<unsigned>()->default_value(10),
193 "stop after sending/echoing <count> MPing messages")
194 ("v2", po::value
<bool>()->default_value(false),
195 "using msgr v2 protocol");
196 po::variables_map vm
;
197 std::vector
<std::string
> unrecognized_options
;
199 auto parsed
= po::command_line_parser(argc
, argv
)
201 .allow_unregistered()
203 po::store(parsed
, vm
);
204 if (vm
.count("help")) {
205 std::cout
<< desc
<< std::endl
;
209 unrecognized_options
= po::collect_unrecognized(parsed
.options
, po::include_positional
);
210 } catch(const po::error
& e
) {
211 std::cerr
<< "error: " << e
.what() << std::endl
;
216 if (vm
["v2"].as
<bool>()) {
217 addr
.set_type(entity_addr_t::TYPE_MSGR2
);
219 addr
.set_type(entity_addr_t::TYPE_LEGACY
);
221 addr
.set_family(AF_INET
);
222 addr
.set_port(vm
["port"].as
<std::uint16_t>());
223 addr
.set_nonce(vm
["nonce"].as
<std::uint32_t>());
225 echo_role role
= echo_role::as_server
;
226 if (vm
["role"].as
<std::string
>() == "ping") {
227 role
= echo_role::as_client
;
230 auto count
= vm
["count"].as
<unsigned>();
231 std::vector
<const char*> args(argv
, argv
+ argc
);
232 auto cct
= global_init(nullptr, args
,
233 CEPH_ENTITY_TYPE_CLIENT
,
234 CODE_ENVIRONMENT_UTILITY
,
235 CINIT_FLAG_NO_MON_CONFIG
);
236 common_init_finish(cct
.get());
237 ceph_echo(cct
.get(), addr
, role
, count
);