]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- | |
2 | ||
3 | #include <boost/program_options/variables_map.hpp> | |
4 | #include <boost/program_options/parsers.hpp> | |
5 | ||
6 | #include "auth/Auth.h" | |
7 | #include "global/global_init.h" | |
8 | #include "msg/Dispatcher.h" | |
9 | #include "msg/Messenger.h" | |
10 | #include "messages/MOSDOp.h" | |
11 | ||
12 | #include "auth/DummyAuth.h" | |
13 | ||
14 | namespace { | |
15 | ||
16 | constexpr int CEPH_OSD_PROTOCOL = 10; | |
17 | ||
18 | struct Server { | |
19 | Server(CephContext* cct, unsigned msg_len) | |
20 | : dummy_auth(cct), dispatcher(cct, msg_len) | |
21 | { | |
22 | msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0)); | |
23 | dummy_auth.auth_registry.refresh_config(); | |
24 | msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); | |
25 | msgr->set_default_policy(Messenger::Policy::stateless_server(0)); | |
26 | msgr->set_auth_client(&dummy_auth); | |
27 | msgr->set_auth_server(&dummy_auth); | |
28 | } | |
29 | DummyAuthClientServer dummy_auth; | |
30 | std::unique_ptr<Messenger> msgr; | |
31 | struct ServerDispatcher : Dispatcher { | |
32 | unsigned msg_len = 0; | |
33 | bufferlist msg_data; | |
34 | ||
35 | ServerDispatcher(CephContext* cct, unsigned msg_len) | |
36 | : Dispatcher(cct), msg_len(msg_len) | |
37 | { | |
38 | msg_data.append_zero(msg_len); | |
39 | } | |
40 | bool ms_can_fast_dispatch_any() const override { | |
41 | return true; | |
42 | } | |
43 | bool ms_can_fast_dispatch(const Message* m) const override { | |
44 | return m->get_type() == CEPH_MSG_OSD_OP; | |
45 | } | |
46 | void ms_fast_dispatch(Message* m) override { | |
47 | ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); | |
48 | const static pg_t pgid; | |
49 | const static object_locator_t oloc; | |
50 | const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), | |
51 | pgid.pool(), oloc.nspace); | |
52 | static spg_t spgid(pgid); | |
53 | MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); | |
54 | bufferlist data(msg_data); | |
55 | rep->write(0, msg_len, data); | |
56 | rep->set_tid(m->get_tid()); | |
57 | m->get_connection()->send_message(rep); | |
58 | m->put(); | |
59 | } | |
60 | bool ms_dispatch(Message*) override { | |
61 | ceph_abort(); | |
62 | } | |
63 | bool ms_handle_reset(Connection*) override { | |
64 | return true; | |
65 | } | |
66 | void ms_handle_remote_reset(Connection*) override { | |
67 | } | |
68 | bool ms_handle_refused(Connection*) override { | |
69 | return true; | |
70 | } | |
71 | } dispatcher; | |
72 | }; | |
73 | ||
74 | } | |
75 | ||
76 | static void run(CephContext* cct, entity_addr_t addr, unsigned bs) | |
77 | { | |
78 | std::cout << "async server listening at " << addr << std::endl; | |
79 | Server server{cct, bs}; | |
80 | server.msgr->bind(addr); | |
81 | server.msgr->add_dispatcher_head(&server.dispatcher); | |
82 | server.msgr->start(); | |
83 | server.msgr->wait(); | |
84 | } | |
85 | ||
86 | int main(int argc, char** argv) | |
87 | { | |
88 | namespace po = boost::program_options; | |
89 | po::options_description desc{"Allowed options"}; | |
90 | desc.add_options() | |
91 | ("help,h", "show help message") | |
92 | ("addr", po::value<std::string>()->default_value("v2:127.0.0.1:9010"), | |
93 | "server address(crimson only supports msgr v2 protocol)") | |
94 | ("bs", po::value<unsigned>()->default_value(0), | |
95 | "server block size") | |
96 | ("crc-enabled", po::value<bool>()->default_value(false), | |
97 | "enable CRC checks") | |
98 | ("threads", po::value<unsigned>()->default_value(3), | |
99 | "async messenger worker threads"); | |
100 | po::variables_map vm; | |
101 | std::vector<std::string> unrecognized_options; | |
102 | try { | |
103 | auto parsed = po::command_line_parser(argc, argv) | |
104 | .options(desc) | |
105 | .allow_unregistered() | |
106 | .run(); | |
107 | po::store(parsed, vm); | |
108 | if (vm.count("help")) { | |
109 | std::cout << desc << std::endl; | |
110 | return 0; | |
111 | } | |
112 | po::notify(vm); | |
113 | unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); | |
114 | } catch(const po::error& e) { | |
115 | std::cerr << "error: " << e.what() << std::endl; | |
116 | return 1; | |
117 | } | |
118 | ||
119 | auto addr = vm["addr"].as<std::string>(); | |
120 | entity_addr_t target_addr; | |
121 | target_addr.parse(addr.c_str(), nullptr); | |
122 | ceph_assert_always(target_addr.is_msgr2()); | |
123 | auto bs = vm["bs"].as<unsigned>(); | |
124 | auto crc_enabled = vm["crc-enabled"].as<bool>(); | |
125 | auto worker_threads = vm["threads"].as<unsigned>(); | |
126 | ||
127 | std::vector<const char*> args(argv, argv + argc); | |
128 | auto cct = global_init(nullptr, args, | |
129 | CEPH_ENTITY_TYPE_CLIENT, | |
130 | CODE_ENVIRONMENT_UTILITY, | |
131 | CINIT_FLAG_NO_MON_CONFIG); | |
132 | common_init_finish(cct.get()); | |
133 | ||
134 | if (crc_enabled) { | |
135 | cct->_conf.set_val("ms_crc_header", "true"); | |
136 | cct->_conf.set_val("ms_crc_data", "true"); | |
137 | } else { | |
138 | cct->_conf.set_val("ms_crc_header", "false"); | |
139 | cct->_conf.set_val("ms_crc_data", "false"); | |
140 | } | |
141 | ||
142 | cct->_conf.set_val("ms_async_op_threads", fmt::format("{}", worker_threads)); | |
143 | ||
144 | std::cout << "server[" << addr | |
145 | << "](bs=" << bs | |
146 | << ", crc_enabled=" << crc_enabled | |
147 | << ", worker_threads=" << worker_threads | |
148 | << std::endl; | |
149 | ||
150 | run(cct.get(), target_addr, bs); | |
151 | } |