]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/pg_backend.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / osd / pg_backend.cc
index e45b935bb323f2adf837f5091440839a9f4494f0..d69e5e2042ecdbb58c7f73a891ae429bec046890 100644 (file)
 #include "common/Clock.h"
 
 #include "crimson/common/exception.h"
+#include "crimson/common/tmap_helpers.h"
 #include "crimson/os/futurized_collection.h"
 #include "crimson/os/futurized_store.h"
 #include "crimson/osd/osd_operation.h"
+#include "crimson/osd/object_context_loader.h"
 #include "replicated_backend.h"
 #include "replicated_recovery_backend.h"
 #include "ec_backend.h"
@@ -43,16 +45,19 @@ PGBackend::create(pg_t pgid,
                  const pg_pool_t& pool,
                  crimson::os::CollectionRef coll,
                  crimson::osd::ShardServices& shard_services,
-                 const ec_profile_t& ec_profile)
+                 const ec_profile_t& ec_profile,
+                 DoutPrefixProvider &dpp)
 {
   switch (pool.type) {
   case pg_pool_t::TYPE_REPLICATED:
     return std::make_unique<ReplicatedBackend>(pgid, pg_shard,
-                                              coll, shard_services);
+                                              coll, shard_services,
+                                              dpp);
   case pg_pool_t::TYPE_ERASURE:
     return std::make_unique<ECBackend>(pg_shard.shard, coll, shard_services,
                                        std::move(ec_profile),
-                                       pool.stripe_width);
+                                       pool.stripe_width,
+                                      dpp);
   default:
     throw runtime_error(seastar::format("unsupported pool type '{}'",
                                         pool.type));
@@ -61,65 +66,85 @@ PGBackend::create(pg_t pgid,
 
 PGBackend::PGBackend(shard_id_t shard,
                      CollectionRef coll,
-                     crimson::os::FuturizedStore* store)
+                     crimson::osd::ShardServices &shard_services,
+                    DoutPrefixProvider &dpp)
   : shard{shard},
     coll{coll},
-    store{store}
+    shard_services{shard_services},
+    dpp{dpp},
+    store{&shard_services.get_store()}
 {}
 
 PGBackend::load_metadata_iertr::future
   <PGBackend::loaded_object_md_t::ref>
 PGBackend::load_metadata(const hobject_t& oid)
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
-
   return interruptor::make_interruptible(store->get_attrs(
     coll,
     ghobject_t{oid, ghobject_t::NO_GEN, shard})).safe_then_interruptible(
       [oid](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t::ref>{
-       loaded_object_md_t::ref ret(new loaded_object_md_t());
-       if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
-         bufferlist bl = std::move(oiiter->second);
-         ret->os = ObjectState(
-           object_info_t(bl, oid),
-           true);
-       } else {
-         logger().error(
-           "load_metadata: object {} present but missing object info",
-           oid);
-         return crimson::ct_error::object_corrupted::make();
-       }
-       
-       if (oid.is_head()) {
-         if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) {
-           bufferlist bl = std::move(ssiter->second);
-           ret->ss = SnapSet(bl);
-         } else {
-           /* TODO: add support for writing out snapsets
-           logger().error(
-             "load_metadata: object {} present but missing snapset",
-             oid);
-           //return crimson::ct_error::object_corrupted::make();
-           */
-           ret->ss = SnapSet();
-         }
-       }
+        loaded_object_md_t::ref ret(new loaded_object_md_t());
+        if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
+          bufferlist bl = std::move(oiiter->second);
+          try {
+            ret->os = ObjectState(
+              object_info_t(bl, oid),
+              true);
+          } catch (const buffer::error&) {
+            logger().warn("unable to decode ObjectState");
+            throw crimson::osd::invalid_argument();
+          }
+        } else {
+          logger().error(
+            "load_metadata: object {} present but missing object info",
+            oid);
+          return crimson::ct_error::object_corrupted::make();
+        }
 
-       return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
-         std::move(ret));
+        if (oid.is_head()) {
+          // Returning object_corrupted when the object exsits and the
+          // Snapset is either not found or empty.
+          bool object_corrupted = true;
+          if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) {
+            object_corrupted = false;
+            bufferlist bl = std::move(ssiter->second);
+            if (bl.length()) {
+              ret->ssc = new crimson::osd::SnapSetContext(oid.get_snapdir());
+              try {
+                ret->ssc->snapset = SnapSet(bl);
+                ret->ssc->exists = true;
+                logger().debug(
+                  "load_metadata: object {} and snapset {} present",
+                   oid, ret->ssc->snapset);
+              } catch (const buffer::error&) {
+                logger().warn("unable to decode SnapSet");
+                throw crimson::osd::invalid_argument();
+              }
+            } else {
+              object_corrupted = true;
+            }
+          }
+          if (object_corrupted) {
+            logger().error(
+              "load_metadata: object {} present but missing snapset",
+              oid);
+            return crimson::ct_error::object_corrupted::make();
+          }
+        }
+
+        return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
+          std::move(ret));
       }, crimson::ct_error::enoent::handle([oid] {
-       logger().debug(
-         "load_metadata: object {} doesn't exist, returning empty metadata",
-         oid);
-       return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
-         new loaded_object_md_t{
-           ObjectState(
-             object_info_t(oid),
-             false),
-           oid.is_head() ? std::optional<SnapSet>(SnapSet()) : std::nullopt
-         });
+        logger().debug(
+          "load_metadata: object {} doesn't exist, returning empty metadata",
+          oid);
+        return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
+          new loaded_object_md_t{
+            ObjectState(
+              object_info_t(oid),
+              false),
+            oid.is_head() ? (new crimson::osd::SnapSetContext(oid)) : nullptr
+          });
       }));
 }
 
@@ -155,6 +180,18 @@ PGBackend::mutate_object(
       // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
       txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv);
     }
+
+    // snapset
+    if (obc->obs.oi.soid.snap == CEPH_NOSNAP) {
+      logger().debug("final snapset {} in {}",
+        obc->ssc->snapset, obc->obs.oi.soid);
+      ceph::bufferlist bss;
+      encode(obc->ssc->snapset, bss);
+      txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, SS_ATTR, bss);
+      obc->ssc->exists = true;
+    } else {
+      logger().debug("no snapset (this is a clone)");
+    }
   } else {
     // reset cached ObjectState without enforcing eviction
     obc->obs.oi = object_info_t(obc->obs.oi.soid);
@@ -216,7 +253,8 @@ PGBackend::read(const ObjectState& os, OSDOp& osd_op,
       return crimson::ct_error::object_corrupted::make();
     }
     logger().debug("read: data length: {}", bl.length());
-    osd_op.rval = bl.length();
+    osd_op.op.extent.length = bl.length();
+    osd_op.rval = 0;
     delta_stats.num_rd++;
     delta_stats.num_rd_kb += shift_round_up(bl.length(), 10);
     osd_op.outdata = std::move(bl);
@@ -231,12 +269,34 @@ PGBackend::read_ierrorator::future<>
 PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op,
                 object_stat_sum_t& delta_stats)
 {
+  if (!os.exists || os.oi.is_whiteout()) {
+    logger().debug("{}: {} DNE", __func__, os.oi.soid);
+    return crimson::ct_error::enoent::make();
+  }
+
   const auto& op = osd_op.op;
+  /* clients (particularly cephfs) may send truncate operations out of order
+   * w.r.t. reads.  op.extent.truncate_seq and op.extent.truncate_size allow
+   * the OSD to determine whether the client submitted read needs to be
+   * adjusted to compensate for a truncate the OSD hasn't seen yet.
+   */
+  uint64_t adjusted_size = os.oi.size;
+  const uint64_t offset = op.extent.offset;
+  uint64_t adjusted_length = op.extent.length;
+  if ((os.oi.truncate_seq < op.extent.truncate_seq) &&
+       (op.extent.offset + op.extent.length > op.extent.truncate_size) &&
+       (adjusted_size > op.extent.truncate_size)) {
+    adjusted_size = op.extent.truncate_size;
+  }
+  if (offset > adjusted_size) {
+    adjusted_length = 0;
+  } else if (offset + adjusted_length > adjusted_size) {
+    adjusted_length = adjusted_size - offset;
+  }
   logger().trace("sparse_read: {} {}~{}",
                  os.oi.soid, op.extent.offset, op.extent.length);
   return interruptor::make_interruptible(store->fiemap(coll, ghobject_t{os.oi.soid},
-                      op.extent.offset,
-                      op.extent.length)).then_interruptible(
+    offset, adjusted_length)).safe_then_interruptible(
     [&delta_stats, &os, &osd_op, this](auto&& m) {
     return seastar::do_with(interval_set<uint64_t>{std::move(m)},
                            [&delta_stats, &os, &osd_op, this](auto&& extents) {
@@ -365,11 +425,6 @@ PGBackend::cmp_ext_ierrorator::future<>
 PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op)
 {
   const ceph_osd_op& op = osd_op.op;
-  // return the index of the first unmatched byte in the payload, hence the
-  // strange limit and check
-  if (op.extent.length > MAX_ERRNO) {
-    return crimson::ct_error::invarg::make();
-  }
   uint64_t obj_size = os.oi.size;
   if (os.oi.truncate_seq < op.extent.truncate_seq &&
       op.extent.offset + op.extent.length > op.extent.truncate_size) {
@@ -391,19 +446,21 @@ PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op)
   } else {
     read_ext = _read(os.oi.soid, op.extent.offset, ext_len, 0);
   }
-  return read_ext.safe_then_interruptible([&osd_op](auto&& read_bl) {
-    int32_t retcode = 0;
+  return read_ext.safe_then_interruptible([&osd_op](auto&& read_bl)
+    -> cmp_ext_errorator::future<> {
     for (unsigned index = 0; index < osd_op.indata.length(); index++) {
       char byte_in_op = osd_op.indata[index];
       char byte_from_disk = (index < read_bl.length() ? read_bl[index] : 0);
       if (byte_in_op != byte_from_disk) {
         logger().debug("cmp_ext: mismatch at {}", index);
-        retcode = -MAX_ERRNO - index;
-       break;
+        // Unlike other ops, we set osd_op.rval here and return a different
+        // error code via ct_error::cmp_fail.
+        osd_op.rval = -MAX_ERRNO - index;
+        return crimson::ct_error::cmp_fail::make();
       }
     }
-    logger().debug("cmp_ext: {}", retcode);
-    osd_op.rval = retcode;
+    osd_op.rval = 0;
+    return cmp_ext_errorator::make_ready_future<>();
   });
 }
 
@@ -425,6 +482,84 @@ PGBackend::stat(
   return stat_errorator::now();
 }
 
+PGBackend::write_iertr::future<> PGBackend::_writefull(
+  ObjectState& os,
+  off_t truncate_size,
+  const bufferlist& bl,
+  ceph::os::Transaction& txn,
+  osd_op_params_t& osd_op_params,
+  object_stat_sum_t& delta_stats,
+  unsigned flags)
+{
+  const bool existing = maybe_create_new_object(os, txn, delta_stats);
+  if (existing && bl.length() < os.oi.size) {
+
+    txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, bl.length());
+    truncate_update_size_and_usage(delta_stats, os.oi, truncate_size);
+
+    osd_op_params.clean_regions.mark_data_region_dirty(
+      bl.length(),
+      os.oi.size - bl.length());
+  }
+  if (bl.length()) {
+    txn.write(
+      coll->get_cid(), ghobject_t{os.oi.soid}, 0, bl.length(),
+      bl, flags);
+    update_size_and_usage(
+      delta_stats, os.oi, 0,
+      bl.length(), true);
+    osd_op_params.clean_regions.mark_data_region_dirty(
+      0,
+      std::max((uint64_t)bl.length(), os.oi.size));
+  }
+  return seastar::now();
+}
+
+PGBackend::write_iertr::future<> PGBackend::_truncate(
+  ObjectState& os,
+  ceph::os::Transaction& txn,
+  osd_op_params_t& osd_op_params,
+  object_stat_sum_t& delta_stats,
+  size_t offset,
+  size_t truncate_size,
+  uint32_t truncate_seq)
+{
+  if (truncate_seq) {
+    assert(offset == truncate_size);
+    if (truncate_seq <= os.oi.truncate_seq) {
+      logger().debug("{} truncate seq {} <= current {}, no-op",
+                     __func__, truncate_seq, os.oi.truncate_seq);
+      return write_ertr::make_ready_future<>();
+    } else {
+      logger().debug("{} truncate seq {} > current {}, truncating",
+                     __func__, truncate_seq, os.oi.truncate_seq);
+      os.oi.truncate_seq = truncate_seq;
+      os.oi.truncate_size = truncate_size;
+    }
+  }
+  maybe_create_new_object(os, txn, delta_stats);
+  if (os.oi.size != offset) {
+    txn.truncate(
+      coll->get_cid(),
+      ghobject_t{os.oi.soid}, offset);
+    if (os.oi.size > offset) {
+      // TODO: modified_ranges.union_of(trim);
+      osd_op_params.clean_regions.mark_data_region_dirty(
+        offset,
+       os.oi.size - offset);
+    } else {
+      // os.oi.size < offset
+      osd_op_params.clean_regions.mark_data_region_dirty(
+        os.oi.size,
+        offset - os.oi.size);
+    }
+    truncate_update_size_and_usage(delta_stats, os.oi, offset);
+    os.oi.clear_data_digest();
+  }
+  delta_stats.num_wr++;
+  return write_ertr::now();
+}
+
 bool PGBackend::maybe_create_new_object(
   ObjectState& os,
   ceph::os::Transaction& txn,
@@ -486,7 +621,26 @@ static bool is_offset_and_length_valid(
   }
 }
 
-PGBackend::interruptible_future<> PGBackend::write(
+PGBackend::interruptible_future<> PGBackend::set_allochint(
+  ObjectState& os,
+  const OSDOp& osd_op,
+  ceph::os::Transaction& txn,
+  object_stat_sum_t& delta_stats)
+{
+  maybe_create_new_object(os, txn, delta_stats);
+
+  os.oi.expected_object_size = osd_op.op.alloc_hint.expected_object_size;
+  os.oi.expected_write_size = osd_op.op.alloc_hint.expected_write_size;
+  os.oi.alloc_hint_flags = osd_op.op.alloc_hint.flags;
+  txn.set_alloc_hint(coll->get_cid(),
+                     ghobject_t{os.oi.soid},
+                     os.oi.expected_object_size,
+                     os.oi.expected_write_size,
+                     os.oi.alloc_hint_flags);
+  return seastar::now();
+}
+
+PGBackend::write_iertr::future<> PGBackend::write(
     ObjectState& os,
     const OSDOp& osd_op,
     ceph::os::Transaction& txn,
@@ -497,6 +651,14 @@ PGBackend::interruptible_future<> PGBackend::write(
   uint64_t offset = op.extent.offset;
   uint64_t length = op.extent.length;
   bufferlist buf = osd_op.indata;
+  if (op.extent.length != osd_op.indata.length()) {
+    return crimson::ct_error::invarg::make();
+  }
+
+  if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
+    return crimson::ct_error::file_too_large::make();
+  }
+
   if (auto seq = os.oi.truncate_seq;
       seq != 0 && op.extent.truncate_seq < seq) {
     // old write, arrived after trimtrunc
@@ -581,7 +743,7 @@ PGBackend::interruptible_future<> PGBackend::write_same(
   return seastar::now();
 }
 
-PGBackend::interruptible_future<> PGBackend::writefull(
+PGBackend::write_iertr::future<> PGBackend::writefull(
   ObjectState& os,
   const OSDOp& osd_op,
   ceph::os::Transaction& txn,
@@ -590,25 +752,97 @@ PGBackend::interruptible_future<> PGBackend::writefull(
 {
   const ceph_osd_op& op = osd_op.op;
   if (op.extent.length != osd_op.indata.length()) {
-    throw crimson::osd::invalid_argument();
+    return crimson::ct_error::invarg::make();
   }
-
-  const bool existing = maybe_create_new_object(os, txn, delta_stats);
-  if (existing && op.extent.length < os.oi.size) {
-    txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.length);
-    truncate_update_size_and_usage(delta_stats, os.oi, op.extent.truncate_size);
-    osd_op_params.clean_regions.mark_data_region_dirty(op.extent.length,
-       os.oi.size - op.extent.length);
+  if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
+    return crimson::ct_error::file_too_large::make();
   }
-  if (op.extent.length) {
-    txn.write(coll->get_cid(), ghobject_t{os.oi.soid}, 0, op.extent.length,
-              osd_op.indata, op.flags);
-    update_size_and_usage(delta_stats, os.oi, 0,
-      op.extent.length, true);
+
+  return _writefull(
+    os,
+    op.extent.truncate_size,
+    osd_op.indata,
+    txn,
+    osd_op_params,
+    delta_stats,
+    op.flags);
+}
+
+PGBackend::rollback_iertr::future<> PGBackend::rollback(
+  ObjectState& os,
+  const OSDOp& osd_op,
+  ceph::os::Transaction& txn,
+  osd_op_params_t& osd_op_params,
+  object_stat_sum_t& delta_stats,
+  crimson::osd::ObjectContextRef head,
+  crimson::osd::ObjectContextLoader& obc_loader)
+{
+  const ceph_osd_op& op = osd_op.op;
+  snapid_t snapid = (uint64_t)op.snap.snapid;
+  assert(os.oi.soid.is_head());
+  logger().debug("{} deleting {} and rolling back to old snap {}",
+                  __func__, os.oi.soid ,snapid);
+  hobject_t target_coid = os.oi.soid;
+  target_coid.snap = snapid;
+  return obc_loader.with_clone_obc_only<RWState::RWWRITE>(
+    head, target_coid,
+    [this, &os, &txn, &delta_stats, &osd_op_params]
+    (auto resolved_obc) {
+    if (resolved_obc->obs.oi.soid.is_head()) {
+      // no-op: The resolved oid returned the head object
+      logger().debug("PGBackend::rollback: loaded head_obc: {}"
+                     " do nothing",
+                     resolved_obc->obs.oi.soid);
+      return rollback_iertr::now();
+    }
+    /* TODO: https://tracker.ceph.com/issues/59114 This implementation will not
+     * behave correctly for a rados operation consisting of a mutation followed
+     * by a rollback to a snapshot since the last mutation of the object.
+     * The correct behavior would be for the rollback to undo the mutation
+     * earlier in the operation by resolving to the clone created at the start
+     * of the operation (see resolve_oid).
+     * Instead, it will select HEAD leaving that mutation intact since the SnapSet won't
+     * yet contain that clone. This behavior exists in classic as well.
+     */
+    logger().debug("PGBackend::rollback: loaded clone_obc: {}",
+                   resolved_obc->obs.oi.soid);
+    // 1) Delete current head
+    if (os.exists) {
+      txn.remove(coll->get_cid(), ghobject_t{os.oi.soid,
+                                  ghobject_t::NO_GEN, shard});
+    }
+    // 2) Clone correct snapshot into head
+    txn.clone(coll->get_cid(), ghobject_t{resolved_obc->obs.oi.soid},
+                               ghobject_t{os.oi.soid});
+    //    Copy clone obc.os.oi to os.oi
+    os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
+    os.oi.copy_user_bits(resolved_obc->obs.oi);
+    delta_stats.num_bytes -= os.oi.size;
+    delta_stats.num_bytes += resolved_obc->obs.oi.size;
     osd_op_params.clean_regions.mark_data_region_dirty(0,
-       std::max((uint64_t) op.extent.length, os.oi.size));
-  }
-  return seastar::now();
+      std::max(os.oi.size, resolved_obc->obs.oi.size));
+    osd_op_params.clean_regions.mark_omap_dirty();
+    // TODO: 3) Calculate clone_overlaps by following overlaps
+    //          forward from rollback snapshot
+    //          https://tracker.ceph.com/issues/58263
+    return rollback_iertr::now();
+  }).safe_then_interruptible([] {
+    logger().debug("PGBackend::rollback succefully");
+    return rollback_iertr::now();
+  },// there's no snapshot here, or there's no object.
+    // if there's no snapshot, we delete the object;
+    // otherwise, do nothing.
+    crimson::ct_error::enoent::handle(
+    [this, &os, &snapid, &txn, &delta_stats] {
+      logger().debug("PGBackend::rollback: deleting head on {}"
+                     " with snap_id of {}"
+                     " because got ENOENT|whiteout on obc lookup",
+                     os.oi.soid, snapid);
+      return remove(os, txn, delta_stats, false);
+    }),
+    rollback_ertr::pass_further{},
+    crimson::ct_error::assert_all{"unexpected error in rollback"}
+  );
 }
 
 PGBackend::append_ierrorator::future<> PGBackend::append(
@@ -650,41 +884,9 @@ PGBackend::write_iertr::future<> PGBackend::truncate(
   if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
     return crimson::ct_error::file_too_large::make();
   }
-  if (op.extent.truncate_seq) {
-    assert(op.extent.offset == op.extent.truncate_size);
-    if (op.extent.truncate_seq <= os.oi.truncate_seq) {
-      logger().debug("{} truncate seq {} <= current {}, no-op",
-                     __func__, op.extent.truncate_seq, os.oi.truncate_seq);
-      return write_ertr::make_ready_future<>();
-    } else {
-      logger().debug("{} truncate seq {} > current {}, truncating",
-                     __func__, op.extent.truncate_seq, os.oi.truncate_seq);
-      os.oi.truncate_seq = op.extent.truncate_seq;
-      os.oi.truncate_size = op.extent.truncate_size;
-    }
-  }
-  maybe_create_new_object(os, txn, delta_stats);
-  if (os.oi.size != op.extent.offset) {
-    txn.truncate(coll->get_cid(),
-                 ghobject_t{os.oi.soid}, op.extent.offset);
-    if (os.oi.size > op.extent.offset) {
-      // TODO: modified_ranges.union_of(trim);
-      osd_op_params.clean_regions.mark_data_region_dirty(
-        op.extent.offset,
-       os.oi.size - op.extent.offset);
-    } else {
-      // os.oi.size < op.extent.offset
-      osd_op_params.clean_regions.mark_data_region_dirty(
-        os.oi.size,
-        op.extent.offset - os.oi.size);
-    }
-    truncate_update_size_and_usage(delta_stats, os.oi, op.extent.offset);
-    os.oi.clear_data_digest();
-  }
-  delta_stats.num_wr++;
-  // ----
-  // do no set exists, or we will break above DELETE -> TRUNCATE munging.
-  return write_ertr::now();
+  return _truncate(
+    os, txn, osd_op_params, delta_stats,
+    op.extent.offset, op.extent.truncate_size, op.extent.truncate_seq);
 }
 
 PGBackend::write_iertr::future<> PGBackend::zero(
@@ -702,7 +904,17 @@ PGBackend::write_iertr::future<> PGBackend::zero(
   if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
     return crimson::ct_error::file_too_large::make();
   }
-  assert(op.extent.length);
+
+  if (op.extent.offset >= os.oi.size || op.extent.length == 0) {
+    return write_iertr::now(); // noop
+  }
+
+  if (op.extent.offset + op.extent.length >= os.oi.size) {
+    return _truncate(
+      os, txn, osd_op_params, delta_stats,
+      op.extent.offset, op.extent.truncate_size, op.extent.truncate_seq);
+  }
+
   txn.zero(coll->get_cid(),
            ghobject_t{os.oi.soid},
            op.extent.offset,
@@ -715,7 +927,7 @@ PGBackend::write_iertr::future<> PGBackend::zero(
   return write_ertr::now();
 }
 
-PGBackend::interruptible_future<> PGBackend::create(
+PGBackend::create_iertr::future<> PGBackend::create(
   ObjectState& os,
   const OSDOp& osd_op,
   ceph::os::Transaction& txn,
@@ -724,7 +936,7 @@ PGBackend::interruptible_future<> PGBackend::create(
   if (os.exists && !os.oi.is_whiteout() &&
       (osd_op.op.flags & CEPH_OSD_OP_FLAG_EXCL)) {
     // this is an exclusive create
-    throw crimson::osd::make_error(-EEXIST);
+    return crimson::ct_error::eexist::make();
   }
 
   if (osd_op.indata.length()) {
@@ -734,11 +946,12 @@ PGBackend::interruptible_future<> PGBackend::create(
       std::string category;
       decode(category, p);
     } catch (buffer::error&) {
-      throw crimson::osd::invalid_argument();
+      return crimson::ct_error::invarg::make();
     }
   }
   maybe_create_new_object(os, txn, delta_stats);
-  txn.nop();
+  txn.create(coll->get_cid(),
+             ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
   return seastar::now();
 }
 
@@ -758,33 +971,50 @@ PGBackend::remove(ObjectState& os, ceph::os::Transaction& txn)
   return seastar::now();
 }
 
-PGBackend::interruptible_future<>
+PGBackend::remove_iertr::future<>
 PGBackend::remove(ObjectState& os, ceph::os::Transaction& txn,
-  object_stat_sum_t& delta_stats)
+  object_stat_sum_t& delta_stats, bool whiteout)
 {
-  // todo: snapset
+  if (!os.exists) {
+    return crimson::ct_error::enoent::make();
+  }
+
+  if (!os.exists) {
+    logger().debug("{} {} does not exist",__func__, os.oi.soid);
+    return seastar::now();
+  }
+  if (whiteout && os.oi.is_whiteout()) {
+    logger().debug("{} whiteout set on {} ",__func__, os.oi.soid);
+    return seastar::now();
+  }
   txn.remove(coll->get_cid(),
             ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
   delta_stats.num_bytes -= os.oi.size;
   os.oi.size = 0;
   os.oi.new_object();
-  os.exists = false;
+
+  // todo: clone_overlap
+  if (whiteout) {
+    logger().debug("{} setting whiteout on {} ",__func__, os.oi.soid);
+    os.oi.set_flag(object_info_t::FLAG_WHITEOUT);
+    delta_stats.num_whiteouts++;
+    txn.create(coll->get_cid(),
+               ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
+    return seastar::now();
+  }
   // todo: update watchers
   if (os.oi.is_whiteout()) {
     os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
     delta_stats.num_whiteouts--;
   }
   delta_stats.num_objects--;
+  os.exists = false;
   return seastar::now();
 }
 
 PGBackend::interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>>
 PGBackend::list_objects(const hobject_t& start, uint64_t limit) const
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
-
   auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard};
   return interruptor::make_interruptible(store->list_objects(coll,
                                         gstart,
@@ -812,7 +1042,7 @@ PGBackend::list_objects(const hobject_t& start, uint64_t limit) const
     });
 }
 
-PGBackend::interruptible_future<> PGBackend::setxattr(
+PGBackend::setxattr_ierrorator::future<> PGBackend::setxattr(
   ObjectState& os,
   const OSDOp& osd_op,
   ceph::os::Transaction& txn,
@@ -820,13 +1050,13 @@ PGBackend::interruptible_future<> PGBackend::setxattr(
 {
   if (local_conf()->osd_max_attr_size > 0 &&
       osd_op.op.xattr.value_len > local_conf()->osd_max_attr_size) {
-    throw crimson::osd::make_error(-EFBIG);
+    return crimson::ct_error::file_too_large::make();
   }
 
   const auto max_name_len = std::min<uint64_t>(
     store->get_max_attr_name_length(), local_conf()->osd_max_attr_name_len);
   if (osd_op.op.xattr.name_len > max_name_len) {
-    throw crimson::osd::make_error(-ENAMETOOLONG);
+    return crimson::ct_error::enametoolong::make();
   }
 
   maybe_create_new_object(os, txn, delta_stats);
@@ -858,7 +1088,7 @@ PGBackend::get_attr_ierrorator::future<> PGBackend::getxattr(
     name = "_" + aname;
   }
   logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name);
-  return getxattr(os.oi.soid, name).safe_then_interruptible(
+  return getxattr(os.oi.soid, std::move(name)).safe_then_interruptible(
     [&delta_stats, &osd_op] (ceph::bufferlist&& val) {
     osd_op.outdata = std::move(val);
     osd_op.op.xattr.value_len = osd_op.outdata.length();
@@ -873,21 +1103,24 @@ PGBackend::getxattr(
   const hobject_t& soid,
   std::string_view key) const
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
-
   return store->get_attr(coll, ghobject_t{soid}, key);
 }
 
+PGBackend::get_attr_ierrorator::future<ceph::bufferlist>
+PGBackend::getxattr(
+  const hobject_t& soid,
+  std::string&& key) const
+{
+  return seastar::do_with(key, [this, &soid](auto &key) {
+    return store->get_attr(coll, ghobject_t{soid}, key);
+  });
+}
+
 PGBackend::get_attr_ierrorator::future<> PGBackend::get_xattrs(
   const ObjectState& os,
   OSDOp& osd_op,
   object_stat_sum_t& delta_stats) const
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
   return store->get_attrs(coll, ghobject_t{os.oi.soid}).safe_then(
     [&delta_stats, &osd_op](auto&& attrs) {
     std::vector<std::pair<std::string, bufferlist>> user_xattrs;
@@ -957,8 +1190,11 @@ PGBackend::cmp_xattr_ierrorator::future<> PGBackend::cmp_xattr(
   bp.copy(osd_op.op.xattr.name_len, name);
  
   logger().debug("cmpxattr on obj={} for attr={}", os.oi.soid, name);
-  return getxattr(os.oi.soid, name).safe_then_interruptible(
-    [&delta_stats, &osd_op] (auto &&xattr) {
+  return getxattr(os.oi.soid, std::move(name)).safe_then_interruptible(
+    [&delta_stats, &osd_op] (auto &&xattr) -> cmp_xattr_ierrorator::future<> {
+    delta_stats.num_rd++;
+    delta_stats.num_rd_kb += shift_round_up(osd_op.op.xattr.value_len, 10);
+
     int result = 0;
     auto bp = osd_op.indata.cbegin();
     bp += osd_op.op.xattr.name_len;
@@ -992,13 +1228,22 @@ PGBackend::cmp_xattr_ierrorator::future<> PGBackend::cmp_xattr(
     }
     if (result == 0) {
       logger().info("cmp_xattr: comparison returned false");
-      osd_op.rval = -ECANCELED;
+      return crimson::ct_error::ecanceled::make();
+    } else if (result == -EINVAL) {
+      return crimson::ct_error::invarg::make();
     } else {
-      osd_op.rval = result;
+      osd_op.rval = 1;
+      return cmp_xattr_ierrorator::now();
     }
-    delta_stats.num_rd++;
-    delta_stats.num_rd_kb += shift_round_up(osd_op.op.xattr.value_len, 10);
-  });
+  }).handle_error_interruptible(
+    crimson::ct_error::enodata::handle([&delta_stats, &osd_op] ()
+      ->cmp_xattr_errorator::future<> {
+      delta_stats.num_rd++;
+      delta_stats.num_rd_kb += shift_round_up(osd_op.op.xattr.value_len, 10);
+      return crimson::ct_error::ecanceled::make();
+    }),
+    cmp_xattr_errorator::pass_further{}
+  );
 }
 
 PGBackend::rm_xattr_iertr::future<>
@@ -1007,9 +1252,6 @@ PGBackend::rm_xattr(
   const OSDOp& osd_op,
   ceph::os::Transaction& txn)
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
   if (!os.exists || os.oi.is_whiteout()) {
     logger().debug("{}: {} DNE", __func__, os.oi.soid);
     return crimson::ct_error::enoent::make();
@@ -1021,8 +1263,24 @@ PGBackend::rm_xattr(
   return rm_xattr_iertr::now();
 }
 
+void PGBackend::clone(
+  /* const */object_info_t& snap_oi,
+  const ObjectState& os,
+  const ObjectState& d_os,
+  ceph::os::Transaction& txn)
+{
+  // See OpsExecutor::execute_clone documentation
+  txn.clone(coll->get_cid(), ghobject_t{os.oi.soid}, ghobject_t{d_os.oi.soid});
+  {
+    ceph::bufferlist bv;
+    snap_oi.encode_no_oid(bv, CEPH_FEATURES_ALL);
+    txn.setattr(coll->get_cid(), ghobject_t{d_os.oi.soid}, OI_ATTR, bv);
+  }
+  txn.rmattr(coll->get_cid(), ghobject_t{d_os.oi.soid}, SS_ATTR);
+}
+
 using get_omap_ertr =
-  crimson::os::FuturizedStore::read_errorator::extend<
+  crimson::os::FuturizedStore::Shard::read_errorator::extend<
     crimson::ct_error::enodata>;
 using get_omap_iertr =
   ::crimson::interruptible::interruptible_errorator<
@@ -1030,9 +1288,9 @@ using get_omap_iertr =
     get_omap_ertr>;
 static
 get_omap_iertr::future<
-  crimson::os::FuturizedStore::omap_values_t>
+  crimson::os::FuturizedStore::Shard::omap_values_t>
 maybe_get_omap_vals_by_keys(
-  crimson::os::FuturizedStore* store,
+  crimson::os::FuturizedStore::Shard* store,
   const crimson::os::CollectionRef& coll,
   const object_info_t& oi,
   const std::set<std::string>& keys_to_get)
@@ -1046,9 +1304,9 @@ maybe_get_omap_vals_by_keys(
 
 static
 get_omap_iertr::future<
-  std::tuple<bool, crimson::os::FuturizedStore::omap_values_t>>
+  std::tuple<bool, crimson::os::FuturizedStore::Shard::omap_values_t>>
 maybe_get_omap_vals(
-  crimson::os::FuturizedStore* store,
+  crimson::os::FuturizedStore::Shard* store,
   const crimson::os::CollectionRef& coll,
   const object_info_t& oi,
   const std::string& start_after)
@@ -1065,7 +1323,13 @@ PGBackend::omap_get_header(
   const crimson::os::CollectionRef& c,
   const ghobject_t& oid) const
 {
-  return store->omap_get_header(c, oid);
+  return store->omap_get_header(c, oid)
+    .handle_error(
+      crimson::ct_error::enodata::handle([] {
+       return seastar::make_ready_future<bufferlist>();
+      }),
+      ll_read_errorator::pass_further{}
+    );
 }
 
 PGBackend::ll_read_ierrorator::future<>
@@ -1089,9 +1353,6 @@ PGBackend::omap_get_keys(
   OSDOp& osd_op,
   object_stat_sum_t& delta_stats) const
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
   if (!os.exists || os.oi.is_whiteout()) {
     logger().debug("{}: object does not exist: {}", os.oi.soid);
     return crimson::ct_error::enoent::make();
@@ -1136,22 +1397,88 @@ PGBackend::omap_get_keys(
        bool truncated = false;
        encode(num, osd_op.outdata);
        encode(truncated, osd_op.outdata);
+       osd_op.rval = 0;
        return seastar::now();
       }),
       ll_read_errorator::pass_further{}
     );
 }
+static
+PGBackend::omap_cmp_ertr::future<> do_omap_val_cmp(
+  std::map<std::string, bufferlist, std::less<>> out,
+  std::map<std::string, std::pair<bufferlist, int>> assertions)
+{
+  bufferlist empty;
+  for (const auto &[akey, avalue] : assertions) {
+    const auto [abl, aflag] = avalue;
+    auto out_entry = out.find(akey);
+    bufferlist &bl = (out_entry != out.end()) ? out_entry->second : empty;
+    switch (aflag) {
+      case CEPH_OSD_CMPXATTR_OP_EQ:
+        if (!(bl == abl)) {
+          return crimson::ct_error::ecanceled::make();
+        }
+      break;
+      case CEPH_OSD_CMPXATTR_OP_LT:
+        if (!(bl < abl)) {
+          return crimson::ct_error::ecanceled::make();
+        }
+      break;
+      case CEPH_OSD_CMPXATTR_OP_GT:
+        if (!(bl > abl)) {
+          return crimson::ct_error::ecanceled::make();
+        }
+      break;
+      default:
+        return crimson::ct_error::invarg::make();
+    }
+  }
+  return PGBackend::omap_cmp_ertr::now();
+}
+PGBackend::omap_cmp_iertr::future<>
+PGBackend::omap_cmp(
+  const ObjectState& os,
+  OSDOp& osd_op,
+  object_stat_sum_t& delta_stats) const
+{
+  if (!os.exists || os.oi.is_whiteout()) {
+    logger().debug("{}: object does not exist: {}", os.oi.soid);
+    return crimson::ct_error::enoent::make();
+  }
 
+  auto bp = osd_op.indata.cbegin();
+  std::map<std::string, std::pair<bufferlist, int> > assertions;
+  try {
+    decode(assertions, bp);
+  } catch (buffer::error&) {
+    return crimson::ct_error::invarg::make();
+  }
+
+  delta_stats.num_rd++;
+  if (os.oi.is_omap()) {
+    std::set<std::string> to_get;
+    for (auto &i: assertions) {
+      to_get.insert(i.first);
+    }
+    return store->omap_get_values(coll, ghobject_t{os.oi.soid}, to_get)
+      .safe_then([=, &osd_op] (auto&& out) -> omap_cmp_iertr::future<> {
+      osd_op.rval = 0;
+      return  do_omap_val_cmp(out, assertions);
+    });
+  } else {
+    return crimson::ct_error::ecanceled::make();
+  }
+}
 PGBackend::ll_read_ierrorator::future<>
 PGBackend::omap_get_vals(
   const ObjectState& os,
   OSDOp& osd_op,
   object_stat_sum_t& delta_stats) const
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
+  if (!os.exists || os.oi.is_whiteout()) {
+    logger().debug("{}: object does not exist: {}", os.oi.soid);
+    return crimson::ct_error::enoent::make();
   }
-
   std::string start_after;
   uint64_t max_return;
   std::string filter_prefix;
@@ -1201,6 +1528,7 @@ PGBackend::omap_get_vals(
       crimson::ct_error::enodata::handle([&osd_op] {
         encode(uint32_t{0} /* num */, osd_op.outdata);
         encode(bool{false} /* truncated */, osd_op.outdata);
+        osd_op.rval = 0;
         return ll_read_errorator::now();
       }),
       ll_read_errorator::pass_further{}
@@ -1213,11 +1541,8 @@ PGBackend::omap_get_vals_by_keys(
   OSDOp& osd_op,
   object_stat_sum_t& delta_stats) const
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
   if (!os.exists || os.oi.is_whiteout()) {
-    logger().debug("{}: object does not exist: {}", os.oi.soid);
+    logger().debug("{}: object does not exist: {}", __func__, os.oi.soid);
     return crimson::ct_error::enoent::make();
   }
 
@@ -1232,13 +1557,14 @@ PGBackend::omap_get_vals_by_keys(
   delta_stats.num_rd++;
   return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get)
   .safe_then_interruptible(
-    [&osd_op] (crimson::os::FuturizedStore::omap_values_t&& vals) {
+    [&osd_op] (crimson::os::FuturizedStore::Shard::omap_values_t&& vals) {
       encode(vals, osd_op.outdata);
       return ll_read_errorator::now();
     }).handle_error_interruptible(
       crimson::ct_error::enodata::handle([&osd_op] {
         uint32_t num = 0;
         encode(num, osd_op.outdata);
+        osd_op.rval = 0;
         return ll_read_errorator::now();
       }),
       ll_read_errorator::pass_further{}
@@ -1337,9 +1663,6 @@ PGBackend::omap_clear(
   osd_op_params_t& osd_op_params,
   object_stat_sum_t& delta_stats)
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
   if (!os.exists || os.oi.is_whiteout()) {
     logger().debug("{}: object does not exist: {}", os.oi.soid);
     return crimson::ct_error::enoent::make();
@@ -1363,7 +1686,7 @@ PGBackend::stat(
   return store->stat(c, oid);
 }
 
-PGBackend::interruptible_future<std::map<uint64_t, uint64_t>>
+PGBackend::read_errorator::future<std::map<uint64_t, uint64_t>>
 PGBackend::fiemap(
   CollectionRef c,
   const ghobject_t& oid,
@@ -1373,7 +1696,110 @@ PGBackend::fiemap(
   return store->fiemap(c, oid, off, len);
 }
 
-void PGBackend::on_activate_complete() {
-  peering.reset();
+PGBackend::write_iertr::future<> PGBackend::tmapput(
+  ObjectState& os,
+  const OSDOp& osd_op,
+  ceph::os::Transaction& txn,
+  object_stat_sum_t& delta_stats,
+  osd_op_params_t& osd_op_params)
+{
+  logger().debug("PGBackend::tmapput: {}", os.oi.soid);
+  auto ret = crimson::common::do_tmap_put(osd_op.indata.cbegin());
+  if (!ret.has_value()) {
+    logger().debug("PGBackend::tmapup: {}, ret={}", os.oi.soid, ret.error());
+    ceph_assert(ret.error() == -EINVAL);
+    return crimson::ct_error::invarg::make();
+  } else {
+    auto bl = std::move(ret.value());
+    return _writefull(
+      os,
+      bl.length(),
+      std::move(bl),
+      txn,
+      osd_op_params,
+      delta_stats,
+      0);
+  }
+}
+
+PGBackend::tmapup_iertr::future<> PGBackend::tmapup(
+  ObjectState& os,
+  const OSDOp& osd_op,
+  ceph::os::Transaction& txn,
+  object_stat_sum_t& delta_stats,
+  osd_op_params_t& osd_op_params)
+{
+  logger().debug("PGBackend::tmapup: {}", os.oi.soid);
+  return PGBackend::write_iertr::now(
+  ).si_then([this, &os] {
+    return _read(os.oi.soid, 0, os.oi.size, 0);
+  }).handle_error_interruptible(
+    crimson::ct_error::enoent::handle([](auto &) {
+      return seastar::make_ready_future<bufferlist>();
+    }),
+    PGBackend::write_iertr::pass_further{},
+    crimson::ct_error::assert_all{"read error in mutate_object_contents"}
+  ).si_then([this, &os, &osd_op, &txn,
+            &delta_stats, &osd_op_params]
+           (auto &&bl) mutable -> PGBackend::tmapup_iertr::future<> {
+    auto result = crimson::common::do_tmap_up(
+      osd_op.indata.cbegin(),
+      std::move(bl));
+    if (!result.has_value()) {
+      int ret = result.error();
+      logger().debug("PGBackend::tmapup: {}, ret={}", os.oi.soid, ret);
+      switch (ret) {
+      case -EEXIST:
+       return crimson::ct_error::eexist::make();
+      case -ENOENT:
+       return crimson::ct_error::enoent::make();
+      case -EINVAL:
+       return crimson::ct_error::invarg::make();
+      default:
+       ceph_assert(0 == "impossible error");
+       return crimson::ct_error::invarg::make();
+      }
+    }
+
+    logger().debug(
+      "PGBackend::tmapup: {}, result.value.length()={}, ret=0",
+      os.oi.soid, result.value().length());
+    return _writefull(
+      os,
+      result.value().length(),
+      result.value(),
+      txn,
+      osd_op_params,
+      delta_stats,
+      0);
+  });
+}
+
+PGBackend::read_ierrorator::future<> PGBackend::tmapget(
+  const ObjectState& os,
+  OSDOp& osd_op,
+  object_stat_sum_t& delta_stats)
+{
+  logger().debug("PGBackend::tmapget: {}", os.oi.soid);
+  const auto& oi = os.oi;
+  logger().debug("PGBackend::tmapget: read {} 0~{}", oi.soid, oi.size);
+  if (!os.exists || os.oi.is_whiteout()) {
+    logger().debug("PGBackend::tmapget: {} DNE", os.oi.soid);
+    return crimson::ct_error::enoent::make();
+  }
+
+  return _read(oi.soid, 0, oi.size, 0).safe_then_interruptible_tuple(
+    [&delta_stats, &osd_op](auto&& bl) -> read_errorator::future<> {
+      logger().debug("PGBackend::tmapget: data length: {}", bl.length());
+      osd_op.op.extent.length = bl.length();
+      osd_op.rval = 0;
+      delta_stats.num_rd++;
+      delta_stats.num_rd_kb += shift_round_up(bl.length(), 10);
+      osd_op.outdata = std::move(bl);
+      return read_errorator::now();
+    }, crimson::ct_error::input_output_error::handle([] {
+      return read_errorator::future<>{crimson::ct_error::object_corrupted::make()};
+    }),
+    read_errorator::pass_further{});
 }