]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/os/seastore/journal/circular_bounded_journal.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / os / seastore / journal / circular_bounded_journal.cc
index 8d0de4e18ecd4247e7887bb1ea457094856a6c4d..ec41bfab142649d8ce36678003b3ca86669064ce 100644 (file)
@@ -36,7 +36,6 @@ CircularBoundedJournal::open_for_mkfs()
 {
   return record_submitter.open(true
   ).safe_then([this](auto ret) {
-    record_submitter.update_committed_to(get_written_to());
     return open_for_mkfs_ret(
       open_for_mkfs_ertr::ready_future_marker{},
       get_written_to());
@@ -48,7 +47,6 @@ CircularBoundedJournal::open_for_mount()
 {
   return record_submitter.open(false
   ).safe_then([this](auto ret) {
-    record_submitter.update_committed_to(get_written_to());
     return open_for_mount_ret(
       open_for_mount_ertr::ready_future_marker{},
       get_written_to());
@@ -111,115 +109,192 @@ CircularBoundedJournal::do_submit_record(
   });
 }
 
+Journal::replay_ret CircularBoundedJournal::replay_segment(
+   cbj_delta_handler_t &handler, scan_valid_records_cursor& cursor)
+{
+  LOG_PREFIX(Journal::replay_segment);
+  return seastar::do_with(
+    RecordScanner::found_record_handler_t(
+      [this, &handler, FNAME](
+      record_locator_t locator,
+      const record_group_header_t& r_header,
+      const bufferlist& mdbuf)
+      -> RecordScanner::scan_valid_records_ertr::future<>
+    {
+      auto maybe_record_deltas_list = try_decode_deltas(
+        r_header, mdbuf, locator.record_block_base);
+      if (!maybe_record_deltas_list) {
+        // This should be impossible, we did check the crc on the mdbuf
+        ERROR("unable to decode deltas for record {} at {}",
+              r_header, locator.record_block_base);
+        return crimson::ct_error::input_output_error::make();
+      }
+      assert(locator.write_result.start_seq != JOURNAL_SEQ_NULL);
+      auto cursor_addr = convert_paddr_to_abs_addr(locator.write_result.start_seq.offset);
+      DEBUG("{} at {}", r_header, cursor_addr);
+      journal_seq_t start_seq = locator.write_result.start_seq;
+      auto write_result = write_result_t{
+       start_seq,
+        r_header.mdlength + r_header.dlength
+      };
+      auto expected_seq = locator.write_result.start_seq.segment_seq;
+      cursor_addr += (r_header.mdlength + r_header.dlength);
+      if (cursor_addr >= get_journal_end()) {
+        cursor_addr = get_records_start();
+        ++expected_seq;
+        paddr_t addr = convert_abs_addr_to_paddr(
+          cursor_addr,
+          get_device_id());
+        write_result.start_seq.offset = addr;
+        write_result.start_seq.segment_seq = expected_seq;
+      }
+      paddr_t addr = convert_abs_addr_to_paddr(
+        cursor_addr,
+        get_device_id());
+      set_written_to(
+        journal_seq_t{expected_seq, addr});
+      return seastar::do_with(
+        std::move(*maybe_record_deltas_list),
+        [write_result,
+        &handler,
+        FNAME](auto& record_deltas_list) {
+        return crimson::do_for_each(
+          record_deltas_list,
+          [write_result,
+          &handler, FNAME](record_deltas_t& record_deltas) {
+          auto locator = record_locator_t{
+            record_deltas.record_block_base,
+            write_result
+          };
+          DEBUG("processing {} deltas at block_base {}",
+              record_deltas.deltas.size(),
+              locator);
+          return crimson::do_for_each(
+            record_deltas.deltas,
+            [locator,
+            &handler](auto& p) {
+            auto& modify_time = p.first;
+            auto& delta = p.second;
+            return handler(
+              locator,
+              delta,
+              modify_time).discard_result();
+          });
+        });
+      });
+    }),
+    [=, this, &cursor](auto &dhandler) {
+      return scan_valid_records(
+        cursor,
+       cjs.get_cbj_header().magic,
+        std::numeric_limits<size_t>::max(),
+        dhandler).safe_then([](auto){}
+      ).handle_error(
+        replay_ertr::pass_further{},
+        crimson::ct_error::assert_all{
+          "shouldn't meet with any other error other replay_ertr"
+        }
+      );
+    }
+  );
+}
+
+
 Journal::replay_ret CircularBoundedJournal::scan_valid_record_delta(
-   cbj_delta_handler_t &&delta_handler, journal_seq_t tail)
+   cbj_delta_handler_t &&handler, journal_seq_t tail)
 {
-  LOG_PREFIX(CircularBoundedJournal::scan_valid_record_delta);
+  LOG_PREFIX(Journal::scan_valid_record_delta);
+  INFO("starting at {} ", tail);
   return seastar::do_with(
+    scan_valid_records_cursor(tail),
+    std::move(handler),
     bool(false),
-    rbm_abs_addr(get_rbm_addr(tail)),
-    std::move(delta_handler),
-    segment_seq_t(NULL_SEG_SEQ),
-    [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
-    return crimson::repeat(
-      [this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
-      -> replay_ertr::future<seastar::stop_iteration> {
-      paddr_t record_paddr = convert_abs_addr_to_paddr(
-       cursor_addr,
-       get_device_id());
-      return read_record(record_paddr, expected_seq
-      ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
-         -> replay_ertr::future<seastar::stop_iteration> {
-       if (!ret.has_value()) {
-         if (expected_seq == NULL_SEG_SEQ || is_rolled) {
-           DEBUG("no more records, stop replaying");
-           return replay_ertr::make_ready_future<
-             seastar::stop_iteration>(seastar::stop_iteration::yes);
-         } else {
-           cursor_addr = get_records_start();
-           ++expected_seq;
-           is_rolled = true;
-           return replay_ertr::make_ready_future<
-             seastar::stop_iteration>(seastar::stop_iteration::no);
-         }
-       }
-       auto [r_header, bl] = *ret;
-       bufferlist mdbuf;
-       mdbuf.substr_of(bl, 0, r_header.mdlength);
-       paddr_t record_block_base = paddr_t::make_blk_paddr(
-         get_device_id(), cursor_addr + r_header.mdlength);
-       auto maybe_record_deltas_list = try_decode_deltas(
-         r_header, mdbuf, record_block_base);
-       if (!maybe_record_deltas_list) {
-         // This should be impossible, we did check the crc on the mdbuf
-         ERROR("unable to decode deltas for record {} at {}",
-               r_header, record_block_base);
-         return crimson::ct_error::input_output_error::make();
-       }
-       DEBUG("{} at {}", r_header, cursor_addr);
-       auto write_result = write_result_t{
-         r_header.committed_to,
-         bl.length()
-       };
-       if (expected_seq == NULL_SEG_SEQ) {
-         expected_seq = r_header.committed_to.segment_seq;
-       } else {
-         assert(expected_seq == r_header.committed_to.segment_seq);
-       }
-       cursor_addr += bl.length();
-       if (cursor_addr >= get_journal_end()) {
-         assert(cursor_addr == get_journal_end());
-         cursor_addr = get_records_start();
-         ++expected_seq;
-         paddr_t addr = convert_abs_addr_to_paddr(
-           cursor_addr,
-           get_device_id());
-         write_result.start_seq.offset = addr;
-         write_result.start_seq.segment_seq = expected_seq;
-         is_rolled = true;
-       }
-       paddr_t addr = convert_abs_addr_to_paddr(
-         cursor_addr,
-         get_device_id());
-       set_written_to(
-         journal_seq_t{expected_seq, addr});
-       return seastar::do_with(
-         std::move(*maybe_record_deltas_list),
-         [write_result,
-         &d_handler,
-         FNAME](auto& record_deltas_list) {
-         return crimson::do_for_each(
-           record_deltas_list,
-           [write_result,
-           &d_handler, FNAME](record_deltas_t& record_deltas) {
-           auto locator = record_locator_t{
-             record_deltas.record_block_base,
-             write_result
-           };
-           DEBUG("processing {} deltas at block_base {}",
-               record_deltas.deltas.size(),
-               locator);
-           return crimson::do_for_each(
-             record_deltas.deltas,
-             [locator,
-             &d_handler](auto& p) {
-             auto& modify_time = p.first;
-             auto& delta = p.second;
-             return d_handler(
-               locator,
-               delta,
-               modify_time).discard_result();
-           });
-         }).safe_then([]() {
-           return replay_ertr::make_ready_future<
-             seastar::stop_iteration>(seastar::stop_iteration::no);
-         });
-       });
+    [this] (auto &cursor, auto &handler, auto &rolled) {
+    return crimson::repeat([this, &handler, &cursor, &rolled]()
+    -> replay_ertr::future<seastar::stop_iteration>
+    {
+      return replay_segment(handler, cursor
+      ).safe_then([this, &cursor, &rolled] {
+        if (!rolled) {
+          cursor.last_valid_header_found = false;
+        }
+        if (!cursor.is_complete()) {
+          try_read_rolled_header(cursor);
+         rolled = true;
+          return replay_ertr::make_ready_future<
+            seastar::stop_iteration>(seastar::stop_iteration::no);
+        }
+        return replay_ertr::make_ready_future<
+          seastar::stop_iteration>(seastar::stop_iteration::yes);
       });
     });
   });
 }
 
+RecordScanner::read_ret CircularBoundedJournal::read(paddr_t start, size_t len) 
+{
+  LOG_PREFIX(CircularBoundedJournal::read);
+  rbm_abs_addr addr = convert_paddr_to_abs_addr(start);
+  DEBUG("reading data from addr {} read length {}", addr, len);
+  auto bptr = bufferptr(ceph::buffer::create_page_aligned(len));
+  return cjs.read(addr, bptr 
+  ).safe_then([bptr=std::move(bptr)]() {
+    return read_ret(
+      RecordScanner::read_ertr::ready_future_marker{},
+      std::move(bptr)
+    );
+  });
+}
+
+bool CircularBoundedJournal::is_record_segment_seq_invalid(
+  scan_valid_records_cursor &cursor,
+  record_group_header_t &r_header) 
+{
+  LOG_PREFIX(CircularBoundedJournal::is_record_segment_seq_invalid);
+  auto print_invalid = [FNAME](auto &r_header) {
+    DEBUG("invalid header: {}", r_header);
+    return true;
+  };
+  if (cursor.seq.offset == convert_abs_addr_to_paddr(
+      get_records_start(), get_device_id())) {
+    if ((r_header.committed_to.segment_seq == NULL_SEG_SEQ &&
+       cursor.seq.segment_seq != 0) ||
+       r_header.committed_to.segment_seq != cursor.seq.segment_seq - 1) {
+      return print_invalid(r_header);
+    }
+  } else if (r_header.committed_to.segment_seq != cursor.seq.segment_seq) {
+    /*
+     * Assuing that seastore issues several records using submit_recods() 
+     * as shown in the following example. 
+     *
+     * Example )
+     *         a. submit_record(a);
+     *         b. submit_record(b);
+     *  c. submit_record(c);
+     *  d. roll to begin
+     *  e. submit_record(d);
+     *  f. submit_record(e);
+     *  g. submit_record(f);
+     *
+     * In this example, we need to consider the two cases.
+     * case 1)
+     *         records a - e were issued in a batch manner
+     * case 2)
+     *  When starts to submit_record(e) at step 6, submit(b) has completed its finalize phase, 
+     *  so the header of e's committed_to points to the end of b.
+     *
+     * To handle these cases correctly, the following condition is added.
+     */
+    if ((r_header.committed_to.offset >= cursor.last_committed.offset &&
+       r_header.committed_to.segment_seq == cursor.last_committed.segment_seq) &&
+       r_header.committed_to.segment_seq == cursor.seq.segment_seq - 1) {
+      return false;
+    }
+    return print_invalid(r_header);
+  }
+  return false;
+}
+
 Journal::replay_ret CircularBoundedJournal::replay(
     delta_handler_t &&delta_handler)
 {
@@ -286,7 +361,13 @@ Journal::replay_ret CircularBoundedJournal::replay(
        return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail);
       });
     }).safe_then([this]() {
-      record_submitter.update_committed_to(get_written_to());
+      // make sure that committed_to is JOURNAL_SEQ_NULL if jounal is the initial state
+      if (get_written_to() != 
+         journal_seq_t{0,
+           convert_abs_addr_to_paddr(get_records_start(),
+           get_device_id())}) {
+       record_submitter.update_committed_to(get_written_to());
+      }
       trimmer.update_journal_tails(
        get_dirty_tail(),
        get_alloc_tail());
@@ -294,81 +375,6 @@ Journal::replay_ret CircularBoundedJournal::replay(
   });
 }
 
-CircularBoundedJournal::read_record_ret
-CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist bl)
-{
-  LOG_PREFIX(CircularBoundedJournal::return_record);
-  DEBUG("record size {}", bl.length());
-  assert(bl.length() == header.mdlength + header.dlength);
-  bufferlist md_bl, data_bl;
-  md_bl.substr_of(bl, 0, header.mdlength);
-  data_bl.substr_of(bl, header.mdlength, header.dlength);
-  if (validate_records_metadata(md_bl) &&
-      validate_records_data(header, data_bl)) {
-    return read_record_ret(
-      read_record_ertr::ready_future_marker{},
-      std::make_pair(header, std::move(bl)));
-  } else {
-    DEBUG("invalid matadata");
-    return read_record_ret(
-      read_record_ertr::ready_future_marker{},
-      std::nullopt);
-  }
-}
-
-CircularBoundedJournal::read_record_ret
-CircularBoundedJournal::read_record(paddr_t off, segment_seq_t expected_seq)
-{
-  LOG_PREFIX(CircularBoundedJournal::read_record);
-  rbm_abs_addr addr = convert_paddr_to_abs_addr(off);
-  auto read_length = get_block_size();
-  assert(addr + read_length <= get_journal_end());
-  DEBUG("reading record from abs addr {} read length {}", addr, read_length);
-  auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length));
-  return cjs.read(addr, bptr
-  ).safe_then([this, addr, bptr, expected_seq, FNAME]() mutable
-    -> read_record_ret {
-    record_group_header_t h;
-    bufferlist bl;
-    bl.append(bptr);
-    auto bp = bl.cbegin();
-    try {
-      decode(h, bp);
-    } catch (ceph::buffer::error &e) {
-      return read_record_ret(
-       read_record_ertr::ready_future_marker{},
-       std::nullopt);
-    }
-    if (h.mdlength < get_block_size() ||
-        h.mdlength % get_block_size() != 0 ||
-        h.dlength % get_block_size() != 0 ||
-        addr + h.mdlength + h.dlength > get_journal_end() ||
-        h.committed_to.segment_seq == NULL_SEG_SEQ ||
-        (expected_seq != NULL_SEG_SEQ &&
-         h.committed_to.segment_seq != expected_seq)) {
-      return read_record_ret(
-        read_record_ertr::ready_future_marker{},
-        std::nullopt);
-    }
-    auto record_size = h.mdlength + h.dlength;
-    if (record_size > get_block_size()) {
-      auto next_addr = addr + get_block_size();
-      auto next_length = record_size - get_block_size();
-      auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_length));
-      DEBUG("reading record part 2 from abs addr {} read length {}",
-            next_addr, next_length);
-      return cjs.read(next_addr, next_bptr
-      ).safe_then([this, h, next_bptr=std::move(next_bptr), bl=std::move(bl)]() mutable {
-        bl.append(next_bptr);
-        return return_record(h, bl);
-      });
-    } else {
-      assert(record_size == get_block_size());
-      return return_record(h, bl);
-    }
-  });
-}
-
 seastar::future<> CircularBoundedJournal::finish_commit(transaction_type_t type) {
   if (is_trim_transaction(type)) {
     return update_journal_tail(