]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/tools/store-nbd.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / crimson / tools / store-nbd.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2
3/**
4 * crimson-store-nbd
5 *
6 * This tool exposes crimson object store internals as an nbd server
7 * for use with fio in basic benchmarking.
8 *
9 * Example usage:
10 *
11 * $ ./bin/crimson-store-nbd --device-path /dev/nvme1n1 -c 1 --total-device-size=107374182400 --mkfs true --uds-path /tmp/store_nbd_socket.sock
12 *
13 * $ cat nbd.fio
14 * [global]
15 * ioengine=nbd
16 * uri=nbd+unix:///?socket=/tmp/store_nbd_socket.sock
17 * rw=randrw
18 * time_based
19 * runtime=120
20 * group_reporting
21 * iodepth=1
22 * size=500G
23 *
24 * [job0]
25 * offset=0
26 *
27 * $ fio nbd.fio
28 */
29
30#include <random>
31
32#include <boost/program_options/variables_map.hpp>
33#include <boost/program_options/parsers.hpp>
34
35#include <linux/nbd.h>
36#include <linux/fs.h>
37
38#include <seastar/core/byteorder.hh>
39
40#include "crimson/os/seastore/cache.h"
41#include "crimson/os/seastore/segment_cleaner.h"
42#include "crimson/os/seastore/segment_manager.h"
43#include "crimson/os/seastore/segment_manager/block.h"
44#include "crimson/os/seastore/transaction_manager.h"
45
46#include "test/crimson/seastar_runner.h"
47#include "test/crimson/seastore/test_block.h"
48
49namespace po = boost::program_options;
50
51using namespace ceph;
52using namespace crimson;
53using namespace crimson::os;
54using namespace crimson::os::seastore;
55using namespace crimson::os::seastore::segment_manager::block;
56
57namespace {
58 seastar::logger& logger() {
59 return crimson::get_logger(ceph_subsys_test);
60 }
61}
62
63/**
64 * BlockDriver
65 *
66 * Simple interface to enable throughput test to compare raw disk to
67 * transaction_manager, etc
68 */
69class BlockDriver {
70public:
71 struct config_t {
72 std::string type;
73 bool mkfs = false;
74 std::optional<std::string> path;
75 size_t segment_size;
76 size_t total_device_size;
77
78 void populate_options(
79 po::options_description &desc)
80 {
81 desc.add_options()
82 ("type",
83 po::value<std::string>()
84 ->default_value("transaction_manager")
85 ->notifier([this](auto s) { type = s; }),
86 "Backend to use, options are transaction_manager"
87 )
88 ("segment-size",
89 po::value<size_t>()
90 ->default_value(16ul << 20 /* 16MB */)
91 ->notifier([this](auto s) { segment_size = s; }),
92 "Total working set size"
93 )
94 ("total-device-size",
95 po::value<size_t>()
96 ->default_value(10ul << 30 /* 10G */)
97 ->notifier([this](auto s) { total_device_size = s; }),
98 "Size of writes"
99 )
100 ("device-path",
101 po::value<std::string>()
102 ->required()
103 ->notifier([this](auto s) { path = s; }),
104 "Number of writes outstanding"
105 )
106 ("mkfs",
107 po::value<bool>()
108 ->default_value(false)
109 ->notifier([this](auto s) { mkfs = s; }),
110 "Do mkfs first"
111 );
112 }
113 };
114
115 virtual bufferptr get_buffer(size_t size) = 0;
116
117 virtual seastar::future<> write(
118 off_t offset,
119 bufferptr ptr) = 0;
120
121 virtual seastar::future<bufferlist> read(
122 off_t offset,
123 size_t size) = 0;
124
125 virtual size_t get_size() const = 0;
126
127 virtual seastar::future<> mount() = 0;
128 virtual seastar::future<> close() = 0;
129
130 virtual ~BlockDriver() {}
131};
132using BlockDriverRef = std::unique_ptr<BlockDriver>;
133
134BlockDriverRef get_backend(BlockDriver::config_t config);
135
136struct request_context_t {
137 uint32_t magic = 0;
138 uint32_t type = 0;
139
140 char handle[8] = {0};
141
142 uint64_t from = 0;
143 uint32_t len = 0;
144
145 unsigned err = 0;
146 std::optional<bufferptr> in_buffer;
147 std::optional<bufferlist> out_buffer;
148
149 bool check_magic() const {
150 // todo
151 return true;
152 }
153
154 uint32_t get_command() const {
155 return type & 0xff;
156 }
157
158 bool has_input_buffer() const {
159 return get_command() == NBD_CMD_WRITE;
160 }
161
162 seastar::future<> read_request(seastar::input_stream<char> &in) {
163 return in.read_exactly(sizeof(struct nbd_request)
164 ).then([this, &in](auto buf) {
165 auto p = buf.get();
166 magic = seastar::consume_be<uint32_t>(p);
167 type = seastar::consume_be<uint32_t>(p);
168 memcpy(handle, p, sizeof(handle));
169 p += sizeof(handle);
170 from = seastar::consume_be<uint64_t>(p);
171 len = seastar::consume_be<uint32_t>(p);
172 logger().debug(
173 "Got request, magic {}, type {}, from {}, len {}",
174 magic, type, from, len);
175
176 if (has_input_buffer()) {
177 return in.read_exactly(len).then([this](auto buf) {
178 in_buffer = ceph::buffer::create_page_aligned(len);
179 in_buffer->copy_in(0, len, buf.get());
180 return seastar::now();
181 });
182 } else {
183 return seastar::now();
184 }
185 });
186 }
187
188 seastar::future<> write_reply(seastar::output_stream<char> &out) {
189 seastar::temporary_buffer<char> buffer{sizeof(struct nbd_reply)};
190 auto p = buffer.get_write();
191 seastar::produce_be<uint32_t>(p, NBD_REPLY_MAGIC);
192 seastar::produce_be<uint32_t>(p, err);
193 memcpy(p, handle, sizeof(handle));
194 return out.write(std::move(buffer)).then([this, &out] {
195 if (out_buffer) {
196 return seastar::do_for_each(
197 out_buffer->mut_buffers(),
198 [&out](bufferptr &ptr) {
199 return out.write(
200 seastar::temporary_buffer<char>(
201 ptr.c_str(),
202 ptr.length(),
203 seastar::make_deleter([ptr](){}))
204 );
205 });
206 } else {
207 return seastar::now();
208 }
209 }).then([&out] {
210 return out.flush();
211 });
212 }
213};
214
215/**
216 * NBDHandler
217 *
218 * Simple throughput test for concurrent, single threaded
219 * writes to an BlockDriver.
220 */
221class NBDHandler {
222 BlockDriver &backend;
223 std::string uds_path;
224public:
225 struct config_t {
226 std::string uds_path;
227
228 void populate_options(
229 po::options_description &desc)
230 {
231 desc.add_options()
232 ("uds-path",
233 po::value<std::string>()
234 ->default_value("/tmp/store_nbd_socket.sock")
235 ->notifier([this](auto s) {
236 uds_path = s;
237 }),
238 "Path to domain socket for nbd"
239 );
240 }
241 };
242
243 NBDHandler(
244 BlockDriver &backend,
245 config_t config) :
246 backend(backend),
247 uds_path(config.uds_path)
248 {}
249
250 seastar::future<> run();
251};
252
253int main(int argc, char** argv)
254{
255 po::options_description desc{"Allowed options"};
256 bool debug = false;
257 desc.add_options()
258 ("help,h", "show help message")
259 ("debug", po::value<bool>(&debug)->default_value(false),
260 "enable debugging");
261
262 po::options_description nbd_pattern_options{"NBD Pattern Options"};
263 NBDHandler::config_t nbd_config;
264 nbd_config.populate_options(nbd_pattern_options);
265 desc.add(nbd_pattern_options);
266
267 po::options_description backend_pattern_options{"Backend Options"};
268 BlockDriver::config_t backend_config;
269 backend_config.populate_options(backend_pattern_options);
270 desc.add(backend_pattern_options);
271
272 po::variables_map vm;
273 std::vector<std::string> unrecognized_options;
274 try {
275 auto parsed = po::command_line_parser(argc, argv)
276 .options(desc)
277 .allow_unregistered()
278 .run();
279 po::store(parsed, vm);
280 if (vm.count("help")) {
281 std::cout << desc << std::endl;
282 return 0;
283 }
284
285 po::notify(vm);
286 unrecognized_options =
287 po::collect_unrecognized(parsed.options, po::include_positional);
288 } catch(const po::error& e) {
289 std::cerr << "error: " << e.what() << std::endl;
290 return 1;
291 }
292 std::vector<const char*> args(argv, argv + argc);
293
294 seastar::app_template app;
295
296 std::vector<char*> av{argv[0]};
297 std::transform(begin(unrecognized_options),
298 end(unrecognized_options),
299 std::back_inserter(av),
300 [](auto& s) {
301 return const_cast<char*>(s.c_str());
302 });
303
304 SeastarRunner sc;
305 sc.init(av.size(), av.data());
306
307 if (debug) {
308 seastar::global_logger_registry().set_all_loggers_level(
309 seastar::log_level::debug
310 );
311 }
312
313 sc.run([=] {
314 auto backend = get_backend(backend_config);
315 return seastar::do_with(
316 NBDHandler(*backend, nbd_config),
317 std::move(backend),
318 [](auto &nbd, auto &backend) {
319 return backend->mount(
320 ).then([&] {
321 logger().debug("Running nbd server...");
322 return nbd.run();
323 }).then([&] {
324 return backend->close();
325 });
326 });
327 });
328 sc.stop();
329}
330
331class nbd_oldstyle_negotiation_t {
332 uint64_t magic = seastar::cpu_to_be(0x4e42444d41474943); // "NBDMAGIC"
333 uint64_t magic2 = seastar::cpu_to_be(0x00420281861253); // "IHAVEOPT"
334 uint64_t size = 0;
335 uint32_t flags = seastar::cpu_to_be(0);
336 char reserved[124] = {0};
337
338public:
339 nbd_oldstyle_negotiation_t(uint64_t size, uint32_t flags)
340 : size(seastar::cpu_to_be(size)), flags(seastar::cpu_to_be(flags)) {}
341} __attribute__((packed));
342
343seastar::future<> send_negotiation(
344 size_t size,
345 seastar::output_stream<char>& out)
346{
347 seastar::temporary_buffer<char> buf{sizeof(nbd_oldstyle_negotiation_t)};
348 new (buf.get_write()) nbd_oldstyle_negotiation_t(size, 1);
349 return out.write(std::move(buf)
350 ).then([&out] {
351 return out.flush();
352 });
353}
354
355seastar::future<> handle_command(
356 BlockDriver &backend,
357 request_context_t &context,
358 seastar::output_stream<char> &out)
359{
360 logger().debug("got command {}", context.get_command());
361 return ([&] {
362 switch (context.get_command()) {
363 case NBD_CMD_WRITE:
364 return backend.write(
365 context.from,
366 *context.in_buffer);
367 case NBD_CMD_READ:
368 return backend.read(
369 context.from,
370 context.len).then([&context] (auto buffer) {
371 context.out_buffer = buffer;
372 });
373 case NBD_CMD_DISC:
374 throw std::system_error(std::make_error_code(std::errc::bad_message));
375 case NBD_CMD_TRIM:
376 throw std::system_error(std::make_error_code(std::errc::bad_message));
377 default:
378 throw std::system_error(std::make_error_code(std::errc::bad_message));
379 }
380 })().then([&] {
381 logger().debug("Writing reply");
382 return context.write_reply(out);
383 });
384}
385
386
387seastar::future<> handle_commands(
388 BlockDriver &backend,
389 seastar::input_stream<char>& in,
390 seastar::output_stream<char>& out)
391{
392 logger().debug("handle_commands");
393 return seastar::keep_doing(
394 [&] {
395 logger().debug("waiting for command");
396 auto request_ref = std::make_unique<request_context_t>();
397 auto &request = *request_ref;
398 return request.read_request(in
399 ).then([&] {
400 return handle_command(backend, request, out);
401 }).then([req=std::move(request_ref)] {
402 logger().debug("complete");
403 });
404 });
405}
406
407seastar::future<> NBDHandler::run()
408{
409 logger().debug("About to listen on {}", uds_path);
410 return seastar::do_with(
411 seastar::engine().listen(
412 seastar::socket_address{
413 seastar::unix_domain_addr{uds_path}}),
414 [=](auto &socket) {
415 return seastar::keep_doing(
416 [this, &socket] {
417 return socket.accept().then([this](auto acc) {
418 logger().debug("Accepted");
419 return seastar::do_with(
420 std::move(acc.connection),
421 [this](auto &conn) {
422 return seastar::do_with(
423 conn.input(),
424 conn.output(),
425 [&, this](auto &input, auto &output) {
426 return send_negotiation(
427 backend.get_size(),
428 output
429 ).then([&, this] {
430 return handle_commands(backend, input, output);
431 }).finally([&] {
432 return input.close();
433 }).finally([&] {
434 return output.close();
435 }).handle_exception([](auto e) {
436 return seastar::now();
437 });
438 });
439 });
440 });
441 });
442 });
443}
444
445class TMDriver final : public BlockDriver {
446 const config_t config;
447 std::unique_ptr<segment_manager::block::BlockSegmentManager> segment_manager;
448 std::unique_ptr<SegmentCleaner> segment_cleaner;
449 std::unique_ptr<Journal> journal;
450 std::unique_ptr<Cache> cache;
451 LBAManagerRef lba_manager;
452 std::unique_ptr<TransactionManager> tm;
453
454public:
455 TMDriver(config_t config) : config(config) {}
456 ~TMDriver() final {}
457
458 bufferptr get_buffer(size_t size) final {
459 return ceph::buffer::create_page_aligned(size);
460 }
461
462 seastar::future<> write(
463 off_t offset,
464 bufferptr ptr) final {
465 logger().debug("Writing offset {}", offset);
466 assert(offset % segment_manager->get_block_size() == 0);
467 assert(ptr.length() == (size_t)segment_manager->get_block_size());
468 return seastar::do_with(
469 tm->create_transaction(),
470 std::move(ptr),
471 [this, offset](auto &t, auto &ptr) {
472 return tm->dec_ref(
473 *t,
474 offset
475 ).safe_then([](auto){}).handle_error(
476 crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }),
477 crimson::ct_error::pass_further_all{}
478 ).safe_then([=, &t, &ptr] {
479 logger().debug("dec_ref complete");
480 return tm->alloc_extent<TestBlock>(
481 *t,
482 offset,
483 ptr.length());
484 }).safe_then([=, &t, &ptr](auto ext) mutable {
485 assert(ext->get_laddr() == (size_t)offset);
486 assert(ext->get_bptr().length() == ptr.length());
487 ext->get_bptr().swap(ptr);
488 logger().debug("submitting transaction");
489 return tm->submit_transaction(std::move(t));
490 });
491 }).handle_error(
492 crimson::ct_error::assert_all{}
493 );
494 }
495
496 seastar::future<bufferlist> read(
497 off_t offset,
498 size_t size) final {
499 logger().debug("Reading offset {}", offset);
500 assert(offset % segment_manager->get_block_size() == 0);
501 assert(size % (size_t)segment_manager->get_block_size() == 0);
502 return seastar::do_with(
503 tm->create_transaction(),
504 [this, offset, size](auto &t) {
505 return tm->read_extents<TestBlock>(*t, offset, size
506 ).safe_then([=](auto ext_list) mutable {
507 size_t cur = offset;
508 bufferlist bl;
509 for (auto &i: ext_list) {
510 if (cur != i.first) {
511 assert(cur < i.first);
512 bl.append_zero(i.first - cur);
513 cur = i.first;
514 }
515 bl.append(i.second->get_bptr());
516 cur += i.second->get_bptr().length();
517 }
518 if (bl.length() != size) {
519 assert(bl.length() < size);
520 bl.append_zero(size - bl.length());
521 }
522 return seastar::make_ready_future<bufferlist>(std::move(bl));
523 });
524 }).handle_error(
525 crimson::ct_error::assert_all{}
526 );
527 }
528
529 void init() {
530 segment_cleaner = std::make_unique<SegmentCleaner>(
531 SegmentCleaner::config_t::default_from_segment_manager(
532 *segment_manager),
533 true);
534 journal = std::make_unique<Journal>(*segment_manager);
535 cache = std::make_unique<Cache>(*segment_manager);
536 lba_manager = lba_manager::create_lba_manager(*segment_manager, *cache);
537 tm = std::make_unique<TransactionManager>(
538 *segment_manager, *segment_cleaner, *journal, *cache, *lba_manager);
539 journal->set_segment_provider(&*segment_cleaner);
540 segment_cleaner->set_extent_callback(&*tm);
541 }
542
543 void clear() {
544 tm.reset();
545 lba_manager.reset();
546 cache.reset();
547 journal.reset();
548 segment_cleaner.reset();
549 }
550
551 size_t get_size() const final {
552 return segment_manager->get_size() * .5;
553 }
554
555 seastar::future<> mkfs() {
556 assert(config.path);
557 segment_manager = std::make_unique<
558 segment_manager::block::BlockSegmentManager
559 >();
560 logger().debug("mkfs");
561 return segment_manager->mkfs(
562 { *config.path, config.segment_size, config.total_device_size }
563 ).safe_then([this] {
564 logger().debug("");
565 return segment_manager->mount({ *config.path });
566 }).safe_then([this] {
567 init();
568 logger().debug("tm mkfs");
569 return tm->mkfs();
570 }).safe_then([this] {
571 logger().debug("tm close");
572 return tm->close();
573 }).safe_then([this] {
574 logger().debug("sm close");
575 return segment_manager->close();
576 }).safe_then([this] {
577 clear();
578 logger().debug("mkfs complete");
579 return TransactionManager::mkfs_ertr::now();
580 }).handle_error(
581 crimson::ct_error::assert_all{}
582 );
583 }
584
585 seastar::future<> mount() final {
586 return (config.mkfs ? mkfs() : seastar::now()
587 ).then([this] {
588 segment_manager = std::make_unique<
589 segment_manager::block::BlockSegmentManager
590 >();
591 return segment_manager->mount({ *config.path });
592 }).safe_then([this] {
593 init();
594 return tm->mount();
595 }).handle_error(
596 crimson::ct_error::assert_all{}
597 );
598 };
599
600 seastar::future<> close() final {
601 return segment_manager->close(
602 ).safe_then([this] {
603 return tm->close();
604 }).safe_then([this] {
605 clear();
606 return seastar::now();
607 }).handle_error(
608 crimson::ct_error::assert_all{}
609 );
610 }
611};
612
613BlockDriverRef get_backend(BlockDriver::config_t config)
614{
615 if (config.type == "transaction_manager") {
616 return std::make_unique<TMDriver>(config);
617 } else {
618 ceph_assert(0 == "invalid option");
619 return BlockDriverRef();
620 }
621}