]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/ops_executer.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / osd / ops_executer.cc
index 17388d119a069951133a61ebb7908f644df56146..040870203bd950ea0d4785a0eeb127a6224d0c09 100644 (file)
@@ -19,6 +19,7 @@
 #include "crimson/osd/pg.h"
 #include "crimson/osd/watch.h"
 #include "osd/ClassHandler.h"
+#include "osd/SnapMapper.h"
 
 namespace {
   seastar::logger& logger() {
@@ -137,7 +138,8 @@ OpsExecuter::call_ierrorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
 }
 
 static watch_info_t create_watch_info(const OSDOp& osd_op,
-                                      const OpsExecuter::ExecutableMessage& msg)
+                                      const OpsExecuter::ExecutableMessage& msg,
+                                      entity_addr_t peer_addr)
 {
   using crimson::common::local_conf;
   const uint32_t timeout =
@@ -146,7 +148,7 @@ static watch_info_t create_watch_info(const OSDOp& osd_op,
   return {
     osd_op.op.watch.cookie,
     timeout,
-    msg.get_connection()->get_peer_addr()
+    peer_addr
   };
 }
 
@@ -161,14 +163,19 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch(
     crimson::net::ConnectionRef conn;
     watch_info_t info;
 
-    connect_ctx_t(const OSDOp& osd_op, const ExecutableMessage& msg)
+    connect_ctx_t(
+      const OSDOp& osd_op,
+      const ExecutableMessage& msg,
+      crimson::net::ConnectionRef conn)
       : key(osd_op.op.watch.cookie, msg.get_reqid().name),
-        conn(msg.get_connection()),
-        info(create_watch_info(osd_op, msg)) {
+        conn(conn),
+        info(create_watch_info(osd_op, msg, conn->get_peer_addr())) {
     }
   };
-  return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() },
-    [&] (auto& ctx) {
+
+  return with_effect_on_obc(
+    connect_ctx_t{ osd_op, get_message(), conn },
+    [&](auto& ctx) {
       const auto& entity = ctx.key.second;
       auto [it, emplaced] =
         os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
@@ -180,7 +187,7 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch(
       }
       return seastar::now();
     },
-    [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
+    [](auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
       assert(pg);
       auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
       if (emplaced) {
@@ -192,7 +199,8 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch(
         logger().info("op_effect: found existing watcher: {}", ctx.key);
       }
       return it->second->connect(std::move(ctx.conn), true /* will_ping */);
-    });
+    }
+  );
 }
 
 OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
@@ -320,14 +328,16 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify(
     const uint64_t client_gid;
     const epoch_t epoch;
 
-    notify_ctx_t(const ExecutableMessage& msg)
-      : conn(msg.get_connection()),
+    notify_ctx_t(const ExecutableMessage& msg,
+                 crimson::net::ConnectionRef conn)
+      : conn(conn),
         client_gid(msg.get_reqid().name.num()),
         epoch(msg.get_map_epoch()) {
     }
   };
-  return with_effect_on_obc(notify_ctx_t{ get_message() },
-    [&] (auto& ctx) {
+  return with_effect_on_obc(
+    notify_ctx_t{ get_message(), conn },
+    [&](auto& ctx) {
       try {
         auto bp = osd_op.indata.cbegin();
         uint32_t ver; // obsolete
@@ -347,13 +357,13 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify(
       ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
       return seastar::now();
     },
-    [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
+    [](auto&& ctx, ObjectContextRef obc, Ref<PG>) {
       auto alive_watchers = obc->watchers | boost::adaptors::map_values
-                                          | boost::adaptors::filtered(
-        [] (const auto& w) {
-          // FIXME: filter as for the `is_ping` in `Watch::start_notify`
-          return w->is_alive();
-        });
+        | boost::adaptors::filtered(
+          [] (const auto& w) {
+            // FIXME: filter as for the `is_ping` in `Watch::start_notify`
+            return w->is_alive();
+          });
       return crimson::osd::Notify::create_n_propagate(
         std::begin(alive_watchers),
         std::end(alive_watchers),
@@ -361,7 +371,8 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify(
         ctx.ninfo,
         ctx.client_gid,
         obc->obs.oi.user_version);
-  });
+    }
+  );
 }
 
 OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_list_watchers(
@@ -378,8 +389,8 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_list_watchers(
     assert(key.second.is_client());
     response.entries.emplace_back(watch_item_t{
       key.second, info.cookie, info.timeout_seconds, info.addr});
-    response.encode(osd_op.outdata, get_message().get_features());
   }
+  response.encode(osd_op.outdata, get_message().get_features());
   return watch_ierrorator::now();
 }
 
@@ -451,17 +462,120 @@ auto OpsExecuter::do_const_op(Func&& f) {
 
 // Defined here because there is a circular dependency between OpsExecuter and PG
 template <class Func>
-auto OpsExecuter::do_write_op(Func&& f, bool um) {
+auto OpsExecuter::do_write_op(Func&& f, OpsExecuter::modified_by m) {
   ++num_write;
   if (!osd_op_params) {
     osd_op_params.emplace();
+    fill_op_params_bump_pg_version();
   }
-  user_modify = um;
+  user_modify = (m == modified_by::user);
   return std::forward<Func>(f)(pg->get_backend(), obc->obs, txn);
 }
+OpsExecuter::call_errorator::future<> OpsExecuter::do_assert_ver(
+  OSDOp& osd_op,
+  const ObjectState& os)
+{
+  if (!osd_op.op.assert_ver.ver) {
+    return crimson::ct_error::invarg::make();
+  } else if (osd_op.op.assert_ver.ver < os.oi.user_version) {
+    return crimson::ct_error::erange::make();
+  } else if (osd_op.op.assert_ver.ver > os.oi.user_version) {
+    return crimson::ct_error::value_too_large::make();
+  }
+  return seastar::now();
+}
+
+OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
+  OSDOp& osd_op,
+  const ObjectState& os,
+  const SnapSet& ss)
+{
+  obj_list_snap_response_t resp;
+  resp.clones.reserve(ss.clones.size() + 1);
+  for (auto &clone: ss.clones) {
+    clone_info ci;
+    ci.cloneid = clone;
+
+    {
+      auto p = ss.clone_snaps.find(clone);
+      if (p == ss.clone_snaps.end()) {
+       logger().error(
+         "OpsExecutor::do_list_snaps: {} has inconsistent "
+         "clone_snaps, missing clone {}",
+         os.oi.soid,
+         clone);
+       return crimson::ct_error::invarg::make();
+      }
+      ci.snaps.reserve(p->second.size());
+      ci.snaps.insert(ci.snaps.end(), p->second.rbegin(), p->second.rend());
+    }
+
+    {
+      auto p = ss.clone_overlap.find(clone);
+      if (p == ss.clone_overlap.end()) {
+       logger().error(
+         "OpsExecutor::do_list_snaps: {} has inconsistent "
+         "clone_overlap, missing clone {}",
+         os.oi.soid,
+         clone);
+       return crimson::ct_error::invarg::make();
+      }
+      ci.overlap.reserve(p->second.num_intervals());
+      ci.overlap.insert(ci.overlap.end(), p->second.begin(), p->second.end());
+    }
+
+    {
+      auto p = ss.clone_size.find(clone);
+      if (p == ss.clone_size.end()) {
+       logger().error(
+         "OpsExecutor::do_list_snaps: {} has inconsistent "
+         "clone_size, missing clone {}",
+         os.oi.soid,
+         clone);
+       return crimson::ct_error::invarg::make();
+      }
+      ci.size = p->second;
+    }
+    resp.clones.push_back(std::move(ci));
+  }
+
+  if (!os.oi.is_whiteout()) {
+    clone_info ci;
+    ci.cloneid = CEPH_NOSNAP;
+    ci.size = os.oi.size;
+    resp.clones.push_back(std::move(ci));
+  }
+  resp.seq = ss.seq;
+  logger().error(
+    "OpsExecutor::do_list_snaps: {}, resp.clones.size(): {}",
+    os.oi.soid,
+    resp.clones.size());
+  resp.encode(osd_op.outdata);
+  return read_ierrorator::now();
+}
 
 OpsExecuter::interruptible_errorated_future<OpsExecuter::osd_op_errorator>
 OpsExecuter::execute_op(OSDOp& osd_op)
+{
+  return do_execute_op(osd_op).handle_error_interruptible(
+    osd_op_errorator::all_same_way([&osd_op](auto e, auto&& e_raw)
+      -> OpsExecuter::osd_op_errorator::future<> {
+        // All ops except for CMPEXT should have rval set to -e.value(),
+        // CMPEXT sets rval itself and shouldn't be overridden.
+        if (e.value() != ct_error::cmp_fail_error_value) {
+          osd_op.rval = -e.value();
+        }
+        if ((osd_op.op.flags & CEPH_OSD_OP_FLAG_FAILOK) &&
+         e.value() != EAGAIN && e.value() != EINPROGRESS) {
+          return osd_op_errorator::now();
+        } else {
+          return std::move(e_raw);
+       }
+      }));
+}
+
+OpsExecuter::interruptible_errorated_future<OpsExecuter::osd_op_errorator>
+OpsExecuter::do_execute_op(OSDOp& osd_op)
 {
   // TODO: dispatch via call table?
   // TODO: we might want to find a way to unify both input and output
@@ -474,78 +588,95 @@ OpsExecuter::execute_op(OSDOp& osd_op)
   case CEPH_OSD_OP_SYNC_READ:
     [[fallthrough]];
   case CEPH_OSD_OP_READ:
-    return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+    return do_read_op([this, &osd_op](auto& backend, const auto& os) {
       return backend.read(os, osd_op, delta_stats);
     });
   case CEPH_OSD_OP_SPARSE_READ:
-    return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+    return do_read_op([this, &osd_op](auto& backend, const auto& os) {
       return backend.sparse_read(os, osd_op, delta_stats);
     });
   case CEPH_OSD_OP_CHECKSUM:
-    return do_read_op([&osd_op] (auto& backend, const auto& os) {
+    return do_read_op([&osd_op](auto& backend, const auto& os) {
       return backend.checksum(os, osd_op);
     });
   case CEPH_OSD_OP_CMPEXT:
-    return do_read_op([&osd_op] (auto& backend, const auto& os) {
+    return do_read_op([&osd_op](auto& backend, const auto& os) {
       return backend.cmp_ext(os, osd_op);
     });
   case CEPH_OSD_OP_GETXATTR:
-    return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+    return do_read_op([this, &osd_op](auto& backend, const auto& os) {
       return backend.getxattr(os, osd_op, delta_stats);
     });
   case CEPH_OSD_OP_GETXATTRS:
-    return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+    return do_read_op([this, &osd_op](auto& backend, const auto& os) {
       return backend.get_xattrs(os, osd_op, delta_stats);
     });
   case CEPH_OSD_OP_CMPXATTR:
-    return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+    return do_read_op([this, &osd_op](auto& backend, const auto& os) {
       return backend.cmp_xattr(os, osd_op, delta_stats);
     });
   case CEPH_OSD_OP_RMXATTR:
-    return do_write_op(
-      [&osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([&osd_op](auto& backend, auto& os, auto& txn) {
       return backend.rm_xattr(os, osd_op, txn);
-    }, true);
+    });
   case CEPH_OSD_OP_CREATE:
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.create(os, osd_op, txn, delta_stats);
-    }, true);
+    });
   case CEPH_OSD_OP_WRITE:
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.write(os, osd_op, txn, *osd_op_params, delta_stats);
-    }, true);
+    });
   case CEPH_OSD_OP_WRITESAME:
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.write_same(os, osd_op, txn, *osd_op_params, delta_stats);
-    }, true);
+    });
   case CEPH_OSD_OP_WRITEFULL:
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.writefull(os, osd_op, txn, *osd_op_params, delta_stats);
-    }, true);
+    });
+  case CEPH_OSD_OP_ROLLBACK:
+    return do_write_op([this, &head=obc,
+                        &osd_op](auto& backend, auto& os, auto& txn) {
+      return backend.rollback(os, osd_op, txn, *osd_op_params, delta_stats,
+                              head, pg->obc_loader);
+    });
   case CEPH_OSD_OP_APPEND:
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.append(os, osd_op, txn, *osd_op_params, delta_stats);
-    }, true);
+    });
   case CEPH_OSD_OP_TRUNCATE:
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       // FIXME: rework needed. Move this out to do_write_op(), introduce
       // do_write_op_no_user_modify()...
       return backend.truncate(os, osd_op, txn, *osd_op_params, delta_stats);
-    }, true);
+    });
   case CEPH_OSD_OP_ZERO:
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.zero(os, osd_op, txn, *osd_op_params, delta_stats);
-    }, true);
+    });
   case CEPH_OSD_OP_SETALLOCHINT:
-    return osd_op_errorator::now();
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
+      return backend.set_allochint(os, osd_op, txn, delta_stats);
+    });
   case CEPH_OSD_OP_SETXATTR:
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.setxattr(os, osd_op, txn, delta_stats);
-    }, true);
+    });
   case CEPH_OSD_OP_DELETE:
-    return do_write_op([this] (auto& backend, auto& os, auto& txn) {
-      return backend.remove(os, txn, delta_stats);
-    }, true);
+  {
+    bool whiteout = false;
+    if (!obc->ssc->snapset.clones.empty() ||
+        (snapc.snaps.size() &&                      // there are snaps
+        snapc.snaps[0] > obc->ssc->snapset.seq)) {  // existing obj is old
+      logger().debug("{} has or will have clones, will whiteout {}",
+                     __func__, obc->obs.oi.soid);
+      whiteout = true;
+    }
+    return do_write_op([this, whiteout](auto& backend, auto& os, auto& txn) {
+      return backend.remove(os, txn, delta_stats, whiteout);
+    });
+  }
   case CEPH_OSD_OP_CALL:
     return this->do_op_call(osd_op);
   case CEPH_OSD_OP_STAT:
@@ -553,87 +684,107 @@ OpsExecuter::execute_op(OSDOp& osd_op)
     return do_const_op([this, &osd_op] (/* const */auto& backend, const auto& os) {
       return backend.stat(os, osd_op, delta_stats);
     });
+
+  case CEPH_OSD_OP_TMAPPUT:
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
+      return backend.tmapput(os, osd_op, txn, delta_stats, *osd_op_params);
+    });
   case CEPH_OSD_OP_TMAPUP:
-    // TODO: there was an effort to kill TMAP in ceph-osd. According to
-    // @dzafman this isn't possible yet. Maybe it could be accomplished
-    // before crimson's readiness and we'd luckily don't need to carry.
-    return dont_do_legacy_op();
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto &txn) {
+      return backend.tmapup(os, osd_op, txn, delta_stats, *osd_op_params);
+    });
+  case CEPH_OSD_OP_TMAPGET:
+    return do_read_op([this, &osd_op](auto& backend, const auto& os) {
+      return backend.tmapget(os, osd_op, delta_stats);
+    });
 
   // OMAP
   case CEPH_OSD_OP_OMAPGETKEYS:
-    return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+    return do_read_op([this, &osd_op](auto& backend, const auto& os) {
       return backend.omap_get_keys(os, osd_op, delta_stats);
     });
   case CEPH_OSD_OP_OMAPGETVALS:
-    return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+    return do_read_op([this, &osd_op](auto& backend, const auto& os) {
       return backend.omap_get_vals(os, osd_op, delta_stats);
     });
+  case CEPH_OSD_OP_OMAP_CMP:
+    return  do_read_op([this, &osd_op](auto& backend, const auto& os) {
+      return backend.omap_cmp(os, osd_op, delta_stats);
+    });
   case CEPH_OSD_OP_OMAPGETHEADER:
-    return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+    return do_read_op([this, &osd_op](auto& backend, const auto& os) {
       return backend.omap_get_header(os, osd_op, delta_stats);
     });
   case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
-    return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+    return do_read_op([this, &osd_op](auto& backend, const auto& os) {
       return backend.omap_get_vals_by_keys(os, osd_op, delta_stats);
     });
   case CEPH_OSD_OP_OMAPSETVALS:
 #if 0
-    if (!pg.get_pool().info.supports_omap()) {
+    if (!pg.get_pgpool().info.supports_omap()) {
       return crimson::ct_error::operation_not_supported::make();
     }
 #endif
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.omap_set_vals(os, osd_op, txn, *osd_op_params, delta_stats);
-    }, true);
+    });
   case CEPH_OSD_OP_OMAPSETHEADER:
 #if 0
-    if (!pg.get_pool().info.supports_omap()) {
+    if (!pg.get_pgpool().info.supports_omap()) {
       return crimson::ct_error::operation_not_supported::make();
     }
 #endif
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.omap_set_header(os, osd_op, txn, *osd_op_params,
         delta_stats);
-    }, true);
+    });
   case CEPH_OSD_OP_OMAPRMKEYRANGE:
 #if 0
-    if (!pg.get_pool().info.supports_omap()) {
+    if (!pg.get_pgpool().info.supports_omap()) {
       return crimson::ct_error::operation_not_supported::make();
     }
 #endif
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.omap_remove_range(os, osd_op, txn, delta_stats);
-    }, true);
+    });
   case CEPH_OSD_OP_OMAPRMKEYS:
     /** TODO: Implement supports_omap()
-    if (!pg.get_pool().info.supports_omap()) {
+    if (!pg.get_pgpool().info.supports_omap()) {
       return crimson::ct_error::operation_not_supported::make();
     }*/
-    return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([&osd_op](auto& backend, auto& os, auto& txn) {
       return backend.omap_remove_key(os, osd_op, txn);
-    }, true);
+    });
   case CEPH_OSD_OP_OMAPCLEAR:
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return backend.omap_clear(os, osd_op, txn, *osd_op_params, delta_stats);
-    }, true);
+    });
 
   // watch/notify
   case CEPH_OSD_OP_WATCH:
-    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+    return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
       return do_op_watch(osd_op, os, txn);
-    }, false);
+    }, modified_by::sys);
   case CEPH_OSD_OP_LIST_WATCHERS:
-    return do_read_op([this, &osd_op] (auto&, const auto& os) {
+    return do_read_op([this, &osd_op](auto&, const auto& os) {
       return do_op_list_watchers(osd_op, os);
     });
   case CEPH_OSD_OP_NOTIFY:
-    return do_read_op([this, &osd_op] (auto&, const auto& os) {
+    return do_read_op([this, &osd_op](auto&, const auto& os) {
       return do_op_notify(osd_op, os);
     });
   case CEPH_OSD_OP_NOTIFY_ACK:
-    return do_read_op([this, &osd_op] (auto&, const auto& os) {
+    return do_read_op([this, &osd_op](auto&, const auto& os) {
       return do_op_notify_ack(osd_op, os);
     });
+  case CEPH_OSD_OP_ASSERT_VER:
+    return do_read_op([this, &osd_op](auto&, const auto& os) {
+      return do_assert_ver(osd_op, os);
+    });
+  case CEPH_OSD_OP_LIST_SNAPS:
+    return do_snapset_op([this, &osd_op](const auto &os, const auto &ss) {
+      return do_list_snaps(osd_op, os, ss);
+    });
 
   default:
     logger().warn("unknown op {}", ceph_osd_op_name(op.op));
@@ -642,9 +793,97 @@ OpsExecuter::execute_op(OSDOp& osd_op)
   }
 }
 
+void OpsExecuter::fill_op_params_bump_pg_version()
+{
+  osd_op_params->req_id = msg->get_reqid();
+  osd_op_params->mtime = msg->get_mtime();
+  osd_op_params->at_version = pg->next_version();
+  osd_op_params->pg_trim_to = pg->get_pg_trim_to();
+  osd_op_params->min_last_complete_ondisk = pg->get_min_last_complete_ondisk();
+  osd_op_params->last_complete = pg->get_info().last_complete;
+}
+
+std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction(
+  const std::vector<OSDOp>& ops)
+{
+  // let's ensure we don't need to inform SnapMapper about this particular
+  // entry.
+  assert(obc->obs.oi.soid.snap >= CEPH_MAXSNAP);
+  std::vector<pg_log_entry_t> log_entries;
+  log_entries.emplace_back(
+    obc->obs.exists ?
+      pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
+    obc->obs.oi.soid,
+    osd_op_params->at_version,
+    obc->obs.oi.version,
+    osd_op_params->user_modify ? osd_op_params->at_version.version : 0,
+    osd_op_params->req_id,
+    osd_op_params->mtime,
+    op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
+  if (op_info.allows_returnvec()) {
+    // also the per-op values are recorded in the pg log
+    log_entries.back().set_op_returns(ops);
+    logger().debug("{} op_returns: {}",
+                   __func__, log_entries.back().op_returns);
+  }
+  log_entries.back().clean_regions = std::move(osd_op_params->clean_regions);
+  return log_entries;
+}
+
+OpsExecuter::interruptible_future<> OpsExecuter::snap_map_remove(
+  const hobject_t& soid,
+  SnapMapper& snap_mapper,
+  OSDriver& osdriver,
+  ceph::os::Transaction& txn)
+{
+  logger().debug("{}: soid {}", __func__, soid);
+  return interruptor::async([soid, &snap_mapper,
+                             _t=osdriver.get_transaction(&txn)]() mutable {
+    const auto r = snap_mapper.remove_oid(soid, &_t);
+    if (r) {
+      logger().error("{}: remove_oid {} failed with {}",
+                     __func__, soid, r);
+    }
+    // On removal tolerate missing key corruption
+    assert(r == 0 || r == -ENOENT);
+  });
+}
+
+OpsExecuter::interruptible_future<> OpsExecuter::snap_map_modify(
+  const hobject_t& soid,
+  const std::set<snapid_t>& snaps,
+  SnapMapper& snap_mapper,
+  OSDriver& osdriver,
+  ceph::os::Transaction& txn)
+{
+  logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
+  return interruptor::async([soid, snaps, &snap_mapper,
+                             _t=osdriver.get_transaction(&txn)]() mutable {
+    assert(std::size(snaps) > 0);
+    [[maybe_unused]] const auto r = snap_mapper.update_snaps(
+      soid, snaps, 0, &_t);
+    assert(r == 0);
+  });
+}
+
+OpsExecuter::interruptible_future<> OpsExecuter::snap_map_clone(
+  const hobject_t& soid,
+  const std::set<snapid_t>& snaps,
+  SnapMapper& snap_mapper,
+  OSDriver& osdriver,
+  ceph::os::Transaction& txn)
+{
+  logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
+  return interruptor::async([soid, snaps, &snap_mapper,
+                             _t=osdriver.get_transaction(&txn)]() mutable {
+    assert(std::size(snaps) > 0);
+    snap_mapper.add_oid(soid, snaps, &_t);
+  });
+}
+
 // Defined here because there is a circular dependency between OpsExecuter and PG
 uint32_t OpsExecuter::get_pool_stripe_width() const {
-  return pg->get_pool().info.get_stripe_width();
+  return pg->get_pgpool().info.get_stripe_width();
 }
 
 // Defined here because there is a circular dependency between OpsExecuter and PG
@@ -653,6 +892,179 @@ version_t OpsExecuter::get_last_user_version() const
   return pg->get_last_user_version();
 }
 
+std::unique_ptr<OpsExecuter::CloningContext> OpsExecuter::execute_clone(
+  const SnapContext& snapc,
+  const ObjectState& initial_obs,
+  const SnapSet& initial_snapset,
+  PGBackend& backend,
+  ceph::os::Transaction& txn)
+{
+  const hobject_t& soid = initial_obs.oi.soid;
+  logger().debug("{} {} snapset={} snapc={}",
+                 __func__, soid,
+                 initial_snapset, snapc);
+
+  auto cloning_ctx = std::make_unique<CloningContext>();
+  cloning_ctx->new_snapset = initial_snapset;
+
+  // clone object, the snap field is set to the seq of the SnapContext
+  // at its creation.
+  hobject_t coid = soid;
+  coid.snap = snapc.seq;
+
+  // existing snaps are stored in descending order in snapc,
+  // cloned_snaps vector will hold all the snaps stored until snapset.seq
+  const std::vector<snapid_t> cloned_snaps = [&] {
+    auto last = std::find_if(
+      std::begin(snapc.snaps), std::end(snapc.snaps),
+      [&](snapid_t snap_id) { return snap_id <= initial_snapset.seq; });
+    return std::vector<snapid_t>{std::begin(snapc.snaps), last};
+  }();
+
+  auto [snap_oi, clone_obc] = prepare_clone(coid);
+  // make clone
+  backend.clone(snap_oi, initial_obs, clone_obc->obs, txn);
+
+  delta_stats.num_objects++;
+  if (snap_oi.is_omap()) {
+    delta_stats.num_objects_omap++;
+  }
+  delta_stats.num_object_clones++;
+  // newsnapset is obc's ssc
+  cloning_ctx->new_snapset.clones.push_back(coid.snap);
+  cloning_ctx->new_snapset.clone_size[coid.snap] = initial_obs.oi.size;
+  cloning_ctx->new_snapset.clone_snaps[coid.snap] = cloned_snaps;
+
+  // clone_overlap should contain an entry for each clone
+  // (an empty interval_set if there is no overlap)
+  auto &overlap = cloning_ctx->new_snapset.clone_overlap[coid.snap];
+  if (initial_obs.oi.size) {
+    overlap.insert(0, initial_obs.oi.size);
+  }
+
+  // log clone
+  logger().debug("cloning v {} to {} v {} snaps={} snapset={}",
+                 initial_obs.oi.version, coid,
+                 osd_op_params->at_version, cloned_snaps, cloning_ctx->new_snapset);
+
+  cloning_ctx->log_entry = {
+    pg_log_entry_t::CLONE,
+    coid,
+    snap_oi.version,
+    initial_obs.oi.version,
+    initial_obs.oi.user_version,
+    osd_reqid_t(),
+    initial_obs.oi.mtime, // will be replaced in `apply_to()`
+    0
+  };
+  encode(cloned_snaps, cloning_ctx->log_entry.snaps);
+
+  // TODO: update most recent clone_overlap and usage stats
+  return cloning_ctx;
+}
+
+void OpsExecuter::CloningContext::apply_to(
+  std::vector<pg_log_entry_t>& log_entries,
+  ObjectContext& processed_obc) &&
+{
+  log_entry.mtime = processed_obc.obs.oi.mtime;
+  log_entries.emplace_back(std::move(log_entry));
+  processed_obc.ssc->snapset = std::move(new_snapset);
+}
+
+OpsExecuter::interruptible_future<std::vector<pg_log_entry_t>>
+OpsExecuter::flush_clone_metadata(
+  std::vector<pg_log_entry_t>&& log_entries,
+  SnapMapper& snap_mapper,
+  OSDriver& osdriver,
+  ceph::os::Transaction& txn)
+{
+  assert(!txn.empty());
+  auto maybe_snap_mapped = interruptor::now();
+  if (cloning_ctx) {
+    std::move(*cloning_ctx).apply_to(log_entries, *obc);
+    const auto& coid = log_entries.back().soid;
+    const auto& cloned_snaps = obc->ssc->snapset.clone_snaps[coid.snap];
+    maybe_snap_mapped = snap_map_clone(
+      coid,
+      std::set<snapid_t>{std::begin(cloned_snaps), std::end(cloned_snaps)},
+      snap_mapper,
+      osdriver,
+      txn);
+  }
+  if (snapc.seq > obc->ssc->snapset.seq) {
+     // update snapset with latest snap context
+     obc->ssc->snapset.seq = snapc.seq;
+     obc->ssc->snapset.snaps.clear();
+  }
+  logger().debug("{} done, initial snapset={}, new snapset={}",
+    __func__, obc->obs.oi.soid, obc->ssc->snapset);
+  return std::move(
+    maybe_snap_mapped
+  ).then_interruptible([log_entries=std::move(log_entries)]() mutable {
+    return interruptor::make_ready_future<std::vector<pg_log_entry_t>>(
+      std::move(log_entries));
+  });
+}
+
+// TODO: make this static
+std::pair<object_info_t, ObjectContextRef> OpsExecuter::prepare_clone(
+  const hobject_t& coid)
+{
+  object_info_t static_snap_oi(coid);
+  static_snap_oi.version = pg->next_version();
+  static_snap_oi.prior_version = obc->obs.oi.version;
+  static_snap_oi.copy_user_bits(obc->obs.oi);
+  if (static_snap_oi.is_whiteout()) {
+    // clone shouldn't be marked as whiteout
+    static_snap_oi.clear_flag(object_info_t::FLAG_WHITEOUT);
+  }
+
+  ObjectContextRef clone_obc;
+  if (pg->is_primary()) {
+    // lookup_or_create
+    auto [c_obc, existed] =
+      pg->obc_registry.get_cached_obc(std::move(coid));
+    assert(!existed);
+    c_obc->obs.oi = static_snap_oi;
+    c_obc->obs.exists = true;
+    c_obc->ssc = obc->ssc;
+    logger().debug("clone_obc: {}", c_obc->obs.oi);
+    clone_obc = std::move(c_obc);
+  }
+  return std::make_pair(std::move(static_snap_oi), std::move(clone_obc));
+}
+
+void OpsExecuter::apply_stats()
+{
+  pg->get_peering_state().apply_op_stats(get_target(), delta_stats);
+  pg->publish_stats_to_osd();
+}
+
+OpsExecuter::OpsExecuter(Ref<PG> pg,
+                         ObjectContextRef _obc,
+                         const OpInfo& op_info,
+                         abstracted_msg_t&& msg,
+                         crimson::net::ConnectionRef conn,
+                         const SnapContext& _snapc)
+  : pg(std::move(pg)),
+    obc(std::move(_obc)),
+    op_info(op_info),
+    msg(std::move(msg)),
+    conn(conn),
+    snapc(_snapc)
+{
+  if (op_info.may_write() && should_clone(*obc, snapc)) {
+    do_write_op([this](auto& backend, auto& os, auto& txn) {
+      cloning_ctx = execute_clone(std::as_const(snapc),
+                                  std::as_const(obc->obs),
+                                  std::as_const(obc->ssc->snapset),
+                                  backend,
+                                  txn);
+    });
+  }
+}
+
 static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
   const std::string& type,
   bufferlist::const_iterator& iter)
@@ -718,7 +1130,7 @@ static PG::interruptible_future<hobject_t> pgls_filter(
   if (const auto xattr = filter.get_xattr(); !xattr.empty()) {
     logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
                    xattr, sobj);
-    return backend.getxattr(sobj, xattr).safe_then_interruptible(
+    return backend.getxattr(sobj, std::move(xattr)).safe_then_interruptible(
       [&filter, sobj] (ceph::bufferlist val) {
         logger().debug("pgls_filter: got xvalue for obj={}", sobj);
 
@@ -819,8 +1231,8 @@ static PG::interruptible_future<ceph::bufferlist> do_pgnls_common(
       response.handle = next.is_max() ? pg_end : next;
       ceph::bufferlist out;
       encode(response, out);
-      logger().debug("{}: response.entries.size()=",
-                     __func__, response.entries.size());
+      logger().debug("do_pgnls_common: response.entries.size()= {}",
+                     response.entries.size());
       return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
   });
 }
@@ -838,7 +1250,7 @@ static PG::interruptible_future<> do_pgnls(
   }
   const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
   const auto pg_end = \
-    pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+    pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
   return do_pgnls_common(pg_start,
                          pg_end,
                          pg.get_backend(),
@@ -882,7 +1294,7 @@ static PG::interruptible_future<> do_pgnls_filtered(
   return seastar::do_with(std::move(filter),
     [&, lower_bound=std::move(lower_bound)](auto&& filter) {
       const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
-      const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+      const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
       return do_pgnls_common(pg_start,
                              pg_end,
                              pg.get_backend(),
@@ -967,7 +1379,7 @@ static PG::interruptible_future<> do_pgls(
   }
   const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
   const auto pg_end =
-    pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+    pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
   return do_pgls_common(pg_start,
                        pg_end,
                        pg.get_backend(),
@@ -1011,7 +1423,7 @@ static PG::interruptible_future<> do_pgls_filtered(
   return seastar::do_with(std::move(filter),
     [&, lower_bound=std::move(lower_bound)](auto&& filter) {
       const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
-      const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+      const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
       return do_pgls_common(pg_start,
                             pg_end,
                             pg.get_backend(),