]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/os/seastore/object_data_handler.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / os / seastore / object_data_handler.cc
index 76e179e2414e4dd456536cb9cf62a6fa7edde02c..0d852696b7144a6a5e3068d1f45e991159409b1f 100644 (file)
@@ -25,12 +25,10 @@ using get_iertr = ObjectDataHandler::write_iertr;
 /**
  * extent_to_write_t
  *
- * Encapsulates extents to be written out using do_insertions.
+ * Encapsulates smallest write operations in overwrite.
  * Indicates a zero/existing extent or a data extent based on whether
  * to_write is populate.
- * The meaning of existing_paddr is that the new extent to be
- * written is the part of exising extent on the disk. existing_paddr
- * must be absolute.
+ * Should be handled by prepare_ops_list.
  */
 struct extent_to_write_t {
   enum class type_t {
@@ -38,16 +36,18 @@ struct extent_to_write_t {
     ZERO,
     EXISTING,
   };
-
   type_t type;
+
+  /// pin of original extent, not nullptr if type == EXISTING
+  LBAMappingRef pin;
+
   laddr_t addr;
   extent_len_t len;
+
   /// non-nullopt if and only if type == DATA
   std::optional<bufferlist> to_write;
-  /// non-nullopt if and only if type == EXISTING
-  std::optional<paddr_t> existing_paddr;
 
-  extent_to_write_t(const extent_to_write_t &) = default;
+  extent_to_write_t(const extent_to_write_t &) = delete;
   extent_to_write_t(extent_to_write_t &&) = default;
 
   bool is_data() const {
@@ -72,13 +72,14 @@ struct extent_to_write_t {
   }
 
   static extent_to_write_t create_zero(
-      laddr_t addr, extent_len_t len) {
+    laddr_t addr, extent_len_t len) {
     return extent_to_write_t(addr, len);
   }
 
   static extent_to_write_t create_existing(
-      laddr_t addr, paddr_t existing_paddr, extent_len_t len) {
-    return extent_to_write_t(addr, existing_paddr, len);
+    LBAMappingRef &&pin, laddr_t addr, extent_len_t len) {
+    assert(pin);
+    return extent_to_write_t(std::move(pin), addr, len);
   }
 
 private:
@@ -89,12 +90,211 @@ private:
   extent_to_write_t(laddr_t addr, extent_len_t len)
     : type(type_t::ZERO), addr(addr), len(len) {}
 
-  extent_to_write_t(laddr_t addr, paddr_t existing_paddr, extent_len_t len)
-    : type(type_t::EXISTING), addr(addr), len(len),
-      to_write(std::nullopt), existing_paddr(existing_paddr) {}
+  extent_to_write_t(LBAMappingRef &&pin, laddr_t addr, extent_len_t len)
+    : type(type_t::EXISTING), pin(std::move(pin)), addr(addr), len(len) {}
 };
 using extent_to_write_list_t = std::list<extent_to_write_t>;
 
+// Encapsulates extents to be written out using do_remappings.
+struct extent_to_remap_t {
+  enum class type_t {
+    REMAP,
+    OVERWRITE
+  };
+  type_t type;
+  /// pin of original extent
+  LBAMappingRef pin;
+  /// offset of remapped extent or overwrite part of overwrite extent.
+  /// overwrite part of overwrite extent might correspond to mutiple 
+  /// fresh write extent.
+  extent_len_t new_offset;
+  /// length of remapped extent or overwrite part of overwrite extent
+  extent_len_t new_len;
+
+  extent_to_remap_t(const extent_to_remap_t &) = delete;
+  extent_to_remap_t(extent_to_remap_t &&) = default;
+
+  bool is_remap() const {
+    return type == type_t::REMAP;
+  }
+
+  bool is_overwrite() const {
+    assert((new_offset != 0) && (pin->get_length() != new_offset + new_len));
+    return type == type_t::OVERWRITE;
+  }
+
+  using remap_entry = TransactionManager::remap_entry;
+  remap_entry create_remap_entry() {
+    assert(is_remap());
+    return remap_entry(
+      new_offset,
+      new_len);
+  }
+
+  remap_entry create_left_remap_entry() {
+    assert(is_overwrite());
+    return remap_entry(
+      0,
+      new_offset);
+  }
+
+  remap_entry create_right_remap_entry() {
+    assert(is_overwrite());
+    return remap_entry(
+      new_offset + new_len,
+      pin->get_length() - new_offset - new_len);
+  }
+
+  static extent_to_remap_t create_remap(
+    LBAMappingRef &&pin, extent_len_t new_offset, extent_len_t new_len) {
+    return extent_to_remap_t(type_t::REMAP,
+      std::move(pin), new_offset, new_len);
+  }
+
+  static extent_to_remap_t create_overwrite(
+    LBAMappingRef &&pin, extent_len_t new_offset, extent_len_t new_len) {
+    return extent_to_remap_t(type_t::OVERWRITE,
+      std::move(pin), new_offset, new_len);
+  }
+
+private:
+  extent_to_remap_t(type_t type,
+    LBAMappingRef &&pin, extent_len_t new_offset, extent_len_t new_len)
+    : type(type),
+      pin(std::move(pin)), new_offset(new_offset), new_len(new_len) {}
+};
+using extent_to_remap_list_t = std::list<extent_to_remap_t>;
+
+// Encapsulates extents to be written out using do_insertions.
+struct extent_to_insert_t {
+  enum class type_t {
+    DATA,
+    ZERO
+  };
+  type_t type;
+  /// laddr of new extent
+  laddr_t addr;
+  /// length of new extent
+  extent_len_t len;
+  /// non-nullopt if type == DATA
+  std::optional<bufferlist> bl;
+
+  extent_to_insert_t(const extent_to_insert_t &) = default;
+  extent_to_insert_t(extent_to_insert_t &&) = default;
+
+  bool is_data() const {
+    return type == type_t::DATA;
+  }
+
+  bool is_zero() const {
+    return type == type_t::ZERO;
+  }
+
+  static extent_to_insert_t create_data(
+    laddr_t addr, extent_len_t len, std::optional<bufferlist> bl) {
+    return extent_to_insert_t(addr, len, bl);
+  }
+
+  static extent_to_insert_t create_zero(
+    laddr_t addr, extent_len_t len) {
+    return extent_to_insert_t(addr, len);
+  }
+
+private:
+  extent_to_insert_t(laddr_t addr, extent_len_t len,
+    std::optional<bufferlist> bl)
+    :type(type_t::DATA), addr(addr), len(len), bl(bl) {}
+
+  extent_to_insert_t(laddr_t addr, extent_len_t len)
+    :type(type_t::ZERO), addr(addr), len(len) {}
+};
+using extent_to_insert_list_t = std::list<extent_to_insert_t>;
+
+// Encapsulates extents to be retired in do_removals.
+using extent_to_remove_list_t = std::list<LBAMappingRef>;
+
+struct overwrite_ops_t {
+  extent_to_remap_list_t to_remap;
+  extent_to_insert_list_t to_insert;
+  extent_to_remove_list_t to_remove;
+};
+
+// prepare to_remap, to_retire, to_insert list
+overwrite_ops_t prepare_ops_list(
+  lba_pin_list_t &pins_to_remove,
+  extent_to_write_list_t &to_write) {
+  assert(pins_to_remove.size() != 0);
+  overwrite_ops_t ops;
+  ops.to_remove.swap(pins_to_remove);
+  if (to_write.empty()) {
+    logger().debug("empty to_write");
+    return ops;
+  }
+  long unsigned int visitted = 0;
+  auto& front = to_write.front();
+  auto& back = to_write.back();
+
+  // prepare overwrite, happens in one original extent.
+  if (ops.to_remove.size() == 1 &&
+      front.is_existing() && back.is_existing()) {
+      visitted += 2;
+      assert(to_write.size() > 2);
+      assert(front.addr == front.pin->get_key());
+      assert(back.addr > back.pin->get_key());
+      ops.to_remap.push_back(extent_to_remap_t::create_overwrite(
+       std::move(front.pin),
+       front.len,
+       back.addr - front.addr - front.len));
+      ops.to_remove.pop_front();
+  } else {
+    // prepare to_remap, happens in one or multiple extents
+    if (front.is_existing()) {
+      visitted++;
+      assert(to_write.size() > 1);
+      assert(front.addr == front.pin->get_key());
+      ops.to_remap.push_back(extent_to_remap_t::create_remap(
+       std::move(front.pin),
+       0,
+       front.len));
+      ops.to_remove.pop_front();
+    }
+    if (back.is_existing()) {
+      visitted++;
+      assert(to_write.size() > 1);
+      assert(back.addr + back.len ==
+       back.pin->get_key() + back.pin->get_length());
+      ops.to_remap.push_back(extent_to_remap_t::create_remap(
+       std::move(back.pin),
+       back.addr - back.pin->get_key(),
+       back.len));
+      ops.to_remove.pop_back();
+    }
+  }
+
+  // prepare to_insert
+  for (auto &region : to_write) {
+    if (region.is_data()) {
+      visitted++;
+      assert(region.to_write.has_value());
+      ops.to_insert.push_back(extent_to_insert_t::create_data(
+       region.addr, region.len, region.to_write));
+    } else if (region.is_zero()) {
+      visitted++;
+      assert(!(region.to_write.has_value()));
+      ops.to_insert.push_back(extent_to_insert_t::create_zero(
+       region.addr, region.len));
+    }
+  }
+
+  logger().debug(
+    "to_remap list size: {}"
+    " to_insert list size: {}"
+    " to_remove list size: {}",
+    ops.to_remap.size(), ops.to_insert.size(), ops.to_remove.size());
+  assert(visitted == to_write.size());
+  return ops;
+}
+
 /**
  * append_extent_to_write
  *
@@ -134,13 +334,54 @@ void splice_extent_to_write(
   }
 }
 
-/// Removes extents/mappings in pins
+/// Creates remap extents in to_remap
+ObjectDataHandler::write_ret do_remappings(
+  context_t ctx,
+  extent_to_remap_list_t &to_remap)
+{
+  return trans_intr::do_for_each(
+    to_remap,
+    [ctx](auto &region) {
+      if (region.is_remap()) {
+        return ctx.tm.remap_pin<ObjectDataBlock, 1>(
+          ctx.t,
+          std::move(region.pin),
+          std::array{
+            region.create_remap_entry()
+          }
+        ).si_then([&region](auto pins) {
+          ceph_assert(pins.size() == 1);
+          ceph_assert(region.new_len == pins[0]->get_length());
+          return ObjectDataHandler::write_iertr::now();
+        });
+      } else if (region.is_overwrite()) {
+        return ctx.tm.remap_pin<ObjectDataBlock, 2>(
+          ctx.t,
+          std::move(region.pin),
+          std::array{
+            region.create_left_remap_entry(),
+            region.create_right_remap_entry()
+          }
+        ).si_then([&region](auto pins) {
+          ceph_assert(pins.size() == 2);
+          ceph_assert(region.pin->get_key() == pins[0]->get_key());
+          ceph_assert(region.pin->get_key() + pins[0]->get_length() +
+            region.new_len == pins[1]->get_key());
+          return ObjectDataHandler::write_iertr::now();
+        });
+      } else {
+        ceph_abort("impossible");
+        return ObjectDataHandler::write_iertr::now();
+      }
+  });
+}
+
 ObjectDataHandler::write_ret do_removals(
   context_t ctx,
-  lba_pin_list_t &pins)
+  lba_pin_list_t &to_remove)
 {
   return trans_intr::do_for_each(
-    pins,
+    to_remove,
     [ctx](auto &pin) {
       LOG_PREFIX(object_data_handler.cc::do_removals);
       DEBUGT("decreasing ref: {}",
@@ -159,19 +400,19 @@ ObjectDataHandler::write_ret do_removals(
     });
 }
 
-/// Creates zero/data extents in to_write
+/// Creates zero/data extents in to_insert
 ObjectDataHandler::write_ret do_insertions(
   context_t ctx,
-  extent_to_write_list_t &to_write)
+  extent_to_insert_list_t &to_insert)
 {
   return trans_intr::do_for_each(
-    to_write,
+    to_insert,
     [ctx](auto &region) {
       LOG_PREFIX(object_data_handler.cc::do_insertions);
       if (region.is_data()) {
        assert_aligned(region.addr);
        assert_aligned(region.len);
-       ceph_assert(region.len == region.to_write->length());
+       ceph_assert(region.len == region.bl->length());
        DEBUGT("allocating extent: {}~{}",
               ctx.t,
               region.addr,
@@ -190,7 +431,7 @@ ObjectDataHandler::write_ret do_insertions(
          }
          ceph_assert(extent->get_laddr() == region.addr);
          ceph_assert(extent->get_length() == region.len);
-         auto iter = region.to_write->cbegin();
+         auto iter = region.bl->cbegin();
          iter.copy(region.len, extent->get_bptr().c_str());
          return ObjectDataHandler::write_iertr::now();
        });
@@ -216,25 +457,8 @@ ObjectDataHandler::write_ret do_insertions(
          return ObjectDataHandler::write_iertr::now();
        });
       } else {
-       ceph_assert(region.is_existing());
-       DEBUGT("map existing extent: laddr {} len {} {}",
-              ctx.t, region.addr, region.len, *region.existing_paddr);
-       return ctx.tm.map_existing_extent<ObjectDataBlock>(
-         ctx.t, region.addr, *region.existing_paddr, region.len
-       ).handle_error_interruptible(
-         TransactionManager::alloc_extent_iertr::pass_further{},
-         Device::read_ertr::assert_all{"ignore read error"}
-       ).si_then([FNAME, ctx, &region](auto extent) {
-         if (extent->get_laddr() != region.addr) {
-           ERRORT(
-             "inconsistent laddr: extent: {} region {}",
-             ctx.t,
-             extent->get_laddr(),
-             region.addr);
-         }
-         ceph_assert(extent->get_laddr() == region.addr);
-         return ObjectDataHandler::write_iertr::now();
-       });
+       ceph_abort("impossible");
+       return ObjectDataHandler::write_iertr::now();
       }
     });
 }
@@ -352,7 +576,8 @@ public:
   overwrite_plan_t(laddr_t offset,
                   extent_len_t len,
                   const lba_pin_list_t& pins,
-                  extent_len_t block_size) :
+                  extent_len_t block_size,
+                  Transaction& t) :
       pin_begin(pins.front()->get_key()),
       pin_end(pins.back()->get_key() + pins.back()->get_length()),
       left_paddr(pins.front()->get_val()),
@@ -365,7 +590,7 @@ public:
       right_operation(overwrite_operation_t::UNKNOWN),
       block_size(block_size) {
     validate();
-    evaluate_operations();
+    evaluate_operations(t);
     assert(left_operation != overwrite_operation_t::UNKNOWN);
     assert(right_operation != overwrite_operation_t::UNKNOWN);
   }
@@ -393,19 +618,31 @@ private:
    * original extent into at most three parts: origin-left, part-to-be-modified
    * and origin-right.
    */
-  void evaluate_operations() {
+  void evaluate_operations(Transaction& t) {
     auto actual_write_size = get_pins_size();
     auto aligned_data_size = get_aligned_data_size();
     auto left_ext_size = get_left_extent_size();
     auto right_ext_size = get_right_extent_size();
 
+    auto can_merge = [](Transaction& t, paddr_t paddr) {
+      CachedExtentRef ext;
+      if (paddr.is_relative() || paddr.is_delayed()) {
+         return true;
+      } else if (t.get_extent(paddr, &ext) ==
+       Transaction::get_extent_ret::PRESENT) {
+       // FIXME: there is no need to lookup the cache if the pin can 
+       // be associated with the extent state
+       if (ext->is_mutable()) {
+         return true;
+       }
+      }
+      return false;
+    };
     if (left_paddr.is_zero()) {
       actual_write_size -= left_ext_size;
       left_ext_size = 0;
       left_operation = overwrite_operation_t::OVERWRITE_ZERO;
-    // FIXME: left_paddr can be absolute and pending
-    } else if (left_paddr.is_relative() ||
-              left_paddr.is_delayed()) {
+    } else if (can_merge(t, left_paddr)) {
       aligned_data_size += left_ext_size;
       left_ext_size = 0;
       left_operation = overwrite_operation_t::MERGE_EXISTING;
@@ -415,9 +652,7 @@ private:
       actual_write_size -= right_ext_size;
       right_ext_size = 0;
       right_operation = overwrite_operation_t::OVERWRITE_ZERO;
-    // FIXME: right_paddr can be absolute and pending
-    } else if (right_paddr.is_relative() ||
-              right_paddr.is_delayed()) {
+    } else if (can_merge(t, right_paddr)) {
       aligned_data_size += right_ext_size;
       right_ext_size = 0;
       right_operation = overwrite_operation_t::MERGE_EXISTING;
@@ -506,14 +741,15 @@ operate_ret operate_left(context_t ctx, LBAMappingRef &pin, const overwrite_plan
         std::nullopt,
         std::nullopt);
     } else {
+      extent_len_t off = pin->get_intermediate_offset();
       return ctx.tm.read_pin<ObjectDataBlock>(
        ctx.t, pin->duplicate()
-      ).si_then([prepend_len](auto left_extent) {
+      ).si_then([prepend_len, off](auto left_extent) {
         return get_iertr::make_ready_future<operate_ret_bare>(
           std::nullopt,
           std::make_optional(bufferptr(
             left_extent->get_bptr(),
-            0,
+            off,
             prepend_len)));
       });
     }
@@ -524,23 +760,24 @@ operate_ret operate_left(context_t ctx, LBAMappingRef &pin, const overwrite_plan
     assert(extent_len);
     std::optional<extent_to_write_t> left_to_write_extent =
       std::make_optional(extent_to_write_t::create_existing(
-        overwrite_plan.pin_begin,
-        overwrite_plan.left_paddr,
+        pin->duplicate(),
+        pin->get_key(),
         extent_len));
 
     auto prepend_len = overwrite_plan.get_left_alignment_size();
     if (prepend_len == 0) {
       return get_iertr::make_ready_future<operate_ret_bare>(
-        left_to_write_extent,
+        std::move(left_to_write_extent),
         std::nullopt);
     } else {
+      extent_len_t off = pin->get_intermediate_offset();
       return ctx.tm.read_pin<ObjectDataBlock>(
        ctx.t, pin->duplicate()
-      ).si_then([prepend_offset=extent_len, prepend_len,
+      ).si_then([prepend_offset=extent_len + off, prepend_len,
                  left_to_write_extent=std::move(left_to_write_extent)]
                 (auto left_extent) mutable {
         return get_iertr::make_ready_future<operate_ret_bare>(
-          left_to_write_extent,
+          std::move(left_to_write_extent),
           std::make_optional(bufferptr(
             left_extent->get_bptr(),
             prepend_offset,
@@ -587,7 +824,10 @@ operate_ret operate_right(context_t ctx, LBAMappingRef &pin, const overwrite_pla
         std::nullopt,
         std::nullopt);
     } else {
-      auto append_offset = overwrite_plan.data_end - right_pin_begin;
+      auto append_offset =
+       overwrite_plan.data_end
+       - right_pin_begin
+       + pin->get_intermediate_offset();
       return ctx.tm.read_pin<ObjectDataBlock>(
        ctx.t, pin->duplicate()
       ).si_then([append_offset, append_len](auto right_extent) {
@@ -606,24 +846,27 @@ operate_ret operate_right(context_t ctx, LBAMappingRef &pin, const overwrite_pla
     assert(extent_len);
     std::optional<extent_to_write_t> right_to_write_extent =
       std::make_optional(extent_to_write_t::create_existing(
+        pin->duplicate(),
         overwrite_plan.aligned_data_end,
-        overwrite_plan.right_paddr.add_offset(overwrite_plan.aligned_data_end - right_pin_begin),
         extent_len));
 
     auto append_len = overwrite_plan.get_right_alignment_size();
     if (append_len == 0) {
       return get_iertr::make_ready_future<operate_ret_bare>(
-        right_to_write_extent,
+        std::move(right_to_write_extent),
         std::nullopt);
     } else {
-      auto append_offset = overwrite_plan.data_end - right_pin_begin;
+      auto append_offset =
+       overwrite_plan.data_end
+       - right_pin_begin
+       + pin->get_intermediate_offset();
       return ctx.tm.read_pin<ObjectDataBlock>(
        ctx.t, pin->duplicate()
       ).si_then([append_offset, append_len,
                  right_to_write_extent=std::move(right_to_write_extent)]
                 (auto right_extent) mutable {
         return get_iertr::make_ready_future<operate_ret_bare>(
-          right_to_write_extent,
+          std::move(right_to_write_extent),
           std::make_optional(bufferptr(
             right_extent->get_bptr(),
             append_offset,
@@ -652,6 +895,31 @@ auto with_object_data(
     });
 }
 
+template <typename F>
+auto with_objects_data(
+  ObjectDataHandler::context_t ctx,
+  F &&f)
+{
+  ceph_assert(ctx.d_onode);
+  return seastar::do_with(
+    ctx.onode.get_layout().object_data.get(),
+    ctx.d_onode->get_layout().object_data.get(),
+    std::forward<F>(f),
+    [ctx](auto &object_data, auto &d_object_data, auto &f) {
+      return std::invoke(f, object_data, d_object_data
+      ).si_then([ctx, &object_data, &d_object_data] {
+       if (object_data.must_update()) {
+         ctx.onode.get_mutable_layout(ctx.t).object_data.update(object_data);
+       }
+       if (d_object_data.must_update()) {
+         ctx.d_onode->get_mutable_layout(
+           ctx.t).object_data.update(d_object_data);
+       }
+       return seastar::now();
+      });
+    });
+}
+
 ObjectDataHandler::write_ret ObjectDataHandler::prepare_data_reservation(
   context_t ctx,
   object_data_t &object_data,
@@ -706,6 +974,11 @@ ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
       ).si_then([ctx, size, &pins, &object_data, &to_write](auto _pins) {
        _pins.swap(pins);
        ceph_assert(pins.size());
+       if (!size) {
+         // no need to reserve region if we are truncating the object's
+         // size to 0
+         return clear_iertr::now();
+       }
        auto &pin = *pins.front();
        ceph_assert(pin.get_key() >= object_data.get_reserved_data_base());
        ceph_assert(
@@ -721,41 +994,67 @@ ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
            object_data.get_reserved_data_len() - pin_offset));
          return clear_iertr::now();
        } else {
-         /* First pin overlaps the boundary and has data, read in extent
-          * and rewrite portion prior to size */
-         return ctx.tm.read_pin<ObjectDataBlock>(
-           ctx.t,
-           pin.duplicate()
-         ).si_then([ctx, size, pin_offset, &pin, &object_data, &to_write](
-                    auto extent) {
-           bufferlist bl;
-           bl.append(
-             bufferptr(
-               extent->get_bptr(),
-               0,
-               size - pin_offset
-             ));
-           bl.append_zero(p2roundup(size, ctx.tm.get_block_size()) - size);
-           to_write.push_back(extent_to_write_t::create_data(
-             pin.get_key(),
-             bl));
+         /* First pin overlaps the boundary and has data, remap it
+          * if aligned or rewrite it if not aligned to size */
+          auto roundup_size = p2roundup(size, ctx.tm.get_block_size());
+          auto append_len = roundup_size - size;
+          if (append_len == 0) {
+            LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
+            TRACET("First pin overlaps the boundary and has aligned data"
+              "create existing at addr:{}, len:{}",
+              ctx.t, pin.get_key(), size - pin_offset);
+            to_write.push_back(extent_to_write_t::create_existing(
+              pin.duplicate(),
+              pin.get_key(),
+              size - pin_offset));
            to_write.push_back(extent_to_write_t::create_zero(
-             object_data.get_reserved_data_base() +
-                p2roundup(size, ctx.tm.get_block_size()),
-             object_data.get_reserved_data_len() -
-                p2roundup(size, ctx.tm.get_block_size())));
-           return clear_iertr::now();
-         });
-       }
-      }).si_then([ctx, &pins] {
-       return do_removals(ctx, pins);
-      }).si_then([ctx, &to_write] {
-       return do_insertions(ctx, to_write);
-      }).si_then([size, &object_data] {
-       if (size == 0) {
-         object_data.clear();
+             object_data.get_reserved_data_base() + roundup_size,
+             object_data.get_reserved_data_len() - roundup_size));
+            return clear_iertr::now();
+          } else {
+            return ctx.tm.read_pin<ObjectDataBlock>(
+              ctx.t,
+              pin.duplicate()
+            ).si_then([ctx, size, pin_offset, append_len, roundup_size,
+                      &pin, &object_data, &to_write](auto extent) {
+              bufferlist bl;
+             bl.append(
+               bufferptr(
+                 extent->get_bptr(),
+                 pin.get_intermediate_offset(),
+                 size - pin_offset
+             ));
+              bl.append_zero(append_len);
+              LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
+              TRACET("First pin overlaps the boundary and has unaligned data"
+                "create data at addr:{}, len:{}",
+                ctx.t, pin.get_key(), bl.length());
+             to_write.push_back(extent_to_write_t::create_data(
+               pin.get_key(),
+               bl));
+             to_write.push_back(extent_to_write_t::create_zero(
+               object_data.get_reserved_data_base() + roundup_size,
+               object_data.get_reserved_data_len() - roundup_size));
+              return clear_iertr::now();
+            });
+          }
        }
-       return ObjectDataHandler::clear_iertr::now();
+      }).si_then([ctx, size, &to_write, &object_data, &pins] {
+        return seastar::do_with(
+          prepare_ops_list(pins, to_write),
+          [ctx, size, &object_data](auto &ops) {
+            return do_remappings(ctx, ops.to_remap
+            ).si_then([ctx, &ops] {
+              return do_removals(ctx, ops.to_remove);
+            }).si_then([ctx, &ops] {
+              return do_insertions(ctx, ops.to_insert);
+            }).si_then([size, &object_data] {
+             if (size == 0) {
+               object_data.clear();
+             }
+             return ObjectDataHandler::clear_iertr::now();
+            });
+        });
       });
     });
 }
@@ -806,7 +1105,9 @@ extent_to_write_list_t get_to_writes_with_zero_buffer(
     }
     assert(bl.length() % block_size == 0);
     assert(bl.length() == (right - left));
-    return {extent_to_write_t::create_data(left, bl)};
+    extent_to_write_list_t ret;
+    ret.push_back(extent_to_write_t::create_data(left, bl));
+    return ret;
   } else {
     // reserved section between ends, headptr and tailptr in different extents
     extent_to_write_list_t ret;
@@ -858,7 +1159,7 @@ ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
   if (bl.has_value()) {
     assert(bl->length() == len);
   }
-  overwrite_plan_t overwrite_plan(offset, len, _pins, ctx.tm.get_block_size());
+  overwrite_plan_t overwrite_plan(offset, len, _pins, ctx.tm.get_block_size(), ctx.t);
   return seastar::do_with(
     std::move(_pins),
     extent_to_write_list_t(),
@@ -931,9 +1232,16 @@ ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
         assert(pin_begin == to_write.front().addr);
         assert(pin_end == to_write.back().get_end_addr());
 
-        return do_removals(ctx, pins);
-      }).si_then([ctx, &to_write] {
-        return do_insertions(ctx, to_write);
+        return seastar::do_with(
+          prepare_ops_list(pins, to_write),
+          [ctx](auto &ops) {
+            return do_remappings(ctx, ops.to_remap
+            ).si_then([ctx, &ops] {
+              return do_removals(ctx, ops.to_remove);
+            }).si_then([ctx, &ops] {
+              return do_insertions(ctx, ops.to_insert);
+            });
+        });
       });
     });
   });
@@ -1061,17 +1369,33 @@ ObjectDataHandler::read_ret ObjectDataHandler::read(
                      current = end;
                      return seastar::now();
                    } else {
+                     LOG_PREFIX(ObjectDataHandler::read);
+                     auto key = pin->get_key();
+                     bool is_indirect = pin->is_indirect();
+                      extent_len_t off = pin->get_intermediate_offset();
+                     DEBUGT("reading {}~{}, indirect: {}, "
+                       "intermediate offset: {}, current: {}, end: {}",
+                       ctx.t,
+                       key,
+                       pin->get_length(),
+                       is_indirect,
+                       off,
+                       current,
+                       end);
                      return ctx.tm.read_pin<ObjectDataBlock>(
                        ctx.t,
                        std::move(pin)
-                     ).si_then([&ret, &current, end](auto extent) {
+                     ).si_then([&ret, &current, end, key, off,
+                               is_indirect](auto extent) {
                        ceph_assert(
-                         (extent->get_laddr() + extent->get_length()) >= end);
+                         is_indirect
+                           ? (key - off + extent->get_length()) >= end
+                           : (extent->get_laddr() + extent->get_length()) >= end);
                        ceph_assert(end > current);
                        ret.append(
                          bufferptr(
                            extent->get_bptr(),
-                           current - extent->get_laddr(),
+                           off + current - (is_indirect ? key : extent->get_laddr()),
                            end - current));
                        current = end;
                        return seastar::now();
@@ -1189,4 +1513,126 @@ ObjectDataHandler::clear_ret ObjectDataHandler::clear(
     });
 }
 
+ObjectDataHandler::clone_ret ObjectDataHandler::clone_extents(
+  context_t ctx,
+  object_data_t &object_data,
+  lba_pin_list_t &pins,
+  laddr_t data_base)
+{
+  LOG_PREFIX(ObjectDataHandler::clone_extents);
+  TRACET(" object_data: {}~{}, data_base: {}",
+    ctx.t,
+    object_data.get_reserved_data_base(),
+    object_data.get_reserved_data_len(),
+    data_base);
+  return ctx.tm.dec_ref(
+    ctx.t,
+    object_data.get_reserved_data_base()
+  ).si_then(
+    [&pins, &object_data, ctx, data_base](auto) mutable {
+      return seastar::do_with(
+       (extent_len_t)0,
+       [&object_data, ctx, data_base, &pins](auto &last_pos) {
+       return trans_intr::do_for_each(
+         pins,
+         [&last_pos, &object_data, ctx, data_base](auto &pin) {
+         auto offset = pin->get_key() - data_base;
+         ceph_assert(offset == last_pos);
+         auto fut = TransactionManager::alloc_extent_iertr
+           ::make_ready_future<LBAMappingRef>();
+         auto addr = object_data.get_reserved_data_base() + offset;
+         if (pin->get_val().is_zero()) {
+           fut = ctx.tm.reserve_region(ctx.t, addr, pin->get_length());
+         } else {
+           fut = ctx.tm.clone_pin(ctx.t, addr, *pin);
+         }
+         return fut.si_then(
+           [&pin, &last_pos, offset](auto) {
+           last_pos = offset + pin->get_length();
+           return seastar::now();
+         }).handle_error_interruptible(
+           crimson::ct_error::input_output_error::pass_further(),
+           crimson::ct_error::assert_all("not possible")
+         );
+       }).si_then([&last_pos, &object_data, ctx] {
+         if (last_pos != object_data.get_reserved_data_len()) {
+           return ctx.tm.reserve_region(
+             ctx.t,
+             object_data.get_reserved_data_base() + last_pos,
+             object_data.get_reserved_data_len() - last_pos
+           ).si_then([](auto) {
+             return seastar::now();
+           });
+         }
+         return TransactionManager::reserve_extent_iertr::now();
+       });
+      });
+    },
+    ObjectDataHandler::write_iertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "object_data_handler::clone invalid error"
+    }
+  );
+}
+
+ObjectDataHandler::clone_ret ObjectDataHandler::clone(
+  context_t ctx)
+{
+  // the whole clone procedure can be seperated into the following steps:
+  //   1. let clone onode(d_object_data) take the head onode's
+  //      object data base;
+  //   2. reserve a new region in lba tree for the head onode;
+  //   3. clone all extents of the clone onode, see transaction_manager.h
+  //      for the details of clone_pin;
+  //   4. reserve the space between the head onode's size and its reservation
+  //      length.
+  return with_objects_data(
+    ctx,
+    [ctx, this](auto &object_data, auto &d_object_data) {
+    ceph_assert(d_object_data.is_null());
+    if (object_data.is_null()) {
+      return clone_iertr::now();
+    }
+    return prepare_data_reservation(
+      ctx,
+      d_object_data,
+      object_data.get_reserved_data_len()
+    ).si_then([&object_data, &d_object_data, ctx, this] {
+      assert(!object_data.is_null());
+      auto base = object_data.get_reserved_data_base();
+      auto len = object_data.get_reserved_data_len();
+      object_data.clear();
+      LOG_PREFIX(ObjectDataHandler::clone);
+      DEBUGT("cloned obj reserve_data_base: {}, len {}",
+       ctx.t,
+       d_object_data.get_reserved_data_base(),
+       d_object_data.get_reserved_data_len());
+      return prepare_data_reservation(
+       ctx,
+       object_data,
+       d_object_data.get_reserved_data_len()
+      ).si_then([&d_object_data, ctx, &object_data, base, len, this] {
+       LOG_PREFIX("ObjectDataHandler::clone");
+       DEBUGT("head obj reserve_data_base: {}, len {}",
+         ctx.t,
+         object_data.get_reserved_data_base(),
+         object_data.get_reserved_data_len());
+       return ctx.tm.get_pins(ctx.t, base, len
+       ).si_then([ctx, &object_data, &d_object_data, base, this](auto pins) {
+         return seastar::do_with(
+           std::move(pins),
+           [ctx, &object_data, &d_object_data, base, this](auto &pins) {
+           return clone_extents(ctx, object_data, pins, base
+           ).si_then([ctx, &d_object_data, base, &pins, this] {
+             return clone_extents(ctx, d_object_data, pins, base);
+           }).si_then([&pins, ctx] {
+             return do_removals(ctx, pins);
+           });
+         });
+       });
+      });
+    });
+  });
+}
+
 } // namespace crimson::os::seastore