]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/crimson/test_async_echo.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / test / crimson / test_async_echo.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3 #include <boost/program_options/variables_map.hpp>
4 #include <boost/program_options/parsers.hpp>
5
6 #include "auth/Auth.h"
7 #include "global/global_init.h"
8 #include "messages/MPing.h"
9 #include "msg/Dispatcher.h"
10 #include "msg/Messenger.h"
11
12 #include "auth/DummyAuth.h"
13
14 enum class echo_role {
15 as_server,
16 as_client,
17 };
18
19 namespace native_pingpong {
20
21 constexpr int CEPH_OSD_PROTOCOL = 10;
22
23 struct Server {
24 Server(CephContext* cct, const entity_inst_t& entity)
25 : dummy_auth(cct), dispatcher(cct)
26 {
27 msgr.reset(Messenger::create(cct, "async",
28 entity.name, "pong", entity.addr.get_nonce(), 0));
29 dummy_auth.auth_registry.refresh_config();
30 msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
31 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
32 msgr->set_auth_client(&dummy_auth);
33 msgr->set_auth_server(&dummy_auth);
34 dispatcher.ms_set_require_authorizer(false);
35 }
36 DummyAuthClientServer dummy_auth;
37 unique_ptr<Messenger> msgr;
38 struct ServerDispatcher : Dispatcher {
39 std::mutex mutex;
40 std::condition_variable on_reply;
41 bool replied = false;
42 ServerDispatcher(CephContext* cct)
43 : Dispatcher(cct)
44 {}
45 bool ms_can_fast_dispatch_any() const override {
46 return true;
47 }
48 bool ms_can_fast_dispatch(const Message* m) const override {
49 return m->get_type() == CEPH_MSG_PING;
50 }
51 void ms_fast_dispatch(Message* m) override {
52 m->get_connection()->send_message(new MPing);
53 m->put();
54 {
55 std::lock_guard lock{mutex};
56 replied = true;
57 }
58 on_reply.notify_one();
59 }
60 bool ms_dispatch(Message*) override {
61 ceph_abort();
62 }
63 bool ms_handle_reset(Connection*) override {
64 return true;
65 }
66 void ms_handle_remote_reset(Connection*) override {
67 }
68 bool ms_handle_refused(Connection*) override {
69 return true;
70 }
71 void echo() {
72 replied = false;
73 std::unique_lock lock{mutex};
74 return on_reply.wait(lock, [this] { return replied; });
75 }
76 } dispatcher;
77 void echo() {
78 dispatcher.echo();
79 }
80 };
81
82 struct Client {
83 unique_ptr<Messenger> msgr;
84 Client(CephContext *cct)
85 : dummy_auth(cct), dispatcher(cct)
86 {
87 msgr.reset(Messenger::create(cct, "async",
88 entity_name_t::CLIENT(-1), "ping",
89 getpid(), 0));
90 dummy_auth.auth_registry.refresh_config();
91 msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
92 msgr->set_default_policy(Messenger::Policy::lossy_client(0));
93 msgr->set_auth_client(&dummy_auth);
94 msgr->set_auth_server(&dummy_auth);
95 dispatcher.ms_set_require_authorizer(false);
96 }
97 DummyAuthClientServer dummy_auth;
98 struct ClientDispatcher : Dispatcher {
99 std::mutex mutex;
100 std::condition_variable on_reply;
101 bool replied = false;
102
103 ClientDispatcher(CephContext* cct)
104 : Dispatcher(cct)
105 {}
106 bool ms_can_fast_dispatch_any() const override {
107 return true;
108 }
109 bool ms_can_fast_dispatch(const Message* m) const override {
110 return m->get_type() == CEPH_MSG_PING;
111 }
112 void ms_fast_dispatch(Message* m) override {
113 m->put();
114 {
115 std::lock_guard lock{mutex};
116 replied = true;
117 }
118 on_reply.notify_one();
119 }
120 bool ms_dispatch(Message*) override {
121 ceph_abort();
122 }
123 bool ms_handle_reset(Connection *) override {
124 return true;
125 }
126 void ms_handle_remote_reset(Connection*) override {
127 }
128 bool ms_handle_refused(Connection*) override {
129 return true;
130 }
131 bool ping(Messenger* msgr, const entity_inst_t& peer) {
132 auto conn = msgr->connect_to(peer.name.type(),
133 entity_addrvec_t{peer.addr});
134 replied = false;
135 conn->send_message(new MPing);
136 std::unique_lock lock{mutex};
137 return on_reply.wait_for(lock, 500ms, [&] {
138 return replied;
139 });
140 }
141 } dispatcher;
142 void ping(const entity_inst_t& peer) {
143 dispatcher.ping(msgr.get(), peer);
144 }
145 };
146 } // namespace native_pingpong
147
148 static void ceph_echo(CephContext* cct,
149 entity_addr_t addr, echo_role role, unsigned count)
150 {
151 std::cout << "ceph/";
152 entity_inst_t entity{entity_name_t::OSD(0), addr};
153 if (role == echo_role::as_server) {
154 std::cout << "server listening at " << addr << std::endl;
155 native_pingpong::Server server{cct, entity};
156 server.msgr->bind(addr);
157 server.msgr->add_dispatcher_head(&server.dispatcher);
158 server.msgr->start();
159 for (unsigned i = 0; i < count; i++) {
160 server.echo();
161 }
162 server.msgr->shutdown();
163 server.msgr->wait();
164 } else {
165 std::cout << "client sending to " << addr << std::endl;
166 native_pingpong::Client client{cct};
167 client.msgr->add_dispatcher_head(&client.dispatcher);
168 client.msgr->start();
169 auto conn = client.msgr->connect_to(entity.name.type(),
170 entity_addrvec_t{entity.addr});
171 for (unsigned i = 0; i < count; i++) {
172 std::cout << "seq=" << i << std::endl;
173 client.ping(entity);
174 }
175 client.msgr->shutdown();
176 client.msgr->wait();
177 }
178 }
179
180 int main(int argc, char** argv)
181 {
182 namespace po = boost::program_options;
183 po::options_description desc{"Allowed options"};
184 desc.add_options()
185 ("help,h", "show help message")
186 ("role", po::value<std::string>()->default_value("pong"),
187 "role to play (ping | pong)")
188 ("port", po::value<uint16_t>()->default_value(9010),
189 "port #")
190 ("nonce", po::value<uint32_t>()->default_value(42),
191 "a unique number to identify the pong server")
192 ("count", po::value<unsigned>()->default_value(10),
193 "stop after sending/echoing <count> MPing messages")
194 ("v2", po::value<bool>()->default_value(false),
195 "using msgr v2 protocol");
196 po::variables_map vm;
197 std::vector<std::string> unrecognized_options;
198 try {
199 auto parsed = po::command_line_parser(argc, argv)
200 .options(desc)
201 .allow_unregistered()
202 .run();
203 po::store(parsed, vm);
204 if (vm.count("help")) {
205 std::cout << desc << std::endl;
206 return 0;
207 }
208 po::notify(vm);
209 unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
210 } catch(const po::error& e) {
211 std::cerr << "error: " << e.what() << std::endl;
212 return 1;
213 }
214
215 entity_addr_t addr;
216 if (vm["v2"].as<bool>()) {
217 addr.set_type(entity_addr_t::TYPE_MSGR2);
218 } else {
219 addr.set_type(entity_addr_t::TYPE_LEGACY);
220 }
221 addr.set_family(AF_INET);
222 addr.set_port(vm["port"].as<std::uint16_t>());
223 addr.set_nonce(vm["nonce"].as<std::uint32_t>());
224
225 echo_role role = echo_role::as_server;
226 if (vm["role"].as<std::string>() == "ping") {
227 role = echo_role::as_client;
228 }
229
230 auto count = vm["count"].as<unsigned>();
231 std::vector<const char*> args(argv, argv + argc);
232 auto cct = global_init(nullptr, args,
233 CEPH_ENTITY_TYPE_CLIENT,
234 CODE_ENVIRONMENT_UTILITY,
235 CINIT_FLAG_NO_MON_CONFIG);
236 common_init_finish(cct.get());
237 ceph_echo(cct.get(), addr, role, count);
238 }