]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/crimson/test_async_echo.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / test / crimson / test_async_echo.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3 #include <boost/program_options/variables_map.hpp>
4 #include <boost/program_options/parsers.hpp>
5
6 #include "auth/Auth.h"
7 #include "global/global_init.h"
8 #include "messages/MPing.h"
9 #include "msg/Dispatcher.h"
10 #include "msg/Messenger.h"
11
12 #include "auth/DummyAuth.h"
13
14 enum class echo_role {
15 as_server,
16 as_client,
17 };
18
19 namespace native_pingpong {
20
21 constexpr int CEPH_OSD_PROTOCOL = 10;
22
23 struct Server {
24 Server(CephContext* cct, const entity_inst_t& entity)
25 : dummy_auth(cct), dispatcher(cct)
26 {
27 msgr.reset(Messenger::create(cct, "async", entity.name, "pong", entity.addr.get_nonce()));
28 dummy_auth.auth_registry.refresh_config();
29 msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
30 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
31 msgr->set_auth_client(&dummy_auth);
32 msgr->set_auth_server(&dummy_auth);
33 }
34 DummyAuthClientServer dummy_auth;
35 std::unique_ptr<Messenger> msgr;
36 struct ServerDispatcher : Dispatcher {
37 std::mutex mutex;
38 std::condition_variable on_reply;
39 bool replied = false;
40 ServerDispatcher(CephContext* cct)
41 : Dispatcher(cct)
42 {}
43 bool ms_can_fast_dispatch_any() const override {
44 return true;
45 }
46 bool ms_can_fast_dispatch(const Message* m) const override {
47 return m->get_type() == CEPH_MSG_PING;
48 }
49 void ms_fast_dispatch(Message* m) override {
50 m->get_connection()->send_message(new MPing);
51 m->put();
52 {
53 std::lock_guard lock{mutex};
54 replied = true;
55 }
56 on_reply.notify_one();
57 }
58 bool ms_dispatch(Message*) override {
59 ceph_abort();
60 }
61 bool ms_handle_reset(Connection*) override {
62 return true;
63 }
64 void ms_handle_remote_reset(Connection*) override {
65 }
66 bool ms_handle_refused(Connection*) override {
67 return true;
68 }
69 void echo() {
70 replied = false;
71 std::unique_lock lock{mutex};
72 return on_reply.wait(lock, [this] { return replied; });
73 }
74 } dispatcher;
75 void echo() {
76 dispatcher.echo();
77 }
78 };
79
80 struct Client {
81 std::unique_ptr<Messenger> msgr;
82 Client(CephContext *cct)
83 : dummy_auth(cct), dispatcher(cct)
84 {
85 msgr.reset(Messenger::create(cct, "async", entity_name_t::CLIENT(-1), "ping", getpid()));
86 dummy_auth.auth_registry.refresh_config();
87 msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
88 msgr->set_default_policy(Messenger::Policy::lossy_client(0));
89 msgr->set_auth_client(&dummy_auth);
90 msgr->set_auth_server(&dummy_auth);
91 }
92 DummyAuthClientServer dummy_auth;
93 struct ClientDispatcher : Dispatcher {
94 std::mutex mutex;
95 std::condition_variable on_reply;
96 bool replied = false;
97
98 ClientDispatcher(CephContext* cct)
99 : Dispatcher(cct)
100 {}
101 bool ms_can_fast_dispatch_any() const override {
102 return true;
103 }
104 bool ms_can_fast_dispatch(const Message* m) const override {
105 return m->get_type() == CEPH_MSG_PING;
106 }
107 void ms_fast_dispatch(Message* m) override {
108 m->put();
109 {
110 std::lock_guard lock{mutex};
111 replied = true;
112 }
113 on_reply.notify_one();
114 }
115 bool ms_dispatch(Message*) override {
116 ceph_abort();
117 }
118 bool ms_handle_reset(Connection *) override {
119 return true;
120 }
121 void ms_handle_remote_reset(Connection*) override {
122 }
123 bool ms_handle_refused(Connection*) override {
124 return true;
125 }
126 bool ping(Messenger* msgr, const entity_inst_t& peer) {
127 using namespace std::chrono_literals;
128 auto conn = msgr->connect_to(peer.name.type(),
129 entity_addrvec_t{peer.addr});
130 replied = false;
131 conn->send_message(new MPing);
132 std::unique_lock lock{mutex};
133 return on_reply.wait_for(lock, 500ms, [&] {
134 return replied;
135 });
136 }
137 } dispatcher;
138 void ping(const entity_inst_t& peer) {
139 dispatcher.ping(msgr.get(), peer);
140 }
141 };
142 } // namespace native_pingpong
143
144 static void ceph_echo(CephContext* cct,
145 entity_addr_t addr, echo_role role, unsigned count)
146 {
147 std::cout << "ceph/";
148 entity_inst_t entity{entity_name_t::OSD(0), addr};
149 if (role == echo_role::as_server) {
150 std::cout << "server listening at " << addr << std::endl;
151 native_pingpong::Server server{cct, entity};
152 server.msgr->bind(addr);
153 server.msgr->add_dispatcher_head(&server.dispatcher);
154 server.msgr->start();
155 for (unsigned i = 0; i < count; i++) {
156 server.echo();
157 }
158 server.msgr->shutdown();
159 server.msgr->wait();
160 } else {
161 std::cout << "client sending to " << addr << std::endl;
162 native_pingpong::Client client{cct};
163 client.msgr->add_dispatcher_head(&client.dispatcher);
164 client.msgr->start();
165 auto conn = client.msgr->connect_to(entity.name.type(),
166 entity_addrvec_t{entity.addr});
167 for (unsigned i = 0; i < count; i++) {
168 std::cout << "seq=" << i << std::endl;
169 client.ping(entity);
170 }
171 client.msgr->shutdown();
172 client.msgr->wait();
173 }
174 }
175
176 int main(int argc, char** argv)
177 {
178 namespace po = boost::program_options;
179 po::options_description desc{"Allowed options"};
180 desc.add_options()
181 ("help,h", "show help message")
182 ("role", po::value<std::string>()->default_value("pong"),
183 "role to play (ping | pong)")
184 ("port", po::value<uint16_t>()->default_value(9010),
185 "port #")
186 ("nonce", po::value<uint32_t>()->default_value(42),
187 "a unique number to identify the pong server")
188 ("count", po::value<unsigned>()->default_value(10),
189 "stop after sending/echoing <count> MPing messages")
190 ("v2", po::value<bool>()->default_value(false),
191 "using msgr v2 protocol");
192 po::variables_map vm;
193 std::vector<std::string> unrecognized_options;
194 try {
195 auto parsed = po::command_line_parser(argc, argv)
196 .options(desc)
197 .allow_unregistered()
198 .run();
199 po::store(parsed, vm);
200 if (vm.count("help")) {
201 std::cout << desc << std::endl;
202 return 0;
203 }
204 po::notify(vm);
205 unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
206 } catch(const po::error& e) {
207 std::cerr << "error: " << e.what() << std::endl;
208 return 1;
209 }
210
211 entity_addr_t addr;
212 if (vm["v2"].as<bool>()) {
213 addr.set_type(entity_addr_t::TYPE_MSGR2);
214 } else {
215 addr.set_type(entity_addr_t::TYPE_LEGACY);
216 }
217 addr.set_family(AF_INET);
218 addr.set_port(vm["port"].as<std::uint16_t>());
219 addr.set_nonce(vm["nonce"].as<std::uint32_t>());
220
221 echo_role role = echo_role::as_server;
222 if (vm["role"].as<std::string>() == "ping") {
223 role = echo_role::as_client;
224 }
225
226 auto count = vm["count"].as<unsigned>();
227 std::vector<const char*> args(argv, argv + argc);
228 auto cct = global_init(nullptr, args,
229 CEPH_ENTITY_TYPE_CLIENT,
230 CODE_ENVIRONMENT_UTILITY,
231 CINIT_FLAG_NO_MON_CONFIG);
232 common_init_finish(cct.get());
233 ceph_echo(cct.get(), addr, role, count);
234 }