1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "test/crimson/gtest_seastar.h"
8 #include "crimson/common/log.h"
9 #include "crimson/os/seastore/async_cleaner.h"
10 #include "crimson/os/seastore/journal.h"
11 #include "crimson/os/seastore/journal/circular_bounded_journal.h"
12 #include "crimson/os/seastore/random_block_manager.h"
13 #include "crimson/os/seastore/random_block_manager/rbm_device.h"
14 #include "crimson/os/seastore/seastore_types.h"
15 #include "test/crimson/seastore/transaction_manager_test_state.h"
16 #include "crimson/os/seastore/random_block_manager/block_rb_manager.h"
18 using namespace crimson
;
19 using namespace crimson::os
;
20 using namespace crimson::os::seastore
;
21 using namespace crimson::os::seastore::journal
;
24 [[maybe_unused
]] seastar::logger
& logger() {
25 return crimson::get_logger(ceph_subsys_test
);
29 std::optional
<record_t
> decode_record(
33 record_group_header_t r_header
;
34 auto bliter
= bl
.cbegin();
35 decode(r_header
, bliter
);
36 logger().debug(" decode_record mdlength {} records {}",
37 r_header
.mdlength
, r_header
.records
);
38 device_id_t d_id
= 1 << (std::numeric_limits
<device_id_t
>::digits
- 1);
40 auto del_infos
= try_decode_deltas(r_header
, bl
,
41 paddr_t::make_blk_paddr(d_id
, 0));
42 for (auto &iter
: *del_infos
) {
43 for (auto r
: iter
.deltas
) {
44 record
.deltas
.push_back(r
.second
);
47 auto ex_infos
= try_decode_extent_infos(r_header
, bl
);
48 auto bliter_ex
= bl
.cbegin();
49 bliter_ex
+= r_header
.mdlength
;
50 for (auto &iter
: *ex_infos
) {
51 for (auto e
: iter
.extent_infos
) {
53 auto bptr
= bufferptr(ceph::buffer::create_page_aligned(e
.len
));
54 logger().debug(" exten len {} remaining {} ", e
.len
, bliter_ex
.get_remaining());
55 bliter_ex
.copy(e
.len
, bptr
.c_str());
57 record
.extents
.push_back(ex
);
63 struct entry_validator_t
{
67 segment_nonce_t magic
= 0;
70 template <typename
... T
>
71 entry_validator_t(T
&&... entry
) : record(std::forward
<T
>(entry
)...) {}
73 void validate(record_t read
) {
74 auto iter
= read
.extents
.begin();
75 for (auto &&block
: record
.extents
) {
80 iter
->bl
.begin().crc32c(iter
->bl
.length(), 1),
81 block
.bl
.begin().crc32c(block
.bl
.length(), 1));
84 auto iter_delta
= read
.deltas
.begin();
85 for (auto &&block
: record
.deltas
) {
87 iter_delta
->bl
.length(),
90 iter_delta
->bl
.begin().crc32c(iter_delta
->bl
.length(), 1),
91 block
.bl
.begin().crc32c(block
.bl
.length(), 1));
95 void validate(CircularBoundedJournal
&cbj
) {
96 rbm_abs_addr offset
= 0;
97 auto cursor
= scan_valid_records_cursor(seq
);
98 cbj
.test_initialize_cursor(cursor
);
99 for (int i
= 0; i
< entries
; i
++) {
100 paddr_t paddr
= seq
.offset
.add_offset(offset
);
101 cursor
.seq
.offset
= paddr
;
102 auto md
= cbj
.test_read_validate_record_metadata(
103 cursor
, magic
).unsafe_get0();
105 auto& [header
, md_bl
] = *md
;
106 auto dbuf
= cbj
.read(
107 paddr
.add_offset(header
.mdlength
),
108 header
.dlength
).unsafe_get0();
113 auto record
= decode_record(bl
);
115 offset
+= header
.mdlength
+ header
.dlength
;
116 cursor
.last_committed
= header
.committed_to
;
120 rbm_abs_addr
get_abs_addr() {
121 return convert_paddr_to_abs_addr(seq
.offset
);
124 bool validate_delta(bufferlist bl
) {
125 for (auto &&block
: record
.deltas
) {
126 if (bl
.begin().crc32c(bl
.length(), 1) ==
127 block
.bl
.begin().crc32c(block
.bl
.length(), 1)) {
135 struct cbjournal_test_t
: public seastar_test_suite_t
, JournalTrimmer
137 std::vector
<entry_validator_t
> entries
;
138 std::unique_ptr
<CircularBoundedJournal
> cbj
;
139 random_block_device::EphemeralRBMDeviceRef device
;
141 std::default_random_engine generator
;
143 WritePipeline pipeline
;
145 cbjournal_test_t() = default;
148 * JournalTrimmer interfaces
150 journal_seq_t
get_journal_head() const {
151 return JOURNAL_SEQ_NULL
;
154 journal_seq_t
get_dirty_tail() const final
{
155 return JOURNAL_SEQ_NULL
;
158 journal_seq_t
get_alloc_tail() const final
{
159 return JOURNAL_SEQ_NULL
;
162 void set_journal_head(journal_seq_t head
) final
{}
164 void update_journal_tails(
165 journal_seq_t dirty_tail
,
166 journal_seq_t alloc_tail
) final
{}
168 bool try_reserve_inline_usage(std::size_t) final
{ return true; }
170 void release_inline_usage(std::size_t) final
{}
172 std::size_t get_trim_size_per_cycle() const final
{
176 auto submit_record(record_t
&& record
) {
177 entries
.push_back(record
);
178 OrderingHandle handle
= get_dummy_ordering_handle();
179 auto [addr
, w_result
] = cbj
->submit_record(
181 handle
).unsafe_get0();
182 entries
.back().seq
= w_result
.start_seq
;
183 entries
.back().entries
= 1;
184 entries
.back().magic
= cbj
->get_cjs().get_cbj_header().magic
;
185 logger().debug("submit entry to addr {}", entries
.back().seq
);
186 return convert_paddr_to_abs_addr(entries
.back().seq
.offset
);
189 seastar::future
<> tear_down_fut() final
{
193 extent_t
generate_extent(size_t blocks
) {
194 std::uniform_int_distribution
<char> distribution(
195 std::numeric_limits
<char>::min(),
196 std::numeric_limits
<char>::max()
198 char contents
= distribution(generator
);
200 bl
.append(buffer::ptr(buffer::create(blocks
* block_size
, contents
)));
201 return extent_t
{extent_types_t::TEST_BLOCK
, L_ADDR_NULL
, bl
};
204 delta_info_t
generate_delta(size_t bytes
) {
205 std::uniform_int_distribution
<char> distribution(
206 std::numeric_limits
<char>::min(),
207 std::numeric_limits
<char>::max()
209 char contents
= distribution(generator
);
211 bl
.append(buffer::ptr(buffer::create(bytes
, contents
)));
213 extent_types_t::TEST_BLOCK
,
217 device
->get_block_size(),
220 segment_type_t::JOURNAL
,
225 auto replay_and_check() {
226 for (auto &i
: entries
) {
227 i
.validate(*(cbj
.get()));
233 [this](const auto &offsets
,
237 auto last_modified
) {
239 for (auto &i
: entries
) {
240 paddr_t base
= offsets
.write_result
.start_seq
.offset
;
241 rbm_abs_addr addr
= convert_paddr_to_abs_addr(base
);
242 if (addr
== i
.get_abs_addr()) {
243 logger().debug(" compare addr: {} and i.addr {} ", base
, i
.get_abs_addr());
244 found
= i
.validate_delta(e
.bl
);
248 assert(found
== true);
249 return Journal::replay_ertr::make_ready_future
<bool>(true);
254 device_config_t config
= get_rbm_ephemeral_device_config(0, 1);
255 return device
->mkfs(config
256 ).safe_then([this]() {
257 return device
->mount(
258 ).safe_then([this]() {
259 return cbj
->open_for_mkfs(
260 ).safe_then([](auto q
) {
261 return seastar::now();
264 }).safe_then([this] {
269 return cbj
->open_for_mount(
270 ).safe_then([](auto q
) {
271 return seastar::now();
274 seastar::future
<> close() {
275 return cbj
->close().handle_error(crimson::ct_error::assert_all
{});
277 auto get_records_available_size() {
278 return cbj
->get_cjs().get_records_available_size();
280 auto get_records_total_size() {
281 return cbj
->get_cjs().get_records_total_size();
283 auto get_block_size() {
284 return device
->get_block_size();
286 auto get_written_to_rbm_addr() {
287 return cbj
->get_rbm_addr(cbj
->get_cjs().get_written_to());
289 auto get_written_to() {
290 return cbj
->get_cjs().get_written_to();
292 auto get_journal_tail() {
293 return cbj
->get_dirty_tail();
295 auto get_records_used_size() {
296 return cbj
->get_cjs().get_records_used_size();
298 bool is_available_size(uint64_t size
) {
299 return cbj
->get_cjs().is_available_size(size
);
301 void update_journal_tail(rbm_abs_addr addr
, uint32_t len
) {
303 convert_abs_addr_to_paddr(
305 cbj
->get_device_id());
306 journal_seq_t seq
= {0, paddr
};
307 cbj
->update_journal_tail(
312 void set_written_to(journal_seq_t seq
) {
313 cbj
->set_written_to(seq
);
316 seastar::future
<> set_up_fut() final
{
317 device
= random_block_device::create_test_ephemeral(
318 random_block_device::DEFAULT_TEST_CBJOURNAL_SIZE
, 0);
319 cbj
.reset(new CircularBoundedJournal(*this, device
.get(), std::string()));
320 block_size
= device
->get_block_size();
321 cbj
->set_write_pipeline(&pipeline
);
331 }).handle_error(crimson::ct_error::assert_all
{});
335 TEST_F(cbjournal_test_t
, submit_one_record
)
340 { generate_extent(1), generate_extent(2) },
341 { generate_delta(3), generate_delta(4) }
347 TEST_F(cbjournal_test_t
, submit_three_records
)
352 { generate_extent(1), generate_extent(2) },
353 { generate_delta(3), generate_delta(4) }
357 { generate_extent(8), generate_extent(9) },
358 { generate_delta(20), generate_delta(21) }
362 { generate_extent(5), generate_extent(6) },
363 { generate_delta(200), generate_delta(210) }
369 TEST_F(cbjournal_test_t
, submit_full_records
)
373 { generate_extent(1), generate_extent(2) },
374 { generate_delta(20), generate_delta(21) }
376 auto r_size
= record_group_size_t(rec
.size
, block_size
);
377 auto record_total_size
= r_size
.get_encoded_length();
379 submit_record(std::move(rec
));
380 while (is_available_size(record_total_size
)) {
383 { generate_extent(1), generate_extent(2) },
384 { generate_delta(20), generate_delta(21) }
388 update_journal_tail(entries
.back().get_abs_addr(), record_total_size
);
389 ASSERT_EQ(get_records_total_size(),
390 get_records_available_size());
392 // will be appended at the begining of log
395 { generate_extent(1), generate_extent(2) },
396 { generate_delta(20), generate_delta(21) }
399 while (is_available_size(record_total_size
)) {
402 { generate_extent(1), generate_extent(2) },
403 { generate_delta(20), generate_delta(21) }
406 ASSERT_TRUE(record_total_size
> get_records_available_size());
410 TEST_F(cbjournal_test_t
, boudary_check_verify
)
414 { generate_extent(1), generate_extent(2) },
415 { generate_delta(20), generate_delta(21) }
417 auto r_size
= record_group_size_t(rec
.size
, block_size
);
418 auto record_total_size
= r_size
.get_encoded_length();
419 submit_record(std::move(rec
));
420 while (is_available_size(record_total_size
)) {
423 { generate_extent(1), generate_extent(2) },
424 { generate_delta(20), generate_delta(21) }
428 uint64_t avail
= get_records_available_size();
429 // forward 2 recod size here because 1 block is reserved between head and tail
430 update_journal_tail(entries
.front().get_abs_addr(), record_total_size
* 2);
431 entries
.erase(entries
.begin());
432 entries
.erase(entries
.begin());
433 ASSERT_EQ(avail
+ (record_total_size
* 2), get_records_available_size());
434 avail
= get_records_available_size();
435 // will be appended at the begining of WAL
438 { generate_extent(1), generate_extent(2) },
439 { generate_delta(20), generate_delta(21) }
441 ASSERT_TRUE(avail
- record_total_size
>= get_records_available_size());
446 TEST_F(cbjournal_test_t
, update_header
)
449 auto [header
, _buf
] = *(cbj
->get_cjs().read_header().unsafe_get0());
451 { generate_extent(1), generate_extent(2) },
452 { generate_delta(20), generate_delta(21) }
454 auto r_size
= record_group_size_t(rec
.size
, block_size
);
455 auto record_total_size
= r_size
.get_encoded_length();
456 submit_record(std::move(rec
));
458 update_journal_tail(entries
.front().get_abs_addr(), record_total_size
);
459 cbj
->get_cjs().write_header().unsafe_get0();
460 auto [update_header
, update_buf2
] = *(cbj
->get_cjs().read_header().unsafe_get0());
461 cbj
->close().unsafe_get0();
462 replay().unsafe_get0();
464 ASSERT_EQ(update_header
.dirty_tail
.offset
, update_header
.dirty_tail
.offset
);
468 TEST_F(cbjournal_test_t
, replay
)
472 { generate_extent(1), generate_extent(2) },
473 { generate_delta(20), generate_delta(21) }
475 auto r_size
= record_group_size_t(rec
.size
, block_size
);
476 auto record_total_size
= r_size
.get_encoded_length();
477 submit_record(std::move(rec
));
478 while (is_available_size(record_total_size
)) {
481 { generate_extent(1), generate_extent(2) },
482 { generate_delta(20), generate_delta(21) }
485 // will be appended at the begining of WAL
486 uint64_t avail
= get_records_available_size();
487 update_journal_tail(entries
.front().get_abs_addr(), record_total_size
* 2);
488 entries
.erase(entries
.begin());
489 entries
.erase(entries
.begin());
490 ASSERT_EQ(avail
+ (record_total_size
* 2), get_records_available_size());
491 avail
= get_records_available_size();
494 { generate_extent(1), generate_extent(2) },
495 { generate_delta(20), generate_delta(21) }
497 ASSERT_TRUE(avail
- record_total_size
>= get_records_available_size());
498 cbj
->close().unsafe_get0();
499 replay().unsafe_get0();
503 TEST_F(cbjournal_test_t
, replay_after_reset
)
507 { generate_extent(1), generate_extent(2) },
508 { generate_delta(20), generate_delta(21) }
510 auto r_size
= record_group_size_t(rec
.size
, block_size
);
511 auto record_total_size
= r_size
.get_encoded_length();
512 submit_record(std::move(rec
));
513 while (is_available_size(record_total_size
)) {
516 { generate_extent(1), generate_extent(2) },
517 { generate_delta(20), generate_delta(21) }
520 auto old_written_to
= get_written_to();
521 auto old_used_size
= get_records_used_size();
524 convert_abs_addr_to_paddr(
525 cbj
->get_records_start(),
526 cbj
->get_device_id())});
527 cbj
->close().unsafe_get0();
528 replay().unsafe_get0();
529 ASSERT_EQ(old_written_to
, get_written_to());
530 ASSERT_EQ(old_used_size
,
531 get_records_used_size());
535 TEST_F(cbjournal_test_t
, multiple_submit_at_end
)
539 { generate_extent(1), generate_extent(2) },
540 { generate_delta(20), generate_delta(21) }
542 auto r_size
= record_group_size_t(rec
.size
, block_size
);
543 auto record_total_size
= r_size
.get_encoded_length();
544 submit_record(std::move(rec
));
545 while (is_available_size(record_total_size
)) {
548 { generate_extent(1), generate_extent(2) },
549 { generate_delta(20), generate_delta(21) }
552 update_journal_tail(entries
.front().get_abs_addr(), record_total_size
* 8);
553 for (int i
= 0; i
< 8; i
++) {
554 entries
.erase(entries
.begin());
556 seastar::parallel_for_each(
557 boost::make_counting_iterator(0u),
558 boost::make_counting_iterator(4u),
560 return seastar::async([&] {
564 { generate_extent(1) },
565 { generate_delta(20) } };
566 submit_record(std::move(rec
));
571 auto old_written_to
= get_written_to();
572 cbj
->close().unsafe_get0();
574 [](const auto &offsets
,
578 auto last_modified
) {
579 return Journal::replay_ertr::make_ready_future
<bool>(true);
581 assert(get_written_to() == old_written_to
);