]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/tools/store_nbd/store-nbd.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / tools / store_nbd / store-nbd.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3 /**
4 * crimson-store-nbd
5 *
6 * This tool exposes crimson object store internals as an nbd server
7 * for use with fio in basic benchmarking.
8 *
9 * Example usage:
10 *
11 * $ ./bin/crimson-store-nbd --device-path /dev/nvme1n1 -c 1 --mkfs true --uds-path /tmp/store_nbd_socket.sock
12 *
13 * $ cat nbd.fio
14 * [global]
15 * ioengine=nbd
16 * uri=nbd+unix:///?socket=/tmp/store_nbd_socket.sock
17 * rw=randrw
18 * time_based
19 * runtime=120
20 * group_reporting
21 * iodepth=1
22 * size=500G
23 *
24 * [job0]
25 * offset=0
26 *
27 * $ fio nbd.fio
28 */
29
30 #include <random>
31
32 #include <boost/program_options/variables_map.hpp>
33 #include <boost/program_options/parsers.hpp>
34
35 #include <linux/nbd.h>
36 #include <linux/fs.h>
37
38 #include <seastar/apps/lib/stop_signal.hh>
39 #include <seastar/core/app-template.hh>
40 #include <seastar/core/byteorder.hh>
41 #include <seastar/core/future-util.hh>
42 #include <seastar/core/gate.hh>
43 #include <seastar/core/reactor.hh>
44 #include <seastar/core/rwlock.hh>
45 #include <seastar/core/thread.hh>
46 #include <seastar/util/defer.hh>
47
48 #include "crimson/common/config_proxy.h"
49 #include "crimson/common/log.h"
50
51 #include "block_driver.h"
52
53 namespace po = boost::program_options;
54
55 using namespace ceph;
56
57 namespace {
58 seastar::logger& logger() {
59 return crimson::get_logger(ceph_subsys_test);
60 }
61 }
62
63 struct request_context_t {
64 uint32_t magic = 0;
65 uint32_t type = 0;
66
67 char handle[8] = {0};
68
69 uint64_t from = 0;
70 uint32_t len = 0;
71
72 unsigned err = 0;
73 std::optional<bufferptr> in_buffer;
74 std::optional<bufferlist> out_buffer;
75
76 using ref = std::unique_ptr<request_context_t>;
77 static ref make_ref() {
78 return std::make_unique<request_context_t>();
79 }
80
81 bool check_magic() const {
82 auto ret = magic == NBD_REQUEST_MAGIC;
83 if (!ret) {
84 logger().error(
85 "Invalid magic {} should be {}",
86 magic,
87 NBD_REQUEST_MAGIC);
88 }
89 return ret;
90 }
91
92 uint32_t get_command() const {
93 return type & 0xff;
94 }
95
96 bool has_input_buffer() const {
97 return get_command() == NBD_CMD_WRITE;
98 }
99
100 seastar::future<> read_request(seastar::input_stream<char> &in) {
101 return in.read_exactly(sizeof(struct nbd_request)
102 ).then([this, &in](auto buf) {
103 if (buf.size() < sizeof(struct nbd_request)) {
104 throw std::system_error(
105 std::make_error_code(
106 std::errc::connection_reset));
107 }
108 auto p = buf.get();
109 magic = seastar::consume_be<uint32_t>(p);
110 type = seastar::consume_be<uint32_t>(p);
111 memcpy(handle, p, sizeof(handle));
112 p += sizeof(handle);
113 from = seastar::consume_be<uint64_t>(p);
114 len = seastar::consume_be<uint32_t>(p);
115 logger().debug(
116 "Got request, magic {}, type {}, from {}, len {}",
117 magic, type, from, len);
118
119 if (!check_magic()) {
120 throw std::system_error(
121 std::make_error_code(
122 std::errc::invalid_argument));
123 }
124
125 if (has_input_buffer()) {
126 return in.read_exactly(len).then([this](auto buf) {
127 in_buffer = ceph::buffer::create_page_aligned(len);
128 in_buffer->copy_in(0, len, buf.get());
129 return seastar::now();
130 });
131 } else {
132 return seastar::now();
133 }
134 });
135 }
136
137 seastar::future<> write_reply(seastar::output_stream<char> &out) {
138 seastar::temporary_buffer<char> buffer{sizeof(struct nbd_reply)};
139 auto p = buffer.get_write();
140 seastar::produce_be<uint32_t>(p, NBD_REPLY_MAGIC);
141 seastar::produce_be<uint32_t>(p, err);
142 logger().debug("write_reply writing err {}", err);
143 memcpy(p, handle, sizeof(handle));
144 return out.write(std::move(buffer)).then([this, &out] {
145 if (out_buffer) {
146 return seastar::do_for_each(
147 out_buffer->mut_buffers(),
148 [&out](bufferptr &ptr) {
149 logger().debug("write_reply writing {}", ptr.length());
150 return out.write(
151 seastar::temporary_buffer<char>(
152 ptr.c_str(),
153 ptr.length(),
154 seastar::make_deleter([ptr](){}))
155 );
156 });
157 } else {
158 return seastar::now();
159 }
160 }).then([&out] {
161 return out.flush();
162 });
163 }
164 };
165
166 struct RequestWriter {
167 seastar::rwlock lock;
168 seastar::output_stream<char> stream;
169 seastar::gate gate;
170
171 RequestWriter(
172 seastar::output_stream<char> &&stream) : stream(std::move(stream)) {}
173 RequestWriter(RequestWriter &&) = default;
174
175 seastar::future<> complete(request_context_t::ref &&req) {
176 auto &request = *req;
177 return lock.write_lock(
178 ).then([&request, this] {
179 return request.write_reply(stream);
180 }).finally([&, this, req=std::move(req)] {
181 lock.write_unlock();
182 logger().debug("complete");
183 return seastar::now();
184 });
185 }
186
187 seastar::future<> close() {
188 return gate.close().then([this] {
189 return stream.close();
190 });
191 }
192 };
193
194 /**
195 * NBDHandler
196 *
197 * Simple throughput test for concurrent, single threaded
198 * writes to an BlockDriver.
199 */
200 class NBDHandler {
201 BlockDriver &backend;
202 std::string uds_path;
203 std::optional<seastar::server_socket> server_socket;
204 std::optional<seastar::connected_socket> connected_socket;
205 seastar::gate gate;
206 public:
207 struct config_t {
208 std::string uds_path;
209
210 void populate_options(
211 po::options_description &desc)
212 {
213 desc.add_options()
214 ("uds-path",
215 po::value<std::string>()
216 ->default_value("/tmp/store_nbd_socket.sock")
217 ->notifier([this](auto s) {
218 uds_path = s;
219 }),
220 "Path to domain socket for nbd"
221 );
222 }
223 };
224
225 NBDHandler(
226 BlockDriver &backend,
227 config_t config) :
228 backend(backend),
229 uds_path(config.uds_path)
230 {}
231
232 void run();
233 seastar::future<> stop();
234 };
235
236 int main(int argc, char** argv)
237 {
238 po::options_description desc{"Allowed options"};
239 bool debug = false;
240 desc.add_options()
241 ("help,h", "show help message")
242 ("debug", po::value<bool>(&debug)->default_value(false),
243 "enable debugging");
244
245 po::options_description nbd_pattern_options{"NBD Pattern Options"};
246 NBDHandler::config_t nbd_config;
247 nbd_config.populate_options(nbd_pattern_options);
248 desc.add(nbd_pattern_options);
249
250 po::options_description backend_pattern_options{"Backend Options"};
251 BlockDriver::config_t backend_config;
252 backend_config.populate_options(backend_pattern_options);
253 desc.add(backend_pattern_options);
254
255 po::variables_map vm;
256 std::vector<std::string> unrecognized_options;
257 try {
258 auto parsed = po::command_line_parser(argc, argv)
259 .options(desc)
260 .allow_unregistered()
261 .run();
262 po::store(parsed, vm);
263 if (vm.count("help")) {
264 std::cout << desc << std::endl;
265 return 0;
266 }
267
268 po::notify(vm);
269 unrecognized_options =
270 po::collect_unrecognized(parsed.options, po::include_positional);
271 } catch(const po::error& e) {
272 std::cerr << "error: " << e.what() << std::endl;
273 return 1;
274 }
275 std::vector<const char*> args(argv, argv + argc);
276
277 seastar::app_template::config app_cfg;
278 app_cfg.name = "crimson-store-nbd";
279 app_cfg.auto_handle_sigint_sigterm = false;
280 seastar::app_template app(std::move(app_cfg));
281
282 std::vector<char*> av{argv[0]};
283 std::transform(begin(unrecognized_options),
284 end(unrecognized_options),
285 std::back_inserter(av),
286 [](auto& s) {
287 return const_cast<char*>(s.c_str());
288 });
289 return app.run(av.size(), av.data(), [&] {
290 if (debug) {
291 seastar::global_logger_registry().set_all_loggers_level(
292 seastar::log_level::debug
293 );
294 }
295 return seastar::async([&] {
296 seastar_apps_lib::stop_signal should_stop;
297 crimson::common::sharded_conf()
298 .start(EntityName{}, std::string_view{"ceph"}).get();
299 auto stop_conf = seastar::defer([] {
300 crimson::common::sharded_conf().stop().get();
301 });
302
303 auto backend = get_backend(backend_config);
304 NBDHandler nbd(*backend, nbd_config);
305 backend->mount().get();
306 auto close_backend = seastar::defer([&] {
307 backend->close().get();
308 });
309
310 logger().debug("Running nbd server...");
311 nbd.run();
312 auto stop_nbd = seastar::defer([&] {
313 nbd.stop().get();
314 });
315 should_stop.wait().get();
316 return 0;
317 });
318 });
319 }
320
321 class nbd_oldstyle_negotiation_t {
322 uint64_t magic = seastar::cpu_to_be(0x4e42444d41474943); // "NBDMAGIC"
323 uint64_t magic2 = seastar::cpu_to_be(0x00420281861253); // "IHAVEOPT"
324 uint64_t size = 0;
325 uint32_t flags = seastar::cpu_to_be(0);
326 char reserved[124] = {0};
327
328 public:
329 nbd_oldstyle_negotiation_t(uint64_t size, uint32_t flags)
330 : size(seastar::cpu_to_be(size)), flags(seastar::cpu_to_be(flags)) {}
331 } __attribute__((packed));
332
333 seastar::future<> send_negotiation(
334 size_t size,
335 seastar::output_stream<char>& out)
336 {
337 seastar::temporary_buffer<char> buf{sizeof(nbd_oldstyle_negotiation_t)};
338 new (buf.get_write()) nbd_oldstyle_negotiation_t(size, 1);
339 return out.write(std::move(buf)
340 ).then([&out] {
341 return out.flush();
342 });
343 }
344
345 seastar::future<> handle_command(
346 BlockDriver &backend,
347 request_context_t::ref request_ref,
348 RequestWriter &out)
349 {
350 auto &request = *request_ref;
351 logger().debug("got command {}", request.get_command());
352 return ([&] {
353 switch (request.get_command()) {
354 case NBD_CMD_WRITE:
355 return backend.write(
356 request.from,
357 *request.in_buffer);
358 case NBD_CMD_READ:
359 return backend.read(
360 request.from,
361 request.len).then([&] (auto buffer) {
362 logger().debug("read returned buffer len {}", buffer.length());
363 request.out_buffer = buffer;
364 });
365 case NBD_CMD_DISC:
366 throw std::system_error(std::make_error_code(std::errc::bad_message));
367 case NBD_CMD_TRIM:
368 throw std::system_error(std::make_error_code(std::errc::bad_message));
369 default:
370 throw std::system_error(std::make_error_code(std::errc::bad_message));
371 }
372 })().then([&, request_ref=std::move(request_ref)]() mutable {
373 logger().debug("handle_command complete");
374 return out.complete(std::move(request_ref));
375 });
376 }
377
378
379 seastar::future<> handle_commands(
380 BlockDriver &backend,
381 seastar::input_stream<char>& in,
382 RequestWriter &out)
383 {
384 logger().debug("handle_commands");
385 return seastar::keep_doing([&] {
386 logger().debug("waiting for command");
387 auto request_ref = request_context_t::make_ref();
388 auto &request = *request_ref;
389 return request.read_request(in).then(
390 [&, request_ref=std::move(request_ref)]() mutable {
391 // keep running in background
392 (void)seastar::try_with_gate(out.gate,
393 [&backend, &out, request_ref=std::move(request_ref)]() mutable {
394 return handle_command(backend, std::move(request_ref), out);
395 });
396 logger().debug("handle_commands after fork");
397 });
398 }).handle_exception_type([](const seastar::gate_closed_exception&) {});
399 }
400
401 void NBDHandler::run()
402 {
403 logger().debug("About to listen on {}", uds_path);
404 server_socket = seastar::engine().listen(
405 seastar::socket_address{
406 seastar::unix_domain_addr{uds_path}});
407
408 // keep running in background
409 (void)seastar::keep_doing([this] {
410 return seastar::try_with_gate(gate, [this] {
411 return server_socket->accept().then([this](auto acc) {
412 logger().debug("Accepted");
413 connected_socket = std::move(acc.connection);
414 return seastar::do_with(
415 connected_socket->input(),
416 RequestWriter{connected_socket->output()},
417 [&, this](auto &input, auto &output) {
418 return send_negotiation(
419 backend.get_size(),
420 output.stream
421 ).then([&, this] {
422 return handle_commands(backend, input, output);
423 }).finally([&] {
424 std::cout << "closing input and output" << std::endl;
425 return seastar::when_all(input.close(),
426 output.close());
427 }).discard_result().handle_exception([](auto e) {
428 logger().error("NBDHandler::run saw exception {}", e);
429 });
430 });
431 }).handle_exception_type([] (const std::system_error &e) {
432 // an ECONNABORTED is expected when we are being stopped.
433 if (e.code() != std::errc::connection_aborted) {
434 logger().error("accept failed: {}", e);
435 }
436 });
437 });
438 }).handle_exception_type([](const seastar::gate_closed_exception&) {});
439 }
440
441 seastar::future<> NBDHandler::stop()
442 {
443 if (server_socket) {
444 server_socket->abort_accept();
445 }
446 if (connected_socket) {
447 connected_socket->shutdown_input();
448 connected_socket->shutdown_output();
449 }
450 return gate.close().then([this] {
451 if (!server_socket.has_value()) {
452 return seastar::now();
453 }
454 return seastar::remove_file(uds_path);
455 });
456 }