]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/crimson/perf_async_msgr.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / tools / crimson / perf_async_msgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3 #include <boost/program_options/variables_map.hpp>
4 #include <boost/program_options/parsers.hpp>
5
6 #include "auth/Auth.h"
7 #include "global/global_init.h"
8 #include "msg/Dispatcher.h"
9 #include "msg/Messenger.h"
10 #include "messages/MOSDOp.h"
11
12 #include "auth/DummyAuth.h"
13
14 namespace {
15
16 constexpr int CEPH_OSD_PROTOCOL = 10;
17
18 struct Server {
19 Server(CephContext* cct, unsigned msg_len)
20 : dummy_auth(cct), dispatcher(cct, msg_len)
21 {
22 msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0));
23 dummy_auth.auth_registry.refresh_config();
24 msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
25 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
26 msgr->set_auth_client(&dummy_auth);
27 msgr->set_auth_server(&dummy_auth);
28 msgr->set_require_authorizer(false);
29 }
30 DummyAuthClientServer dummy_auth;
31 std::unique_ptr<Messenger> msgr;
32 struct ServerDispatcher : Dispatcher {
33 unsigned msg_len = 0;
34 bufferlist msg_data;
35
36 ServerDispatcher(CephContext* cct, unsigned msg_len)
37 : Dispatcher(cct), msg_len(msg_len)
38 {
39 msg_data.append_zero(msg_len);
40 }
41 bool ms_can_fast_dispatch_any() const override {
42 return true;
43 }
44 bool ms_can_fast_dispatch(const Message* m) const override {
45 return m->get_type() == CEPH_MSG_OSD_OP;
46 }
47 void ms_fast_dispatch(Message* m) override {
48 ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
49 const static pg_t pgid;
50 const static object_locator_t oloc;
51 const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
52 pgid.pool(), oloc.nspace);
53 static spg_t spgid(pgid);
54 MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
55 bufferlist data(msg_data);
56 rep->write(0, msg_len, data);
57 rep->set_tid(m->get_tid());
58 m->get_connection()->send_message(rep);
59 m->put();
60 }
61 bool ms_dispatch(Message*) override {
62 ceph_abort();
63 }
64 bool ms_handle_reset(Connection*) override {
65 return true;
66 }
67 void ms_handle_remote_reset(Connection*) override {
68 }
69 bool ms_handle_refused(Connection*) override {
70 return true;
71 }
72 } dispatcher;
73 };
74
75 }
76
77 static void run(CephContext* cct, entity_addr_t addr, unsigned bs)
78 {
79 std::cout << "async server listening at " << addr << std::endl;
80 Server server{cct, bs};
81 server.msgr->bind(addr);
82 server.msgr->add_dispatcher_head(&server.dispatcher);
83 server.msgr->start();
84 server.msgr->wait();
85 }
86
87 int main(int argc, char** argv)
88 {
89 namespace po = boost::program_options;
90 po::options_description desc{"Allowed options"};
91 desc.add_options()
92 ("help,h", "show help message")
93 ("addr", po::value<std::string>()->default_value("v1:127.0.0.1:9010"),
94 "server address")
95 ("bs", po::value<unsigned>()->default_value(0),
96 "server block size")
97 ("v1-crc-enabled", po::value<bool>()->default_value(false),
98 "enable v1 CRC checks");
99 po::variables_map vm;
100 std::vector<std::string> unrecognized_options;
101 try {
102 auto parsed = po::command_line_parser(argc, argv)
103 .options(desc)
104 .allow_unregistered()
105 .run();
106 po::store(parsed, vm);
107 if (vm.count("help")) {
108 std::cout << desc << std::endl;
109 return 0;
110 }
111 po::notify(vm);
112 unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
113 } catch(const po::error& e) {
114 std::cerr << "error: " << e.what() << std::endl;
115 return 1;
116 }
117
118 auto addr = vm["addr"].as<std::string>();
119 entity_addr_t target_addr;
120 target_addr.parse(addr.c_str(), nullptr);
121 auto bs = vm["bs"].as<unsigned>();
122 auto v1_crc_enabled = vm["v1-crc-enabled"].as<bool>();
123
124 std::vector<const char*> args(argv, argv + argc);
125 auto cct = global_init(nullptr, args,
126 CEPH_ENTITY_TYPE_CLIENT,
127 CODE_ENVIRONMENT_UTILITY,
128 CINIT_FLAG_NO_MON_CONFIG);
129 common_init_finish(cct.get());
130
131 if (v1_crc_enabled) {
132 cct->_conf.set_val("ms_crc_header", "true");
133 cct->_conf.set_val("ms_crc_data", "true");
134 } else {
135 cct->_conf.set_val("ms_crc_header", "false");
136 cct->_conf.set_val("ms_crc_data", "false");
137 }
138
139 run(cct.get(), target_addr, bs);
140 }