]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/crimson/test_async_echo.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / test / crimson / test_async_echo.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3#include <boost/program_options/variables_map.hpp>
4#include <boost/program_options/parsers.hpp>
5
6#include "auth/Auth.h"
7#include "global/global_init.h"
8#include "messages/MPing.h"
9#include "msg/Dispatcher.h"
10#include "msg/Messenger.h"
11
12#include "auth/DummyAuth.h"
13
14enum class echo_role {
15 as_server,
16 as_client,
17};
18
19namespace native_pingpong {
20
21constexpr int CEPH_OSD_PROTOCOL = 10;
22
23struct Server {
24 Server(CephContext* cct, const entity_inst_t& entity)
25 : dummy_auth(cct), dispatcher(cct)
26 {
f67539c2 27 msgr.reset(Messenger::create(cct, "async", entity.name, "pong", entity.addr.get_nonce()));
11fdf7f2
TL
28 dummy_auth.auth_registry.refresh_config();
29 msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
30 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
31 msgr->set_auth_client(&dummy_auth);
32 msgr->set_auth_server(&dummy_auth);
9f95a23c 33 msgr->set_require_authorizer(false);
11fdf7f2
TL
34 }
35 DummyAuthClientServer dummy_auth;
20effc67 36 std::unique_ptr<Messenger> msgr;
11fdf7f2
TL
37 struct ServerDispatcher : Dispatcher {
38 std::mutex mutex;
39 std::condition_variable on_reply;
40 bool replied = false;
41 ServerDispatcher(CephContext* cct)
42 : Dispatcher(cct)
43 {}
44 bool ms_can_fast_dispatch_any() const override {
45 return true;
46 }
47 bool ms_can_fast_dispatch(const Message* m) const override {
48 return m->get_type() == CEPH_MSG_PING;
49 }
50 void ms_fast_dispatch(Message* m) override {
51 m->get_connection()->send_message(new MPing);
52 m->put();
53 {
54 std::lock_guard lock{mutex};
55 replied = true;
56 }
57 on_reply.notify_one();
58 }
59 bool ms_dispatch(Message*) override {
60 ceph_abort();
61 }
62 bool ms_handle_reset(Connection*) override {
63 return true;
64 }
65 void ms_handle_remote_reset(Connection*) override {
66 }
67 bool ms_handle_refused(Connection*) override {
68 return true;
69 }
70 void echo() {
71 replied = false;
72 std::unique_lock lock{mutex};
73 return on_reply.wait(lock, [this] { return replied; });
74 }
75 } dispatcher;
76 void echo() {
77 dispatcher.echo();
78 }
79};
80
81struct Client {
20effc67 82 std::unique_ptr<Messenger> msgr;
11fdf7f2
TL
83 Client(CephContext *cct)
84 : dummy_auth(cct), dispatcher(cct)
85 {
f67539c2 86 msgr.reset(Messenger::create(cct, "async", entity_name_t::CLIENT(-1), "ping", getpid()));
11fdf7f2
TL
87 dummy_auth.auth_registry.refresh_config();
88 msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
89 msgr->set_default_policy(Messenger::Policy::lossy_client(0));
90 msgr->set_auth_client(&dummy_auth);
91 msgr->set_auth_server(&dummy_auth);
9f95a23c 92 msgr->set_require_authorizer(false);
11fdf7f2
TL
93 }
94 DummyAuthClientServer dummy_auth;
95 struct ClientDispatcher : Dispatcher {
96 std::mutex mutex;
97 std::condition_variable on_reply;
98 bool replied = false;
99
100 ClientDispatcher(CephContext* cct)
101 : Dispatcher(cct)
102 {}
103 bool ms_can_fast_dispatch_any() const override {
104 return true;
105 }
106 bool ms_can_fast_dispatch(const Message* m) const override {
107 return m->get_type() == CEPH_MSG_PING;
108 }
109 void ms_fast_dispatch(Message* m) override {
110 m->put();
111 {
112 std::lock_guard lock{mutex};
113 replied = true;
114 }
115 on_reply.notify_one();
116 }
117 bool ms_dispatch(Message*) override {
118 ceph_abort();
119 }
120 bool ms_handle_reset(Connection *) override {
121 return true;
122 }
123 void ms_handle_remote_reset(Connection*) override {
124 }
125 bool ms_handle_refused(Connection*) override {
126 return true;
127 }
128 bool ping(Messenger* msgr, const entity_inst_t& peer) {
20effc67 129 using namespace std::chrono_literals;
11fdf7f2
TL
130 auto conn = msgr->connect_to(peer.name.type(),
131 entity_addrvec_t{peer.addr});
132 replied = false;
133 conn->send_message(new MPing);
134 std::unique_lock lock{mutex};
135 return on_reply.wait_for(lock, 500ms, [&] {
136 return replied;
137 });
138 }
139 } dispatcher;
140 void ping(const entity_inst_t& peer) {
141 dispatcher.ping(msgr.get(), peer);
142 }
143};
144} // namespace native_pingpong
145
146static void ceph_echo(CephContext* cct,
147 entity_addr_t addr, echo_role role, unsigned count)
148{
149 std::cout << "ceph/";
150 entity_inst_t entity{entity_name_t::OSD(0), addr};
151 if (role == echo_role::as_server) {
152 std::cout << "server listening at " << addr << std::endl;
153 native_pingpong::Server server{cct, entity};
154 server.msgr->bind(addr);
155 server.msgr->add_dispatcher_head(&server.dispatcher);
156 server.msgr->start();
157 for (unsigned i = 0; i < count; i++) {
158 server.echo();
159 }
160 server.msgr->shutdown();
161 server.msgr->wait();
162 } else {
163 std::cout << "client sending to " << addr << std::endl;
164 native_pingpong::Client client{cct};
165 client.msgr->add_dispatcher_head(&client.dispatcher);
166 client.msgr->start();
167 auto conn = client.msgr->connect_to(entity.name.type(),
168 entity_addrvec_t{entity.addr});
169 for (unsigned i = 0; i < count; i++) {
170 std::cout << "seq=" << i << std::endl;
171 client.ping(entity);
172 }
173 client.msgr->shutdown();
174 client.msgr->wait();
175 }
176}
177
178int main(int argc, char** argv)
179{
180 namespace po = boost::program_options;
181 po::options_description desc{"Allowed options"};
182 desc.add_options()
183 ("help,h", "show help message")
184 ("role", po::value<std::string>()->default_value("pong"),
185 "role to play (ping | pong)")
186 ("port", po::value<uint16_t>()->default_value(9010),
187 "port #")
188 ("nonce", po::value<uint32_t>()->default_value(42),
189 "a unique number to identify the pong server")
190 ("count", po::value<unsigned>()->default_value(10),
191 "stop after sending/echoing <count> MPing messages")
192 ("v2", po::value<bool>()->default_value(false),
193 "using msgr v2 protocol");
194 po::variables_map vm;
195 std::vector<std::string> unrecognized_options;
196 try {
197 auto parsed = po::command_line_parser(argc, argv)
198 .options(desc)
199 .allow_unregistered()
200 .run();
201 po::store(parsed, vm);
202 if (vm.count("help")) {
203 std::cout << desc << std::endl;
204 return 0;
205 }
206 po::notify(vm);
207 unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
208 } catch(const po::error& e) {
209 std::cerr << "error: " << e.what() << std::endl;
210 return 1;
211 }
212
213 entity_addr_t addr;
214 if (vm["v2"].as<bool>()) {
215 addr.set_type(entity_addr_t::TYPE_MSGR2);
216 } else {
217 addr.set_type(entity_addr_t::TYPE_LEGACY);
218 }
219 addr.set_family(AF_INET);
220 addr.set_port(vm["port"].as<std::uint16_t>());
221 addr.set_nonce(vm["nonce"].as<std::uint32_t>());
222
223 echo_role role = echo_role::as_server;
224 if (vm["role"].as<std::string>() == "ping") {
225 role = echo_role::as_client;
226 }
227
228 auto count = vm["count"].as<unsigned>();
229 std::vector<const char*> args(argv, argv + argc);
230 auto cct = global_init(nullptr, args,
231 CEPH_ENTITY_TYPE_CLIENT,
232 CODE_ENVIRONMENT_UTILITY,
233 CINIT_FLAG_NO_MON_CONFIG);
234 common_init_finish(cct.get());
235 ceph_echo(cct.get(), addr, role, count);
236}