#include "crimson/osd/pg_backend.h"
#include "crimson/osd/pg_recovery.h"
-#include "messages/MOSDPGPull.h"
-#include "messages/MOSDPGPush.h"
-#include "messages/MOSDPGPushReply.h"
-#include "messages/MOSDPGRecoveryDelete.h"
-#include "messages/MOSDPGRecoveryDeleteReply.h"
-
#include "osd/osd_types.h"
#include "osd/PeeringState.h"
}
}
+using std::map;
+using std::set;
+
void PGRecovery::start_pglogbased_recovery()
{
using PglogBasedRecovery = crimson::osd::PglogBasedRecovery;
(void) pg->get_shard_services().start_operation<PglogBasedRecovery>(
static_cast<crimson::osd::PG*>(pg),
pg->get_shard_services(),
- pg->get_osdmap_epoch());
+ pg->get_osdmap_epoch(),
+ float(0.001));
}
-crimson::osd::blocking_future<bool>
+PGRecovery::blocking_interruptible_future<bool>
PGRecovery::start_recovery_ops(size_t max_to_start)
{
assert(pg->is_primary());
assert(!pg->is_backfilling());
assert(!pg->get_peering_state().is_deleting());
- std::vector<crimson::osd::blocking_future<>> started;
+ std::vector<blocking_interruptible_future<>> started;
started.reserve(max_to_start);
max_to_start -= start_primary_recovery_ops(max_to_start, &started);
if (max_to_start > 0) {
max_to_start -= start_replica_recovery_ops(max_to_start, &started);
}
- return crimson::osd::join_blocking_futures(std::move(started)).then(
+ return crimson::join_blocking_interruptible_futures<
+ ::crimson::osd::IOInterruptCondition>(std::move(started)).then_interruptible<
+ ::crimson::osd::IOInterruptCondition>(
[this] {
bool done = !pg->get_peering_state().needs_recovery();
if (done) {
size_t PGRecovery::start_primary_recovery_ops(
size_t max_to_start,
- std::vector<crimson::osd::blocking_future<>> *out)
+ std::vector<PGRecovery::blocking_interruptible_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
// TODO: handle lost/unfound
if (pg->get_recovery_backend()->is_recovering(soid)) {
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
- out->push_back(recovery_waiter.wait_for_recovered_blocking());
+ out->push_back(recovery_waiter.wait_for_recovered_blocking<
+ ::crimson::osd::IOInterruptCondition>());
++started;
} else if (pg->get_recovery_backend()->is_recovering(head)) {
++skipped;
size_t PGRecovery::start_replica_recovery_ops(
size_t max_to_start,
- std::vector<crimson::osd::blocking_future<>> *out)
+ std::vector<PGRecovery::blocking_interruptible_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
if (pg->get_recovery_backend()->is_recovering(soid)) {
logger().debug("{}: already recovering object {}", __func__, soid);
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
- out->push_back(recovery_waiter.wait_for_recovered_blocking());
+ out->push_back(recovery_waiter.wait_for_recovered_blocking<
+ ::crimson::osd::IOInterruptCondition>());
started++;
continue;
}
return started;
}
-crimson::osd::blocking_future<> PGRecovery::recover_missing(
+PGRecovery::blocking_interruptible_future<>
+PGRecovery::recover_missing(
const hobject_t &soid, eversion_t need)
{
if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
pg->get_recovery_backend()->recover_delete(soid, need));
} else {
return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->recover_object(soid, need).handle_exception(
+ pg->get_recovery_backend()->recover_object(soid, need)
+ .handle_exception_interruptible(
[=, soid = std::move(soid)] (auto e) {
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
return seastar::make_ready_future<>();
size_t PGRecovery::prep_object_replica_deletes(
const hobject_t& soid,
eversion_t need,
- std::vector<crimson::osd::blocking_future<>> *in_progress)
+ std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress)
{
in_progress->push_back(
pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->push_delete(soid, need).then([=] {
+ pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
+ [=] {
object_stat_sum_t stat_diff;
stat_diff.num_objects_recovered = 1;
on_global_recover(soid, stat_diff, true);
size_t PGRecovery::prep_object_replica_pushes(
const hobject_t& soid,
eversion_t need,
- std::vector<crimson::osd::blocking_future<>> *in_progress)
+ std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress)
{
in_progress->push_back(
pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->recover_object(soid, need).handle_exception(
+ pg->get_recovery_backend()->recover_object(soid, need)
+ .handle_exception_interruptible(
[=, soid = std::move(soid)] (auto e) {
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
return seastar::make_ready_future<>();
const bool is_delete,
ceph::os::Transaction& t)
{
+ if (const auto &log = pg->get_peering_state().get_pg_log();
+ !is_delete &&
+ log.get_missing().is_missing(recovery_info.soid) &&
+ log.get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
+ assert(pg->is_primary());
+ if (const auto* latest = log.get_log().objects.find(recovery_info.soid)->second;
+ latest->op == pg_log_entry_t::LOST_REVERT) {
+ ceph_abort("mark_unfound_lost (LOST_REVERT) is not implemented yet");
+ }
+ }
pg->get_peering_state().recover_got(soid,
recovery_info.version, is_delete, t);
const hobject_t& end)
{
logger().debug("{}: target.osd={}", __func__, target.osd);
- auto msg = make_message<MOSDPGScan>(
+ auto msg = crimson::make_message<MOSDPGScan>(
MOSDPGScan::OP_SCAN_GET_DIGEST,
pg->get_pg_whoami(),
pg->get_osdmap_epoch(),
begin,
local_conf()->osd_backfill_scan_min,
local_conf()->osd_backfill_scan_max
- ).then([this] (BackfillInterval bi) {
+ ).then_interruptible([this] (BackfillInterval bi) {
logger().debug("request_primary_scan:{}", __func__);
using BackfillState = crimson::osd::BackfillState;
start_backfill_recovery(BackfillState::PrimaryScanned{ std::move(bi) });
__func__, obj, v);
pg->get_recovery_backend()->add_recovering(obj);
std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\
- handle_exception([] (auto) {
+ handle_exception_interruptible([] (auto) {
ceph_abort_msg("got exception on backfill's push");
return seastar::make_ready_future<>();
- }).then([this, obj] {
+ }).then_interruptible([this, obj] {
logger().debug("enqueue_push:{}", __func__);
using BackfillState = crimson::osd::BackfillState;
start_backfill_recovery(BackfillState::ObjectPushed(std::move(obj)));
// allocate a pair if target is seen for the first time
auto& req = backfill_drop_requests[target];
if (!req) {
- req = ceph::make_message<MOSDPGBackfillRemove>(
+ req = crimson::make_message<MOSDPGBackfillRemove>(
spg_t(pg->get_pgid().pgid, target.shard), pg->get_osdmap_epoch());
}
req->ls.emplace_back(obj, v);
if (const pg_info_t& pinfo = pg->get_peering_state().get_peer_info(bt);
new_last_backfill > pinfo.last_backfill) {
pg->get_peering_state().update_peer_last_backfill(bt, new_last_backfill);
- auto m = make_message<MOSDPGBackfill>(
+ auto m = crimson::make_message<MOSDPGBackfill>(
pinfo.last_backfill.is_max() ? MOSDPGBackfill::OP_BACKFILL_FINISH
: MOSDPGBackfill::OP_BACKFILL_PROGRESS,
pg->get_osdmap_epoch(),