1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
6 * This tool exposes crimson object store internals as an nbd server
7 * for use with fio in basic benchmarking.
11 * $ ./bin/crimson-store-nbd --device-path /dev/nvme1n1 -c 1 --mkfs true --uds-path /tmp/store_nbd_socket.sock
16 * uri=nbd+unix:///?socket=/tmp/store_nbd_socket.sock
32 #include <boost/program_options/variables_map.hpp>
33 #include <boost/program_options/parsers.hpp>
35 #include <linux/nbd.h>
38 #include <seastar/apps/lib/stop_signal.hh>
39 #include <seastar/core/app-template.hh>
40 #include <seastar/core/byteorder.hh>
41 #include <seastar/core/future-util.hh>
42 #include <seastar/core/gate.hh>
43 #include <seastar/core/reactor.hh>
44 #include <seastar/core/rwlock.hh>
45 #include <seastar/core/thread.hh>
46 #include <seastar/util/defer.hh>
48 #include "crimson/common/config_proxy.h"
49 #include "crimson/common/log.h"
51 #include "block_driver.h"
53 namespace po
= boost::program_options
;
58 seastar::logger
& logger() {
59 return crimson::get_logger(ceph_subsys_test
);
63 struct request_context_t
{
73 std::optional
<bufferptr
> in_buffer
;
74 std::optional
<bufferlist
> out_buffer
;
76 using ref
= std::unique_ptr
<request_context_t
>;
77 static ref
make_ref() {
78 return std::make_unique
<request_context_t
>();
81 bool check_magic() const {
82 auto ret
= magic
== NBD_REQUEST_MAGIC
;
85 "Invalid magic {} should be {}",
92 uint32_t get_command() const {
96 bool has_input_buffer() const {
97 return get_command() == NBD_CMD_WRITE
;
100 seastar::future
<> read_request(seastar::input_stream
<char> &in
) {
101 return in
.read_exactly(sizeof(struct nbd_request
)
102 ).then([this, &in
](auto buf
) {
103 if (buf
.size() < sizeof(struct nbd_request
)) {
104 throw std::system_error(
105 std::make_error_code(
106 std::errc::connection_reset
));
109 magic
= seastar::consume_be
<uint32_t>(p
);
110 type
= seastar::consume_be
<uint32_t>(p
);
111 memcpy(handle
, p
, sizeof(handle
));
113 from
= seastar::consume_be
<uint64_t>(p
);
114 len
= seastar::consume_be
<uint32_t>(p
);
116 "Got request, magic {}, type {}, from {}, len {}",
117 magic
, type
, from
, len
);
119 if (!check_magic()) {
120 throw std::system_error(
121 std::make_error_code(
122 std::errc::invalid_argument
));
125 if (has_input_buffer()) {
126 return in
.read_exactly(len
).then([this](auto buf
) {
127 in_buffer
= ceph::buffer::create_page_aligned(len
);
128 in_buffer
->copy_in(0, len
, buf
.get());
129 return seastar::now();
132 return seastar::now();
137 seastar::future
<> write_reply(seastar::output_stream
<char> &out
) {
138 seastar::temporary_buffer
<char> buffer
{sizeof(struct nbd_reply
)};
139 auto p
= buffer
.get_write();
140 seastar::produce_be
<uint32_t>(p
, NBD_REPLY_MAGIC
);
141 seastar::produce_be
<uint32_t>(p
, err
);
142 logger().debug("write_reply writing err {}", err
);
143 memcpy(p
, handle
, sizeof(handle
));
144 return out
.write(std::move(buffer
)).then([this, &out
] {
146 return seastar::do_for_each(
147 out_buffer
->mut_buffers(),
148 [&out
](bufferptr
&ptr
) {
149 logger().debug("write_reply writing {}", ptr
.length());
151 seastar::temporary_buffer
<char>(
154 seastar::make_deleter([ptr
](){}))
158 return seastar::now();
166 struct RequestWriter
{
167 seastar::rwlock lock
;
168 seastar::output_stream
<char> stream
;
172 seastar::output_stream
<char> &&stream
) : stream(std::move(stream
)) {}
173 RequestWriter(RequestWriter
&&) = default;
175 seastar::future
<> complete(request_context_t::ref
&&req
) {
176 auto &request
= *req
;
177 return lock
.write_lock(
178 ).then([&request
, this] {
179 return request
.write_reply(stream
);
180 }).finally([&, this, req
=std::move(req
)] {
182 logger().debug("complete");
183 return seastar::now();
187 seastar::future
<> close() {
188 return gate
.close().then([this] {
189 return stream
.close();
197 * Simple throughput test for concurrent, single threaded
198 * writes to an BlockDriver.
201 BlockDriver
&backend
;
202 std::string uds_path
;
203 std::optional
<seastar::server_socket
> server_socket
;
204 std::optional
<seastar::connected_socket
> connected_socket
;
208 std::string uds_path
;
210 void populate_options(
211 po::options_description
&desc
)
215 po::value
<std::string
>()
216 ->default_value("/tmp/store_nbd_socket.sock")
217 ->notifier([this](auto s
) {
220 "Path to domain socket for nbd"
226 BlockDriver
&backend
,
229 uds_path(config
.uds_path
)
233 seastar::future
<> stop();
236 int main(int argc
, char** argv
)
238 po::options_description desc
{"Allowed options"};
241 ("help,h", "show help message")
242 ("debug", po::value
<bool>(&debug
)->default_value(false),
245 po::options_description nbd_pattern_options
{"NBD Pattern Options"};
246 NBDHandler::config_t nbd_config
;
247 nbd_config
.populate_options(nbd_pattern_options
);
248 desc
.add(nbd_pattern_options
);
250 po::options_description backend_pattern_options
{"Backend Options"};
251 BlockDriver::config_t backend_config
;
252 backend_config
.populate_options(backend_pattern_options
);
253 desc
.add(backend_pattern_options
);
255 po::variables_map vm
;
256 std::vector
<std::string
> unrecognized_options
;
258 auto parsed
= po::command_line_parser(argc
, argv
)
260 .allow_unregistered()
262 po::store(parsed
, vm
);
263 if (vm
.count("help")) {
264 std::cout
<< desc
<< std::endl
;
269 unrecognized_options
=
270 po::collect_unrecognized(parsed
.options
, po::include_positional
);
271 } catch(const po::error
& e
) {
272 std::cerr
<< "error: " << e
.what() << std::endl
;
275 std::vector
<const char*> args(argv
, argv
+ argc
);
277 seastar::app_template::config app_cfg
;
278 app_cfg
.name
= "crimson-store-nbd";
279 app_cfg
.auto_handle_sigint_sigterm
= false;
280 seastar::app_template
app(std::move(app_cfg
));
282 std::vector
<char*> av
{argv
[0]};
283 std::transform(begin(unrecognized_options
),
284 end(unrecognized_options
),
285 std::back_inserter(av
),
287 return const_cast<char*>(s
.c_str());
289 return app
.run(av
.size(), av
.data(), [&] {
291 seastar::global_logger_registry().set_all_loggers_level(
292 seastar::log_level::debug
295 return seastar::async([&] {
296 seastar_apps_lib::stop_signal should_stop
;
297 crimson::common::sharded_conf()
298 .start(EntityName
{}, std::string_view
{"ceph"}).get();
299 auto stop_conf
= seastar::defer([] {
300 crimson::common::sharded_conf().stop().get();
303 auto backend
= get_backend(backend_config
);
304 NBDHandler
nbd(*backend
, nbd_config
);
305 backend
->mount().get();
306 auto close_backend
= seastar::defer([&] {
307 backend
->close().get();
310 logger().debug("Running nbd server...");
312 auto stop_nbd
= seastar::defer([&] {
315 should_stop
.wait().get();
321 class nbd_oldstyle_negotiation_t
{
322 uint64_t magic
= seastar::cpu_to_be(0x4e42444d41474943); // "NBDMAGIC"
323 uint64_t magic2
= seastar::cpu_to_be(0x00420281861253); // "IHAVEOPT"
325 uint32_t flags
= seastar::cpu_to_be(0);
326 char reserved
[124] = {0};
329 nbd_oldstyle_negotiation_t(uint64_t size
, uint32_t flags
)
330 : size(seastar::cpu_to_be(size
)), flags(seastar::cpu_to_be(flags
)) {}
331 } __attribute__((packed
));
333 seastar::future
<> send_negotiation(
335 seastar::output_stream
<char>& out
)
337 seastar::temporary_buffer
<char> buf
{sizeof(nbd_oldstyle_negotiation_t
)};
338 new (buf
.get_write()) nbd_oldstyle_negotiation_t(size
, 1);
339 return out
.write(std::move(buf
)
345 seastar::future
<> handle_command(
346 BlockDriver
&backend
,
347 request_context_t::ref request_ref
,
350 auto &request
= *request_ref
;
351 logger().debug("got command {}", request
.get_command());
353 switch (request
.get_command()) {
355 return backend
.write(
361 request
.len
).then([&] (auto buffer
) {
362 logger().debug("read returned buffer len {}", buffer
.length());
363 request
.out_buffer
= buffer
;
366 throw std::system_error(std::make_error_code(std::errc::bad_message
));
368 throw std::system_error(std::make_error_code(std::errc::bad_message
));
370 throw std::system_error(std::make_error_code(std::errc::bad_message
));
372 })().then([&, request_ref
=std::move(request_ref
)]() mutable {
373 logger().debug("handle_command complete");
374 return out
.complete(std::move(request_ref
));
379 seastar::future
<> handle_commands(
380 BlockDriver
&backend
,
381 seastar::input_stream
<char>& in
,
384 logger().debug("handle_commands");
385 return seastar::keep_doing([&] {
386 logger().debug("waiting for command");
387 auto request_ref
= request_context_t::make_ref();
388 auto &request
= *request_ref
;
389 return request
.read_request(in
).then(
390 [&, request_ref
=std::move(request_ref
)]() mutable {
391 // keep running in background
392 (void)seastar::try_with_gate(out
.gate
,
393 [&backend
, &out
, request_ref
=std::move(request_ref
)]() mutable {
394 return handle_command(backend
, std::move(request_ref
), out
);
396 logger().debug("handle_commands after fork");
398 }).handle_exception_type([](const seastar::gate_closed_exception
&) {});
401 void NBDHandler::run()
403 logger().debug("About to listen on {}", uds_path
);
404 server_socket
= seastar::engine().listen(
405 seastar::socket_address
{
406 seastar::unix_domain_addr
{uds_path
}});
408 // keep running in background
409 (void)seastar::keep_doing([this] {
410 return seastar::try_with_gate(gate
, [this] {
411 return server_socket
->accept().then([this](auto acc
) {
412 logger().debug("Accepted");
413 connected_socket
= std::move(acc
.connection
);
414 return seastar::do_with(
415 connected_socket
->input(),
416 RequestWriter
{connected_socket
->output()},
417 [&, this](auto &input
, auto &output
) {
418 return send_negotiation(
422 return handle_commands(backend
, input
, output
);
424 std::cout
<< "closing input and output" << std::endl
;
425 return seastar::when_all(input
.close(),
427 }).discard_result().handle_exception([](auto e
) {
428 logger().error("NBDHandler::run saw exception {}", e
);
431 }).handle_exception_type([] (const std::system_error
&e
) {
432 // an ECONNABORTED is expected when we are being stopped.
433 if (e
.code() != std::errc::connection_aborted
) {
434 logger().error("accept failed: {}", e
);
438 }).handle_exception_type([](const seastar::gate_closed_exception
&) {});
441 seastar::future
<> NBDHandler::stop()
444 server_socket
->abort_accept();
446 if (connected_socket
) {
447 connected_socket
->shutdown_input();
448 connected_socket
->shutdown_output();
450 return gate
.close().then([this] {
451 if (!server_socket
.has_value()) {
452 return seastar::now();
454 return seastar::remove_file(uds_path
);