// vim: ts=8 sw=2 smarttab
#include <seastar/core/future.hh>
+#include <seastar/core/sleep.hh>
#include "messages/MOSDOp.h"
Ref<PG> pg,
ShardServices &ss,
epoch_t epoch_started,
- crimson::osd::scheduler::scheduler_class_t scheduler_class)
+ crimson::osd::scheduler::scheduler_class_t scheduler_class,
+ float delay)
: pg(pg),
epoch_started(epoch_started),
+ delay(delay),
ss(ss),
scheduler_class(scheduler_class)
{}
logger().debug("{}: start", *this);
IRef ref = this;
- return ss.throttler.with_throttle_while(
- this, get_scheduler_params(), [this] {
- return do_recovery();
- }).handle_exception_type([ref, this](const std::system_error& err) {
- if (err.code() == std::make_error_code(std::errc::interrupted)) {
- logger().debug("{} recovery interruped: {}", *pg, err.what());
- return seastar::now();
- }
- return seastar::make_exception_future<>(err);
- });
+ auto maybe_delay = seastar::now();
+ if (delay) {
+ maybe_delay = seastar::sleep(
+ std::chrono::milliseconds(std::lround(delay * 1000)));
+ }
+ return maybe_delay.then([ref, this] {
+ return ss.throttler.with_throttle_while(
+ this, get_scheduler_params(), [this] {
+ return interruptor::with_interruption([this] {
+ return do_recovery();
+ }, [](std::exception_ptr) {
+ return seastar::make_ready_future<bool>(false);
+ }, pg);
+ }).handle_exception_type([ref, this](const std::system_error& err) {
+ if (err.code() == std::make_error_code(std::errc::interrupted)) {
+ logger().debug("{} recovery interruped: {}", *pg, err.what());
+ return seastar::now();
+ }
+ return seastar::make_exception_future<>(err);
+ });
+ });
}
-seastar::future<bool> UrgentRecovery::do_recovery()
+UrgentRecovery::interruptible_future<bool>
+UrgentRecovery::do_recovery()
{
+ logger().debug("{}: {}", __func__, *this);
if (!pg->has_reset_since(epoch_started)) {
- return with_blocking_future(
+ return with_blocking_future_interruptible<interruptor::condition>(
pg->get_recovery_handler()->recover_missing(soid, need)
- ).then([] {
+ ).then_interruptible([] {
return seastar::make_ready_future<bool>(false);
});
}
void UrgentRecovery::print(std::ostream &lhs) const
{
lhs << "UrgentRecovery(" << pg->get_pgid() << ", "
- << soid << ", v" << need << ")";
+ << soid << ", v" << need << ", epoch_started: "
+ << epoch_started << ")";
}
void UrgentRecovery::dump_detail(Formatter *f) const
PglogBasedRecovery::PglogBasedRecovery(
Ref<PG> pg,
ShardServices &ss,
- const epoch_t epoch_started)
+ const epoch_t epoch_started,
+ float delay)
: BackgroundRecovery(
std::move(pg),
ss,
epoch_started,
- crimson::osd::scheduler::scheduler_class_t::background_recovery)
+ crimson::osd::scheduler::scheduler_class_t::background_recovery,
+ delay)
{}
-seastar::future<bool> PglogBasedRecovery::do_recovery()
+PglogBasedRecovery::interruptible_future<bool>
+PglogBasedRecovery::do_recovery()
{
- if (pg->has_reset_since(epoch_started))
+ if (pg->has_reset_since(epoch_started)) {
return seastar::make_ready_future<bool>(false);
- return with_blocking_future(
+ }
+ return with_blocking_future_interruptible<interruptor::condition>(
pg->get_recovery_handler()->start_recovery_ops(
crimson::common::local_conf()->osd_recovery_max_single_start));
}
return pg.backfill_pipeline;
}
-seastar::future<bool> BackfillRecovery::do_recovery()
+BackfillRecovery::interruptible_future<bool>
+BackfillRecovery::do_recovery()
{
logger().debug("{}", __func__);
return seastar::make_ready_future<bool>(false);
}
// TODO: limits
- return with_blocking_future(
+ return with_blocking_future_interruptible<interruptor::condition>(
// process_event() of our boost::statechart machine is non-reentrant.
// with the backfill_pipeline we protect it from a second entry from
// the implementation of BackfillListener.
// additionally, this stage serves to synchronize with PeeringEvent.
handle.enter(bp(*pg).process)
- ).then([this] {
+ ).then_interruptible([this] {
pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt));
return seastar::make_ready_future<bool>(false);
});