-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
#include <fmt/format.h>
#include <fmt/ostream.h>
}
}
-seastar::future<> ReplicatedRecoveryBackend::recover_object(
+using std::less;
+using std::map;
+using std::string;
+
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::recover_object(
const hobject_t& soid,
eversion_t need)
{
// always add_recovering(soid) before recover_object(soid)
assert(is_recovering(soid));
// start tracking the recovery of soid
- return maybe_pull_missing_obj(soid, need).then([this, soid, need] {
+ return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] {
logger().debug("recover_object: loading obc: {}", soid);
return pg.with_head_obc<RWState::RWREAD>(soid,
[this, soid, need](auto obc) {
recovery_waiter.obc = obc;
recovery_waiter.obc->wait_recovery_read();
return maybe_push_shards(soid, need);
- }).handle_error(
+ }).handle_error_interruptible(
crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
// TODO: may need eio handling?
logger().error("recover_object saw error code {}, ignoring object {}",
});
}
-seastar::future<>
+RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::maybe_push_shards(
const hobject_t& soid,
eversion_t need)
{
- return seastar::parallel_for_each(get_shards_to_push(soid),
+ return interruptor::parallel_for_each(get_shards_to_push(soid),
[this, need, soid](auto shard) {
- return prep_push(soid, need, shard).then([this, soid, shard](auto push) {
- auto msg = make_message<MOSDPGPush>();
+ return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) {
+ auto msg = crimson::make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->pushes.push_back(std::move(push));
msg->set_priority(pg.get_recovery_op_priority());
- return shard_services.send_to_osd(shard.osd,
- std::move(msg),
- pg.get_osdmap_epoch()).then(
+ return interruptor::make_interruptible(
+ shard_services.send_to_osd(shard.osd,
+ std::move(msg),
+ pg.get_osdmap_epoch()))
+ .then_interruptible(
[this, soid, shard] {
return recovering.at(soid).wait_for_pushes(shard);
});
});
- }).then([this, soid] {
+ }).then_interruptible([this, soid] {
auto &recovery = recovering.at(soid);
- auto push_info = recovery.pushing.begin();
- object_stat_sum_t stat = {};
- if (push_info != recovery.pushing.end()) {
- stat = push_info->second.stat;
+ if (auto push_info = recovery.pushing.begin();
+ push_info != recovery.pushing.end()) {
+ pg.get_recovery_handler()->on_global_recover(soid,
+ push_info->second.stat,
+ false);
+ } else if (recovery.pi) {
+ // no push happened (empty get_shards_to_push()) but pull actually did
+ pg.get_recovery_handler()->on_global_recover(soid,
+ recovery.pi->stat,
+ false);
} else {
- // no push happened, take pull_info's stat
- assert(recovery.pi);
- stat = recovery.pi->stat;
+ // no pulls, no pushes
}
- pg.get_recovery_handler()->on_global_recover(soid, stat, false);
return seastar::make_ready_future<>();
- }).handle_exception([this, soid](auto e) {
+ }).handle_exception_interruptible([this, soid](auto e) {
auto &recovery = recovering.at(soid);
if (recovery.obc) {
recovery.obc->drop_recovery_read();
});
}
-seastar::future<>
+RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::maybe_pull_missing_obj(
const hobject_t& soid,
eversion_t need)
recovery_waiter.pi = std::make_optional<RecoveryBackend::PullInfo>();
auto& pi = *recovery_waiter.pi;
prepare_pull(po, pi, soid, need);
- auto msg = make_message<MOSDPGPull>();
+ auto msg = crimson::make_message<MOSDPGPull>();
msg->from = pg.get_pg_whoami();
msg->set_priority(pg.get_recovery_op_priority());
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_pulls({std::move(po)});
- return shard_services.send_to_osd(
- pi.from.osd,
- std::move(msg),
- pg.get_osdmap_epoch()
- ).then([&recovery_waiter] {
+ return interruptor::make_interruptible(
+ shard_services.send_to_osd(
+ pi.from.osd,
+ std::move(msg),
+ pg.get_osdmap_epoch()
+ )).then_interruptible([&recovery_waiter] {
return recovery_waiter.wait_for_pull();
});
}
-seastar::future<> ReplicatedRecoveryBackend::push_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::push_delete(
const hobject_t& soid,
eversion_t need)
{
logger().debug("{}: {}, {}", __func__, soid, need);
- recovering[soid];
epoch_t min_epoch = pg.get_last_peering_reset();
assert(pg.get_acting_recovery_backfill().size() > 0);
- return seastar::parallel_for_each(pg.get_acting_recovery_backfill(),
- [this, soid, need, min_epoch](pg_shard_t shard) {
+ return interruptor::parallel_for_each(pg.get_acting_recovery_backfill(),
+ [this, soid, need, min_epoch](pg_shard_t shard)
+ -> interruptible_future<> {
if (shard == pg.get_pg_whoami())
return seastar::make_ready_future<>();
auto iter = pg.get_shard_missing().find(shard);
logger().debug("push_delete: will remove {} from {}", soid, shard);
pg.begin_peer_recover(shard, soid);
spg_t target_pg(pg.get_info().pgid.pgid, shard.shard);
- auto msg = make_message<MOSDPGRecoveryDelete>(
+ auto msg = crimson::make_message<MOSDPGRecoveryDelete>(
pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch);
msg->set_priority(pg.get_recovery_op_priority());
msg->objects.push_back(std::make_pair(soid, need));
- return shard_services.send_to_osd(shard.osd, std::move(msg),
- pg.get_osdmap_epoch()).then(
+ return interruptor::make_interruptible(
+ shard_services.send_to_osd(shard.osd, std::move(msg),
+ pg.get_osdmap_epoch())).then_interruptible(
[this, soid, shard] {
return recovering.at(soid).wait_for_pushes(shard);
});
});
}
-seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_recovery_delete(
Ref<MOSDPGRecoveryDelete> m)
{
logger().debug("{}: {}", __func__, *m);
auto& p = m->objects.front(); //TODO: only one delete per message for now.
- return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch()).then(
+ return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch())
+ .then_interruptible(
[this, m] {
- auto reply = make_message<MOSDPGRecoveryDeleteReply>();
+ auto reply = crimson::make_message<MOSDPGRecoveryDeleteReply>();
reply->from = pg.get_pg_whoami();
reply->set_priority(m->get_priority());
reply->pgid = spg_t(pg.get_info().pgid.pgid, m->from.shard);
});
}
-seastar::future<> ReplicatedRecoveryBackend::on_local_recover_persist(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::on_local_recover_persist(
const hobject_t& soid,
const ObjectRecoveryInfo& _recovery_info,
bool is_delete,
logger().debug("{}", __func__);
ceph::os::Transaction t;
pg.get_recovery_handler()->on_local_recover(soid, _recovery_info, is_delete, t);
- return shard_services.get_store().do_transaction(coll, std::move(t)).then(
+ logger().debug("ReplicatedRecoveryBackend::on_local_recover_persist: do_transaction...");
+ return interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(coll, std::move(t)))
+ .then_interruptible(
[this, epoch_frozen, last_complete = pg.get_info().last_complete] {
pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
return seastar::make_ready_future<>();
});
}
-seastar::future<> ReplicatedRecoveryBackend::local_recover_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::local_recover_delete(
const hobject_t& soid,
eversion_t need,
epoch_t epoch_to_freeze)
{
logger().debug("{}: {}, {}", __func__, soid, need);
- return backend->load_metadata(soid).safe_then([this]
- (auto lomt) {
+ return backend->load_metadata(soid).safe_then_interruptible([this]
+ (auto lomt) -> interruptible_future<> {
if (lomt->os.exists) {
return seastar::do_with(ceph::os::Transaction(),
[this, lomt = std::move(lomt)](auto& txn) {
- return backend->remove(lomt->os, txn).then([this, &txn]() mutable {
+ return backend->remove(lomt->os, txn).then_interruptible(
+ [this, &txn]() mutable {
+ logger().debug("ReplicatedRecoveryBackend::local_recover_delete: do_transaction...");
return shard_services.get_store().do_transaction(coll,
std::move(txn));
});
});
}
return seastar::make_ready_future<>();
- }).safe_then([this, soid, epoch_to_freeze, need] {
+ }).safe_then_interruptible([this, soid, epoch_to_freeze, need] {
ObjectRecoveryInfo recovery_info;
recovery_info.soid = soid;
recovery_info.version = need;
);
}
-seastar::future<> ReplicatedRecoveryBackend::recover_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::recover_delete(
const hobject_t &soid, eversion_t need)
{
logger().debug("{}: {}, {}", __func__, soid, need);
epoch_t cur_epoch = pg.get_osdmap_epoch();
return seastar::do_with(object_stat_sum_t(),
[this, soid, need, cur_epoch](auto& stat_diff) {
- return local_recover_delete(soid, need, cur_epoch).then(
- [this, &stat_diff, cur_epoch, soid, need] {
+ return local_recover_delete(soid, need, cur_epoch).then_interruptible(
+ [this, &stat_diff, cur_epoch, soid, need]()
+ -> interruptible_future<> {
if (!pg.has_reset_since(cur_epoch)) {
bool object_missing = false;
for (const auto& shard : pg.get_acting_recovery_backfill()) {
}
}
return seastar::make_ready_future<>();
- }).then([this, soid, &stat_diff] {
+ }).then_interruptible([this, soid, &stat_diff] {
pg.get_recovery_handler()->on_global_recover(soid, stat_diff, true);
return seastar::make_ready_future<>();
});
});
}
-seastar::future<PushOp>
+RecoveryBackend::interruptible_future<PushOp>
ReplicatedRecoveryBackend::prep_push(
const hobject_t& soid,
eversion_t need,
data_subset.insert(0, obc->obs.oi.size);
}
const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
- if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
- const auto it = missing.get_items().find(soid);
- assert(it != missing.get_items().end());
- data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
- logger().debug("prep_push: {} data_subset {}", soid, data_subset);
- }
+ const auto it = missing.get_items().find(soid);
+ assert(it != missing.get_items().end());
+ data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
+ logger().debug("prep_push: {} data_subset {} to {}",
+ soid, data_subset, pg_shard);
- logger().debug("prep_push: {} to {}", soid, pg_shard);
auto& pi = recovery_waiter.pushing[pg_shard];
pg.begin_peer_recover(pg_shard, soid);
const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
pi.recovery_info.object_exist =
missing_iter->second.clean_regions.object_is_exist();
pi.recovery_progress.omap_complete =
- (!missing_iter->second.clean_regions.omap_is_dirty() &&
- HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS));
+ !missing_iter->second.clean_regions.omap_is_dirty();
- return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then(
+ return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then_interruptible(
[this, soid, pg_shard](auto pop) {
auto& recovery_waiter = recovering.at(soid);
auto& pi = recovery_waiter.pushing[pg_shard];
//TODO: skipped snap objects case for now
po.recovery_info.copy_subset.insert(0, (uint64_t) -1);
- if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS))
- po.recovery_info.copy_subset.intersection_of(
- missing_iter->second.clean_regions.get_dirty_regions());
+ po.recovery_info.copy_subset.intersection_of(
+ missing_iter->second.clean_regions.get_dirty_regions());
po.recovery_info.size = ((uint64_t) -1);
po.recovery_info.object_exist =
missing_iter->second.clean_regions.object_is_exist();
po.soid = soid;
po.recovery_progress.data_complete = false;
po.recovery_progress.omap_complete =
- !missing_iter->second.clean_regions.omap_is_dirty() &&
- HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
+ !missing_iter->second.clean_regions.omap_is_dirty();
po.recovery_progress.data_recovered_to = 0;
po.recovery_progress.first = true;
pi.recovery_progress = po.recovery_progress;
}
-seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
+RecoveryBackend::interruptible_future<PushOp>
+ReplicatedRecoveryBackend::build_push_op(
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress& progress,
object_stat_sum_t* stat)
recovery_info.version,
PushOp(),
[this, &recovery_info, &progress, stat]
- (auto new_progress, auto available, auto v, auto pop) {
+ (auto& new_progress, auto& available, auto& v, auto& pop) {
return read_metadata_for_push_op(recovery_info.soid,
progress, new_progress,
- v, &pop).then([&](eversion_t local_ver) mutable {
+ v, &pop).then_interruptible([&](eversion_t local_ver) mutable {
// If requestor didn't know the version, use ours
if (v == eversion_t()) {
v = local_ver;
progress,
new_progress,
&available, &pop);
- }).then([this, &recovery_info, &progress, &available, &pop]() mutable {
+ }).then_interruptible([this, &recovery_info, &progress, &available, &pop]() mutable {
logger().debug("build_push_op: available: {}, copy_subset: {}",
available, recovery_info.copy_subset);
return read_object_for_push_op(recovery_info.soid,
recovery_info.copy_subset,
progress.data_recovered_to,
available, &pop);
- }).then([&recovery_info, &v, &progress, &new_progress, stat, &pop]
+ }).then_interruptible([&recovery_info, &v, &progress, &new_progress, stat, &pop]
(uint64_t recovered_to) mutable {
new_progress.data_recovered_to = recovered_to;
if (new_progress.is_complete(recovery_info)) {
});
}
-seastar::future<eversion_t>
+RecoveryBackend::interruptible_future<eversion_t>
ReplicatedRecoveryBackend::read_metadata_for_push_op(
const hobject_t& oid,
const ObjectRecoveryProgress& progress,
eversion_t ver,
PushOp* push_op)
{
+ logger().debug("{}, {}", __func__, oid);
if (!progress.first) {
return seastar::make_ready_future<eversion_t>(ver);
}
- return seastar::when_all_succeed(
- backend->omap_get_header(coll, ghobject_t(oid)).handle_error(
- crimson::os::FuturizedStore::read_errorator::all_same_way(
- [] (const std::error_code& e) {
- return seastar::make_ready_future<bufferlist>();
- })),
- store->get_attrs(coll, ghobject_t(oid)).handle_error(
- crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
- [] (const std::error_code& e) {
- return seastar::make_ready_future<crimson::os::FuturizedStore::attrs_t>();
- }))
- ).then_unpack([&new_progress, push_op](auto bl, auto attrs) {
+ return interruptor::make_interruptible(interruptor::when_all_succeed(
+ backend->omap_get_header(coll, ghobject_t(oid)).handle_error_interruptible<false>(
+ crimson::os::FuturizedStore::read_errorator::all_same_way(
+ [oid] (const std::error_code& e) {
+ logger().debug("read_metadata_for_push_op, error {} when getting omap header: {}", e, oid);
+ return seastar::make_ready_future<bufferlist>();
+ })),
+ interruptor::make_interruptible(store->get_attrs(coll, ghobject_t(oid)))
+ .handle_error_interruptible<false>(
+ crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
+ [oid] (const std::error_code& e) {
+ logger().debug("read_metadata_for_push_op, error {} when getting attrs: {}", e, oid);
+ return seastar::make_ready_future<crimson::os::FuturizedStore::attrs_t>();
+ }))
+ )).then_unpack_interruptible([&new_progress, push_op](auto bl, auto attrs) {
if (bl.length() == 0) {
- logger().error("read_metadata_for_push_op: fail to read omap header");
- return eversion_t{};
+ logger().warn("read_metadata_for_push_op: fail to read omap header");
} else if (attrs.empty()) {
logger().error("read_metadata_for_push_op: fail to read attrs");
return eversion_t{};
}
push_op->omap_header.claim_append(std::move(bl));
- for (auto&& [key, val] : std::move(attrs)) {
- push_op->attrset[key].push_back(val);
+ for (auto&& [key, val] : attrs) {
+ push_op->attrset.emplace(std::move(key), std::move(val));
}
logger().debug("read_metadata_for_push_op: {}", push_op->attrset[OI_ATTR]);
object_info_t oi;
- oi.decode(push_op->attrset[OI_ATTR]);
+ oi.decode_no_oid(push_op->attrset[OI_ATTR]);
new_progress.first = false;
return oi.version;
});
}
-seastar::future<uint64_t>
+RecoveryBackend::interruptible_future<uint64_t>
ReplicatedRecoveryBackend::read_object_for_push_op(
const hobject_t& oid,
const interval_set<uint64_t>& copy_subset,
}
// 1. get the extents in the interested range
return backend->fiemap(coll, ghobject_t{oid},
- 0, copy_subset.range_end()).then_wrapped(
+ 0, copy_subset.range_end()).then_wrapped_interruptible(
[=](auto&& fiemap_included) mutable {
interval_set<uint64_t> extents;
try {
// 3. read the truncated extents
// TODO: check if the returned extents are pruned
return store->readv(coll, ghobject_t{oid}, push_op->data_included, 0);
- }).safe_then([push_op, range_end=copy_subset.range_end()](auto &&bl) {
+ }).safe_then_interruptible([push_op, range_end=copy_subset.range_end()](auto &&bl) {
push_op->data.claim_append(std::move(bl));
uint64_t recovered_to = 0;
if (push_op->data_included.empty()) {
}));
}
-seastar::future<>
+RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::read_omap_for_push_op(
const hobject_t& oid,
const ObjectRecoveryProgress& progress,
return shards;
}
-seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
{
logger().debug("{}: {}", __func__, *m);
- return seastar::parallel_for_each(m->take_pulls(),
- [this, from=m->from](auto& pull_op) {
- const hobject_t& soid = pull_op.soid;
- logger().debug("handle_pull: {}", soid);
- return backend->stat(coll, ghobject_t(soid)).then(
- [this, &pull_op](auto st) {
- ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
- ObjectRecoveryProgress &progress = pull_op.recovery_progress;
- if (progress.first && recovery_info.size == ((uint64_t) -1)) {
- // Adjust size and copy_subset
- recovery_info.size = st.st_size;
- if (st.st_size) {
- interval_set<uint64_t> object_range;
- object_range.insert(0, st.st_size);
- recovery_info.copy_subset.intersection_of(object_range);
- } else {
- recovery_info.copy_subset.clear();
+ if (pg.can_discard_replica_op(*m)) {
+ logger().debug("{}: discarding {}", __func__, *m);
+ return seastar::now();
+ }
+ return seastar::do_with(m->take_pulls(), [this, from=m->from](auto& pulls) {
+ return interruptor::parallel_for_each(pulls,
+ [this, from](auto& pull_op) {
+ const hobject_t& soid = pull_op.soid;
+ logger().debug("handle_pull: {}", soid);
+ return backend->stat(coll, ghobject_t(soid)).then_interruptible(
+ [this, &pull_op](auto st) {
+ ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
+ ObjectRecoveryProgress &progress = pull_op.recovery_progress;
+ if (progress.first && recovery_info.size == ((uint64_t) -1)) {
+ // Adjust size and copy_subset
+ recovery_info.size = st.st_size;
+ if (st.st_size) {
+ interval_set<uint64_t> object_range;
+ object_range.insert(0, st.st_size);
+ recovery_info.copy_subset.intersection_of(object_range);
+ } else {
+ recovery_info.copy_subset.clear();
+ }
+ assert(recovery_info.clone_subset.empty());
}
- assert(recovery_info.clone_subset.empty());
- }
- return build_push_op(recovery_info, progress, 0);
- }).then([this, from](auto pop) {
- auto msg = make_message<MOSDPGPush>();
- msg->from = pg.get_pg_whoami();
- msg->pgid = pg.get_pgid();
- msg->map_epoch = pg.get_osdmap_epoch();
- msg->min_epoch = pg.get_last_peering_reset();
- msg->set_priority(pg.get_recovery_op_priority());
- msg->pushes.push_back(std::move(pop));
- return shard_services.send_to_osd(from.osd, std::move(msg),
- pg.get_osdmap_epoch());
+ return build_push_op(recovery_info, progress, 0);
+ }).then_interruptible([this, from](auto pop) {
+ auto msg = crimson::make_message<MOSDPGPush>();
+ msg->from = pg.get_pg_whoami();
+ msg->pgid = pg.get_pgid();
+ msg->map_epoch = pg.get_osdmap_epoch();
+ msg->min_epoch = pg.get_last_peering_reset();
+ msg->set_priority(pg.get_recovery_op_priority());
+ msg->pushes.push_back(std::move(pop));
+ return shard_services.send_to_osd(from.osd, std::move(msg),
+ pg.get_osdmap_epoch());
+ });
});
});
}
-seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
+RecoveryBackend::interruptible_future<bool>
+ReplicatedRecoveryBackend::_handle_pull_response(
pg_shard_t from,
- const PushOp& pop,
+ PushOp& pop,
PullOp* response,
ceph::os::Transaction* t)
{
if (pi.recovery_info.version == eversion_t())
pi.recovery_info.version = pop.version;
- auto prepare_waiter = seastar::make_ready_future<>();
+ auto prepare_waiter = interruptor::make_interruptible(
+ seastar::make_ready_future<>());
if (pi.recovery_progress.first) {
prepare_waiter = pg.with_head_obc<RWState::RWNONE>(
pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) {
pi.obc = obc;
recovery_waiter.obc = obc;
- obc->obs.oi.decode(pop.attrset.at(OI_ATTR));
+ obc->obs.oi.decode_no_oid(pop.attrset.at(OI_ATTR), pop.soid);
pi.recovery_info.oi = obc->obs.oi;
return crimson::osd::PG::load_obc_ertr::now();
- }).handle_error(crimson::ct_error::assert_all{});
+ }).handle_error_interruptible(crimson::ct_error::assert_all{});
};
- return prepare_waiter.then([this, &pi, &pop, t, response]() mutable {
+ return prepare_waiter.then_interruptible([this, &pi, &pop, t, response]() mutable {
const bool first = pi.recovery_progress.first;
pi.recovery_progress = pop.after_progress;
logger().debug("new recovery_info {}, new progress {}",
bool complete = pi.is_complete();
bool clear_omap = !pop.before_progress.omap_complete;
return submit_push_data(pi.recovery_info, first, complete, clear_omap,
- std::move(data_zeros), usable_intervals, data, pop.omap_header,
- pop.attrset, pop.omap_entries, t).then(
+ std::move(data_zeros), std::move(usable_intervals),
+ std::move(data), std::move(pop.omap_header),
+ pop.attrset, std::move(pop.omap_entries), t)
+ .then_interruptible(
[this, response, &pi, &pop, complete, t, bytes_recovered=data.length()] {
pi.stat.num_keys_recovered += pop.omap_entries.size();
pi.stat.num_bytes_recovered += bytes_recovered;
});
}
-seastar::future<> ReplicatedRecoveryBackend::handle_pull_response(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_pull_response(
Ref<MOSDPGPush> m)
{
+ if (pg.can_discard_replica_op(*m)) {
+ logger().debug("{}: discarding {}", __func__, *m);
+ return seastar::now();
+ }
const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now.
if (pop.version == eversion_t()) {
// replica doesn't have it!
[this, &response](auto& t, auto& m) {
pg_shard_t from = m->from;
PushOp& pop = m->pushes[0]; // only one push per message for now
- return _handle_pull_response(from, pop, &response, &t).then(
+ return _handle_pull_response(from, pop, &response, &t).then_interruptible(
[this, &t](bool complete) {
epoch_t epoch_frozen = pg.get_osdmap_epoch();
+ logger().debug("ReplicatedRecoveryBackend::handle_pull_response: do_transaction...");
return shard_services.get_store().do_transaction(coll, std::move(t))
.then([this, epoch_frozen, complete,
last_complete = pg.get_info().last_complete] {
return seastar::make_ready_future<bool>(complete);
});
});
- }).then([this, m, &response](bool complete) {
+ }).then_interruptible([this, m, &response](bool complete) {
if (complete) {
auto& pop = m->pushes[0];
recovering.at(pop.soid).set_pulled();
return seastar::make_ready_future<>();
} else {
- auto reply = make_message<MOSDPGPull>();
+ auto reply = crimson::make_message<MOSDPGPull>();
reply->from = pg.get_pg_whoami();
reply->set_priority(m->get_priority());
reply->pgid = pg.get_info().pgid;
});
}
-seastar::future<> ReplicatedRecoveryBackend::_handle_push(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::_handle_push(
pg_shard_t from,
- const PushOp &pop,
+ PushOp &pop,
PushReplyOp *response,
ceph::os::Transaction *t)
{
response->soid = pop.recovery_info.soid;
return submit_push_data(pop.recovery_info, first, complete, clear_omap,
- std::move(data_zeros), pop.data_included, pop.data, pop.omap_header,
- pop.attrset, pop.omap_entries, t).then([this, complete, &pop, t] {
+ std::move(data_zeros), std::move(pop.data_included),
+ std::move(pop.data), std::move(pop.omap_header),
+ pop.attrset, std::move(pop.omap_entries), t)
+ .then_interruptible(
+ [this, complete, &pop, t] {
if (complete) {
pg.get_recovery_handler()->on_local_recover(
pop.recovery_info.soid, pop.recovery_info,
});
}
-seastar::future<> ReplicatedRecoveryBackend::handle_push(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_push(
Ref<MOSDPGPush> m)
{
+ if (pg.can_discard_replica_op(*m)) {
+ logger().debug("{}: discarding {}", __func__, *m);
+ return seastar::now();
+ }
if (pg.is_primary()) {
return handle_pull_response(m);
}
logger().debug("{}: {}", __func__, *m);
return seastar::do_with(PushReplyOp(), [this, m](auto& response) {
- const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now
+ PushOp& pop = m->pushes[0]; // TODO: only one push per message for now
return seastar::do_with(ceph::os::Transaction(),
[this, m, &pop, &response](auto& t) {
- return _handle_push(m->from, pop, &response, &t).then(
+ return _handle_push(m->from, pop, &response, &t).then_interruptible(
[this, &t] {
epoch_t epoch_frozen = pg.get_osdmap_epoch();
- return shard_services.get_store().do_transaction(coll, std::move(t)).then(
+ logger().debug("ReplicatedRecoveryBackend::handle_push: do_transaction...");
+ return interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(coll, std::move(t))).then_interruptible(
[this, epoch_frozen, last_complete = pg.get_info().last_complete] {
//TODO: this should be grouped with pg.on_local_recover somehow.
pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
});
});
- }).then([this, m, &response]() mutable {
- auto reply = make_message<MOSDPGPushReply>();
+ }).then_interruptible([this, m, &response]() mutable {
+ auto reply = crimson::make_message<MOSDPGPushReply>();
reply->from = pg.get_pg_whoami();
reply->set_priority(m->get_priority());
reply->pgid = pg.get_info().pgid;
});
}
-seastar::future<std::optional<PushOp>>
+RecoveryBackend::interruptible_future<std::optional<PushOp>>
ReplicatedRecoveryBackend::_handle_push_reply(
pg_shard_t peer,
const PushReplyOp &op)
bool error = pi.recovery_progress.error;
if (!pi.recovery_progress.data_complete && !error) {
return build_push_op(pi.recovery_info, pi.recovery_progress,
- &pi.stat).then([&pi] (auto pop) {
+ &pi.stat).then_interruptible([&pi] (auto pop) {
pi.recovery_progress = pop.after_progress;
return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
- }).handle_exception([recovering_iter, &pi, peer] (auto e) {
+ }).handle_exception_interruptible([recovering_iter, &pi, peer] (auto e) {
pi.recovery_progress.error = true;
recovering_iter->second.set_push_failed(peer, e);
return seastar::make_ready_future<std::optional<PushOp>>();
}
}
-seastar::future<> ReplicatedRecoveryBackend::handle_push_reply(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_push_reply(
Ref<MOSDPGPushReply> m)
{
logger().debug("{}: {}", __func__, *m);
auto from = m->from;
auto& push_reply = m->replies[0]; //TODO: only one reply per message
- return _handle_push_reply(from, push_reply).then(
+ return _handle_push_reply(from, push_reply).then_interruptible(
[this, from](std::optional<PushOp> push_op) {
if (push_op) {
- auto msg = make_message<MOSDPGPush>();
+ auto msg = crimson::make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
return {intervals_usable, data_usable};
}
-seastar::future<> ReplicatedRecoveryBackend::submit_push_data(
- const ObjectRecoveryInfo &recovery_info,
+RecoveryBackend::interruptible_future<hobject_t>
+ReplicatedRecoveryBackend::prep_push_target(
+ const ObjectRecoveryInfo& recovery_info,
bool first,
bool complete,
bool clear_omap,
- interval_set<uint64_t> data_zeros,
- const interval_set<uint64_t> &intervals_included,
- bufferlist data_included,
- bufferlist omap_header,
- const map<string, bufferlist> &attrs,
- const map<string, bufferlist> &omap_entries,
- ObjectStore::Transaction *t)
+ ObjectStore::Transaction* t,
+ const map<string, bufferlist, less<>>& attrs,
+ bufferlist&& omap_header)
{
- logger().debug("{}", __func__);
- hobject_t target_oid;
- if (first && complete) {
- target_oid = recovery_info.soid;
+ if (!first) {
+ return seastar::make_ready_future<hobject_t>(
+ get_temp_recovery_object(recovery_info.soid,
+ recovery_info.version));
+ }
+
+ ghobject_t target_oid;
+ if (complete) {
+ // overwrite the original object
+ target_oid = ghobject_t(recovery_info.soid);
} else {
- target_oid = get_temp_recovery_object(recovery_info.soid,
- recovery_info.version);
- if (first) {
- logger().debug("{}: Adding oid {} in the temp collection",
- __func__, target_oid);
- add_temp_obj(target_oid);
+ target_oid = ghobject_t(get_temp_recovery_object(recovery_info.soid,
+ recovery_info.version));
+ logger().debug("{}: Adding oid {} in the temp collection",
+ __func__, target_oid);
+ add_temp_obj(target_oid.hobj);
+ }
+ // create a new object
+ if (!complete || !recovery_info.object_exist) {
+ t->remove(coll->get_cid(), target_oid);
+ t->touch(coll->get_cid(), target_oid);
+ object_info_t oi;
+ oi.decode_no_oid(attrs.at(OI_ATTR));
+ t->set_alloc_hint(coll->get_cid(), target_oid,
+ oi.expected_object_size,
+ oi.expected_write_size,
+ oi.alloc_hint_flags);
+ }
+ if (complete) {
+ // remove xattr and update later if overwrite on original object
+ t->rmattrs(coll->get_cid(), target_oid);
+ // if need update omap, clear the previous content first
+ if (clear_omap) {
+ t->omap_clear(coll->get_cid(), target_oid);
}
}
-
- return [this, &recovery_info, first, complete, t,
- &omap_header, &attrs, target_oid, clear_omap] {
- if (first) {
- if (!complete) {
- t->remove(coll->get_cid(), ghobject_t(target_oid));
- t->touch(coll->get_cid(), ghobject_t(target_oid));
- bufferlist bv = attrs.at(OI_ATTR);
- object_info_t oi(bv);
- t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid),
- oi.expected_object_size,
- oi.expected_write_size,
- oi.alloc_hint_flags);
- } else {
- if (!recovery_info.object_exist) {
- t->remove(coll->get_cid(), ghobject_t(target_oid));
- t->touch(coll->get_cid(), ghobject_t(target_oid));
- bufferlist bv = attrs.at(OI_ATTR);
- object_info_t oi(bv);
- t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid),
- oi.expected_object_size,
- oi.expected_write_size,
- oi.alloc_hint_flags);
- }
- //remove xattr and update later if overwrite on original object
- t->rmattrs(coll->get_cid(), ghobject_t(target_oid));
- //if need update omap, clear the previous content first
- if (clear_omap)
- t->omap_clear(coll->get_cid(), ghobject_t(target_oid));
- }
-
- t->truncate(coll->get_cid(), ghobject_t(target_oid), recovery_info.size);
- if (omap_header.length())
- t->omap_setheader(coll->get_cid(), ghobject_t(target_oid), omap_header);
-
- return store->stat(coll, ghobject_t(recovery_info.soid)).then(
- [this, &recovery_info, complete, t, target_oid,
- omap_header = std::move(omap_header)] (auto st) {
- //TODO: pg num bytes counting
- if (!complete) {
- //clone overlap content in local object
- if (recovery_info.object_exist) {
- uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
- interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
- if (local_size) {
- local_intervals_included.insert(0, local_size);
- local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
- local_intervals_included.subtract(local_intervals_excluded);
- }
- for (auto [off, len] : local_intervals_included) {
- logger().debug(" clone_range {} {}~{}",
- recovery_info.soid, off, len);
- t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid),
- ghobject_t(target_oid), off, len, off);
- }
- }
- }
- return seastar::make_ready_future<>();
- });
+ t->truncate(coll->get_cid(), target_oid, recovery_info.size);
+ if (omap_header.length()) {
+ t->omap_setheader(coll->get_cid(), target_oid, omap_header);
+ }
+ if (complete || !recovery_info.object_exist) {
+ return seastar::make_ready_future<hobject_t>(target_oid.hobj);
+ }
+ // clone overlap content in local object if using a new object
+ return interruptor::make_interruptible(store->stat(coll, ghobject_t(recovery_info.soid)))
+ .then_interruptible(
+ [this, &recovery_info, t, target_oid] (auto st) {
+ // TODO: pg num bytes counting
+ uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
+ interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
+ if (local_size) {
+ local_intervals_included.insert(0, local_size);
+ local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
+ local_intervals_included.subtract(local_intervals_excluded);
}
- return seastar::make_ready_future<>();
- }().then([this, data_zeros=std::move(data_zeros),
- &recovery_info, &intervals_included, t, target_oid,
- &omap_entries, &attrs, data_included, complete, first]() mutable {
+ for (auto [off, len] : local_intervals_included) {
+ logger().debug(" clone_range {} {}~{}",
+ recovery_info.soid, off, len);
+ t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid),
+ target_oid, off, len, off);
+ }
+ return seastar::make_ready_future<hobject_t>(target_oid.hobj);
+ });
+}
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::submit_push_data(
+ const ObjectRecoveryInfo &recovery_info,
+ bool first,
+ bool complete,
+ bool clear_omap,
+ interval_set<uint64_t>&& data_zeros,
+ interval_set<uint64_t>&& intervals_included,
+ bufferlist&& data_included,
+ bufferlist&& omap_header,
+ const map<string, bufferlist, less<>> &attrs,
+ map<string, bufferlist>&& omap_entries,
+ ObjectStore::Transaction *t)
+{
+ logger().debug("{}", __func__);
+ return prep_push_target(recovery_info, first, complete,
+ clear_omap, t, attrs,
+ std::move(omap_header)).then_interruptible(
+ [this,
+ &recovery_info, t,
+ first, complete,
+ data_zeros=std::move(data_zeros),
+ intervals_included=std::move(intervals_included),
+ data_included=std::move(data_included),
+ omap_entries=std::move(omap_entries),
+ &attrs](auto target_oid) mutable {
+
uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
// Punch zeros for data, if fiemap indicates nothing but it is marked dirty
if (!data_zeros.empty()) {
}
}
-seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete_reply(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_recovery_delete_reply(
Ref<MOSDPGRecoveryDeleteReply> m)
{
auto& p = m->objects.front();
return seastar::now();
}
-seastar::future<> ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m)
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m)
{
switch (m->get_header().type) {
case MSG_OSD_PG_PULL: