]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/tools/perf_async_msgr.cc
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / crimson / tools / perf_async_msgr.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3#include <boost/program_options/variables_map.hpp>
4#include <boost/program_options/parsers.hpp>
5
6#include "auth/Auth.h"
7#include "global/global_init.h"
8#include "msg/Dispatcher.h"
9#include "msg/Messenger.h"
10#include "messages/MOSDOp.h"
11
12#include "auth/DummyAuth.h"
13
14namespace {
15
16constexpr int CEPH_OSD_PROTOCOL = 10;
17
18struct Server {
19 Server(CephContext* cct, unsigned msg_len)
20 : dummy_auth(cct), dispatcher(cct, msg_len)
21 {
f67539c2 22 msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0));
9f95a23c
TL
23 dummy_auth.auth_registry.refresh_config();
24 msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
25 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
26 msgr->set_auth_client(&dummy_auth);
27 msgr->set_auth_server(&dummy_auth);
9f95a23c
TL
28 }
29 DummyAuthClientServer dummy_auth;
20effc67 30 std::unique_ptr<Messenger> msgr;
9f95a23c
TL
31 struct ServerDispatcher : Dispatcher {
32 unsigned msg_len = 0;
33 bufferlist msg_data;
34
35 ServerDispatcher(CephContext* cct, unsigned msg_len)
36 : Dispatcher(cct), msg_len(msg_len)
37 {
38 msg_data.append_zero(msg_len);
39 }
40 bool ms_can_fast_dispatch_any() const override {
41 return true;
42 }
43 bool ms_can_fast_dispatch(const Message* m) const override {
44 return m->get_type() == CEPH_MSG_OSD_OP;
45 }
46 void ms_fast_dispatch(Message* m) override {
47 ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
48 const static pg_t pgid;
49 const static object_locator_t oloc;
50 const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
51 pgid.pool(), oloc.nspace);
52 static spg_t spgid(pgid);
53 MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
54 bufferlist data(msg_data);
55 rep->write(0, msg_len, data);
56 rep->set_tid(m->get_tid());
57 m->get_connection()->send_message(rep);
58 m->put();
59 }
60 bool ms_dispatch(Message*) override {
61 ceph_abort();
62 }
63 bool ms_handle_reset(Connection*) override {
64 return true;
65 }
66 void ms_handle_remote_reset(Connection*) override {
67 }
68 bool ms_handle_refused(Connection*) override {
69 return true;
70 }
71 } dispatcher;
72};
73
74}
75
76static void run(CephContext* cct, entity_addr_t addr, unsigned bs)
77{
78 std::cout << "async server listening at " << addr << std::endl;
79 Server server{cct, bs};
80 server.msgr->bind(addr);
81 server.msgr->add_dispatcher_head(&server.dispatcher);
82 server.msgr->start();
83 server.msgr->wait();
84}
85
86int main(int argc, char** argv)
87{
88 namespace po = boost::program_options;
89 po::options_description desc{"Allowed options"};
90 desc.add_options()
91 ("help,h", "show help message")
1e59de90
TL
92 ("addr", po::value<std::string>()->default_value("v2:127.0.0.1:9010"),
93 "server address(crimson only supports msgr v2 protocol)")
9f95a23c
TL
94 ("bs", po::value<unsigned>()->default_value(0),
95 "server block size")
1e59de90 96 ("crc-enabled", po::value<bool>()->default_value(false),
aee94f69
TL
97 "enable CRC checks")
98 ("threads", po::value<unsigned>()->default_value(3),
99 "async messenger worker threads");
9f95a23c
TL
100 po::variables_map vm;
101 std::vector<std::string> unrecognized_options;
102 try {
103 auto parsed = po::command_line_parser(argc, argv)
104 .options(desc)
105 .allow_unregistered()
106 .run();
107 po::store(parsed, vm);
108 if (vm.count("help")) {
109 std::cout << desc << std::endl;
110 return 0;
111 }
112 po::notify(vm);
113 unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
114 } catch(const po::error& e) {
115 std::cerr << "error: " << e.what() << std::endl;
116 return 1;
117 }
118
119 auto addr = vm["addr"].as<std::string>();
120 entity_addr_t target_addr;
121 target_addr.parse(addr.c_str(), nullptr);
1e59de90 122 ceph_assert_always(target_addr.is_msgr2());
9f95a23c 123 auto bs = vm["bs"].as<unsigned>();
1e59de90 124 auto crc_enabled = vm["crc-enabled"].as<bool>();
aee94f69 125 auto worker_threads = vm["threads"].as<unsigned>();
9f95a23c
TL
126
127 std::vector<const char*> args(argv, argv + argc);
128 auto cct = global_init(nullptr, args,
129 CEPH_ENTITY_TYPE_CLIENT,
130 CODE_ENVIRONMENT_UTILITY,
131 CINIT_FLAG_NO_MON_CONFIG);
132 common_init_finish(cct.get());
133
1e59de90 134 if (crc_enabled) {
9f95a23c
TL
135 cct->_conf.set_val("ms_crc_header", "true");
136 cct->_conf.set_val("ms_crc_data", "true");
137 } else {
138 cct->_conf.set_val("ms_crc_header", "false");
139 cct->_conf.set_val("ms_crc_data", "false");
140 }
141
aee94f69
TL
142 cct->_conf.set_val("ms_async_op_threads", fmt::format("{}", worker_threads));
143
144 std::cout << "server[" << addr
145 << "](bs=" << bs
146 << ", crc_enabled=" << crc_enabled
147 << ", worker_threads=" << worker_threads
148 << std::endl;
149
9f95a23c
TL
150 run(cct.get(), target_addr, bs);
151}