1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
3 #include <boost/program_options/variables_map.hpp>
4 #include <boost/program_options/parsers.hpp>
7 #include "global/global_init.h"
8 #include "messages/MPing.h"
9 #include "msg/Dispatcher.h"
10 #include "msg/Messenger.h"
12 #include "auth/DummyAuth.h"
14 enum class echo_role
{
19 namespace native_pingpong
{
21 constexpr int CEPH_OSD_PROTOCOL
= 10;
24 Server(CephContext
* cct
, const entity_inst_t
& entity
)
25 : dummy_auth(cct
), dispatcher(cct
)
27 msgr
.reset(Messenger::create(cct
, "async", entity
.name
, "pong", entity
.addr
.get_nonce()));
28 dummy_auth
.auth_registry
.refresh_config();
29 msgr
->set_cluster_protocol(CEPH_OSD_PROTOCOL
);
30 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
31 msgr
->set_auth_client(&dummy_auth
);
32 msgr
->set_auth_server(&dummy_auth
);
34 DummyAuthClientServer dummy_auth
;
35 std::unique_ptr
<Messenger
> msgr
;
36 struct ServerDispatcher
: Dispatcher
{
38 std::condition_variable on_reply
;
40 ServerDispatcher(CephContext
* cct
)
43 bool ms_can_fast_dispatch_any() const override
{
46 bool ms_can_fast_dispatch(const Message
* m
) const override
{
47 return m
->get_type() == CEPH_MSG_PING
;
49 void ms_fast_dispatch(Message
* m
) override
{
50 m
->get_connection()->send_message(new MPing
);
53 std::lock_guard lock
{mutex
};
56 on_reply
.notify_one();
58 bool ms_dispatch(Message
*) override
{
61 bool ms_handle_reset(Connection
*) override
{
64 void ms_handle_remote_reset(Connection
*) override
{
66 bool ms_handle_refused(Connection
*) override
{
71 std::unique_lock lock
{mutex
};
72 return on_reply
.wait(lock
, [this] { return replied
; });
81 std::unique_ptr
<Messenger
> msgr
;
82 Client(CephContext
*cct
)
83 : dummy_auth(cct
), dispatcher(cct
)
85 msgr
.reset(Messenger::create(cct
, "async", entity_name_t::CLIENT(-1), "ping", getpid()));
86 dummy_auth
.auth_registry
.refresh_config();
87 msgr
->set_cluster_protocol(CEPH_OSD_PROTOCOL
);
88 msgr
->set_default_policy(Messenger::Policy::lossy_client(0));
89 msgr
->set_auth_client(&dummy_auth
);
90 msgr
->set_auth_server(&dummy_auth
);
92 DummyAuthClientServer dummy_auth
;
93 struct ClientDispatcher
: Dispatcher
{
95 std::condition_variable on_reply
;
98 ClientDispatcher(CephContext
* cct
)
101 bool ms_can_fast_dispatch_any() const override
{
104 bool ms_can_fast_dispatch(const Message
* m
) const override
{
105 return m
->get_type() == CEPH_MSG_PING
;
107 void ms_fast_dispatch(Message
* m
) override
{
110 std::lock_guard lock
{mutex
};
113 on_reply
.notify_one();
115 bool ms_dispatch(Message
*) override
{
118 bool ms_handle_reset(Connection
*) override
{
121 void ms_handle_remote_reset(Connection
*) override
{
123 bool ms_handle_refused(Connection
*) override
{
126 bool ping(Messenger
* msgr
, const entity_inst_t
& peer
) {
127 using namespace std::chrono_literals
;
128 auto conn
= msgr
->connect_to(peer
.name
.type(),
129 entity_addrvec_t
{peer
.addr
});
131 conn
->send_message(new MPing
);
132 std::unique_lock lock
{mutex
};
133 return on_reply
.wait_for(lock
, 500ms
, [&] {
138 void ping(const entity_inst_t
& peer
) {
139 dispatcher
.ping(msgr
.get(), peer
);
142 } // namespace native_pingpong
144 static void ceph_echo(CephContext
* cct
,
145 entity_addr_t addr
, echo_role role
, unsigned count
)
147 std::cout
<< "ceph/";
148 entity_inst_t entity
{entity_name_t::OSD(0), addr
};
149 if (role
== echo_role::as_server
) {
150 std::cout
<< "server listening at " << addr
<< std::endl
;
151 native_pingpong::Server server
{cct
, entity
};
152 server
.msgr
->bind(addr
);
153 server
.msgr
->add_dispatcher_head(&server
.dispatcher
);
154 server
.msgr
->start();
155 for (unsigned i
= 0; i
< count
; i
++) {
158 server
.msgr
->shutdown();
161 std::cout
<< "client sending to " << addr
<< std::endl
;
162 native_pingpong::Client client
{cct
};
163 client
.msgr
->add_dispatcher_head(&client
.dispatcher
);
164 client
.msgr
->start();
165 auto conn
= client
.msgr
->connect_to(entity
.name
.type(),
166 entity_addrvec_t
{entity
.addr
});
167 for (unsigned i
= 0; i
< count
; i
++) {
168 std::cout
<< "seq=" << i
<< std::endl
;
171 client
.msgr
->shutdown();
176 int main(int argc
, char** argv
)
178 namespace po
= boost::program_options
;
179 po::options_description desc
{"Allowed options"};
181 ("help,h", "show help message")
182 ("role", po::value
<std::string
>()->default_value("pong"),
183 "role to play (ping | pong)")
184 ("port", po::value
<uint16_t>()->default_value(9010),
186 ("nonce", po::value
<uint32_t>()->default_value(42),
187 "a unique number to identify the pong server")
188 ("count", po::value
<unsigned>()->default_value(10),
189 "stop after sending/echoing <count> MPing messages")
190 ("v2", po::value
<bool>()->default_value(false),
191 "using msgr v2 protocol");
192 po::variables_map vm
;
193 std::vector
<std::string
> unrecognized_options
;
195 auto parsed
= po::command_line_parser(argc
, argv
)
197 .allow_unregistered()
199 po::store(parsed
, vm
);
200 if (vm
.count("help")) {
201 std::cout
<< desc
<< std::endl
;
205 unrecognized_options
= po::collect_unrecognized(parsed
.options
, po::include_positional
);
206 } catch(const po::error
& e
) {
207 std::cerr
<< "error: " << e
.what() << std::endl
;
212 if (vm
["v2"].as
<bool>()) {
213 addr
.set_type(entity_addr_t::TYPE_MSGR2
);
215 addr
.set_type(entity_addr_t::TYPE_LEGACY
);
217 addr
.set_family(AF_INET
);
218 addr
.set_port(vm
["port"].as
<std::uint16_t>());
219 addr
.set_nonce(vm
["nonce"].as
<std::uint32_t>());
221 echo_role role
= echo_role::as_server
;
222 if (vm
["role"].as
<std::string
>() == "ping") {
223 role
= echo_role::as_client
;
226 auto count
= vm
["count"].as
<unsigned>();
227 std::vector
<const char*> args(argv
, argv
+ argc
);
228 auto cct
= global_init(nullptr, args
,
229 CEPH_ENTITY_TYPE_CLIENT
,
230 CODE_ENVIRONMENT_UTILITY
,
231 CINIT_FLAG_NO_MON_CONFIG
);
232 common_init_finish(cct
.get());
233 ceph_echo(cct
.get(), addr
, role
, count
);