namespace crimson::osd {
+PG::interruptible_future<>
+PG::SnapTrimMutex::lock(SnapTrimEvent &st_event) noexcept
+{
+ return st_event.enter_stage<interruptor>(wait_pg
+ ).then_interruptible([this] {
+ return mutex.lock();
+ });
+}
+
void SnapTrimEvent::SubOpBlocker::dump_detail(Formatter *f) const
{
f->open_array_section("dependent_operations");
});
}
-CommonPGPipeline& SnapTrimEvent::pp()
+CommonPGPipeline& SnapTrimEvent::client_pp()
{
return pg->request_pg_pipeline;
}
{
return interruptor::with_interruption([&shard_services, this] {
return enter_stage<interruptor>(
- pp().wait_for_active
+ client_pp().wait_for_active
).then_interruptible([this] {
return with_blocking_event<PGActivationBlocker::BlockingEvent,
interruptor>([this] (auto&& trigger) {
});
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().recover_missing);
+ client_pp().recover_missing);
}).then_interruptible([] {
//return do_recover_missing(pg, get_target_oid());
return seastar::now();
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().get_obc);
+ client_pp().get_obc);
+ }).then_interruptible([this] {
+ return pg->snaptrim_mutex.lock(*this);
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().process);
+ client_pp().process);
}).then_interruptible([&shard_services, this] {
return interruptor::async([this] {
std::vector<hobject_t> to_trim;
if (to_trim.empty()) {
// the legit ENOENT -> done
logger().debug("{}: to_trim is empty! Stopping iteration", *this);
+ pg->snaptrim_mutex.unlock();
return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
}
- for (const auto& object : to_trim) {
- logger().debug("{}: trimming {}", *this, object);
- auto [op, fut] = shard_services.start_operation_may_interrupt<
- interruptor, SnapTrimObjSubEvent>(
- pg,
- object,
- snapid);
- subop_blocker.emplace_back(
- op->get_id(),
- std::move(fut)
- );
- }
- return enter_stage<interruptor>(
- wait_subop
- ).then_interruptible([this] {
+ return [&shard_services, this](const auto &to_trim) {
+ for (const auto& object : to_trim) {
+ logger().debug("{}: trimming {}", *this, object);
+ auto [op, fut] = shard_services.start_operation_may_interrupt<
+ interruptor, SnapTrimObjSubEvent>(
+ pg,
+ object,
+ snapid);
+ subop_blocker.emplace_back(
+ op->get_id(),
+ std::move(fut)
+ );
+ }
+ return interruptor::now();
+ }(to_trim).then_interruptible([this] {
+ return enter_stage<interruptor>(wait_subop);
+ }).then_interruptible([this] {
logger().debug("{}: awaiting completion", *this);
return subop_blocker.wait_completion();
- }).safe_then_interruptible([this] {
+ }).finally([this] {
+ pg->snaptrim_mutex.unlock();
+ }).safe_then_interruptible([this] {
if (!needs_pause) {
return interruptor::now();
}
}
-CommonPGPipeline& SnapTrimObjSubEvent::pp()
+CommonPGPipeline& SnapTrimObjSubEvent::client_pp()
{
return pg->request_pg_pipeline;
}
ShardServices &shard_services, Ref<PG> _pg)
{
return enter_stage<interruptor>(
- pp().wait_for_active
+ client_pp().wait_for_active
).then_interruptible([this] {
return with_blocking_event<PGActivationBlocker::BlockingEvent,
interruptor>([this] (auto&& trigger) {
});
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().recover_missing);
+ client_pp().recover_missing);
}).then_interruptible([] {
//return do_recover_missing(pg, get_target_oid());
return seastar::now();
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().get_obc);
+ client_pp().get_obc);
}).then_interruptible([this] {
logger().debug("{}: getting obc for {}", *this, coid);
// end of commonality
- // with_head_and_clone_obc lock both clone's and head's obcs
- return pg->obc_loader.with_head_and_clone_obc<RWState::RWWRITE>(
+ // with_clone_obc_direct lock both clone's and head's obcs
+ return pg->obc_loader.with_clone_obc_direct<RWState::RWWRITE>(
coid,
[this](auto head_obc, auto clone_obc) {
logger().debug("{}: got clone_obc={}", *this, clone_obc->get_oid());
return enter_stage<interruptor>(
- pp().process
+ client_pp().process
).then_interruptible(
[this,clone_obc=std::move(clone_obc), head_obc=std::move(head_obc)]() mutable {
logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid());