#include "crimson/osd/pg.h"
#include "crimson/osd/watch.h"
#include "osd/ClassHandler.h"
+#include "osd/SnapMapper.h"
namespace {
seastar::logger& logger() {
}
static watch_info_t create_watch_info(const OSDOp& osd_op,
- const OpsExecuter::ExecutableMessage& msg)
+ const OpsExecuter::ExecutableMessage& msg,
+ entity_addr_t peer_addr)
{
using crimson::common::local_conf;
const uint32_t timeout =
return {
osd_op.op.watch.cookie,
timeout,
- msg.get_connection()->get_peer_addr()
+ peer_addr
};
}
crimson::net::ConnectionRef conn;
watch_info_t info;
- connect_ctx_t(const OSDOp& osd_op, const ExecutableMessage& msg)
+ connect_ctx_t(
+ const OSDOp& osd_op,
+ const ExecutableMessage& msg,
+ crimson::net::ConnectionRef conn)
: key(osd_op.op.watch.cookie, msg.get_reqid().name),
- conn(msg.get_connection()),
- info(create_watch_info(osd_op, msg)) {
+ conn(conn),
+ info(create_watch_info(osd_op, msg, conn->get_peer_addr())) {
}
};
- return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() },
- [&] (auto& ctx) {
+
+ return with_effect_on_obc(
+ connect_ctx_t{ osd_op, get_message(), conn },
+ [&](auto& ctx) {
const auto& entity = ctx.key.second;
auto [it, emplaced] =
os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
}
return seastar::now();
},
- [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
+ [](auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
assert(pg);
auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
if (emplaced) {
logger().info("op_effect: found existing watcher: {}", ctx.key);
}
return it->second->connect(std::move(ctx.conn), true /* will_ping */);
- });
+ }
+ );
}
OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
const uint64_t client_gid;
const epoch_t epoch;
- notify_ctx_t(const ExecutableMessage& msg)
- : conn(msg.get_connection()),
+ notify_ctx_t(const ExecutableMessage& msg,
+ crimson::net::ConnectionRef conn)
+ : conn(conn),
client_gid(msg.get_reqid().name.num()),
epoch(msg.get_map_epoch()) {
}
};
- return with_effect_on_obc(notify_ctx_t{ get_message() },
- [&] (auto& ctx) {
+ return with_effect_on_obc(
+ notify_ctx_t{ get_message(), conn },
+ [&](auto& ctx) {
try {
auto bp = osd_op.indata.cbegin();
uint32_t ver; // obsolete
ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
return seastar::now();
},
- [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
+ [](auto&& ctx, ObjectContextRef obc, Ref<PG>) {
auto alive_watchers = obc->watchers | boost::adaptors::map_values
- | boost::adaptors::filtered(
- [] (const auto& w) {
- // FIXME: filter as for the `is_ping` in `Watch::start_notify`
- return w->is_alive();
- });
+ | boost::adaptors::filtered(
+ [] (const auto& w) {
+ // FIXME: filter as for the `is_ping` in `Watch::start_notify`
+ return w->is_alive();
+ });
return crimson::osd::Notify::create_n_propagate(
std::begin(alive_watchers),
std::end(alive_watchers),
ctx.ninfo,
ctx.client_gid,
obc->obs.oi.user_version);
- });
+ }
+ );
}
OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_list_watchers(
assert(key.second.is_client());
response.entries.emplace_back(watch_item_t{
key.second, info.cookie, info.timeout_seconds, info.addr});
- response.encode(osd_op.outdata, get_message().get_features());
}
+ response.encode(osd_op.outdata, get_message().get_features());
return watch_ierrorator::now();
}
// Defined here because there is a circular dependency between OpsExecuter and PG
template <class Func>
-auto OpsExecuter::do_write_op(Func&& f, bool um) {
+auto OpsExecuter::do_write_op(Func&& f, OpsExecuter::modified_by m) {
++num_write;
if (!osd_op_params) {
osd_op_params.emplace();
+ fill_op_params_bump_pg_version();
}
- user_modify = um;
+ user_modify = (m == modified_by::user);
return std::forward<Func>(f)(pg->get_backend(), obc->obs, txn);
}
+OpsExecuter::call_errorator::future<> OpsExecuter::do_assert_ver(
+ OSDOp& osd_op,
+ const ObjectState& os)
+{
+ if (!osd_op.op.assert_ver.ver) {
+ return crimson::ct_error::invarg::make();
+ } else if (osd_op.op.assert_ver.ver < os.oi.user_version) {
+ return crimson::ct_error::erange::make();
+ } else if (osd_op.op.assert_ver.ver > os.oi.user_version) {
+ return crimson::ct_error::value_too_large::make();
+ }
+ return seastar::now();
+}
+
+OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
+ OSDOp& osd_op,
+ const ObjectState& os,
+ const SnapSet& ss)
+{
+ obj_list_snap_response_t resp;
+ resp.clones.reserve(ss.clones.size() + 1);
+ for (auto &clone: ss.clones) {
+ clone_info ci;
+ ci.cloneid = clone;
+
+ {
+ auto p = ss.clone_snaps.find(clone);
+ if (p == ss.clone_snaps.end()) {
+ logger().error(
+ "OpsExecutor::do_list_snaps: {} has inconsistent "
+ "clone_snaps, missing clone {}",
+ os.oi.soid,
+ clone);
+ return crimson::ct_error::invarg::make();
+ }
+ ci.snaps.reserve(p->second.size());
+ ci.snaps.insert(ci.snaps.end(), p->second.rbegin(), p->second.rend());
+ }
+
+ {
+ auto p = ss.clone_overlap.find(clone);
+ if (p == ss.clone_overlap.end()) {
+ logger().error(
+ "OpsExecutor::do_list_snaps: {} has inconsistent "
+ "clone_overlap, missing clone {}",
+ os.oi.soid,
+ clone);
+ return crimson::ct_error::invarg::make();
+ }
+ ci.overlap.reserve(p->second.num_intervals());
+ ci.overlap.insert(ci.overlap.end(), p->second.begin(), p->second.end());
+ }
+
+ {
+ auto p = ss.clone_size.find(clone);
+ if (p == ss.clone_size.end()) {
+ logger().error(
+ "OpsExecutor::do_list_snaps: {} has inconsistent "
+ "clone_size, missing clone {}",
+ os.oi.soid,
+ clone);
+ return crimson::ct_error::invarg::make();
+ }
+ ci.size = p->second;
+ }
+ resp.clones.push_back(std::move(ci));
+ }
+
+ if (!os.oi.is_whiteout()) {
+ clone_info ci;
+ ci.cloneid = CEPH_NOSNAP;
+ ci.size = os.oi.size;
+ resp.clones.push_back(std::move(ci));
+ }
+ resp.seq = ss.seq;
+ logger().error(
+ "OpsExecutor::do_list_snaps: {}, resp.clones.size(): {}",
+ os.oi.soid,
+ resp.clones.size());
+ resp.encode(osd_op.outdata);
+ return read_ierrorator::now();
+}
OpsExecuter::interruptible_errorated_future<OpsExecuter::osd_op_errorator>
OpsExecuter::execute_op(OSDOp& osd_op)
+{
+ return do_execute_op(osd_op).handle_error_interruptible(
+ osd_op_errorator::all_same_way([&osd_op](auto e, auto&& e_raw)
+ -> OpsExecuter::osd_op_errorator::future<> {
+ // All ops except for CMPEXT should have rval set to -e.value(),
+ // CMPEXT sets rval itself and shouldn't be overridden.
+ if (e.value() != ct_error::cmp_fail_error_value) {
+ osd_op.rval = -e.value();
+ }
+ if ((osd_op.op.flags & CEPH_OSD_OP_FLAG_FAILOK) &&
+ e.value() != EAGAIN && e.value() != EINPROGRESS) {
+ return osd_op_errorator::now();
+ } else {
+ return std::move(e_raw);
+ }
+ }));
+}
+
+OpsExecuter::interruptible_errorated_future<OpsExecuter::osd_op_errorator>
+OpsExecuter::do_execute_op(OSDOp& osd_op)
{
// TODO: dispatch via call table?
// TODO: we might want to find a way to unify both input and output
case CEPH_OSD_OP_SYNC_READ:
[[fallthrough]];
case CEPH_OSD_OP_READ:
- return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
return backend.read(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_SPARSE_READ:
- return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
return backend.sparse_read(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_CHECKSUM:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return do_read_op([&osd_op](auto& backend, const auto& os) {
return backend.checksum(os, osd_op);
});
case CEPH_OSD_OP_CMPEXT:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return do_read_op([&osd_op](auto& backend, const auto& os) {
return backend.cmp_ext(os, osd_op);
});
case CEPH_OSD_OP_GETXATTR:
- return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
return backend.getxattr(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_GETXATTRS:
- return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
return backend.get_xattrs(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_CMPXATTR:
- return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
return backend.cmp_xattr(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_RMXATTR:
- return do_write_op(
- [&osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([&osd_op](auto& backend, auto& os, auto& txn) {
return backend.rm_xattr(os, osd_op, txn);
- }, true);
+ });
case CEPH_OSD_OP_CREATE:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.create(os, osd_op, txn, delta_stats);
- }, true);
+ });
case CEPH_OSD_OP_WRITE:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.write(os, osd_op, txn, *osd_op_params, delta_stats);
- }, true);
+ });
case CEPH_OSD_OP_WRITESAME:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.write_same(os, osd_op, txn, *osd_op_params, delta_stats);
- }, true);
+ });
case CEPH_OSD_OP_WRITEFULL:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.writefull(os, osd_op, txn, *osd_op_params, delta_stats);
- }, true);
+ });
+ case CEPH_OSD_OP_ROLLBACK:
+ return do_write_op([this, &head=obc,
+ &osd_op](auto& backend, auto& os, auto& txn) {
+ return backend.rollback(os, osd_op, txn, *osd_op_params, delta_stats,
+ head, pg->obc_loader);
+ });
case CEPH_OSD_OP_APPEND:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.append(os, osd_op, txn, *osd_op_params, delta_stats);
- }, true);
+ });
case CEPH_OSD_OP_TRUNCATE:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
// FIXME: rework needed. Move this out to do_write_op(), introduce
// do_write_op_no_user_modify()...
return backend.truncate(os, osd_op, txn, *osd_op_params, delta_stats);
- }, true);
+ });
case CEPH_OSD_OP_ZERO:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.zero(os, osd_op, txn, *osd_op_params, delta_stats);
- }, true);
+ });
case CEPH_OSD_OP_SETALLOCHINT:
- return osd_op_errorator::now();
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
+ return backend.set_allochint(os, osd_op, txn, delta_stats);
+ });
case CEPH_OSD_OP_SETXATTR:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.setxattr(os, osd_op, txn, delta_stats);
- }, true);
+ });
case CEPH_OSD_OP_DELETE:
- return do_write_op([this] (auto& backend, auto& os, auto& txn) {
- return backend.remove(os, txn, delta_stats);
- }, true);
+ {
+ bool whiteout = false;
+ if (!obc->ssc->snapset.clones.empty() ||
+ (snapc.snaps.size() && // there are snaps
+ snapc.snaps[0] > obc->ssc->snapset.seq)) { // existing obj is old
+ logger().debug("{} has or will have clones, will whiteout {}",
+ __func__, obc->obs.oi.soid);
+ whiteout = true;
+ }
+ return do_write_op([this, whiteout](auto& backend, auto& os, auto& txn) {
+ return backend.remove(os, txn, delta_stats, whiteout);
+ });
+ }
case CEPH_OSD_OP_CALL:
return this->do_op_call(osd_op);
case CEPH_OSD_OP_STAT:
return do_const_op([this, &osd_op] (/* const */auto& backend, const auto& os) {
return backend.stat(os, osd_op, delta_stats);
});
+
+ case CEPH_OSD_OP_TMAPPUT:
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
+ return backend.tmapput(os, osd_op, txn, delta_stats, *osd_op_params);
+ });
case CEPH_OSD_OP_TMAPUP:
- // TODO: there was an effort to kill TMAP in ceph-osd. According to
- // @dzafman this isn't possible yet. Maybe it could be accomplished
- // before crimson's readiness and we'd luckily don't need to carry.
- return dont_do_legacy_op();
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto &txn) {
+ return backend.tmapup(os, osd_op, txn, delta_stats, *osd_op_params);
+ });
+ case CEPH_OSD_OP_TMAPGET:
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
+ return backend.tmapget(os, osd_op, delta_stats);
+ });
// OMAP
case CEPH_OSD_OP_OMAPGETKEYS:
- return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
return backend.omap_get_keys(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_OMAPGETVALS:
- return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
return backend.omap_get_vals(os, osd_op, delta_stats);
});
+ case CEPH_OSD_OP_OMAP_CMP:
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
+ return backend.omap_cmp(os, osd_op, delta_stats);
+ });
case CEPH_OSD_OP_OMAPGETHEADER:
- return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
return backend.omap_get_header(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
- return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
return backend.omap_get_vals_by_keys(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_OMAPSETVALS:
#if 0
- if (!pg.get_pool().info.supports_omap()) {
+ if (!pg.get_pgpool().info.supports_omap()) {
return crimson::ct_error::operation_not_supported::make();
}
#endif
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.omap_set_vals(os, osd_op, txn, *osd_op_params, delta_stats);
- }, true);
+ });
case CEPH_OSD_OP_OMAPSETHEADER:
#if 0
- if (!pg.get_pool().info.supports_omap()) {
+ if (!pg.get_pgpool().info.supports_omap()) {
return crimson::ct_error::operation_not_supported::make();
}
#endif
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.omap_set_header(os, osd_op, txn, *osd_op_params,
delta_stats);
- }, true);
+ });
case CEPH_OSD_OP_OMAPRMKEYRANGE:
#if 0
- if (!pg.get_pool().info.supports_omap()) {
+ if (!pg.get_pgpool().info.supports_omap()) {
return crimson::ct_error::operation_not_supported::make();
}
#endif
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.omap_remove_range(os, osd_op, txn, delta_stats);
- }, true);
+ });
case CEPH_OSD_OP_OMAPRMKEYS:
/** TODO: Implement supports_omap()
- if (!pg.get_pool().info.supports_omap()) {
+ if (!pg.get_pgpool().info.supports_omap()) {
return crimson::ct_error::operation_not_supported::make();
}*/
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([&osd_op](auto& backend, auto& os, auto& txn) {
return backend.omap_remove_key(os, osd_op, txn);
- }, true);
+ });
case CEPH_OSD_OP_OMAPCLEAR:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.omap_clear(os, osd_op, txn, *osd_op_params, delta_stats);
- }, true);
+ });
// watch/notify
case CEPH_OSD_OP_WATCH:
- return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return do_op_watch(osd_op, os, txn);
- }, false);
+ }, modified_by::sys);
case CEPH_OSD_OP_LIST_WATCHERS:
- return do_read_op([this, &osd_op] (auto&, const auto& os) {
+ return do_read_op([this, &osd_op](auto&, const auto& os) {
return do_op_list_watchers(osd_op, os);
});
case CEPH_OSD_OP_NOTIFY:
- return do_read_op([this, &osd_op] (auto&, const auto& os) {
+ return do_read_op([this, &osd_op](auto&, const auto& os) {
return do_op_notify(osd_op, os);
});
case CEPH_OSD_OP_NOTIFY_ACK:
- return do_read_op([this, &osd_op] (auto&, const auto& os) {
+ return do_read_op([this, &osd_op](auto&, const auto& os) {
return do_op_notify_ack(osd_op, os);
});
+ case CEPH_OSD_OP_ASSERT_VER:
+ return do_read_op([this, &osd_op](auto&, const auto& os) {
+ return do_assert_ver(osd_op, os);
+ });
+ case CEPH_OSD_OP_LIST_SNAPS:
+ return do_snapset_op([this, &osd_op](const auto &os, const auto &ss) {
+ return do_list_snaps(osd_op, os, ss);
+ });
default:
logger().warn("unknown op {}", ceph_osd_op_name(op.op));
}
}
+void OpsExecuter::fill_op_params_bump_pg_version()
+{
+ osd_op_params->req_id = msg->get_reqid();
+ osd_op_params->mtime = msg->get_mtime();
+ osd_op_params->at_version = pg->next_version();
+ osd_op_params->pg_trim_to = pg->get_pg_trim_to();
+ osd_op_params->min_last_complete_ondisk = pg->get_min_last_complete_ondisk();
+ osd_op_params->last_complete = pg->get_info().last_complete;
+}
+
+std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction(
+ const std::vector<OSDOp>& ops)
+{
+ // let's ensure we don't need to inform SnapMapper about this particular
+ // entry.
+ assert(obc->obs.oi.soid.snap >= CEPH_MAXSNAP);
+ std::vector<pg_log_entry_t> log_entries;
+ log_entries.emplace_back(
+ obc->obs.exists ?
+ pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
+ obc->obs.oi.soid,
+ osd_op_params->at_version,
+ obc->obs.oi.version,
+ osd_op_params->user_modify ? osd_op_params->at_version.version : 0,
+ osd_op_params->req_id,
+ osd_op_params->mtime,
+ op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
+ if (op_info.allows_returnvec()) {
+ // also the per-op values are recorded in the pg log
+ log_entries.back().set_op_returns(ops);
+ logger().debug("{} op_returns: {}",
+ __func__, log_entries.back().op_returns);
+ }
+ log_entries.back().clean_regions = std::move(osd_op_params->clean_regions);
+ return log_entries;
+}
+
+OpsExecuter::interruptible_future<> OpsExecuter::snap_map_remove(
+ const hobject_t& soid,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn)
+{
+ logger().debug("{}: soid {}", __func__, soid);
+ return interruptor::async([soid, &snap_mapper,
+ _t=osdriver.get_transaction(&txn)]() mutable {
+ const auto r = snap_mapper.remove_oid(soid, &_t);
+ if (r) {
+ logger().error("{}: remove_oid {} failed with {}",
+ __func__, soid, r);
+ }
+ // On removal tolerate missing key corruption
+ assert(r == 0 || r == -ENOENT);
+ });
+}
+
+OpsExecuter::interruptible_future<> OpsExecuter::snap_map_modify(
+ const hobject_t& soid,
+ const std::set<snapid_t>& snaps,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn)
+{
+ logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
+ return interruptor::async([soid, snaps, &snap_mapper,
+ _t=osdriver.get_transaction(&txn)]() mutable {
+ assert(std::size(snaps) > 0);
+ [[maybe_unused]] const auto r = snap_mapper.update_snaps(
+ soid, snaps, 0, &_t);
+ assert(r == 0);
+ });
+}
+
+OpsExecuter::interruptible_future<> OpsExecuter::snap_map_clone(
+ const hobject_t& soid,
+ const std::set<snapid_t>& snaps,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn)
+{
+ logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
+ return interruptor::async([soid, snaps, &snap_mapper,
+ _t=osdriver.get_transaction(&txn)]() mutable {
+ assert(std::size(snaps) > 0);
+ snap_mapper.add_oid(soid, snaps, &_t);
+ });
+}
+
// Defined here because there is a circular dependency between OpsExecuter and PG
uint32_t OpsExecuter::get_pool_stripe_width() const {
- return pg->get_pool().info.get_stripe_width();
+ return pg->get_pgpool().info.get_stripe_width();
}
// Defined here because there is a circular dependency between OpsExecuter and PG
return pg->get_last_user_version();
}
+std::unique_ptr<OpsExecuter::CloningContext> OpsExecuter::execute_clone(
+ const SnapContext& snapc,
+ const ObjectState& initial_obs,
+ const SnapSet& initial_snapset,
+ PGBackend& backend,
+ ceph::os::Transaction& txn)
+{
+ const hobject_t& soid = initial_obs.oi.soid;
+ logger().debug("{} {} snapset={} snapc={}",
+ __func__, soid,
+ initial_snapset, snapc);
+
+ auto cloning_ctx = std::make_unique<CloningContext>();
+ cloning_ctx->new_snapset = initial_snapset;
+
+ // clone object, the snap field is set to the seq of the SnapContext
+ // at its creation.
+ hobject_t coid = soid;
+ coid.snap = snapc.seq;
+
+ // existing snaps are stored in descending order in snapc,
+ // cloned_snaps vector will hold all the snaps stored until snapset.seq
+ const std::vector<snapid_t> cloned_snaps = [&] {
+ auto last = std::find_if(
+ std::begin(snapc.snaps), std::end(snapc.snaps),
+ [&](snapid_t snap_id) { return snap_id <= initial_snapset.seq; });
+ return std::vector<snapid_t>{std::begin(snapc.snaps), last};
+ }();
+
+ auto [snap_oi, clone_obc] = prepare_clone(coid);
+ // make clone
+ backend.clone(snap_oi, initial_obs, clone_obc->obs, txn);
+
+ delta_stats.num_objects++;
+ if (snap_oi.is_omap()) {
+ delta_stats.num_objects_omap++;
+ }
+ delta_stats.num_object_clones++;
+ // newsnapset is obc's ssc
+ cloning_ctx->new_snapset.clones.push_back(coid.snap);
+ cloning_ctx->new_snapset.clone_size[coid.snap] = initial_obs.oi.size;
+ cloning_ctx->new_snapset.clone_snaps[coid.snap] = cloned_snaps;
+
+ // clone_overlap should contain an entry for each clone
+ // (an empty interval_set if there is no overlap)
+ auto &overlap = cloning_ctx->new_snapset.clone_overlap[coid.snap];
+ if (initial_obs.oi.size) {
+ overlap.insert(0, initial_obs.oi.size);
+ }
+
+ // log clone
+ logger().debug("cloning v {} to {} v {} snaps={} snapset={}",
+ initial_obs.oi.version, coid,
+ osd_op_params->at_version, cloned_snaps, cloning_ctx->new_snapset);
+
+ cloning_ctx->log_entry = {
+ pg_log_entry_t::CLONE,
+ coid,
+ snap_oi.version,
+ initial_obs.oi.version,
+ initial_obs.oi.user_version,
+ osd_reqid_t(),
+ initial_obs.oi.mtime, // will be replaced in `apply_to()`
+ 0
+ };
+ encode(cloned_snaps, cloning_ctx->log_entry.snaps);
+
+ // TODO: update most recent clone_overlap and usage stats
+ return cloning_ctx;
+}
+
+void OpsExecuter::CloningContext::apply_to(
+ std::vector<pg_log_entry_t>& log_entries,
+ ObjectContext& processed_obc) &&
+{
+ log_entry.mtime = processed_obc.obs.oi.mtime;
+ log_entries.emplace_back(std::move(log_entry));
+ processed_obc.ssc->snapset = std::move(new_snapset);
+}
+
+OpsExecuter::interruptible_future<std::vector<pg_log_entry_t>>
+OpsExecuter::flush_clone_metadata(
+ std::vector<pg_log_entry_t>&& log_entries,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn)
+{
+ assert(!txn.empty());
+ auto maybe_snap_mapped = interruptor::now();
+ if (cloning_ctx) {
+ std::move(*cloning_ctx).apply_to(log_entries, *obc);
+ const auto& coid = log_entries.back().soid;
+ const auto& cloned_snaps = obc->ssc->snapset.clone_snaps[coid.snap];
+ maybe_snap_mapped = snap_map_clone(
+ coid,
+ std::set<snapid_t>{std::begin(cloned_snaps), std::end(cloned_snaps)},
+ snap_mapper,
+ osdriver,
+ txn);
+ }
+ if (snapc.seq > obc->ssc->snapset.seq) {
+ // update snapset with latest snap context
+ obc->ssc->snapset.seq = snapc.seq;
+ obc->ssc->snapset.snaps.clear();
+ }
+ logger().debug("{} done, initial snapset={}, new snapset={}",
+ __func__, obc->obs.oi.soid, obc->ssc->snapset);
+ return std::move(
+ maybe_snap_mapped
+ ).then_interruptible([log_entries=std::move(log_entries)]() mutable {
+ return interruptor::make_ready_future<std::vector<pg_log_entry_t>>(
+ std::move(log_entries));
+ });
+}
+
+// TODO: make this static
+std::pair<object_info_t, ObjectContextRef> OpsExecuter::prepare_clone(
+ const hobject_t& coid)
+{
+ object_info_t static_snap_oi(coid);
+ static_snap_oi.version = pg->next_version();
+ static_snap_oi.prior_version = obc->obs.oi.version;
+ static_snap_oi.copy_user_bits(obc->obs.oi);
+ if (static_snap_oi.is_whiteout()) {
+ // clone shouldn't be marked as whiteout
+ static_snap_oi.clear_flag(object_info_t::FLAG_WHITEOUT);
+ }
+
+ ObjectContextRef clone_obc;
+ if (pg->is_primary()) {
+ // lookup_or_create
+ auto [c_obc, existed] =
+ pg->obc_registry.get_cached_obc(std::move(coid));
+ assert(!existed);
+ c_obc->obs.oi = static_snap_oi;
+ c_obc->obs.exists = true;
+ c_obc->ssc = obc->ssc;
+ logger().debug("clone_obc: {}", c_obc->obs.oi);
+ clone_obc = std::move(c_obc);
+ }
+ return std::make_pair(std::move(static_snap_oi), std::move(clone_obc));
+}
+
+void OpsExecuter::apply_stats()
+{
+ pg->get_peering_state().apply_op_stats(get_target(), delta_stats);
+ pg->publish_stats_to_osd();
+}
+
+OpsExecuter::OpsExecuter(Ref<PG> pg,
+ ObjectContextRef _obc,
+ const OpInfo& op_info,
+ abstracted_msg_t&& msg,
+ crimson::net::ConnectionRef conn,
+ const SnapContext& _snapc)
+ : pg(std::move(pg)),
+ obc(std::move(_obc)),
+ op_info(op_info),
+ msg(std::move(msg)),
+ conn(conn),
+ snapc(_snapc)
+{
+ if (op_info.may_write() && should_clone(*obc, snapc)) {
+ do_write_op([this](auto& backend, auto& os, auto& txn) {
+ cloning_ctx = execute_clone(std::as_const(snapc),
+ std::as_const(obc->obs),
+ std::as_const(obc->ssc->snapset),
+ backend,
+ txn);
+ });
+ }
+}
+
static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
const std::string& type,
bufferlist::const_iterator& iter)
if (const auto xattr = filter.get_xattr(); !xattr.empty()) {
logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
xattr, sobj);
- return backend.getxattr(sobj, xattr).safe_then_interruptible(
+ return backend.getxattr(sobj, std::move(xattr)).safe_then_interruptible(
[&filter, sobj] (ceph::bufferlist val) {
logger().debug("pgls_filter: got xvalue for obj={}", sobj);
response.handle = next.is_max() ? pg_end : next;
ceph::bufferlist out;
encode(response, out);
- logger().debug("{}: response.entries.size()=",
- __func__, response.entries.size());
+ logger().debug("do_pgnls_common: response.entries.size()= {}",
+ response.entries.size());
return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
});
}
}
const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
const auto pg_end = \
- pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
return do_pgnls_common(pg_start,
pg_end,
pg.get_backend(),
return seastar::do_with(std::move(filter),
[&, lower_bound=std::move(lower_bound)](auto&& filter) {
const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
- const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
return do_pgnls_common(pg_start,
pg_end,
pg.get_backend(),
}
const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
const auto pg_end =
- pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
return do_pgls_common(pg_start,
pg_end,
pg.get_backend(),
return seastar::do_with(std::move(filter),
[&, lower_bound=std::move(lower_bound)](auto&& filter) {
const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
- const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
return do_pgls_common(pg_start,
pg_end,
pg.get_backend(),