]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | ||
3 | #include <boost/program_options/variables_map.hpp> | |
4 | #include <boost/program_options/parsers.hpp> | |
5 | ||
6 | #include "auth/Auth.h" | |
7 | #include "global/global_init.h" | |
8 | #include "messages/MPing.h" | |
9 | #include "msg/Dispatcher.h" | |
10 | #include "msg/Messenger.h" | |
11 | ||
12 | #include "auth/DummyAuth.h" | |
13 | ||
14 | enum class echo_role { | |
15 | as_server, | |
16 | as_client, | |
17 | }; | |
18 | ||
19 | namespace native_pingpong { | |
20 | ||
21 | constexpr int CEPH_OSD_PROTOCOL = 10; | |
22 | ||
23 | struct Server { | |
24 | Server(CephContext* cct, const entity_inst_t& entity) | |
25 | : dummy_auth(cct), dispatcher(cct) | |
26 | { | |
f67539c2 | 27 | msgr.reset(Messenger::create(cct, "async", entity.name, "pong", entity.addr.get_nonce())); |
11fdf7f2 TL |
28 | dummy_auth.auth_registry.refresh_config(); |
29 | msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); | |
30 | msgr->set_default_policy(Messenger::Policy::stateless_server(0)); | |
31 | msgr->set_auth_client(&dummy_auth); | |
32 | msgr->set_auth_server(&dummy_auth); | |
9f95a23c | 33 | msgr->set_require_authorizer(false); |
11fdf7f2 TL |
34 | } |
35 | DummyAuthClientServer dummy_auth; | |
20effc67 | 36 | std::unique_ptr<Messenger> msgr; |
11fdf7f2 TL |
37 | struct ServerDispatcher : Dispatcher { |
38 | std::mutex mutex; | |
39 | std::condition_variable on_reply; | |
40 | bool replied = false; | |
41 | ServerDispatcher(CephContext* cct) | |
42 | : Dispatcher(cct) | |
43 | {} | |
44 | bool ms_can_fast_dispatch_any() const override { | |
45 | return true; | |
46 | } | |
47 | bool ms_can_fast_dispatch(const Message* m) const override { | |
48 | return m->get_type() == CEPH_MSG_PING; | |
49 | } | |
50 | void ms_fast_dispatch(Message* m) override { | |
51 | m->get_connection()->send_message(new MPing); | |
52 | m->put(); | |
53 | { | |
54 | std::lock_guard lock{mutex}; | |
55 | replied = true; | |
56 | } | |
57 | on_reply.notify_one(); | |
58 | } | |
59 | bool ms_dispatch(Message*) override { | |
60 | ceph_abort(); | |
61 | } | |
62 | bool ms_handle_reset(Connection*) override { | |
63 | return true; | |
64 | } | |
65 | void ms_handle_remote_reset(Connection*) override { | |
66 | } | |
67 | bool ms_handle_refused(Connection*) override { | |
68 | return true; | |
69 | } | |
70 | void echo() { | |
71 | replied = false; | |
72 | std::unique_lock lock{mutex}; | |
73 | return on_reply.wait(lock, [this] { return replied; }); | |
74 | } | |
75 | } dispatcher; | |
76 | void echo() { | |
77 | dispatcher.echo(); | |
78 | } | |
79 | }; | |
80 | ||
81 | struct Client { | |
20effc67 | 82 | std::unique_ptr<Messenger> msgr; |
11fdf7f2 TL |
83 | Client(CephContext *cct) |
84 | : dummy_auth(cct), dispatcher(cct) | |
85 | { | |
f67539c2 | 86 | msgr.reset(Messenger::create(cct, "async", entity_name_t::CLIENT(-1), "ping", getpid())); |
11fdf7f2 TL |
87 | dummy_auth.auth_registry.refresh_config(); |
88 | msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); | |
89 | msgr->set_default_policy(Messenger::Policy::lossy_client(0)); | |
90 | msgr->set_auth_client(&dummy_auth); | |
91 | msgr->set_auth_server(&dummy_auth); | |
9f95a23c | 92 | msgr->set_require_authorizer(false); |
11fdf7f2 TL |
93 | } |
94 | DummyAuthClientServer dummy_auth; | |
95 | struct ClientDispatcher : Dispatcher { | |
96 | std::mutex mutex; | |
97 | std::condition_variable on_reply; | |
98 | bool replied = false; | |
99 | ||
100 | ClientDispatcher(CephContext* cct) | |
101 | : Dispatcher(cct) | |
102 | {} | |
103 | bool ms_can_fast_dispatch_any() const override { | |
104 | return true; | |
105 | } | |
106 | bool ms_can_fast_dispatch(const Message* m) const override { | |
107 | return m->get_type() == CEPH_MSG_PING; | |
108 | } | |
109 | void ms_fast_dispatch(Message* m) override { | |
110 | m->put(); | |
111 | { | |
112 | std::lock_guard lock{mutex}; | |
113 | replied = true; | |
114 | } | |
115 | on_reply.notify_one(); | |
116 | } | |
117 | bool ms_dispatch(Message*) override { | |
118 | ceph_abort(); | |
119 | } | |
120 | bool ms_handle_reset(Connection *) override { | |
121 | return true; | |
122 | } | |
123 | void ms_handle_remote_reset(Connection*) override { | |
124 | } | |
125 | bool ms_handle_refused(Connection*) override { | |
126 | return true; | |
127 | } | |
128 | bool ping(Messenger* msgr, const entity_inst_t& peer) { | |
20effc67 | 129 | using namespace std::chrono_literals; |
11fdf7f2 TL |
130 | auto conn = msgr->connect_to(peer.name.type(), |
131 | entity_addrvec_t{peer.addr}); | |
132 | replied = false; | |
133 | conn->send_message(new MPing); | |
134 | std::unique_lock lock{mutex}; | |
135 | return on_reply.wait_for(lock, 500ms, [&] { | |
136 | return replied; | |
137 | }); | |
138 | } | |
139 | } dispatcher; | |
140 | void ping(const entity_inst_t& peer) { | |
141 | dispatcher.ping(msgr.get(), peer); | |
142 | } | |
143 | }; | |
144 | } // namespace native_pingpong | |
145 | ||
146 | static void ceph_echo(CephContext* cct, | |
147 | entity_addr_t addr, echo_role role, unsigned count) | |
148 | { | |
149 | std::cout << "ceph/"; | |
150 | entity_inst_t entity{entity_name_t::OSD(0), addr}; | |
151 | if (role == echo_role::as_server) { | |
152 | std::cout << "server listening at " << addr << std::endl; | |
153 | native_pingpong::Server server{cct, entity}; | |
154 | server.msgr->bind(addr); | |
155 | server.msgr->add_dispatcher_head(&server.dispatcher); | |
156 | server.msgr->start(); | |
157 | for (unsigned i = 0; i < count; i++) { | |
158 | server.echo(); | |
159 | } | |
160 | server.msgr->shutdown(); | |
161 | server.msgr->wait(); | |
162 | } else { | |
163 | std::cout << "client sending to " << addr << std::endl; | |
164 | native_pingpong::Client client{cct}; | |
165 | client.msgr->add_dispatcher_head(&client.dispatcher); | |
166 | client.msgr->start(); | |
167 | auto conn = client.msgr->connect_to(entity.name.type(), | |
168 | entity_addrvec_t{entity.addr}); | |
169 | for (unsigned i = 0; i < count; i++) { | |
170 | std::cout << "seq=" << i << std::endl; | |
171 | client.ping(entity); | |
172 | } | |
173 | client.msgr->shutdown(); | |
174 | client.msgr->wait(); | |
175 | } | |
176 | } | |
177 | ||
178 | int main(int argc, char** argv) | |
179 | { | |
180 | namespace po = boost::program_options; | |
181 | po::options_description desc{"Allowed options"}; | |
182 | desc.add_options() | |
183 | ("help,h", "show help message") | |
184 | ("role", po::value<std::string>()->default_value("pong"), | |
185 | "role to play (ping | pong)") | |
186 | ("port", po::value<uint16_t>()->default_value(9010), | |
187 | "port #") | |
188 | ("nonce", po::value<uint32_t>()->default_value(42), | |
189 | "a unique number to identify the pong server") | |
190 | ("count", po::value<unsigned>()->default_value(10), | |
191 | "stop after sending/echoing <count> MPing messages") | |
192 | ("v2", po::value<bool>()->default_value(false), | |
193 | "using msgr v2 protocol"); | |
194 | po::variables_map vm; | |
195 | std::vector<std::string> unrecognized_options; | |
196 | try { | |
197 | auto parsed = po::command_line_parser(argc, argv) | |
198 | .options(desc) | |
199 | .allow_unregistered() | |
200 | .run(); | |
201 | po::store(parsed, vm); | |
202 | if (vm.count("help")) { | |
203 | std::cout << desc << std::endl; | |
204 | return 0; | |
205 | } | |
206 | po::notify(vm); | |
207 | unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); | |
208 | } catch(const po::error& e) { | |
209 | std::cerr << "error: " << e.what() << std::endl; | |
210 | return 1; | |
211 | } | |
212 | ||
213 | entity_addr_t addr; | |
214 | if (vm["v2"].as<bool>()) { | |
215 | addr.set_type(entity_addr_t::TYPE_MSGR2); | |
216 | } else { | |
217 | addr.set_type(entity_addr_t::TYPE_LEGACY); | |
218 | } | |
219 | addr.set_family(AF_INET); | |
220 | addr.set_port(vm["port"].as<std::uint16_t>()); | |
221 | addr.set_nonce(vm["nonce"].as<std::uint32_t>()); | |
222 | ||
223 | echo_role role = echo_role::as_server; | |
224 | if (vm["role"].as<std::string>() == "ping") { | |
225 | role = echo_role::as_client; | |
226 | } | |
227 | ||
228 | auto count = vm["count"].as<unsigned>(); | |
229 | std::vector<const char*> args(argv, argv + argc); | |
230 | auto cct = global_init(nullptr, args, | |
231 | CEPH_ENTITY_TYPE_CLIENT, | |
232 | CODE_ENVIRONMENT_UTILITY, | |
233 | CINIT_FLAG_NO_MON_CONFIG); | |
234 | common_init_finish(cct.get()); | |
235 | ceph_echo(cct.get(), addr, role, count); | |
236 | } |