]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | ||
3 | /** | |
4 | * crimson-store-nbd | |
5 | * | |
6 | * This tool exposes crimson object store internals as an nbd server | |
7 | * for use with fio in basic benchmarking. | |
8 | * | |
9 | * Example usage: | |
10 | * | |
11 | * $ ./bin/crimson-store-nbd --device-path /dev/nvme1n1 -c 1 --total-device-size=107374182400 --mkfs true --uds-path /tmp/store_nbd_socket.sock | |
12 | * | |
13 | * $ cat nbd.fio | |
14 | * [global] | |
15 | * ioengine=nbd | |
16 | * uri=nbd+unix:///?socket=/tmp/store_nbd_socket.sock | |
17 | * rw=randrw | |
18 | * time_based | |
19 | * runtime=120 | |
20 | * group_reporting | |
21 | * iodepth=1 | |
22 | * size=500G | |
23 | * | |
24 | * [job0] | |
25 | * offset=0 | |
26 | * | |
27 | * $ fio nbd.fio | |
28 | */ | |
29 | ||
30 | #include <random> | |
31 | ||
32 | #include <boost/program_options/variables_map.hpp> | |
33 | #include <boost/program_options/parsers.hpp> | |
34 | ||
35 | #include <linux/nbd.h> | |
36 | #include <linux/fs.h> | |
37 | ||
38 | #include <seastar/core/byteorder.hh> | |
39 | ||
40 | #include "crimson/os/seastore/cache.h" | |
41 | #include "crimson/os/seastore/segment_cleaner.h" | |
42 | #include "crimson/os/seastore/segment_manager.h" | |
43 | #include "crimson/os/seastore/segment_manager/block.h" | |
44 | #include "crimson/os/seastore/transaction_manager.h" | |
45 | ||
46 | #include "test/crimson/seastar_runner.h" | |
47 | #include "test/crimson/seastore/test_block.h" | |
48 | ||
49 | namespace po = boost::program_options; | |
50 | ||
51 | using namespace ceph; | |
52 | using namespace crimson; | |
53 | using namespace crimson::os; | |
54 | using namespace crimson::os::seastore; | |
55 | using namespace crimson::os::seastore::segment_manager::block; | |
56 | ||
57 | namespace { | |
58 | seastar::logger& logger() { | |
59 | return crimson::get_logger(ceph_subsys_test); | |
60 | } | |
61 | } | |
62 | ||
63 | /** | |
64 | * BlockDriver | |
65 | * | |
66 | * Simple interface to enable throughput test to compare raw disk to | |
67 | * transaction_manager, etc | |
68 | */ | |
69 | class BlockDriver { | |
70 | public: | |
71 | struct config_t { | |
72 | std::string type; | |
73 | bool mkfs = false; | |
74 | std::optional<std::string> path; | |
75 | size_t segment_size; | |
76 | size_t total_device_size; | |
77 | ||
78 | void populate_options( | |
79 | po::options_description &desc) | |
80 | { | |
81 | desc.add_options() | |
82 | ("type", | |
83 | po::value<std::string>() | |
84 | ->default_value("transaction_manager") | |
85 | ->notifier([this](auto s) { type = s; }), | |
86 | "Backend to use, options are transaction_manager" | |
87 | ) | |
88 | ("segment-size", | |
89 | po::value<size_t>() | |
90 | ->default_value(16ul << 20 /* 16MB */) | |
91 | ->notifier([this](auto s) { segment_size = s; }), | |
92 | "Total working set size" | |
93 | ) | |
94 | ("total-device-size", | |
95 | po::value<size_t>() | |
96 | ->default_value(10ul << 30 /* 10G */) | |
97 | ->notifier([this](auto s) { total_device_size = s; }), | |
98 | "Size of writes" | |
99 | ) | |
100 | ("device-path", | |
101 | po::value<std::string>() | |
102 | ->required() | |
103 | ->notifier([this](auto s) { path = s; }), | |
104 | "Number of writes outstanding" | |
105 | ) | |
106 | ("mkfs", | |
107 | po::value<bool>() | |
108 | ->default_value(false) | |
109 | ->notifier([this](auto s) { mkfs = s; }), | |
110 | "Do mkfs first" | |
111 | ); | |
112 | } | |
113 | }; | |
114 | ||
115 | virtual bufferptr get_buffer(size_t size) = 0; | |
116 | ||
117 | virtual seastar::future<> write( | |
118 | off_t offset, | |
119 | bufferptr ptr) = 0; | |
120 | ||
121 | virtual seastar::future<bufferlist> read( | |
122 | off_t offset, | |
123 | size_t size) = 0; | |
124 | ||
125 | virtual size_t get_size() const = 0; | |
126 | ||
127 | virtual seastar::future<> mount() = 0; | |
128 | virtual seastar::future<> close() = 0; | |
129 | ||
130 | virtual ~BlockDriver() {} | |
131 | }; | |
132 | using BlockDriverRef = std::unique_ptr<BlockDriver>; | |
133 | ||
134 | BlockDriverRef get_backend(BlockDriver::config_t config); | |
135 | ||
136 | struct request_context_t { | |
137 | uint32_t magic = 0; | |
138 | uint32_t type = 0; | |
139 | ||
140 | char handle[8] = {0}; | |
141 | ||
142 | uint64_t from = 0; | |
143 | uint32_t len = 0; | |
144 | ||
145 | unsigned err = 0; | |
146 | std::optional<bufferptr> in_buffer; | |
147 | std::optional<bufferlist> out_buffer; | |
148 | ||
149 | bool check_magic() const { | |
150 | // todo | |
151 | return true; | |
152 | } | |
153 | ||
154 | uint32_t get_command() const { | |
155 | return type & 0xff; | |
156 | } | |
157 | ||
158 | bool has_input_buffer() const { | |
159 | return get_command() == NBD_CMD_WRITE; | |
160 | } | |
161 | ||
162 | seastar::future<> read_request(seastar::input_stream<char> &in) { | |
163 | return in.read_exactly(sizeof(struct nbd_request) | |
164 | ).then([this, &in](auto buf) { | |
165 | auto p = buf.get(); | |
166 | magic = seastar::consume_be<uint32_t>(p); | |
167 | type = seastar::consume_be<uint32_t>(p); | |
168 | memcpy(handle, p, sizeof(handle)); | |
169 | p += sizeof(handle); | |
170 | from = seastar::consume_be<uint64_t>(p); | |
171 | len = seastar::consume_be<uint32_t>(p); | |
172 | logger().debug( | |
173 | "Got request, magic {}, type {}, from {}, len {}", | |
174 | magic, type, from, len); | |
175 | ||
176 | if (has_input_buffer()) { | |
177 | return in.read_exactly(len).then([this](auto buf) { | |
178 | in_buffer = ceph::buffer::create_page_aligned(len); | |
179 | in_buffer->copy_in(0, len, buf.get()); | |
180 | return seastar::now(); | |
181 | }); | |
182 | } else { | |
183 | return seastar::now(); | |
184 | } | |
185 | }); | |
186 | } | |
187 | ||
188 | seastar::future<> write_reply(seastar::output_stream<char> &out) { | |
189 | seastar::temporary_buffer<char> buffer{sizeof(struct nbd_reply)}; | |
190 | auto p = buffer.get_write(); | |
191 | seastar::produce_be<uint32_t>(p, NBD_REPLY_MAGIC); | |
192 | seastar::produce_be<uint32_t>(p, err); | |
193 | memcpy(p, handle, sizeof(handle)); | |
194 | return out.write(std::move(buffer)).then([this, &out] { | |
195 | if (out_buffer) { | |
196 | return seastar::do_for_each( | |
197 | out_buffer->mut_buffers(), | |
198 | [&out](bufferptr &ptr) { | |
199 | return out.write( | |
200 | seastar::temporary_buffer<char>( | |
201 | ptr.c_str(), | |
202 | ptr.length(), | |
203 | seastar::make_deleter([ptr](){})) | |
204 | ); | |
205 | }); | |
206 | } else { | |
207 | return seastar::now(); | |
208 | } | |
209 | }).then([&out] { | |
210 | return out.flush(); | |
211 | }); | |
212 | } | |
213 | }; | |
214 | ||
215 | /** | |
216 | * NBDHandler | |
217 | * | |
218 | * Simple throughput test for concurrent, single threaded | |
219 | * writes to an BlockDriver. | |
220 | */ | |
221 | class NBDHandler { | |
222 | BlockDriver &backend; | |
223 | std::string uds_path; | |
224 | public: | |
225 | struct config_t { | |
226 | std::string uds_path; | |
227 | ||
228 | void populate_options( | |
229 | po::options_description &desc) | |
230 | { | |
231 | desc.add_options() | |
232 | ("uds-path", | |
233 | po::value<std::string>() | |
234 | ->default_value("/tmp/store_nbd_socket.sock") | |
235 | ->notifier([this](auto s) { | |
236 | uds_path = s; | |
237 | }), | |
238 | "Path to domain socket for nbd" | |
239 | ); | |
240 | } | |
241 | }; | |
242 | ||
243 | NBDHandler( | |
244 | BlockDriver &backend, | |
245 | config_t config) : | |
246 | backend(backend), | |
247 | uds_path(config.uds_path) | |
248 | {} | |
249 | ||
250 | seastar::future<> run(); | |
251 | }; | |
252 | ||
253 | int main(int argc, char** argv) | |
254 | { | |
255 | po::options_description desc{"Allowed options"}; | |
256 | bool debug = false; | |
257 | desc.add_options() | |
258 | ("help,h", "show help message") | |
259 | ("debug", po::value<bool>(&debug)->default_value(false), | |
260 | "enable debugging"); | |
261 | ||
262 | po::options_description nbd_pattern_options{"NBD Pattern Options"}; | |
263 | NBDHandler::config_t nbd_config; | |
264 | nbd_config.populate_options(nbd_pattern_options); | |
265 | desc.add(nbd_pattern_options); | |
266 | ||
267 | po::options_description backend_pattern_options{"Backend Options"}; | |
268 | BlockDriver::config_t backend_config; | |
269 | backend_config.populate_options(backend_pattern_options); | |
270 | desc.add(backend_pattern_options); | |
271 | ||
272 | po::variables_map vm; | |
273 | std::vector<std::string> unrecognized_options; | |
274 | try { | |
275 | auto parsed = po::command_line_parser(argc, argv) | |
276 | .options(desc) | |
277 | .allow_unregistered() | |
278 | .run(); | |
279 | po::store(parsed, vm); | |
280 | if (vm.count("help")) { | |
281 | std::cout << desc << std::endl; | |
282 | return 0; | |
283 | } | |
284 | ||
285 | po::notify(vm); | |
286 | unrecognized_options = | |
287 | po::collect_unrecognized(parsed.options, po::include_positional); | |
288 | } catch(const po::error& e) { | |
289 | std::cerr << "error: " << e.what() << std::endl; | |
290 | return 1; | |
291 | } | |
292 | std::vector<const char*> args(argv, argv + argc); | |
293 | ||
294 | seastar::app_template app; | |
295 | ||
296 | std::vector<char*> av{argv[0]}; | |
297 | std::transform(begin(unrecognized_options), | |
298 | end(unrecognized_options), | |
299 | std::back_inserter(av), | |
300 | [](auto& s) { | |
301 | return const_cast<char*>(s.c_str()); | |
302 | }); | |
303 | ||
304 | SeastarRunner sc; | |
305 | sc.init(av.size(), av.data()); | |
306 | ||
307 | if (debug) { | |
308 | seastar::global_logger_registry().set_all_loggers_level( | |
309 | seastar::log_level::debug | |
310 | ); | |
311 | } | |
312 | ||
313 | sc.run([=] { | |
314 | auto backend = get_backend(backend_config); | |
315 | return seastar::do_with( | |
316 | NBDHandler(*backend, nbd_config), | |
317 | std::move(backend), | |
318 | [](auto &nbd, auto &backend) { | |
319 | return backend->mount( | |
320 | ).then([&] { | |
321 | logger().debug("Running nbd server..."); | |
322 | return nbd.run(); | |
323 | }).then([&] { | |
324 | return backend->close(); | |
325 | }); | |
326 | }); | |
327 | }); | |
328 | sc.stop(); | |
329 | } | |
330 | ||
331 | class nbd_oldstyle_negotiation_t { | |
332 | uint64_t magic = seastar::cpu_to_be(0x4e42444d41474943); // "NBDMAGIC" | |
333 | uint64_t magic2 = seastar::cpu_to_be(0x00420281861253); // "IHAVEOPT" | |
334 | uint64_t size = 0; | |
335 | uint32_t flags = seastar::cpu_to_be(0); | |
336 | char reserved[124] = {0}; | |
337 | ||
338 | public: | |
339 | nbd_oldstyle_negotiation_t(uint64_t size, uint32_t flags) | |
340 | : size(seastar::cpu_to_be(size)), flags(seastar::cpu_to_be(flags)) {} | |
341 | } __attribute__((packed)); | |
342 | ||
343 | seastar::future<> send_negotiation( | |
344 | size_t size, | |
345 | seastar::output_stream<char>& out) | |
346 | { | |
347 | seastar::temporary_buffer<char> buf{sizeof(nbd_oldstyle_negotiation_t)}; | |
348 | new (buf.get_write()) nbd_oldstyle_negotiation_t(size, 1); | |
349 | return out.write(std::move(buf) | |
350 | ).then([&out] { | |
351 | return out.flush(); | |
352 | }); | |
353 | } | |
354 | ||
355 | seastar::future<> handle_command( | |
356 | BlockDriver &backend, | |
357 | request_context_t &context, | |
358 | seastar::output_stream<char> &out) | |
359 | { | |
360 | logger().debug("got command {}", context.get_command()); | |
361 | return ([&] { | |
362 | switch (context.get_command()) { | |
363 | case NBD_CMD_WRITE: | |
364 | return backend.write( | |
365 | context.from, | |
366 | *context.in_buffer); | |
367 | case NBD_CMD_READ: | |
368 | return backend.read( | |
369 | context.from, | |
370 | context.len).then([&context] (auto buffer) { | |
371 | context.out_buffer = buffer; | |
372 | }); | |
373 | case NBD_CMD_DISC: | |
374 | throw std::system_error(std::make_error_code(std::errc::bad_message)); | |
375 | case NBD_CMD_TRIM: | |
376 | throw std::system_error(std::make_error_code(std::errc::bad_message)); | |
377 | default: | |
378 | throw std::system_error(std::make_error_code(std::errc::bad_message)); | |
379 | } | |
380 | })().then([&] { | |
381 | logger().debug("Writing reply"); | |
382 | return context.write_reply(out); | |
383 | }); | |
384 | } | |
385 | ||
386 | ||
387 | seastar::future<> handle_commands( | |
388 | BlockDriver &backend, | |
389 | seastar::input_stream<char>& in, | |
390 | seastar::output_stream<char>& out) | |
391 | { | |
392 | logger().debug("handle_commands"); | |
393 | return seastar::keep_doing( | |
394 | [&] { | |
395 | logger().debug("waiting for command"); | |
396 | auto request_ref = std::make_unique<request_context_t>(); | |
397 | auto &request = *request_ref; | |
398 | return request.read_request(in | |
399 | ).then([&] { | |
400 | return handle_command(backend, request, out); | |
401 | }).then([req=std::move(request_ref)] { | |
402 | logger().debug("complete"); | |
403 | }); | |
404 | }); | |
405 | } | |
406 | ||
407 | seastar::future<> NBDHandler::run() | |
408 | { | |
409 | logger().debug("About to listen on {}", uds_path); | |
410 | return seastar::do_with( | |
411 | seastar::engine().listen( | |
412 | seastar::socket_address{ | |
413 | seastar::unix_domain_addr{uds_path}}), | |
414 | [=](auto &socket) { | |
415 | return seastar::keep_doing( | |
416 | [this, &socket] { | |
417 | return socket.accept().then([this](auto acc) { | |
418 | logger().debug("Accepted"); | |
419 | return seastar::do_with( | |
420 | std::move(acc.connection), | |
421 | [this](auto &conn) { | |
422 | return seastar::do_with( | |
423 | conn.input(), | |
424 | conn.output(), | |
425 | [&, this](auto &input, auto &output) { | |
426 | return send_negotiation( | |
427 | backend.get_size(), | |
428 | output | |
429 | ).then([&, this] { | |
430 | return handle_commands(backend, input, output); | |
431 | }).finally([&] { | |
432 | return input.close(); | |
433 | }).finally([&] { | |
434 | return output.close(); | |
435 | }).handle_exception([](auto e) { | |
436 | return seastar::now(); | |
437 | }); | |
438 | }); | |
439 | }); | |
440 | }); | |
441 | }); | |
442 | }); | |
443 | } | |
444 | ||
445 | class TMDriver final : public BlockDriver { | |
446 | const config_t config; | |
447 | std::unique_ptr<segment_manager::block::BlockSegmentManager> segment_manager; | |
448 | std::unique_ptr<SegmentCleaner> segment_cleaner; | |
449 | std::unique_ptr<Journal> journal; | |
450 | std::unique_ptr<Cache> cache; | |
451 | LBAManagerRef lba_manager; | |
452 | std::unique_ptr<TransactionManager> tm; | |
453 | ||
454 | public: | |
455 | TMDriver(config_t config) : config(config) {} | |
456 | ~TMDriver() final {} | |
457 | ||
458 | bufferptr get_buffer(size_t size) final { | |
459 | return ceph::buffer::create_page_aligned(size); | |
460 | } | |
461 | ||
462 | seastar::future<> write( | |
463 | off_t offset, | |
464 | bufferptr ptr) final { | |
465 | logger().debug("Writing offset {}", offset); | |
466 | assert(offset % segment_manager->get_block_size() == 0); | |
467 | assert(ptr.length() == (size_t)segment_manager->get_block_size()); | |
468 | return seastar::do_with( | |
469 | tm->create_transaction(), | |
470 | std::move(ptr), | |
471 | [this, offset](auto &t, auto &ptr) { | |
472 | return tm->dec_ref( | |
473 | *t, | |
474 | offset | |
475 | ).safe_then([](auto){}).handle_error( | |
476 | crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }), | |
477 | crimson::ct_error::pass_further_all{} | |
478 | ).safe_then([=, &t, &ptr] { | |
479 | logger().debug("dec_ref complete"); | |
480 | return tm->alloc_extent<TestBlock>( | |
481 | *t, | |
482 | offset, | |
483 | ptr.length()); | |
484 | }).safe_then([=, &t, &ptr](auto ext) mutable { | |
485 | assert(ext->get_laddr() == (size_t)offset); | |
486 | assert(ext->get_bptr().length() == ptr.length()); | |
487 | ext->get_bptr().swap(ptr); | |
488 | logger().debug("submitting transaction"); | |
489 | return tm->submit_transaction(std::move(t)); | |
490 | }); | |
491 | }).handle_error( | |
492 | crimson::ct_error::assert_all{} | |
493 | ); | |
494 | } | |
495 | ||
496 | seastar::future<bufferlist> read( | |
497 | off_t offset, | |
498 | size_t size) final { | |
499 | logger().debug("Reading offset {}", offset); | |
500 | assert(offset % segment_manager->get_block_size() == 0); | |
501 | assert(size % (size_t)segment_manager->get_block_size() == 0); | |
502 | return seastar::do_with( | |
503 | tm->create_transaction(), | |
504 | [this, offset, size](auto &t) { | |
505 | return tm->read_extents<TestBlock>(*t, offset, size | |
506 | ).safe_then([=](auto ext_list) mutable { | |
507 | size_t cur = offset; | |
508 | bufferlist bl; | |
509 | for (auto &i: ext_list) { | |
510 | if (cur != i.first) { | |
511 | assert(cur < i.first); | |
512 | bl.append_zero(i.first - cur); | |
513 | cur = i.first; | |
514 | } | |
515 | bl.append(i.second->get_bptr()); | |
516 | cur += i.second->get_bptr().length(); | |
517 | } | |
518 | if (bl.length() != size) { | |
519 | assert(bl.length() < size); | |
520 | bl.append_zero(size - bl.length()); | |
521 | } | |
522 | return seastar::make_ready_future<bufferlist>(std::move(bl)); | |
523 | }); | |
524 | }).handle_error( | |
525 | crimson::ct_error::assert_all{} | |
526 | ); | |
527 | } | |
528 | ||
529 | void init() { | |
530 | segment_cleaner = std::make_unique<SegmentCleaner>( | |
531 | SegmentCleaner::config_t::default_from_segment_manager( | |
532 | *segment_manager), | |
533 | true); | |
534 | journal = std::make_unique<Journal>(*segment_manager); | |
535 | cache = std::make_unique<Cache>(*segment_manager); | |
536 | lba_manager = lba_manager::create_lba_manager(*segment_manager, *cache); | |
537 | tm = std::make_unique<TransactionManager>( | |
538 | *segment_manager, *segment_cleaner, *journal, *cache, *lba_manager); | |
539 | journal->set_segment_provider(&*segment_cleaner); | |
540 | segment_cleaner->set_extent_callback(&*tm); | |
541 | } | |
542 | ||
543 | void clear() { | |
544 | tm.reset(); | |
545 | lba_manager.reset(); | |
546 | cache.reset(); | |
547 | journal.reset(); | |
548 | segment_cleaner.reset(); | |
549 | } | |
550 | ||
551 | size_t get_size() const final { | |
552 | return segment_manager->get_size() * .5; | |
553 | } | |
554 | ||
555 | seastar::future<> mkfs() { | |
556 | assert(config.path); | |
557 | segment_manager = std::make_unique< | |
558 | segment_manager::block::BlockSegmentManager | |
559 | >(); | |
560 | logger().debug("mkfs"); | |
561 | return segment_manager->mkfs( | |
562 | { *config.path, config.segment_size, config.total_device_size } | |
563 | ).safe_then([this] { | |
564 | logger().debug(""); | |
565 | return segment_manager->mount({ *config.path }); | |
566 | }).safe_then([this] { | |
567 | init(); | |
568 | logger().debug("tm mkfs"); | |
569 | return tm->mkfs(); | |
570 | }).safe_then([this] { | |
571 | logger().debug("tm close"); | |
572 | return tm->close(); | |
573 | }).safe_then([this] { | |
574 | logger().debug("sm close"); | |
575 | return segment_manager->close(); | |
576 | }).safe_then([this] { | |
577 | clear(); | |
578 | logger().debug("mkfs complete"); | |
579 | return TransactionManager::mkfs_ertr::now(); | |
580 | }).handle_error( | |
581 | crimson::ct_error::assert_all{} | |
582 | ); | |
583 | } | |
584 | ||
585 | seastar::future<> mount() final { | |
586 | return (config.mkfs ? mkfs() : seastar::now() | |
587 | ).then([this] { | |
588 | segment_manager = std::make_unique< | |
589 | segment_manager::block::BlockSegmentManager | |
590 | >(); | |
591 | return segment_manager->mount({ *config.path }); | |
592 | }).safe_then([this] { | |
593 | init(); | |
594 | return tm->mount(); | |
595 | }).handle_error( | |
596 | crimson::ct_error::assert_all{} | |
597 | ); | |
598 | }; | |
599 | ||
600 | seastar::future<> close() final { | |
601 | return segment_manager->close( | |
602 | ).safe_then([this] { | |
603 | return tm->close(); | |
604 | }).safe_then([this] { | |
605 | clear(); | |
606 | return seastar::now(); | |
607 | }).handle_error( | |
608 | crimson::ct_error::assert_all{} | |
609 | ); | |
610 | } | |
611 | }; | |
612 | ||
613 | BlockDriverRef get_backend(BlockDriver::config_t config) | |
614 | { | |
615 | if (config.type == "transaction_manager") { | |
616 | return std::make_unique<TMDriver>(config); | |
617 | } else { | |
618 | ceph_assert(0 == "invalid option"); | |
619 | return BlockDriverRef(); | |
620 | } | |
621 | } |