#include <array>
#include <set>
#include <vector>
+#include <boost/core/demangle.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <seastar/core/future-util.hh>
#include "include/ceph_assert.h"
+#include "include/utime.h"
+#include "common/Clock.h"
+#include "common/Formatter.h"
#include "crimson/common/interruptible_future.h"
+#include "crimson/common/smp_helpers.h"
+#include "crimson/common/log.h"
namespace ceph {
class Formatter;
class Operation;
class Blocker;
+
+namespace detail {
+void dump_time_event(const char* name,
+ const utime_t& timestamp,
+ ceph::Formatter* f);
+void dump_blocking_event(const char* name,
+ const utime_t& timestamp,
+ const Blocker* blocker,
+ ceph::Formatter* f);
+} // namespace detail
+
/**
- * Provides an abstraction for registering and unregistering a blocker
- * for the duration of a future becoming available.
+ * Provides an interface for dumping diagnostic information about
+ * why a particular op is not making progress.
*/
-template <typename Fut>
-class blocking_future_detail {
- friend class Operation;
- friend class Blocker;
- Blocker *blocker;
- Fut fut;
- blocking_future_detail(Blocker *b, Fut &&f)
- : blocker(b), fut(std::move(f)) {}
+class Blocker {
+public:
+ void dump(ceph::Formatter *f) const;
+ virtual ~Blocker() = default;
- template <typename V, typename... U>
- friend blocking_future_detail<seastar::future<V>>
- make_ready_blocking_future(U&&... args);
+private:
+ virtual void dump_detail(ceph::Formatter *f) const = 0;
+ virtual const char *get_type_name() const = 0;
+};
- template <typename V, typename Exception>
- friend blocking_future_detail<seastar::future<V>>
- make_exception_blocking_future(Exception&& e);
+// the main template. by default an operation has no extenral
+// event handler (the empty tuple). specializing the template
+// allows to define backends on per-operation-type manner.
+// NOTE: basically this could be a function but C++ disallows
+// differentiating return type among specializations.
+template <class T>
+struct EventBackendRegistry {
+ template <typename...> static constexpr bool always_false = false;
+
+ static std::tuple<> get_backends() {
+ static_assert(always_false<T>, "Registry specialization not found");
+ return {};
+ }
+};
- template <typename U>
- friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
+template <class T>
+struct Event {
+ T* that() {
+ return static_cast<T*>(this);
+ }
+ const T* that() const {
+ return static_cast<const T*>(this);
+ }
- template <typename InterruptCond, typename T>
- friend blocking_future_detail<
- ::crimson::interruptible::interruptible_future<InterruptCond>>
- join_blocking_interruptible_futures(T&& t);
+ template <class OpT, class... Args>
+ void trigger(OpT&& op, Args&&... args) {
+ that()->internal_backend.handle(*that(),
+ std::forward<OpT>(op),
+ std::forward<Args>(args)...);
+ // let's call `handle()` for concrete event type from each single
+ // of our backends. the order in the registry matters.
+ std::apply([&, //args=std::forward_as_tuple(std::forward<Args>(args)...),
+ this] (auto... backend) {
+ (..., backend.handle(*that(),
+ std::forward<OpT>(op),
+ std::forward<Args>(args)...));
+ }, EventBackendRegistry<std::decay_t<OpT>>::get_backends());
+ }
+};
- template <typename U>
- friend class blocking_future_detail;
-public:
- template <typename F>
- auto then(F &&f) && {
- using result = decltype(std::declval<Fut>().then(f));
- return blocking_future_detail<seastar::futurize_t<result>>(
- blocker,
- std::move(fut).then(std::forward<F>(f)));
+// simplest event type for recording things like beginning or end
+// of TrackableOperation's life.
+template <class T>
+struct TimeEvent : Event<T> {
+ struct Backend {
+ // `T` is passed solely to let implementations to discriminate
+ // basing on the type-of-event.
+ virtual void handle(T&, const Operation&) = 0;
+ };
+
+ // for the sake of dumping ops-in-flight.
+ struct InternalBackend final : Backend {
+ void handle(T&, const Operation&) override {
+ timestamp = ceph_clock_now();
+ }
+ utime_t timestamp;
+ } internal_backend;
+
+ void dump(ceph::Formatter *f) const {
+ auto demangled_name = boost::core::demangle(typeid(T).name());
+ detail::dump_time_event(
+ demangled_name.c_str(),
+ internal_backend.timestamp, f);
}
- template <typename InterruptCond, typename F>
- auto then_interruptible(F &&f) && {
- using result = decltype(std::declval<Fut>().then_interruptible(f));
- return blocking_future_detail<
- typename ::crimson::interruptible::interruptor<
- InterruptCond>::template futurize<result>::type>(
- blocker,
- std::move(fut).then_interruptible(std::forward<F>(f)));
+
+ auto get_timestamp() const {
+ return internal_backend.timestamp;
}
};
-template <typename T=void>
-using blocking_future = blocking_future_detail<seastar::future<T>>;
-template <typename InterruptCond, typename T = void>
-using blocking_interruptible_future = blocking_future_detail<
- ::crimson::interruptible::interruptible_future<InterruptCond, T>>;
-
-template <typename InterruptCond, typename V, typename U>
-blocking_interruptible_future<InterruptCond, V>
-make_ready_blocking_interruptible_future(U&& args) {
- return blocking_interruptible_future<InterruptCond, V>(
- nullptr,
- seastar::make_ready_future<V>(std::forward<U>(args)));
-}
-
-template <typename InterruptCond, typename V, typename Exception>
-blocking_interruptible_future<InterruptCond, V>
-make_exception_blocking_interruptible_future(Exception&& e) {
- return blocking_interruptible_future<InterruptCond, V>(
- nullptr,
- seastar::make_exception_future<InterruptCond, V>(e));
-}
+template <typename T>
+class BlockerT : public Blocker {
+public:
+ struct BlockingEvent : Event<typename T::BlockingEvent> {
+ using Blocker = std::decay_t<T>;
+
+ struct Backend {
+ // `T` is based solely to let implementations to discriminate
+ // basing on the type-of-event.
+ virtual void handle(typename T::BlockingEvent&, const Operation&, const T&) = 0;
+ };
+
+ struct InternalBackend : Backend {
+ void handle(typename T::BlockingEvent&,
+ const Operation&,
+ const T& blocker) override {
+ this->timestamp = ceph_clock_now();
+ this->blocker = &blocker;
+ }
-template <typename V=void, typename... U>
-blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&&... args) {
- return blocking_future<V>(
- nullptr,
- seastar::make_ready_future<V>(std::forward<U>(args)...));
-}
+ utime_t timestamp;
+ const T* blocker;
+ } internal_backend;
+
+ // we don't want to make any BlockerT to be aware and coupled with
+ // an operation. to not templatize an entire path from an op to
+ // a blocker, type erasuring is used.
+ struct TriggerI {
+ TriggerI(BlockingEvent& event) : event(event) {}
+
+ template <class FutureT>
+ auto maybe_record_blocking(FutureT&& fut, const T& blocker) {
+ if (!fut.available()) {
+ // a full blown call via vtable. that's the cost for templatization
+ // avoidance. anyway, most of the things actually have the type
+ // knowledge.
+ record_blocking(blocker);
+ return std::forward<FutureT>(fut).finally(
+ [&event=this->event, &blocker] () mutable {
+ // beware trigger instance may be already dead when this
+ // is executed!
+ record_unblocking(event, blocker);
+ });
+ }
+ return std::forward<FutureT>(fut);
+ }
+ virtual ~TriggerI() = default;
+ protected:
+ // it's for the sake of erasing the OpT type
+ virtual void record_blocking(const T& blocker) = 0;
+
+ static void record_unblocking(BlockingEvent& event, const T& blocker) {
+ assert(event.internal_backend.blocker == &blocker);
+ event.internal_backend.blocker = nullptr;
+ }
-template <typename V, typename Exception>
-blocking_future_detail<seastar::future<V>>
-make_exception_blocking_future(Exception&& e) {
- return blocking_future<V>(
- nullptr,
- seastar::make_exception_future<V>(e));
-}
+ BlockingEvent& event;
+ };
+
+ template <class OpT>
+ struct Trigger : TriggerI {
+ Trigger(BlockingEvent& event, const OpT& op) : TriggerI(event), op(op) {}
+
+ template <class FutureT>
+ auto maybe_record_blocking(FutureT&& fut, const T& blocker) {
+ if (!fut.available()) {
+ // no need for the dynamic dispatch! if we're lucky, a compiler
+ // should collapse all these abstractions into a bunch of movs.
+ this->Trigger::record_blocking(blocker);
+ return std::forward<FutureT>(fut).finally(
+ [&event=this->event, &blocker] () mutable {
+ Trigger::record_unblocking(event, blocker);
+ });
+ }
+ return std::forward<FutureT>(fut);
+ }
-/**
- * Provides an interface for dumping diagnostic information about
- * why a particular op is not making progress.
- */
-class Blocker {
-public:
- template <typename T>
- blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
- return blocking_future<T>(this, std::move(f));
- }
+ const OpT &get_op() { return op; }
- template <typename InterruptCond, typename T>
- blocking_interruptible_future<InterruptCond, T>
- make_blocking_future(
- crimson::interruptible::interruptible_future<InterruptCond, T> &&f) {
- return blocking_interruptible_future<InterruptCond, T>(
- this, std::move(f));
- }
- template <typename InterruptCond, typename T = void>
- blocking_interruptible_future<InterruptCond, T>
- make_blocking_interruptible_future(seastar::future<T> &&f) {
- return blocking_interruptible_future<InterruptCond, T>(
- this,
- ::crimson::interruptible::interruptor<InterruptCond>::make_interruptible(
- std::move(f)));
- }
+ protected:
+ void record_blocking(const T& blocker) override {
+ this->event.trigger(op, blocker);
+ }
- void dump(ceph::Formatter *f) const;
- virtual ~Blocker() = default;
+ const OpT& op;
+ };
-private:
- virtual void dump_detail(ceph::Formatter *f) const = 0;
- virtual const char *get_type_name() const = 0;
-};
+ void dump(ceph::Formatter *f) const {
+ auto demangled_name = boost::core::demangle(typeid(T).name());
+ detail::dump_blocking_event(
+ demangled_name.c_str(),
+ internal_backend.timestamp,
+ internal_backend.blocker,
+ f);
+ }
+ };
-template <typename T>
-class BlockerT : public Blocker {
-public:
virtual ~BlockerT() = default;
+ template <class TriggerT, class... Args>
+ decltype(auto) track_blocking(TriggerT&& trigger, Args&&... args) {
+ return std::forward<TriggerT>(trigger).maybe_record_blocking(
+ std::forward<Args>(args)..., static_cast<const T&>(*this));
+ }
+
private:
const char *get_type_name() const final {
- return T::type_name;
+ return static_cast<const T*>(this)->type_name;
}
};
-class AggregateBlocker : public BlockerT<AggregateBlocker> {
- std::vector<Blocker*> parent_blockers;
-public:
- AggregateBlocker(std::vector<Blocker*> &&parent_blockers)
- : parent_blockers(std::move(parent_blockers)) {}
- static constexpr const char *type_name = "AggregateBlocker";
-private:
- void dump_detail(ceph::Formatter *f) const final;
-};
+template <class T>
+struct AggregateBlockingEvent {
+ struct TriggerI {
+ protected:
+ struct TriggerContainerI {
+ virtual typename T::TriggerI& get_trigger() = 0;
+ virtual ~TriggerContainerI() = default;
+ };
+ using TriggerContainerIRef = std::unique_ptr<TriggerContainerI>;
+ virtual TriggerContainerIRef create_part_trigger() = 0;
-template <typename T>
-blocking_future<> join_blocking_futures(T &&t) {
- std::vector<Blocker*> blockers;
- blockers.reserve(t.size());
- for (auto &&bf: t) {
- blockers.push_back(bf.blocker);
- bf.blocker = nullptr;
- }
- auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
- return agg->make_blocking_future(
- seastar::parallel_for_each(
- std::forward<T>(t),
- [](auto &&bf) {
- return std::move(bf.fut);
- }).then([agg=std::move(agg)] {
- return seastar::make_ready_future<>();
- }));
-}
+ public:
+ template <class FutureT>
+ auto maybe_record_blocking(FutureT&& fut,
+ const typename T::Blocker& blocker) {
+ // AggregateBlockingEvent is supposed to be used on relatively cold
+ // paths (recovery), so we don't need to worry about the dynamic
+ // polymothps / dynamic memory's overhead.
+ auto tcont = create_part_trigger();
+ return tcont->get_trigger().maybe_record_blocking(
+ std::move(fut), blocker
+ ).finally([tcont=std::move(tcont)] {});
+ }
-template <typename InterruptCond, typename T>
-blocking_interruptible_future<InterruptCond>
-join_blocking_interruptible_futures(T&& t) {
- std::vector<Blocker*> blockers;
- blockers.reserve(t.size());
- for (auto &&bf: t) {
- blockers.push_back(bf.blocker);
- bf.blocker = nullptr;
- }
- auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
- return agg->make_blocking_future(
- ::crimson::interruptible::interruptor<InterruptCond>::parallel_for_each(
- std::forward<T>(t),
- [](auto &&bf) {
- return std::move(bf.fut);
- }).then_interruptible([agg=std::move(agg)] {
- return seastar::make_ready_future<>();
- }));
-}
+ virtual ~TriggerI() = default;
+ };
+
+ template <class OpT>
+ struct Trigger final : TriggerI {
+ Trigger(AggregateBlockingEvent& event, const OpT& op)
+ : event(event), op(op) {}
+
+ class TriggerContainer final : public TriggerI::TriggerContainerI {
+ AggregateBlockingEvent& event;
+ typename decltype(event.events)::iterator iter;
+ typename T::template Trigger<OpT> trigger;
+
+ typename T::TriggerI &get_trigger() final {
+ return trigger;
+ }
+
+ public:
+ TriggerContainer(AggregateBlockingEvent& _event, const OpT& op) :
+ event(_event),
+ iter(event.events.emplace(event.events.end())),
+ trigger(*iter, op) {}
+
+ ~TriggerContainer() final {
+ event.events.erase(iter);
+ }
+ };
+
+ protected:
+ typename TriggerI::TriggerContainerIRef create_part_trigger() final {
+ return std::make_unique<TriggerContainer>(event, op);
+ }
+
+ private:
+ AggregateBlockingEvent& event;
+ const OpT& op;
+ };
+private:
+ std::list<T> events;
+ template <class OpT>
+ friend class Trigger;
+};
/**
* Common base for all crimson-osd operations. Mainly provides
class Operation : public boost::intrusive_ref_counter<
Operation, boost::thread_unsafe_counter> {
public:
- uint64_t get_id() const {
+ using id_t = uint64_t;
+ static constexpr id_t NULL_ID = std::numeric_limits<uint64_t>::max();
+ id_t get_id() const {
return id;
}
+ static constexpr bool is_trackable = false;
+
virtual unsigned get_type() const = 0;
virtual const char *get_type_name() const = 0;
virtual void print(std::ostream &) const = 0;
- template <typename T>
- seastar::future<T> with_blocking_future(blocking_future<T> &&f) {
- if (f.fut.available()) {
- return std::move(f.fut);
- }
- assert(f.blocker);
- add_blocker(f.blocker);
- return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
- clear_blocker(blocker);
- return std::move(arg);
- });
- }
-
- template <typename InterruptCond, typename T>
- ::crimson::interruptible::interruptible_future<InterruptCond, T>
- with_blocking_future_interruptible(blocking_future<T> &&f) {
- if (f.fut.available()) {
- return std::move(f.fut);
- }
- assert(f.blocker);
- add_blocker(f.blocker);
- auto fut = std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
- clear_blocker(blocker);
- return std::move(arg);
- });
- return ::crimson::interruptible::interruptible_future<
- InterruptCond, T>(std::move(fut));
- }
-
- template <typename InterruptCond, typename T>
- ::crimson::interruptible::interruptible_future<InterruptCond, T>
- with_blocking_future_interruptible(
- blocking_interruptible_future<InterruptCond, T> &&f) {
- if (f.fut.available()) {
- return std::move(f.fut);
- }
- assert(f.blocker);
- add_blocker(f.blocker);
- return std::move(f.fut).template then_wrapped_interruptible(
- [this, blocker=f.blocker](auto &&arg) {
- clear_blocker(blocker);
- return std::move(arg);
- });
- }
-
void dump(ceph::Formatter *f) const;
void dump_brief(ceph::Formatter *f) const;
virtual ~Operation() = default;
registry_hook_t registry_hook;
- std::vector<Blocker*> blockers;
- uint64_t id = 0;
- void set_id(uint64_t in_id) {
+ id_t id = 0;
+ void set_id(id_t in_id) {
id = in_id;
}
- void add_blocker(Blocker *b) {
- blockers.push_back(b);
- }
-
- void clear_blocker(Blocker *b) {
- auto iter = std::find(blockers.begin(), blockers.end(), b);
- if (iter != blockers.end()) {
- blockers.erase(iter);
- }
- }
-
friend class OperationRegistryI;
template <size_t>
friend class OperationRegistryT;
protected:
virtual void do_register(Operation *op) = 0;
virtual bool registries_empty() const = 0;
+ virtual void do_stop() = 0;
public:
using op_list = boost::intrusive::list<
boost::intrusive::constant_time_size<false>>;
template <typename T, typename... Args>
- typename T::IRef create_operation(Args&&... args) {
- typename T::IRef op = new T(std::forward<Args>(args)...);
+ auto create_operation(Args&&... args) {
+ boost::intrusive_ptr<T> op = new T(std::forward<Args>(args)...);
do_register(&*op);
return op;
}
seastar::future<> stop() {
+ crimson::get_logger(ceph_subsys_osd).info("OperationRegistryI::{}", __func__);
+ do_stop();
shutdown_timer.set_callback([this] {
if (registries_empty()) {
shutdown.set_value();
template <size_t NUM_REGISTRIES>
class OperationRegistryT : public OperationRegistryI {
+ Operation::id_t next_id = 0;
std::array<
op_list,
NUM_REGISTRIES
> registries;
- std::array<
- uint64_t,
- NUM_REGISTRIES
- > op_id_counters = {};
-
protected:
void do_register(Operation *op) final {
- registries[op->get_type()].push_back(*op);
- op->set_id(++op_id_counters[op->get_type()]);
+ const auto op_type = op->get_type();
+ registries[op_type].push_back(*op);
+ op->set_id(++next_id);
}
bool registries_empty() const final {
return opl.empty();
});
}
-public:
+
+protected:
+ OperationRegistryT(core_id_t core)
+ // Use core to initialize upper 8 bits of counters to ensure that
+ // ids generated by different cores are disjoint
+ : next_id(static_cast<id_t>(core) <<
+ (std::numeric_limits<id_t>::digits - 8))
+ {}
+
template <size_t REGISTRY_INDEX>
const op_list& get_registry() const {
static_assert(
REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value);
return registries[REGISTRY_INDEX];
}
+
+ template <size_t REGISTRY_INDEX>
+ op_list& get_registry() {
+ static_assert(
+ REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value);
+ return registries[REGISTRY_INDEX];
+ }
+
+public:
+ /// Iterate over live ops
+ template <typename F>
+ void for_each_op(F &&f) const {
+ for (const auto ®istry: registries) {
+ for (const auto &op: registry) {
+ std::invoke(f, op);
+ }
+ }
+ }
+
+ /// Removes op from registry
+ void remove_from_registry(Operation &op) {
+ const auto op_type = op.get_type();
+ registries[op_type].erase(op_list::s_iterator_to(op));
+ }
+
+ /// Adds op to registry
+ void add_to_registry(Operation &op) {
+ const auto op_type = op.get_type();
+ registries[op_type].push_back(op);
+ }
};
class PipelineExitBarrierI {
virtual ~PipelineExitBarrierI() {}
};
-class PipelineStageI : public Blocker {
+template <class T>
+class PipelineStageIT : public BlockerT<T> {
+ const core_id_t core = seastar::this_shard_id();
public:
- virtual seastar::future<PipelineExitBarrierI::Ref> enter() = 0;
+ core_id_t get_core() const { return core; }
+
+ template <class... Args>
+ decltype(auto) enter(Args&&... args) {
+ return static_cast<T*>(this)->enter(std::forward<Args>(args)...);
+ }
};
class PipelineHandle {
PipelineHandle &operator=(PipelineHandle&&) = default;
/**
- * Returns a future which unblocks when the handle has entered the passed
- * OrderedPipelinePhase. If already in a phase, enter will also release
- * that phase after placing itself in the queue for the next one to preserve
- * ordering.
- */
- template <typename T>
- blocking_future<> enter(T &t) {
- /* Strictly speaking, we probably want the blocker to be registered on
- * the previous stage until wait_barrier() resolves and on the next
- * until enter() resolves, but blocking_future will need some refactoring
- * to permit that. TODO
- */
- return t.make_blocking_future(
- wait_barrier().then([this, &t] {
- auto fut = t.enter();
- exit();
- return std::move(fut).then([this](auto &&barrier_ref) {
- barrier = std::move(barrier_ref);
- return seastar::now();
- });
- })
- );
+ * Returns a future which unblocks when the handle has entered the passed
+ * OrderedPipelinePhase. If already in a phase, enter will also release
+ * that phase after placing itself in the queue for the next one to preserve
+ * ordering.
+ */
+ template <typename OpT, typename T>
+ seastar::future<>
+ enter(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
+ ceph_assert(stage.get_core() == seastar::this_shard_id());
+ return wait_barrier().then([this, &stage, t=std::move(t)] () mutable {
+ auto fut = t.maybe_record_blocking(stage.enter(t), stage);
+ exit();
+ return std::move(fut).then(
+ [this, t=std::move(t)](auto &&barrier_ref) mutable {
+ barrier = std::move(barrier_ref);
+ return seastar::now();
+ });
+ });
}
/**
* resolve) a new phase prior to exiting the previous one will ensure that
* the op ordering is preserved.
*/
-class OrderedExclusivePhase : public PipelineStageI {
- void dump_detail(ceph::Formatter *f) const final;
- const char *get_type_name() const final {
- return name;
+template <class T>
+class OrderedExclusivePhaseT : public PipelineStageIT<T> {
+ void dump_detail(ceph::Formatter *f) const final {
+ f->dump_unsigned("waiting", waiting);
+ if (held_by != Operation::NULL_ID) {
+ f->dump_unsigned("held_by_operation_id", held_by);
+ }
}
class ExitBarrier final : public PipelineExitBarrierI {
- OrderedExclusivePhase *phase;
+ OrderedExclusivePhaseT *phase;
+ Operation::id_t op_id;
public:
- ExitBarrier(OrderedExclusivePhase *phase) : phase(phase) {}
+ ExitBarrier(OrderedExclusivePhaseT *phase, Operation::id_t id)
+ : phase(phase), op_id(id) {}
seastar::future<> wait() final {
return seastar::now();
void exit() final {
if (phase) {
- phase->exit();
+ auto *p = phase;
+ auto id = op_id;
phase = nullptr;
+ std::ignore = seastar::smp::submit_to(
+ p->get_core(),
+ [p, id] {
+ p->exit(id);
+ });
}
}
}
};
- void exit() {
+ void exit(Operation::id_t op_id) {
+ clear_held_by(op_id);
mutex.unlock();
}
public:
- seastar::future<PipelineExitBarrierI::Ref> enter() final {
- return mutex.lock().then([this] {
- return PipelineExitBarrierI::Ref(new ExitBarrier{this});
+ template <class TriggerT>
+ seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) {
+ waiting++;
+ return mutex.lock().then([this, op_id=t.get_op().get_id()] {
+ ceph_assert_always(waiting > 0);
+ --waiting;
+ set_held_by(op_id);
+ return PipelineExitBarrierI::Ref(new ExitBarrier{this, op_id});
});
}
- OrderedExclusivePhase(const char *name) : name(name) {}
-
private:
- const char * name;
+ void set_held_by(Operation::id_t id) {
+ ceph_assert_always(held_by == Operation::NULL_ID);
+ held_by = id;
+ }
+
+ void clear_held_by(Operation::id_t id) {
+ ceph_assert_always(held_by == id);
+ held_by = Operation::NULL_ID;
+ }
+
+ unsigned waiting = 0;
seastar::shared_mutex mutex;
+ Operation::id_t held_by = Operation::NULL_ID;
};
/**
* they will proceed to the next stage in the order in which they called
* enter.
*/
-class OrderedConcurrentPhase : public PipelineStageI {
- void dump_detail(ceph::Formatter *f) const final;
- const char *get_type_name() const final {
- return name;
- }
+template <class T>
+class OrderedConcurrentPhaseT : public PipelineStageIT<T> {
+ using base_t = PipelineStageIT<T>;
+public:
+ struct BlockingEvent : base_t::BlockingEvent {
+ using base_t::BlockingEvent::BlockingEvent;
+
+ struct ExitBarrierEvent : TimeEvent<ExitBarrierEvent> {};
+
+ template <class OpT>
+ struct Trigger : base_t::BlockingEvent::template Trigger<OpT> {
+ using base_t::BlockingEvent::template Trigger<OpT>::Trigger;
+
+ template <class FutureT>
+ decltype(auto) maybe_record_exit_barrier(FutureT&& fut) {
+ if (!fut.available()) {
+ exit_barrier_event.trigger(this->op);
+ }
+ return std::forward<FutureT>(fut);
+ }
+
+ ExitBarrierEvent exit_barrier_event;
+ };
+ };
+
+private:
+ void dump_detail(ceph::Formatter *f) const final {}
+ template <class TriggerT>
class ExitBarrier final : public PipelineExitBarrierI {
- OrderedConcurrentPhase *phase;
+ OrderedConcurrentPhaseT *phase;
std::optional<seastar::future<>> barrier;
+ TriggerT trigger;
public:
ExitBarrier(
- OrderedConcurrentPhase *phase,
- seastar::future<> &&barrier) : phase(phase), barrier(std::move(barrier)) {}
+ OrderedConcurrentPhaseT *phase,
+ seastar::future<> &&barrier,
+ TriggerT& trigger) : phase(phase), barrier(std::move(barrier)), trigger(trigger) {}
seastar::future<> wait() final {
assert(phase);
assert(barrier);
auto ret = std::move(*barrier);
barrier = std::nullopt;
- return ret;
+ return trigger.maybe_record_exit_barrier(std::move(ret));
}
void exit() final {
phase = nullptr;
}
if (phase) {
- phase->mutex.unlock();
- phase = nullptr;
+ std::ignore = seastar::smp::submit_to(
+ phase->get_core(),
+ [this] {
+ phase->mutex.unlock();
+ phase = nullptr;
+ });
}
}
};
public:
- seastar::future<PipelineExitBarrierI::Ref> enter() final {
+ template <class TriggerT>
+ seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) {
return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
- new ExitBarrier{this, mutex.lock()});
+ new ExitBarrier<TriggerT>{this, mutex.lock(), t});
}
- OrderedConcurrentPhase(const char *name) : name(name) {}
-
private:
- const char * name;
seastar::shared_mutex mutex;
};
* may exit in any order. Useful mainly for informational purposes between
* stages with constraints.
*/
-class UnorderedStage : public PipelineStageI {
+template <class T>
+class UnorderedStageT : public PipelineStageIT<T> {
void dump_detail(ceph::Formatter *f) const final {}
- const char *get_type_name() const final {
- return name;
- }
class ExitBarrier final : public PipelineExitBarrierI {
public:
};
public:
- seastar::future<PipelineExitBarrierI::Ref> enter() final {
+ template <class... IgnoreArgs>
+ seastar::future<PipelineExitBarrierI::Ref> enter(IgnoreArgs&&...) {
return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
new ExitBarrier);
}
-
- UnorderedStage(const char *name) : name(name) {}
-
-private:
- const char * name;
};
-
}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::Operation> : fmt::ostream_formatter {};
+#endif