struct entry_validator_t {
bufferlist bl;
int entries;
- journal_seq_t last_seq;
record_t record;
- rbm_abs_addr addr = 0;
+ segment_nonce_t magic = 0;
+ journal_seq_t seq;
template <typename... T>
entry_validator_t(T&&... entry) : record(std::forward<T>(entry)...) {}
++iter_delta;
}
}
-
void validate(CircularBoundedJournal &cbj) {
rbm_abs_addr offset = 0;
+ auto cursor = scan_valid_records_cursor(seq);
+ cbj.test_initialize_cursor(cursor);
for (int i = 0; i < entries; i++) {
- paddr_t paddr = convert_abs_addr_to_paddr(
- addr + offset,
- cbj.get_device_id());
- auto [header, buf] = *(cbj.read_record(paddr, NULL_SEG_SEQ).unsafe_get0());
- auto record = decode_record(buf);
+ paddr_t paddr = seq.offset.add_offset(offset);
+ cursor.seq.offset = paddr;
+ auto md = cbj.test_read_validate_record_metadata(
+ cursor, magic).unsafe_get0();
+ assert(md);
+ auto& [header, md_bl] = *md;
+ auto dbuf = cbj.read(
+ paddr.add_offset(header.mdlength),
+ header.dlength).unsafe_get0();
+
+ bufferlist bl;
+ bl.append(md_bl);
+ bl.append(dbuf);
+ auto record = decode_record(bl);
validate(*record);
offset += header.mdlength + header.dlength;
+ cursor.last_committed = header.committed_to;
}
}
+ rbm_abs_addr get_abs_addr() {
+ return convert_paddr_to_abs_addr(seq.offset);
+ }
+
bool validate_delta(bufferlist bl) {
for (auto &&block : record.deltas) {
if (bl.begin().crc32c(bl.length(), 1) ==
auto [addr, w_result] = cbj->submit_record(
std::move(record),
handle).unsafe_get0();
- entries.back().addr =
- convert_paddr_to_abs_addr(w_result.start_seq.offset);
+ entries.back().seq = w_result.start_seq;
entries.back().entries = 1;
- logger().debug("submit entry to addr {}", entries.back().addr);
- return entries.back().addr;
+ entries.back().magic = cbj->get_cjs().get_cbj_header().magic;
+ logger().debug("submit entry to addr {}", entries.back().seq);
+ return convert_paddr_to_abs_addr(entries.back().seq.offset);
}
seastar::future<> tear_down_fut() final {
for (auto &i : entries) {
paddr_t base = offsets.write_result.start_seq.offset;
rbm_abs_addr addr = convert_paddr_to_abs_addr(base);
- if (addr == i.addr) {
- logger().debug(" compare addr: {} and i.addr {} ", base, i.addr);
+ if (addr == i.get_abs_addr()) {
+ logger().debug(" compare addr: {} and i.addr {} ", base, i.get_abs_addr());
found = i.validate_delta(e.bl);
break;
}
).safe_then([this] {
return replay(
).safe_then([this] {
- return open();
+ return open(
+ ).safe_then([this] {
+ return replay();
+ });
});
}).handle_error(crimson::ct_error::assert_all{});
}
});
}
- update_journal_tail(entries.back().addr, record_total_size);
+ update_journal_tail(entries.back().get_abs_addr(), record_total_size);
ASSERT_EQ(get_records_total_size(),
get_records_available_size());
uint64_t avail = get_records_available_size();
// forward 2 recod size here because 1 block is reserved between head and tail
- update_journal_tail(entries.front().addr, record_total_size * 2);
+ update_journal_tail(entries.front().get_abs_addr(), record_total_size * 2);
entries.erase(entries.begin());
entries.erase(entries.begin());
ASSERT_EQ(avail + (record_total_size * 2), get_records_available_size());
auto record_total_size = r_size.get_encoded_length();
submit_record(std::move(rec));
- update_journal_tail(entries.front().addr, record_total_size);
+ update_journal_tail(entries.front().get_abs_addr(), record_total_size);
cbj->get_cjs().write_header().unsafe_get0();
auto [update_header, update_buf2] = *(cbj->get_cjs().read_header().unsafe_get0());
cbj->close().unsafe_get0();
}
// will be appended at the begining of WAL
uint64_t avail = get_records_available_size();
- update_journal_tail(entries.front().addr, record_total_size * 2);
+ update_journal_tail(entries.front().get_abs_addr(), record_total_size * 2);
entries.erase(entries.begin());
entries.erase(entries.begin());
ASSERT_EQ(avail + (record_total_size * 2), get_records_available_size());
get_records_used_size());
});
}
+
+TEST_F(cbjournal_test_t, multiple_submit_at_end)
+{
+ run_async([this] {
+ record_t rec {
+ { generate_extent(1), generate_extent(2) },
+ { generate_delta(20), generate_delta(21) }
+ };
+ auto r_size = record_group_size_t(rec.size, block_size);
+ auto record_total_size = r_size.get_encoded_length();
+ submit_record(std::move(rec));
+ while (is_available_size(record_total_size)) {
+ submit_record(
+ record_t {
+ { generate_extent(1), generate_extent(2) },
+ { generate_delta(20), generate_delta(21) }
+ });
+ }
+ update_journal_tail(entries.front().get_abs_addr(), record_total_size * 8);
+ for (int i = 0; i < 8; i++) {
+ entries.erase(entries.begin());
+ }
+ seastar::parallel_for_each(
+ boost::make_counting_iterator(0u),
+ boost::make_counting_iterator(4u),
+ [&](auto) {
+ return seastar::async([&] {
+ auto writes = 0;
+ while (writes < 2) {
+ record_t rec {
+ { generate_extent(1) },
+ { generate_delta(20) } };
+ submit_record(std::move(rec));
+ writes++;
+ }
+ });
+ }).get0();
+ auto old_written_to = get_written_to();
+ cbj->close().unsafe_get0();
+ cbj->replay(
+ [](const auto &offsets,
+ const auto &e,
+ auto &dirty_seq,
+ auto &alloc_seq,
+ auto last_modified) {
+ return Journal::replay_ertr::make_ready_future<bool>(true);
+ }).unsafe_get0();
+ assert(get_written_to() == old_written_to);
+ });
+}