]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/common/operation.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / common / operation.h
index 22fefd005d8bf68eb30453a67a2aadfe9d4e67b1..f26a3e860bc2b950a8ab9cd79bd165d3dec031b9 100644 (file)
@@ -7,6 +7,7 @@
 #include <array>
 #include <set>
 #include <vector>
+#include <boost/core/demangle.hpp>
 #include <boost/intrusive/list.hpp>
 #include <boost/intrusive_ptr.hpp>
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
 #include <seastar/core/future-util.hh>
 
 #include "include/ceph_assert.h"
+#include "include/utime.h"
+#include "common/Clock.h"
+#include "common/Formatter.h"
 #include "crimson/common/interruptible_future.h"
+#include "crimson/common/smp_helpers.h"
+#include "crimson/common/log.h"
 
 namespace ceph {
   class Formatter;
@@ -31,189 +37,280 @@ using registry_hook_t = boost::intrusive::list_member_hook<
 class Operation;
 class Blocker;
 
+
+namespace detail {
+void dump_time_event(const char* name,
+                    const utime_t& timestamp,
+                    ceph::Formatter* f);
+void dump_blocking_event(const char* name,
+                        const utime_t& timestamp,
+                        const Blocker* blocker,
+                        ceph::Formatter* f);
+} // namespace detail
+
 /**
- * Provides an abstraction for registering and unregistering a blocker
- * for the duration of a future becoming available.
+ * Provides an interface for dumping diagnostic information about
+ * why a particular op is not making progress.
  */
-template <typename Fut>
-class blocking_future_detail {
-  friend class Operation;
-  friend class Blocker;
-  Blocker *blocker;
-  Fut fut;
-  blocking_future_detail(Blocker *b, Fut &&f)
-    : blocker(b), fut(std::move(f)) {}
+class Blocker {
+public:
+  void dump(ceph::Formatter *f) const;
+  virtual ~Blocker() = default;
 
-  template <typename V, typename... U>
-  friend blocking_future_detail<seastar::future<V>>
-  make_ready_blocking_future(U&&... args);
+private:
+  virtual void dump_detail(ceph::Formatter *f) const = 0;
+  virtual const char *get_type_name() const = 0;
+};
 
-  template <typename V, typename Exception>
-  friend blocking_future_detail<seastar::future<V>>
-  make_exception_blocking_future(Exception&& e);
+// the main template. by default an operation has no extenral
+// event handler (the empty tuple). specializing the template
+// allows to define backends on per-operation-type manner.
+// NOTE: basically this could be a function but C++ disallows
+// differentiating return type among specializations.
+template <class T>
+struct EventBackendRegistry {
+  template <typename...> static constexpr bool always_false = false;
+
+  static std::tuple<> get_backends() {
+    static_assert(always_false<T>, "Registry specialization not found");
+    return {};
+  }
+};
 
-  template <typename U>
-  friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
+template <class T>
+struct Event {
+  T* that() {
+    return static_cast<T*>(this);
+  }
+  const T* that() const {
+    return static_cast<const T*>(this);
+  }
 
-  template <typename InterruptCond, typename T>
-  friend blocking_future_detail<
-    ::crimson::interruptible::interruptible_future<InterruptCond>>
-  join_blocking_interruptible_futures(T&& t);
+  template <class OpT, class... Args>
+  void trigger(OpT&& op, Args&&... args) {
+    that()->internal_backend.handle(*that(),
+                                    std::forward<OpT>(op),
+                                    std::forward<Args>(args)...);
+    // let's call `handle()` for concrete event type from each single
+    // of our backends. the order in the registry matters.
+    std::apply([&, //args=std::forward_as_tuple(std::forward<Args>(args)...),
+               this] (auto... backend) {
+      (..., backend.handle(*that(),
+                           std::forward<OpT>(op),
+                           std::forward<Args>(args)...));
+    }, EventBackendRegistry<std::decay_t<OpT>>::get_backends());
+  }
+};
 
-  template <typename U>
-  friend class blocking_future_detail;
 
-public:
-  template <typename F>
-  auto then(F &&f) && {
-    using result = decltype(std::declval<Fut>().then(f));
-    return blocking_future_detail<seastar::futurize_t<result>>(
-      blocker,
-      std::move(fut).then(std::forward<F>(f)));
+// simplest event type for recording things like beginning or end
+// of TrackableOperation's life.
+template <class T>
+struct TimeEvent : Event<T> {
+  struct Backend {
+    // `T` is passed solely to let implementations to discriminate
+    // basing on the type-of-event.
+    virtual void handle(T&, const Operation&) = 0;
+  };
+
+  // for the sake of dumping ops-in-flight.
+  struct InternalBackend final : Backend {
+    void handle(T&, const Operation&) override {
+      timestamp = ceph_clock_now();
+    }
+    utime_t timestamp;
+  } internal_backend;
+
+  void dump(ceph::Formatter *f) const {
+    auto demangled_name = boost::core::demangle(typeid(T).name());
+    detail::dump_time_event(
+      demangled_name.c_str(),
+      internal_backend.timestamp, f);
   }
-  template <typename InterruptCond, typename F>
-  auto then_interruptible(F &&f) && {
-    using result = decltype(std::declval<Fut>().then_interruptible(f));
-    return blocking_future_detail<
-      typename ::crimson::interruptible::interruptor<
-       InterruptCond>::template futurize<result>::type>(
-      blocker,
-      std::move(fut).then_interruptible(std::forward<F>(f)));
+
+  auto get_timestamp() const {
+    return internal_backend.timestamp;
   }
 };
 
-template <typename T=void>
-using blocking_future = blocking_future_detail<seastar::future<T>>;
 
-template <typename InterruptCond, typename T = void>
-using blocking_interruptible_future = blocking_future_detail<
-  ::crimson::interruptible::interruptible_future<InterruptCond, T>>;
-
-template <typename InterruptCond, typename V, typename U>
-blocking_interruptible_future<InterruptCond, V>
-make_ready_blocking_interruptible_future(U&& args) {
-  return blocking_interruptible_future<InterruptCond, V>(
-    nullptr,
-    seastar::make_ready_future<V>(std::forward<U>(args)));
-}
-
-template <typename InterruptCond, typename V, typename Exception>
-blocking_interruptible_future<InterruptCond, V>
-make_exception_blocking_interruptible_future(Exception&& e) {
-  return blocking_interruptible_future<InterruptCond, V>(
-    nullptr,
-    seastar::make_exception_future<InterruptCond, V>(e));
-}
+template <typename T>
+class BlockerT : public Blocker {
+public:
+  struct BlockingEvent : Event<typename T::BlockingEvent> {
+    using Blocker = std::decay_t<T>;
+
+    struct Backend {
+      // `T` is based solely to let implementations to discriminate
+      // basing on the type-of-event.
+      virtual void handle(typename T::BlockingEvent&, const Operation&, const T&) = 0;
+    };
+
+    struct InternalBackend : Backend {
+      void handle(typename T::BlockingEvent&,
+                  const Operation&,
+                  const T& blocker) override {
+        this->timestamp = ceph_clock_now();
+        this->blocker = &blocker;
+      }
 
-template <typename V=void, typename... U>
-blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&&... args) {
-  return blocking_future<V>(
-    nullptr,
-    seastar::make_ready_future<V>(std::forward<U>(args)...));
-}
+      utime_t timestamp;
+      const T* blocker;
+    } internal_backend;
+
+    // we don't want to make any BlockerT to be aware and coupled with
+    // an operation. to not templatize an entire path from an op to
+    // a blocker, type erasuring is used.
+    struct TriggerI {
+      TriggerI(BlockingEvent& event) : event(event) {}
+
+      template <class FutureT>
+      auto maybe_record_blocking(FutureT&& fut, const T& blocker) {
+        if (!fut.available()) {
+         // a full blown call via vtable. that's the cost for templatization
+         // avoidance. anyway, most of the things actually have the type
+         // knowledge.
+         record_blocking(blocker);
+         return std::forward<FutureT>(fut).finally(
+           [&event=this->event, &blocker] () mutable {
+           // beware trigger instance may be already dead when this
+           // is executed!
+           record_unblocking(event, blocker);
+         });
+       }
+       return std::forward<FutureT>(fut);
+      }
+      virtual ~TriggerI() = default;
+    protected:
+      // it's for the sake of erasing the OpT type
+      virtual void record_blocking(const T& blocker) = 0;
+
+      static void record_unblocking(BlockingEvent& event, const T& blocker) {
+       assert(event.internal_backend.blocker == &blocker);
+       event.internal_backend.blocker = nullptr;
+      }
 
-template <typename V, typename Exception>
-blocking_future_detail<seastar::future<V>>
-make_exception_blocking_future(Exception&& e) {
-  return blocking_future<V>(
-    nullptr,
-    seastar::make_exception_future<V>(e));
-}
+      BlockingEvent& event;
+    };
+
+    template <class OpT>
+    struct Trigger : TriggerI {
+      Trigger(BlockingEvent& event, const OpT& op) : TriggerI(event), op(op) {}
+
+      template <class FutureT>
+      auto maybe_record_blocking(FutureT&& fut, const T& blocker) {
+        if (!fut.available()) {
+         // no need for the dynamic dispatch! if we're lucky, a compiler
+         // should collapse all these abstractions into a bunch of movs.
+         this->Trigger::record_blocking(blocker);
+         return std::forward<FutureT>(fut).finally(
+           [&event=this->event, &blocker] () mutable {
+           Trigger::record_unblocking(event, blocker);
+         });
+       }
+       return std::forward<FutureT>(fut);
+      }
 
-/**
- * Provides an interface for dumping diagnostic information about
- * why a particular op is not making progress.
- */
-class Blocker {
-public:
-  template <typename T>
-  blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
-    return blocking_future<T>(this, std::move(f));
-  }
+      const OpT &get_op() { return op; }
 
-  template <typename InterruptCond, typename T>
-  blocking_interruptible_future<InterruptCond, T>
-  make_blocking_future(
-      crimson::interruptible::interruptible_future<InterruptCond, T> &&f) {
-    return blocking_interruptible_future<InterruptCond, T>(
-      this, std::move(f));
-  }
-  template <typename InterruptCond, typename T = void>
-  blocking_interruptible_future<InterruptCond, T>
-  make_blocking_interruptible_future(seastar::future<T> &&f) {
-    return blocking_interruptible_future<InterruptCond, T>(
-       this,
-       ::crimson::interruptible::interruptor<InterruptCond>::make_interruptible(
-         std::move(f)));
-  }
+    protected:
+      void record_blocking(const T& blocker) override {
+       this->event.trigger(op, blocker);
+      }
 
-  void dump(ceph::Formatter *f) const;
-  virtual ~Blocker() = default;
+      const OpT& op;
+    };
 
-private:
-  virtual void dump_detail(ceph::Formatter *f) const = 0;
-  virtual const char *get_type_name() const = 0;
-};
+    void dump(ceph::Formatter *f) const {
+      auto demangled_name = boost::core::demangle(typeid(T).name());
+      detail::dump_blocking_event(
+       demangled_name.c_str(),
+       internal_backend.timestamp,
+       internal_backend.blocker,
+       f);
+    }
+  };
 
-template <typename T>
-class BlockerT : public Blocker {
-public:
   virtual ~BlockerT() = default;
+  template <class TriggerT, class... Args>
+  decltype(auto) track_blocking(TriggerT&& trigger, Args&&... args) {
+    return std::forward<TriggerT>(trigger).maybe_record_blocking(
+      std::forward<Args>(args)..., static_cast<const T&>(*this));
+  }
+
 private:
   const char *get_type_name() const final {
-    return T::type_name;
+    return static_cast<const T*>(this)->type_name;
   }
 };
 
-class AggregateBlocker : public BlockerT<AggregateBlocker> {
-  std::vector<Blocker*> parent_blockers;
-public:
-  AggregateBlocker(std::vector<Blocker*> &&parent_blockers)
-    : parent_blockers(std::move(parent_blockers)) {}
-  static constexpr const char *type_name = "AggregateBlocker";
-private:
-  void dump_detail(ceph::Formatter *f) const final;
-};
+template <class T>
+struct AggregateBlockingEvent {
+  struct TriggerI {
+  protected:
+    struct TriggerContainerI {
+      virtual typename T::TriggerI& get_trigger() = 0;
+      virtual ~TriggerContainerI() = default;
+    };
+    using TriggerContainerIRef = std::unique_ptr<TriggerContainerI>;
+    virtual TriggerContainerIRef create_part_trigger() = 0;
 
-template <typename T>
-blocking_future<> join_blocking_futures(T &&t) {
-  std::vector<Blocker*> blockers;
-  blockers.reserve(t.size());
-  for (auto &&bf: t) {
-    blockers.push_back(bf.blocker);
-    bf.blocker = nullptr;
-  }
-  auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
-  return agg->make_blocking_future(
-    seastar::parallel_for_each(
-      std::forward<T>(t),
-      [](auto &&bf) {
-       return std::move(bf.fut);
-      }).then([agg=std::move(agg)] {
-       return seastar::make_ready_future<>();
-      }));
-}
+  public:
+    template <class FutureT>
+    auto maybe_record_blocking(FutureT&& fut,
+                              const typename T::Blocker& blocker) {
+      // AggregateBlockingEvent is supposed to be used on relatively cold
+      // paths (recovery), so we don't need to worry about the dynamic
+      // polymothps / dynamic memory's overhead.
+      auto tcont = create_part_trigger();
+      return tcont->get_trigger().maybe_record_blocking(
+       std::move(fut), blocker
+      ).finally([tcont=std::move(tcont)] {});
+    }
 
-template <typename InterruptCond, typename T>
-blocking_interruptible_future<InterruptCond>
-join_blocking_interruptible_futures(T&& t) {
-  std::vector<Blocker*> blockers;
-  blockers.reserve(t.size());
-  for (auto &&bf: t) {
-    blockers.push_back(bf.blocker);
-    bf.blocker = nullptr;
-  }
-  auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
-  return agg->make_blocking_future(
-    ::crimson::interruptible::interruptor<InterruptCond>::parallel_for_each(
-      std::forward<T>(t),
-      [](auto &&bf) {
-       return std::move(bf.fut);
-      }).then_interruptible([agg=std::move(agg)] {
-       return seastar::make_ready_future<>();
-      }));
-}
+    virtual ~TriggerI() = default;
+  };
+
+  template <class OpT>
+  struct Trigger final : TriggerI {
+    Trigger(AggregateBlockingEvent& event, const OpT& op)
+      : event(event), op(op) {}
+
+    class TriggerContainer final : public TriggerI::TriggerContainerI {
+      AggregateBlockingEvent& event;
+      typename decltype(event.events)::iterator iter;
+      typename T::template Trigger<OpT> trigger;
+
+      typename T::TriggerI &get_trigger() final {
+       return trigger;
+      }
+
+    public:
+      TriggerContainer(AggregateBlockingEvent& _event, const OpT& op) :
+       event(_event),
+       iter(event.events.emplace(event.events.end())),
+       trigger(*iter, op) {}
+
+      ~TriggerContainer() final {
+       event.events.erase(iter);
+      }
+    };
+
+  protected:
+    typename TriggerI::TriggerContainerIRef create_part_trigger() final {
+      return std::make_unique<TriggerContainer>(event, op);
+    }
+
+  private:
+    AggregateBlockingEvent& event;
+    const OpT& op;
+  };
 
+private:
+  std::list<T> events;
+  template <class OpT>
+  friend class Trigger;
+};
 
 /**
  * Common base for all crimson-osd operations.  Mainly provides
@@ -223,59 +320,18 @@ join_blocking_interruptible_futures(T&& t) {
 class Operation : public boost::intrusive_ref_counter<
   Operation, boost::thread_unsafe_counter> {
  public:
-  uint64_t get_id() const {
+  using id_t = uint64_t;
+  static constexpr id_t NULL_ID = std::numeric_limits<uint64_t>::max();
+  id_t get_id() const {
     return id;
   }
 
+  static constexpr bool is_trackable = false;
+
   virtual unsigned get_type() const = 0;
   virtual const char *get_type_name() const = 0;
   virtual void print(std::ostream &) const = 0;
 
-  template <typename T>
-  seastar::future<T> with_blocking_future(blocking_future<T> &&f) {
-    if (f.fut.available()) {
-      return std::move(f.fut);
-    }
-    assert(f.blocker);
-    add_blocker(f.blocker);
-    return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
-      clear_blocker(blocker);
-      return std::move(arg);
-    });
-  }
-
-  template <typename InterruptCond, typename T>
-  ::crimson::interruptible::interruptible_future<InterruptCond, T>
-  with_blocking_future_interruptible(blocking_future<T> &&f) {
-    if (f.fut.available()) {
-      return std::move(f.fut);
-    }
-    assert(f.blocker);
-    add_blocker(f.blocker);
-    auto fut = std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
-      clear_blocker(blocker);
-      return std::move(arg);
-    });
-    return ::crimson::interruptible::interruptible_future<
-      InterruptCond, T>(std::move(fut));
-  }
-
-  template <typename InterruptCond, typename T>
-  ::crimson::interruptible::interruptible_future<InterruptCond, T>
-  with_blocking_future_interruptible(
-    blocking_interruptible_future<InterruptCond, T> &&f) {
-    if (f.fut.available()) {
-      return std::move(f.fut);
-    }
-    assert(f.blocker);
-    add_blocker(f.blocker);
-    return std::move(f.fut).template then_wrapped_interruptible(
-      [this, blocker=f.blocker](auto &&arg) {
-      clear_blocker(blocker);
-      return std::move(arg);
-    });
-  }
-
   void dump(ceph::Formatter *f) const;
   void dump_brief(ceph::Formatter *f) const;
   virtual ~Operation() = default;
@@ -285,23 +341,11 @@ class Operation : public boost::intrusive_ref_counter<
 
   registry_hook_t registry_hook;
 
-  std::vector<Blocker*> blockers;
-  uint64_t id = 0;
-  void set_id(uint64_t in_id) {
+  id_t id = 0;
+  void set_id(id_t in_id) {
     id = in_id;
   }
 
-  void add_blocker(Blocker *b) {
-    blockers.push_back(b);
-  }
-
-  void clear_blocker(Blocker *b) {
-    auto iter = std::find(blockers.begin(), blockers.end(), b);
-    if (iter != blockers.end()) {
-      blockers.erase(iter);
-    }
-  }
-
   friend class OperationRegistryI;
   template <size_t>
   friend class OperationRegistryT;
@@ -327,6 +371,7 @@ class OperationRegistryI {
 protected:
   virtual void do_register(Operation *op) = 0;
   virtual bool registries_empty() const = 0;
+  virtual void do_stop() = 0;
 
 public:
   using op_list = boost::intrusive::list<
@@ -335,13 +380,15 @@ public:
     boost::intrusive::constant_time_size<false>>;
 
   template <typename T, typename... Args>
-  typename T::IRef create_operation(Args&&... args) {
-    typename T::IRef op = new T(std::forward<Args>(args)...);
+  auto create_operation(Args&&... args) {
+    boost::intrusive_ptr<T> op = new T(std::forward<Args>(args)...);
     do_register(&*op);
     return op;
   }
 
   seastar::future<> stop() {
+    crimson::get_logger(ceph_subsys_osd).info("OperationRegistryI::{}", __func__);
+    do_stop();
     shutdown_timer.set_callback([this] {
       if (registries_empty()) {
        shutdown.set_value();
@@ -357,20 +404,17 @@ public:
 
 template <size_t NUM_REGISTRIES>
 class OperationRegistryT : public OperationRegistryI {
+  Operation::id_t next_id = 0;
   std::array<
     op_list,
     NUM_REGISTRIES
   > registries;
 
-  std::array<
-    uint64_t,
-    NUM_REGISTRIES
-  > op_id_counters = {};
-
 protected:
   void do_register(Operation *op) final {
-    registries[op->get_type()].push_back(*op);
-    op->set_id(++op_id_counters[op->get_type()]);
+    const auto op_type = op->get_type();
+    registries[op_type].push_back(*op);
+    op->set_id(++next_id);
   }
 
   bool registries_empty() const final {
@@ -380,13 +424,51 @@ protected:
                         return opl.empty();
                       });
   }
-public:
+
+protected:
+  OperationRegistryT(core_id_t core)
+    // Use core to initialize upper 8 bits of counters to ensure that
+    // ids generated by different cores are disjoint
+    : next_id(static_cast<id_t>(core) <<
+             (std::numeric_limits<id_t>::digits - 8))
+  {}
+
   template <size_t REGISTRY_INDEX>
   const op_list& get_registry() const {
     static_assert(
       REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value);
     return registries[REGISTRY_INDEX];
   }
+
+  template <size_t REGISTRY_INDEX>
+  op_list& get_registry() {
+    static_assert(
+      REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value);
+    return registries[REGISTRY_INDEX];
+  }
+
+public:
+  /// Iterate over live ops
+  template <typename F>
+  void for_each_op(F &&f) const {
+    for (const auto &registry: registries) {
+      for (const auto &op: registry) {
+       std::invoke(f, op);
+      }
+    }
+  }
+
+  /// Removes op from registry
+  void remove_from_registry(Operation &op) {
+    const auto op_type = op.get_type();
+    registries[op_type].erase(op_list::s_iterator_to(op));
+  }
+
+  /// Adds op to registry
+  void add_to_registry(Operation &op) {
+    const auto op_type = op.get_type();
+    registries[op_type].push_back(op);
+  }
 };
 
 class PipelineExitBarrierI {
@@ -406,9 +488,16 @@ public:
   virtual ~PipelineExitBarrierI() {}
 };
 
-class PipelineStageI : public Blocker {
+template <class T>
+class PipelineStageIT : public BlockerT<T> {
+  const core_id_t core = seastar::this_shard_id();
 public:
-  virtual seastar::future<PipelineExitBarrierI::Ref> enter() = 0;
+  core_id_t get_core() const { return core; }
+
+  template <class... Args>
+  decltype(auto) enter(Args&&... args) {
+    return static_cast<T*>(this)->enter(std::forward<Args>(args)...);
+  }
 };
 
 class PipelineHandle {
@@ -427,28 +516,24 @@ public:
   PipelineHandle &operator=(PipelineHandle&&) = default;
 
   /**
-     * Returns a future which unblocks when the handle has entered the passed
-     * OrderedPipelinePhase.  If already in a phase, enter will also release
-     * that phase after placing itself in the queue for the next one to preserve
-     * ordering.
-     */
-  template <typename T>
-  blocking_future<> enter(T &t) {
-    /* Strictly speaking, we probably want the blocker to be registered on
-     * the previous stage until wait_barrier() resolves and on the next
-     * until enter() resolves, but blocking_future will need some refactoring
-     * to permit that.  TODO
-     */
-    return t.make_blocking_future(
-      wait_barrier().then([this, &t] {
-       auto fut = t.enter();
-       exit();
-       return std::move(fut).then([this](auto &&barrier_ref) {
-         barrier = std::move(barrier_ref);
-         return seastar::now();
-       });
-      })
-    );
+   * Returns a future which unblocks when the handle has entered the passed
+   * OrderedPipelinePhase.  If already in a phase, enter will also release
+   * that phase after placing itself in the queue for the next one to preserve
+   * ordering.
+   */
+  template <typename OpT, typename T>
+  seastar::future<>
+  enter(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
+    ceph_assert(stage.get_core() == seastar::this_shard_id());
+    return wait_barrier().then([this, &stage, t=std::move(t)] () mutable {
+      auto fut = t.maybe_record_blocking(stage.enter(t), stage);
+      exit();
+      return std::move(fut).then(
+        [this, t=std::move(t)](auto &&barrier_ref) mutable {
+        barrier = std::move(barrier_ref);
+        return seastar::now();
+      });
+    });
   }
 
   /**
@@ -477,16 +562,21 @@ public:
  * resolve) a new phase prior to exiting the previous one will ensure that
  * the op ordering is preserved.
  */
-class OrderedExclusivePhase : public PipelineStageI {
-  void dump_detail(ceph::Formatter *f) const final;
-  const char *get_type_name() const final {
-    return name;
+template <class T>
+class OrderedExclusivePhaseT : public PipelineStageIT<T> {
+  void dump_detail(ceph::Formatter *f) const final {
+    f->dump_unsigned("waiting", waiting);
+    if (held_by != Operation::NULL_ID) {
+      f->dump_unsigned("held_by_operation_id", held_by);
+    }
   }
 
   class ExitBarrier final : public PipelineExitBarrierI {
-    OrderedExclusivePhase *phase;
+    OrderedExclusivePhaseT *phase;
+    Operation::id_t op_id;
   public:
-    ExitBarrier(OrderedExclusivePhase *phase) : phase(phase) {}
+    ExitBarrier(OrderedExclusivePhaseT *phase, Operation::id_t id)
+      : phase(phase), op_id(id) {}
 
     seastar::future<> wait() final {
       return seastar::now();
@@ -494,8 +584,14 @@ class OrderedExclusivePhase : public PipelineStageI {
 
     void exit() final {
       if (phase) {
-       phase->exit();
+       auto *p = phase;
+       auto id = op_id;
        phase = nullptr;
+       std::ignore = seastar::smp::submit_to(
+         p->get_core(),
+         [p, id] {
+           p->exit(id);
+         });
       }
     }
 
@@ -508,22 +604,37 @@ class OrderedExclusivePhase : public PipelineStageI {
     }
   };
 
-  void exit() {
+  void exit(Operation::id_t op_id) {
+    clear_held_by(op_id);
     mutex.unlock();
   }
 
 public:
-  seastar::future<PipelineExitBarrierI::Ref> enter() final {
-    return mutex.lock().then([this] {
-      return PipelineExitBarrierI::Ref(new ExitBarrier{this});
+  template <class TriggerT>
+  seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) {
+    waiting++;
+    return mutex.lock().then([this, op_id=t.get_op().get_id()] {
+      ceph_assert_always(waiting > 0);
+      --waiting;
+      set_held_by(op_id);
+      return PipelineExitBarrierI::Ref(new ExitBarrier{this, op_id});
     });
   }
 
-  OrderedExclusivePhase(const char *name) : name(name) {}
-
 private:
-  const char * name;
+  void set_held_by(Operation::id_t id) {
+    ceph_assert_always(held_by == Operation::NULL_ID);
+    held_by = id;
+  }
+
+  void clear_held_by(Operation::id_t id) {
+    ceph_assert_always(held_by == id);
+    held_by = Operation::NULL_ID;
+  }
+
+  unsigned waiting = 0;
   seastar::shared_mutex mutex;
+  Operation::id_t held_by = Operation::NULL_ID;
 };
 
 /**
@@ -531,26 +642,51 @@ private:
  * they will proceed to the next stage in the order in which they called
  * enter.
  */
-class OrderedConcurrentPhase : public PipelineStageI {
-  void dump_detail(ceph::Formatter *f) const final;
-  const char *get_type_name() const final {
-    return name;
-  }
+template <class T>
+class OrderedConcurrentPhaseT : public PipelineStageIT<T> {
+  using base_t = PipelineStageIT<T>;
+public:
+  struct BlockingEvent : base_t::BlockingEvent {
+    using base_t::BlockingEvent::BlockingEvent;
+
+    struct ExitBarrierEvent : TimeEvent<ExitBarrierEvent> {};
+
+    template <class OpT>
+    struct Trigger : base_t::BlockingEvent::template Trigger<OpT> {
+      using base_t::BlockingEvent::template Trigger<OpT>::Trigger;
+
+      template <class FutureT>
+      decltype(auto) maybe_record_exit_barrier(FutureT&& fut) {
+        if (!fut.available()) {
+         exit_barrier_event.trigger(this->op);
+       }
+       return std::forward<FutureT>(fut);
+      }
+
+      ExitBarrierEvent exit_barrier_event;
+    };
+  };
+
+private:
+  void dump_detail(ceph::Formatter *f) const final {}
 
+  template <class TriggerT>
   class ExitBarrier final : public PipelineExitBarrierI {
-    OrderedConcurrentPhase *phase;
+    OrderedConcurrentPhaseT *phase;
     std::optional<seastar::future<>> barrier;
+    TriggerT trigger;
   public:
     ExitBarrier(
-      OrderedConcurrentPhase *phase,
-      seastar::future<> &&barrier) : phase(phase), barrier(std::move(barrier)) {}
+      OrderedConcurrentPhaseT *phase,
+      seastar::future<> &&barrier,
+      TriggerT& trigger) : phase(phase), barrier(std::move(barrier)), trigger(trigger) {}
 
     seastar::future<> wait() final {
       assert(phase);
       assert(barrier);
       auto ret = std::move(*barrier);
       barrier = std::nullopt;
-      return ret;
+      return trigger.maybe_record_exit_barrier(std::move(ret));
     }
 
     void exit() final {
@@ -561,8 +697,12 @@ class OrderedConcurrentPhase : public PipelineStageI {
        phase = nullptr;
       }
       if (phase) {
-       phase->mutex.unlock();
-       phase = nullptr;
+       std::ignore = seastar::smp::submit_to(
+         phase->get_core(),
+         [this] {
+           phase->mutex.unlock();
+           phase = nullptr;
+         });
       }
     }
 
@@ -576,15 +716,13 @@ class OrderedConcurrentPhase : public PipelineStageI {
   };
 
 public:
-  seastar::future<PipelineExitBarrierI::Ref> enter() final {
+  template <class TriggerT>
+  seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) {
     return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
-      new ExitBarrier{this, mutex.lock()});
+      new ExitBarrier<TriggerT>{this, mutex.lock(), t});
   }
 
-  OrderedConcurrentPhase(const char *name) : name(name) {}
-
 private:
-  const char * name;
   seastar::shared_mutex mutex;
 };
 
@@ -593,11 +731,9 @@ private:
  * may exit in any order.  Useful mainly for informational purposes between
  * stages with constraints.
  */
-class UnorderedStage : public PipelineStageI {
+template <class T>
+class UnorderedStageT : public PipelineStageIT<T> {
   void dump_detail(ceph::Formatter *f) const final {}
-  const char *get_type_name() const final {
-    return name;
-  }
 
   class ExitBarrier final : public PipelineExitBarrierI {
   public:
@@ -615,16 +751,15 @@ class UnorderedStage : public PipelineStageI {
   };
 
 public:
-  seastar::future<PipelineExitBarrierI::Ref> enter() final {
+  template <class... IgnoreArgs>
+  seastar::future<PipelineExitBarrierI::Ref> enter(IgnoreArgs&&...) {
     return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
       new ExitBarrier);
   }
-
-  UnorderedStage(const char *name) : name(name) {}
-
-private:
-  const char * name;
 };
 
-
 }
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::Operation> : fmt::ostream_formatter {};
+#endif