]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/crimson/test_async_echo.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / test / crimson / test_async_echo.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3 #include <boost/program_options/variables_map.hpp>
4 #include <boost/program_options/parsers.hpp>
5
6 #include "auth/Auth.h"
7 #include "global/global_init.h"
8 #include "messages/MPing.h"
9 #include "msg/Dispatcher.h"
10 #include "msg/Messenger.h"
11
12 #include "auth/DummyAuth.h"
13
14 enum class echo_role {
15 as_server,
16 as_client,
17 };
18
19 namespace native_pingpong {
20
21 constexpr int CEPH_OSD_PROTOCOL = 10;
22
23 struct Server {
24 Server(CephContext* cct, const entity_inst_t& entity)
25 : dummy_auth(cct), dispatcher(cct)
26 {
27 msgr.reset(Messenger::create(cct, "async", entity.name, "pong", entity.addr.get_nonce()));
28 dummy_auth.auth_registry.refresh_config();
29 msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
30 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
31 msgr->set_auth_client(&dummy_auth);
32 msgr->set_auth_server(&dummy_auth);
33 msgr->set_require_authorizer(false);
34 }
35 DummyAuthClientServer dummy_auth;
36 unique_ptr<Messenger> msgr;
37 struct ServerDispatcher : Dispatcher {
38 std::mutex mutex;
39 std::condition_variable on_reply;
40 bool replied = false;
41 ServerDispatcher(CephContext* cct)
42 : Dispatcher(cct)
43 {}
44 bool ms_can_fast_dispatch_any() const override {
45 return true;
46 }
47 bool ms_can_fast_dispatch(const Message* m) const override {
48 return m->get_type() == CEPH_MSG_PING;
49 }
50 void ms_fast_dispatch(Message* m) override {
51 m->get_connection()->send_message(new MPing);
52 m->put();
53 {
54 std::lock_guard lock{mutex};
55 replied = true;
56 }
57 on_reply.notify_one();
58 }
59 bool ms_dispatch(Message*) override {
60 ceph_abort();
61 }
62 bool ms_handle_reset(Connection*) override {
63 return true;
64 }
65 void ms_handle_remote_reset(Connection*) override {
66 }
67 bool ms_handle_refused(Connection*) override {
68 return true;
69 }
70 void echo() {
71 replied = false;
72 std::unique_lock lock{mutex};
73 return on_reply.wait(lock, [this] { return replied; });
74 }
75 } dispatcher;
76 void echo() {
77 dispatcher.echo();
78 }
79 };
80
81 struct Client {
82 unique_ptr<Messenger> msgr;
83 Client(CephContext *cct)
84 : dummy_auth(cct), dispatcher(cct)
85 {
86 msgr.reset(Messenger::create(cct, "async", entity_name_t::CLIENT(-1), "ping", getpid()));
87 dummy_auth.auth_registry.refresh_config();
88 msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
89 msgr->set_default_policy(Messenger::Policy::lossy_client(0));
90 msgr->set_auth_client(&dummy_auth);
91 msgr->set_auth_server(&dummy_auth);
92 msgr->set_require_authorizer(false);
93 }
94 DummyAuthClientServer dummy_auth;
95 struct ClientDispatcher : Dispatcher {
96 std::mutex mutex;
97 std::condition_variable on_reply;
98 bool replied = false;
99
100 ClientDispatcher(CephContext* cct)
101 : Dispatcher(cct)
102 {}
103 bool ms_can_fast_dispatch_any() const override {
104 return true;
105 }
106 bool ms_can_fast_dispatch(const Message* m) const override {
107 return m->get_type() == CEPH_MSG_PING;
108 }
109 void ms_fast_dispatch(Message* m) override {
110 m->put();
111 {
112 std::lock_guard lock{mutex};
113 replied = true;
114 }
115 on_reply.notify_one();
116 }
117 bool ms_dispatch(Message*) override {
118 ceph_abort();
119 }
120 bool ms_handle_reset(Connection *) override {
121 return true;
122 }
123 void ms_handle_remote_reset(Connection*) override {
124 }
125 bool ms_handle_refused(Connection*) override {
126 return true;
127 }
128 bool ping(Messenger* msgr, const entity_inst_t& peer) {
129 auto conn = msgr->connect_to(peer.name.type(),
130 entity_addrvec_t{peer.addr});
131 replied = false;
132 conn->send_message(new MPing);
133 std::unique_lock lock{mutex};
134 return on_reply.wait_for(lock, 500ms, [&] {
135 return replied;
136 });
137 }
138 } dispatcher;
139 void ping(const entity_inst_t& peer) {
140 dispatcher.ping(msgr.get(), peer);
141 }
142 };
143 } // namespace native_pingpong
144
145 static void ceph_echo(CephContext* cct,
146 entity_addr_t addr, echo_role role, unsigned count)
147 {
148 std::cout << "ceph/";
149 entity_inst_t entity{entity_name_t::OSD(0), addr};
150 if (role == echo_role::as_server) {
151 std::cout << "server listening at " << addr << std::endl;
152 native_pingpong::Server server{cct, entity};
153 server.msgr->bind(addr);
154 server.msgr->add_dispatcher_head(&server.dispatcher);
155 server.msgr->start();
156 for (unsigned i = 0; i < count; i++) {
157 server.echo();
158 }
159 server.msgr->shutdown();
160 server.msgr->wait();
161 } else {
162 std::cout << "client sending to " << addr << std::endl;
163 native_pingpong::Client client{cct};
164 client.msgr->add_dispatcher_head(&client.dispatcher);
165 client.msgr->start();
166 auto conn = client.msgr->connect_to(entity.name.type(),
167 entity_addrvec_t{entity.addr});
168 for (unsigned i = 0; i < count; i++) {
169 std::cout << "seq=" << i << std::endl;
170 client.ping(entity);
171 }
172 client.msgr->shutdown();
173 client.msgr->wait();
174 }
175 }
176
177 int main(int argc, char** argv)
178 {
179 namespace po = boost::program_options;
180 po::options_description desc{"Allowed options"};
181 desc.add_options()
182 ("help,h", "show help message")
183 ("role", po::value<std::string>()->default_value("pong"),
184 "role to play (ping | pong)")
185 ("port", po::value<uint16_t>()->default_value(9010),
186 "port #")
187 ("nonce", po::value<uint32_t>()->default_value(42),
188 "a unique number to identify the pong server")
189 ("count", po::value<unsigned>()->default_value(10),
190 "stop after sending/echoing <count> MPing messages")
191 ("v2", po::value<bool>()->default_value(false),
192 "using msgr v2 protocol");
193 po::variables_map vm;
194 std::vector<std::string> unrecognized_options;
195 try {
196 auto parsed = po::command_line_parser(argc, argv)
197 .options(desc)
198 .allow_unregistered()
199 .run();
200 po::store(parsed, vm);
201 if (vm.count("help")) {
202 std::cout << desc << std::endl;
203 return 0;
204 }
205 po::notify(vm);
206 unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
207 } catch(const po::error& e) {
208 std::cerr << "error: " << e.what() << std::endl;
209 return 1;
210 }
211
212 entity_addr_t addr;
213 if (vm["v2"].as<bool>()) {
214 addr.set_type(entity_addr_t::TYPE_MSGR2);
215 } else {
216 addr.set_type(entity_addr_t::TYPE_LEGACY);
217 }
218 addr.set_family(AF_INET);
219 addr.set_port(vm["port"].as<std::uint16_t>());
220 addr.set_nonce(vm["nonce"].as<std::uint32_t>());
221
222 echo_role role = echo_role::as_server;
223 if (vm["role"].as<std::string>() == "ping") {
224 role = echo_role::as_client;
225 }
226
227 auto count = vm["count"].as<unsigned>();
228 std::vector<const char*> args(argv, argv + argc);
229 auto cct = global_init(nullptr, args,
230 CEPH_ENTITY_TYPE_CLIENT,
231 CODE_ENVIRONMENT_UTILITY,
232 CINIT_FLAG_NO_MON_CONFIG);
233 common_init_finish(cct.get());
234 ceph_echo(cct.get(), addr, role, count);
235 }