1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
3 #include <boost/program_options/variables_map.hpp>
4 #include <boost/program_options/parsers.hpp>
7 #include "global/global_init.h"
8 #include "msg/Dispatcher.h"
9 #include "msg/Messenger.h"
10 #include "messages/MOSDOp.h"
12 #include "auth/DummyAuth.h"
16 constexpr int CEPH_OSD_PROTOCOL
= 10;
19 Server(CephContext
* cct
, unsigned msg_len
)
20 : dummy_auth(cct
), dispatcher(cct
, msg_len
)
22 msgr
.reset(Messenger::create(cct
, "async", entity_name_t::OSD(0), "server", 0));
23 dummy_auth
.auth_registry
.refresh_config();
24 msgr
->set_cluster_protocol(CEPH_OSD_PROTOCOL
);
25 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
26 msgr
->set_auth_client(&dummy_auth
);
27 msgr
->set_auth_server(&dummy_auth
);
29 DummyAuthClientServer dummy_auth
;
30 std::unique_ptr
<Messenger
> msgr
;
31 struct ServerDispatcher
: Dispatcher
{
35 ServerDispatcher(CephContext
* cct
, unsigned msg_len
)
36 : Dispatcher(cct
), msg_len(msg_len
)
38 msg_data
.append_zero(msg_len
);
40 bool ms_can_fast_dispatch_any() const override
{
43 bool ms_can_fast_dispatch(const Message
* m
) const override
{
44 return m
->get_type() == CEPH_MSG_OSD_OP
;
46 void ms_fast_dispatch(Message
* m
) override
{
47 ceph_assert(m
->get_type() == CEPH_MSG_OSD_OP
);
48 const static pg_t pgid
;
49 const static object_locator_t oloc
;
50 const static hobject_t
hobj(object_t(), oloc
.key
, CEPH_NOSNAP
, pgid
.ps(),
51 pgid
.pool(), oloc
.nspace
);
52 static spg_t
spgid(pgid
);
53 MOSDOp
*rep
= new MOSDOp(0, 0, hobj
, spgid
, 0, 0, 0);
54 bufferlist
data(msg_data
);
55 rep
->write(0, msg_len
, data
);
56 rep
->set_tid(m
->get_tid());
57 m
->get_connection()->send_message(rep
);
60 bool ms_dispatch(Message
*) override
{
63 bool ms_handle_reset(Connection
*) override
{
66 void ms_handle_remote_reset(Connection
*) override
{
68 bool ms_handle_refused(Connection
*) override
{
76 static void run(CephContext
* cct
, entity_addr_t addr
, unsigned bs
)
78 std::cout
<< "async server listening at " << addr
<< std::endl
;
79 Server server
{cct
, bs
};
80 server
.msgr
->bind(addr
);
81 server
.msgr
->add_dispatcher_head(&server
.dispatcher
);
86 int main(int argc
, char** argv
)
88 namespace po
= boost::program_options
;
89 po::options_description desc
{"Allowed options"};
91 ("help,h", "show help message")
92 ("addr", po::value
<std::string
>()->default_value("v2:127.0.0.1:9010"),
93 "server address(crimson only supports msgr v2 protocol)")
94 ("bs", po::value
<unsigned>()->default_value(0),
96 ("crc-enabled", po::value
<bool>()->default_value(false),
98 ("threads", po::value
<unsigned>()->default_value(3),
99 "async messenger worker threads");
100 po::variables_map vm
;
101 std::vector
<std::string
> unrecognized_options
;
103 auto parsed
= po::command_line_parser(argc
, argv
)
105 .allow_unregistered()
107 po::store(parsed
, vm
);
108 if (vm
.count("help")) {
109 std::cout
<< desc
<< std::endl
;
113 unrecognized_options
= po::collect_unrecognized(parsed
.options
, po::include_positional
);
114 } catch(const po::error
& e
) {
115 std::cerr
<< "error: " << e
.what() << std::endl
;
119 auto addr
= vm
["addr"].as
<std::string
>();
120 entity_addr_t target_addr
;
121 target_addr
.parse(addr
.c_str(), nullptr);
122 ceph_assert_always(target_addr
.is_msgr2());
123 auto bs
= vm
["bs"].as
<unsigned>();
124 auto crc_enabled
= vm
["crc-enabled"].as
<bool>();
125 auto worker_threads
= vm
["threads"].as
<unsigned>();
127 std::vector
<const char*> args(argv
, argv
+ argc
);
128 auto cct
= global_init(nullptr, args
,
129 CEPH_ENTITY_TYPE_CLIENT
,
130 CODE_ENVIRONMENT_UTILITY
,
131 CINIT_FLAG_NO_MON_CONFIG
);
132 common_init_finish(cct
.get());
135 cct
->_conf
.set_val("ms_crc_header", "true");
136 cct
->_conf
.set_val("ms_crc_data", "true");
138 cct
->_conf
.set_val("ms_crc_header", "false");
139 cct
->_conf
.set_val("ms_crc_data", "false");
142 cct
->_conf
.set_val("ms_async_op_threads", fmt::format("{}", worker_threads
));
144 std::cout
<< "server[" << addr
146 << ", crc_enabled=" << crc_enabled
147 << ", worker_threads=" << worker_threads
150 run(cct
.get(), target_addr
, bs
);