]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/crimson/seastore/test_cbjournal.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / crimson / seastore / test_cbjournal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "test/crimson/gtest_seastar.h"
5
6 #include <random>
7
8 #include "crimson/common/log.h"
9 #include "crimson/os/seastore/async_cleaner.h"
10 #include "crimson/os/seastore/journal.h"
11 #include "crimson/os/seastore/journal/circular_bounded_journal.h"
12 #include "crimson/os/seastore/random_block_manager.h"
13 #include "crimson/os/seastore/random_block_manager/rbm_device.h"
14 #include "crimson/os/seastore/seastore_types.h"
15 #include "test/crimson/seastore/transaction_manager_test_state.h"
16 #include "crimson/os/seastore/random_block_manager/block_rb_manager.h"
17
18 using namespace crimson;
19 using namespace crimson::os;
20 using namespace crimson::os::seastore;
21 using namespace crimson::os::seastore::journal;
22
23 namespace {
24 [[maybe_unused]] seastar::logger& logger() {
25 return crimson::get_logger(ceph_subsys_test);
26 }
27 }
28
29 std::optional<record_t> decode_record(
30 bufferlist& bl)
31 {
32 record_t record;
33 record_group_header_t r_header;
34 auto bliter = bl.cbegin();
35 decode(r_header, bliter);
36 logger().debug(" decode_record mdlength {} records {}",
37 r_header.mdlength, r_header.records);
38 device_id_t d_id = 1 << (std::numeric_limits<device_id_t>::digits - 1);
39
40 auto del_infos = try_decode_deltas(r_header, bl,
41 paddr_t::make_blk_paddr(d_id, 0));
42 for (auto &iter : *del_infos) {
43 for (auto r : iter.deltas) {
44 record.deltas.push_back(r.second);
45 }
46 }
47 auto ex_infos = try_decode_extent_infos(r_header, bl);
48 auto bliter_ex = bl.cbegin();
49 bliter_ex += r_header.mdlength;
50 for (auto &iter: *ex_infos) {
51 for (auto e : iter.extent_infos) {
52 extent_t ex;
53 auto bptr = bufferptr(ceph::buffer::create_page_aligned(e.len));
54 logger().debug(" exten len {} remaining {} ", e.len, bliter_ex.get_remaining());
55 bliter_ex.copy(e.len, bptr.c_str());
56 ex.bl.append(bptr);
57 record.extents.push_back(ex);
58 }
59 }
60 return record;
61 }
62
63 struct entry_validator_t {
64 bufferlist bl;
65 int entries;
66 record_t record;
67 segment_nonce_t magic = 0;
68 journal_seq_t seq;
69
70 template <typename... T>
71 entry_validator_t(T&&... entry) : record(std::forward<T>(entry)...) {}
72
73 void validate(record_t read) {
74 auto iter = read.extents.begin();
75 for (auto &&block : record.extents) {
76 ASSERT_EQ(
77 iter->bl.length(),
78 block.bl.length());
79 ASSERT_EQ(
80 iter->bl.begin().crc32c(iter->bl.length(), 1),
81 block.bl.begin().crc32c(block.bl.length(), 1));
82 ++iter;
83 }
84 auto iter_delta = read.deltas.begin();
85 for (auto &&block : record.deltas) {
86 ASSERT_EQ(
87 iter_delta->bl.length(),
88 block.bl.length());
89 ASSERT_EQ(
90 iter_delta->bl.begin().crc32c(iter_delta->bl.length(), 1),
91 block.bl.begin().crc32c(block.bl.length(), 1));
92 ++iter_delta;
93 }
94 }
95 void validate(CircularBoundedJournal &cbj) {
96 rbm_abs_addr offset = 0;
97 auto cursor = scan_valid_records_cursor(seq);
98 cbj.test_initialize_cursor(cursor);
99 for (int i = 0; i < entries; i++) {
100 paddr_t paddr = seq.offset.add_offset(offset);
101 cursor.seq.offset = paddr;
102 auto md = cbj.test_read_validate_record_metadata(
103 cursor, magic).unsafe_get0();
104 assert(md);
105 auto& [header, md_bl] = *md;
106 auto dbuf = cbj.read(
107 paddr.add_offset(header.mdlength),
108 header.dlength).unsafe_get0();
109
110 bufferlist bl;
111 bl.append(md_bl);
112 bl.append(dbuf);
113 auto record = decode_record(bl);
114 validate(*record);
115 offset += header.mdlength + header.dlength;
116 cursor.last_committed = header.committed_to;
117 }
118 }
119
120 rbm_abs_addr get_abs_addr() {
121 return convert_paddr_to_abs_addr(seq.offset);
122 }
123
124 bool validate_delta(bufferlist bl) {
125 for (auto &&block : record.deltas) {
126 if (bl.begin().crc32c(bl.length(), 1) ==
127 block.bl.begin().crc32c(block.bl.length(), 1)) {
128 return true;
129 }
130 }
131 return false;
132 }
133 };
134
135 struct cbjournal_test_t : public seastar_test_suite_t, JournalTrimmer
136 {
137 std::vector<entry_validator_t> entries;
138 std::unique_ptr<CircularBoundedJournal> cbj;
139 random_block_device::EphemeralRBMDeviceRef device;
140
141 std::default_random_engine generator;
142 uint64_t block_size;
143 WritePipeline pipeline;
144
145 cbjournal_test_t() = default;
146
147 /*
148 * JournalTrimmer interfaces
149 */
150 journal_seq_t get_journal_head() const {
151 return JOURNAL_SEQ_NULL;
152 }
153
154 journal_seq_t get_dirty_tail() const final {
155 return JOURNAL_SEQ_NULL;
156 }
157
158 journal_seq_t get_alloc_tail() const final {
159 return JOURNAL_SEQ_NULL;
160 }
161
162 void set_journal_head(journal_seq_t head) final {}
163
164 void update_journal_tails(
165 journal_seq_t dirty_tail,
166 journal_seq_t alloc_tail) final {}
167
168 bool try_reserve_inline_usage(std::size_t) final { return true; }
169
170 void release_inline_usage(std::size_t) final {}
171
172 std::size_t get_trim_size_per_cycle() const final {
173 return 0;
174 }
175
176 auto submit_record(record_t&& record) {
177 entries.push_back(record);
178 OrderingHandle handle = get_dummy_ordering_handle();
179 auto [addr, w_result] = cbj->submit_record(
180 std::move(record),
181 handle).unsafe_get0();
182 entries.back().seq = w_result.start_seq;
183 entries.back().entries = 1;
184 entries.back().magic = cbj->get_cjs().get_cbj_header().magic;
185 logger().debug("submit entry to addr {}", entries.back().seq);
186 return convert_paddr_to_abs_addr(entries.back().seq.offset);
187 }
188
189 seastar::future<> tear_down_fut() final {
190 return close();
191 }
192
193 extent_t generate_extent(size_t blocks) {
194 std::uniform_int_distribution<char> distribution(
195 std::numeric_limits<char>::min(),
196 std::numeric_limits<char>::max()
197 );
198 char contents = distribution(generator);
199 bufferlist bl;
200 bl.append(buffer::ptr(buffer::create(blocks * block_size, contents)));
201 return extent_t{extent_types_t::TEST_BLOCK, L_ADDR_NULL, bl};
202 }
203
204 delta_info_t generate_delta(size_t bytes) {
205 std::uniform_int_distribution<char> distribution(
206 std::numeric_limits<char>::min(),
207 std::numeric_limits<char>::max()
208 );
209 char contents = distribution(generator);
210 bufferlist bl;
211 bl.append(buffer::ptr(buffer::create(bytes, contents)));
212 return delta_info_t{
213 extent_types_t::TEST_BLOCK,
214 paddr_t{},
215 L_ADDR_NULL,
216 0, 0,
217 device->get_block_size(),
218 1,
219 0,
220 segment_type_t::JOURNAL,
221 bl
222 };
223 }
224
225 auto replay_and_check() {
226 for (auto &i : entries) {
227 i.validate(*(cbj.get()));
228 }
229 }
230
231 auto replay() {
232 return cbj->replay(
233 [this](const auto &offsets,
234 const auto &e,
235 auto &dirty_seq,
236 auto &alloc_seq,
237 auto last_modified) {
238 bool found = false;
239 for (auto &i : entries) {
240 paddr_t base = offsets.write_result.start_seq.offset;
241 rbm_abs_addr addr = convert_paddr_to_abs_addr(base);
242 if (addr == i.get_abs_addr()) {
243 logger().debug(" compare addr: {} and i.addr {} ", base, i.get_abs_addr());
244 found = i.validate_delta(e.bl);
245 break;
246 }
247 }
248 assert(found == true);
249 return Journal::replay_ertr::make_ready_future<bool>(true);
250 });
251 }
252
253 auto mkfs() {
254 device_config_t config = get_rbm_ephemeral_device_config(0, 1);
255 return device->mkfs(config
256 ).safe_then([this]() {
257 return device->mount(
258 ).safe_then([this]() {
259 return cbj->open_for_mkfs(
260 ).safe_then([](auto q) {
261 return seastar::now();
262 });
263 });
264 }).safe_then([this] {
265 return cbj->close();
266 });
267 }
268 auto open() {
269 return cbj->open_for_mount(
270 ).safe_then([](auto q) {
271 return seastar::now();
272 });
273 }
274 seastar::future<> close() {
275 return cbj->close().handle_error(crimson::ct_error::assert_all{});
276 }
277 auto get_records_available_size() {
278 return cbj->get_cjs().get_records_available_size();
279 }
280 auto get_records_total_size() {
281 return cbj->get_cjs().get_records_total_size();
282 }
283 auto get_block_size() {
284 return device->get_block_size();
285 }
286 auto get_written_to_rbm_addr() {
287 return cbj->get_rbm_addr(cbj->get_cjs().get_written_to());
288 }
289 auto get_written_to() {
290 return cbj->get_cjs().get_written_to();
291 }
292 auto get_journal_tail() {
293 return cbj->get_dirty_tail();
294 }
295 auto get_records_used_size() {
296 return cbj->get_cjs().get_records_used_size();
297 }
298 bool is_available_size(uint64_t size) {
299 return cbj->get_cjs().is_available_size(size);
300 }
301 void update_journal_tail(rbm_abs_addr addr, uint32_t len) {
302 paddr_t paddr =
303 convert_abs_addr_to_paddr(
304 addr + len,
305 cbj->get_device_id());
306 journal_seq_t seq = {0, paddr};
307 cbj->update_journal_tail(
308 seq,
309 seq
310 ).get0();
311 }
312 void set_written_to(journal_seq_t seq) {
313 cbj->set_written_to(seq);
314 }
315
316 seastar::future<> set_up_fut() final {
317 device = random_block_device::create_test_ephemeral(
318 random_block_device::DEFAULT_TEST_CBJOURNAL_SIZE, 0);
319 cbj.reset(new CircularBoundedJournal(*this, device.get(), std::string()));
320 block_size = device->get_block_size();
321 cbj->set_write_pipeline(&pipeline);
322 return mkfs(
323 ).safe_then([this] {
324 return replay(
325 ).safe_then([this] {
326 return open(
327 ).safe_then([this] {
328 return replay();
329 });
330 });
331 }).handle_error(crimson::ct_error::assert_all{});
332 }
333 };
334
335 TEST_F(cbjournal_test_t, submit_one_record)
336 {
337 run_async([this] {
338 submit_record(
339 record_t{
340 { generate_extent(1), generate_extent(2) },
341 { generate_delta(3), generate_delta(4) }
342 });
343 replay_and_check();
344 });
345 }
346
347 TEST_F(cbjournal_test_t, submit_three_records)
348 {
349 run_async([this] {
350 submit_record(
351 record_t{
352 { generate_extent(1), generate_extent(2) },
353 { generate_delta(3), generate_delta(4) }
354 });
355 submit_record(
356 record_t{
357 { generate_extent(8), generate_extent(9) },
358 { generate_delta(20), generate_delta(21) }
359 });
360 submit_record(
361 record_t{
362 { generate_extent(5), generate_extent(6) },
363 { generate_delta(200), generate_delta(210) }
364 });
365 replay_and_check();
366 });
367 }
368
369 TEST_F(cbjournal_test_t, submit_full_records)
370 {
371 run_async([this] {
372 record_t rec {
373 { generate_extent(1), generate_extent(2) },
374 { generate_delta(20), generate_delta(21) }
375 };
376 auto r_size = record_group_size_t(rec.size, block_size);
377 auto record_total_size = r_size.get_encoded_length();
378
379 submit_record(std::move(rec));
380 while (is_available_size(record_total_size)) {
381 submit_record(
382 record_t {
383 { generate_extent(1), generate_extent(2) },
384 { generate_delta(20), generate_delta(21) }
385 });
386 }
387
388 update_journal_tail(entries.back().get_abs_addr(), record_total_size);
389 ASSERT_EQ(get_records_total_size(),
390 get_records_available_size());
391
392 // will be appended at the begining of log
393 submit_record(
394 record_t {
395 { generate_extent(1), generate_extent(2) },
396 { generate_delta(20), generate_delta(21) }
397 });
398
399 while (is_available_size(record_total_size)) {
400 submit_record(
401 record_t {
402 { generate_extent(1), generate_extent(2) },
403 { generate_delta(20), generate_delta(21) }
404 });
405 }
406 ASSERT_TRUE(record_total_size > get_records_available_size());
407 });
408 }
409
410 TEST_F(cbjournal_test_t, boudary_check_verify)
411 {
412 run_async([this] {
413 record_t rec {
414 { generate_extent(1), generate_extent(2) },
415 { generate_delta(20), generate_delta(21) }
416 };
417 auto r_size = record_group_size_t(rec.size, block_size);
418 auto record_total_size = r_size.get_encoded_length();
419 submit_record(std::move(rec));
420 while (is_available_size(record_total_size)) {
421 submit_record(
422 record_t {
423 { generate_extent(1), generate_extent(2) },
424 { generate_delta(20), generate_delta(21) }
425 });
426 }
427
428 uint64_t avail = get_records_available_size();
429 // forward 2 recod size here because 1 block is reserved between head and tail
430 update_journal_tail(entries.front().get_abs_addr(), record_total_size * 2);
431 entries.erase(entries.begin());
432 entries.erase(entries.begin());
433 ASSERT_EQ(avail + (record_total_size * 2), get_records_available_size());
434 avail = get_records_available_size();
435 // will be appended at the begining of WAL
436 submit_record(
437 record_t {
438 { generate_extent(1), generate_extent(2) },
439 { generate_delta(20), generate_delta(21) }
440 });
441 ASSERT_TRUE(avail - record_total_size >= get_records_available_size());
442 replay_and_check();
443 });
444 }
445
446 TEST_F(cbjournal_test_t, update_header)
447 {
448 run_async([this] {
449 auto [header, _buf] = *(cbj->get_cjs().read_header().unsafe_get0());
450 record_t rec {
451 { generate_extent(1), generate_extent(2) },
452 { generate_delta(20), generate_delta(21) }
453 };
454 auto r_size = record_group_size_t(rec.size, block_size);
455 auto record_total_size = r_size.get_encoded_length();
456 submit_record(std::move(rec));
457
458 update_journal_tail(entries.front().get_abs_addr(), record_total_size);
459 cbj->get_cjs().write_header().unsafe_get0();
460 auto [update_header, update_buf2] = *(cbj->get_cjs().read_header().unsafe_get0());
461 cbj->close().unsafe_get0();
462 replay().unsafe_get0();
463
464 ASSERT_EQ(update_header.dirty_tail.offset, update_header.dirty_tail.offset);
465 });
466 }
467
468 TEST_F(cbjournal_test_t, replay)
469 {
470 run_async([this] {
471 record_t rec {
472 { generate_extent(1), generate_extent(2) },
473 { generate_delta(20), generate_delta(21) }
474 };
475 auto r_size = record_group_size_t(rec.size, block_size);
476 auto record_total_size = r_size.get_encoded_length();
477 submit_record(std::move(rec));
478 while (is_available_size(record_total_size)) {
479 submit_record(
480 record_t {
481 { generate_extent(1), generate_extent(2) },
482 { generate_delta(20), generate_delta(21) }
483 });
484 }
485 // will be appended at the begining of WAL
486 uint64_t avail = get_records_available_size();
487 update_journal_tail(entries.front().get_abs_addr(), record_total_size * 2);
488 entries.erase(entries.begin());
489 entries.erase(entries.begin());
490 ASSERT_EQ(avail + (record_total_size * 2), get_records_available_size());
491 avail = get_records_available_size();
492 submit_record(
493 record_t {
494 { generate_extent(1), generate_extent(2) },
495 { generate_delta(20), generate_delta(21) }
496 });
497 ASSERT_TRUE(avail - record_total_size >= get_records_available_size());
498 cbj->close().unsafe_get0();
499 replay().unsafe_get0();
500 });
501 }
502
503 TEST_F(cbjournal_test_t, replay_after_reset)
504 {
505 run_async([this] {
506 record_t rec {
507 { generate_extent(1), generate_extent(2) },
508 { generate_delta(20), generate_delta(21) }
509 };
510 auto r_size = record_group_size_t(rec.size, block_size);
511 auto record_total_size = r_size.get_encoded_length();
512 submit_record(std::move(rec));
513 while (is_available_size(record_total_size)) {
514 submit_record(
515 record_t {
516 { generate_extent(1), generate_extent(2) },
517 { generate_delta(20), generate_delta(21) }
518 });
519 }
520 auto old_written_to = get_written_to();
521 auto old_used_size = get_records_used_size();
522 set_written_to(
523 journal_seq_t{0,
524 convert_abs_addr_to_paddr(
525 cbj->get_records_start(),
526 cbj->get_device_id())});
527 cbj->close().unsafe_get0();
528 replay().unsafe_get0();
529 ASSERT_EQ(old_written_to, get_written_to());
530 ASSERT_EQ(old_used_size,
531 get_records_used_size());
532 });
533 }
534
535 TEST_F(cbjournal_test_t, multiple_submit_at_end)
536 {
537 run_async([this] {
538 record_t rec {
539 { generate_extent(1), generate_extent(2) },
540 { generate_delta(20), generate_delta(21) }
541 };
542 auto r_size = record_group_size_t(rec.size, block_size);
543 auto record_total_size = r_size.get_encoded_length();
544 submit_record(std::move(rec));
545 while (is_available_size(record_total_size)) {
546 submit_record(
547 record_t {
548 { generate_extent(1), generate_extent(2) },
549 { generate_delta(20), generate_delta(21) }
550 });
551 }
552 update_journal_tail(entries.front().get_abs_addr(), record_total_size * 8);
553 for (int i = 0; i < 8; i++) {
554 entries.erase(entries.begin());
555 }
556 seastar::parallel_for_each(
557 boost::make_counting_iterator(0u),
558 boost::make_counting_iterator(4u),
559 [&](auto) {
560 return seastar::async([&] {
561 auto writes = 0;
562 while (writes < 2) {
563 record_t rec {
564 { generate_extent(1) },
565 { generate_delta(20) } };
566 submit_record(std::move(rec));
567 writes++;
568 }
569 });
570 }).get0();
571 auto old_written_to = get_written_to();
572 cbj->close().unsafe_get0();
573 cbj->replay(
574 [](const auto &offsets,
575 const auto &e,
576 auto &dirty_seq,
577 auto &alloc_seq,
578 auto last_modified) {
579 return Journal::replay_ertr::make_ready_future<bool>(true);
580 }).unsafe_get0();
581 assert(get_written_to() == old_written_to);
582 });
583 }