1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
10 #include <boost/core/demangle.hpp>
11 #include <boost/intrusive/list.hpp>
12 #include <boost/intrusive_ptr.hpp>
13 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
14 #include <seastar/core/shared_mutex.hh>
15 #include <seastar/core/future.hh>
16 #include <seastar/core/timer.hh>
17 #include <seastar/core/lowres_clock.hh>
18 #include <seastar/core/future-util.hh>
20 #include "include/ceph_assert.h"
21 #include "include/utime.h"
22 #include "common/Clock.h"
23 #include "common/Formatter.h"
24 #include "crimson/common/interruptible_future.h"
25 #include "crimson/common/smp_helpers.h"
26 #include "crimson/common/log.h"
34 using registry_hook_t
= boost::intrusive::list_member_hook
<
35 boost::intrusive::link_mode
<boost::intrusive::auto_unlink
>>;
42 void dump_time_event(const char* name
,
43 const utime_t
& timestamp
,
45 void dump_blocking_event(const char* name
,
46 const utime_t
& timestamp
,
47 const Blocker
* blocker
,
52 * Provides an interface for dumping diagnostic information about
53 * why a particular op is not making progress.
57 void dump(ceph::Formatter
*f
) const;
58 virtual ~Blocker() = default;
61 virtual void dump_detail(ceph::Formatter
*f
) const = 0;
62 virtual const char *get_type_name() const = 0;
65 // the main template. by default an operation has no extenral
66 // event handler (the empty tuple). specializing the template
67 // allows to define backends on per-operation-type manner.
68 // NOTE: basically this could be a function but C++ disallows
69 // differentiating return type among specializations.
71 struct EventBackendRegistry
{
72 template <typename
...> static constexpr bool always_false
= false;
74 static std::tuple
<> get_backends() {
75 static_assert(always_false
<T
>, "Registry specialization not found");
83 return static_cast<T
*>(this);
85 const T
* that() const {
86 return static_cast<const T
*>(this);
89 template <class OpT
, class... Args
>
90 void trigger(OpT
&& op
, Args
&&... args
) {
91 that()->internal_backend
.handle(*that(),
92 std::forward
<OpT
>(op
),
93 std::forward
<Args
>(args
)...);
94 // let's call `handle()` for concrete event type from each single
95 // of our backends. the order in the registry matters.
96 std::apply([&, //args=std::forward_as_tuple(std::forward<Args>(args)...),
97 this] (auto... backend
) {
98 (..., backend
.handle(*that(),
99 std::forward
<OpT
>(op
),
100 std::forward
<Args
>(args
)...));
101 }, EventBackendRegistry
<std::decay_t
<OpT
>>::get_backends());
106 // simplest event type for recording things like beginning or end
107 // of TrackableOperation's life.
109 struct TimeEvent
: Event
<T
> {
111 // `T` is passed solely to let implementations to discriminate
112 // basing on the type-of-event.
113 virtual void handle(T
&, const Operation
&) = 0;
116 // for the sake of dumping ops-in-flight.
117 struct InternalBackend final
: Backend
{
118 void handle(T
&, const Operation
&) override
{
119 timestamp
= ceph_clock_now();
124 void dump(ceph::Formatter
*f
) const {
125 auto demangled_name
= boost::core::demangle(typeid(T
).name());
126 detail::dump_time_event(
127 demangled_name
.c_str(),
128 internal_backend
.timestamp
, f
);
131 auto get_timestamp() const {
132 return internal_backend
.timestamp
;
137 template <typename T
>
138 class BlockerT
: public Blocker
{
140 struct BlockingEvent
: Event
<typename
T::BlockingEvent
> {
141 using Blocker
= std::decay_t
<T
>;
144 // `T` is based solely to let implementations to discriminate
145 // basing on the type-of-event.
146 virtual void handle(typename
T::BlockingEvent
&, const Operation
&, const T
&) = 0;
149 struct InternalBackend
: Backend
{
150 void handle(typename
T::BlockingEvent
&,
152 const T
& blocker
) override
{
153 this->timestamp
= ceph_clock_now();
154 this->blocker
= &blocker
;
161 // we don't want to make any BlockerT to be aware and coupled with
162 // an operation. to not templatize an entire path from an op to
163 // a blocker, type erasuring is used.
165 TriggerI(BlockingEvent
& event
) : event(event
) {}
167 template <class FutureT
>
168 auto maybe_record_blocking(FutureT
&& fut
, const T
& blocker
) {
169 if (!fut
.available()) {
170 // a full blown call via vtable. that's the cost for templatization
171 // avoidance. anyway, most of the things actually have the type
173 record_blocking(blocker
);
174 return std::forward
<FutureT
>(fut
).finally(
175 [&event
=this->event
, &blocker
] () mutable {
176 // beware trigger instance may be already dead when this
178 record_unblocking(event
, blocker
);
181 return std::forward
<FutureT
>(fut
);
183 virtual ~TriggerI() = default;
185 // it's for the sake of erasing the OpT type
186 virtual void record_blocking(const T
& blocker
) = 0;
188 static void record_unblocking(BlockingEvent
& event
, const T
& blocker
) {
189 assert(event
.internal_backend
.blocker
== &blocker
);
190 event
.internal_backend
.blocker
= nullptr;
193 BlockingEvent
& event
;
197 struct Trigger
: TriggerI
{
198 Trigger(BlockingEvent
& event
, const OpT
& op
) : TriggerI(event
), op(op
) {}
200 template <class FutureT
>
201 auto maybe_record_blocking(FutureT
&& fut
, const T
& blocker
) {
202 if (!fut
.available()) {
203 // no need for the dynamic dispatch! if we're lucky, a compiler
204 // should collapse all these abstractions into a bunch of movs.
205 this->Trigger::record_blocking(blocker
);
206 return std::forward
<FutureT
>(fut
).finally(
207 [&event
=this->event
, &blocker
] () mutable {
208 Trigger::record_unblocking(event
, blocker
);
211 return std::forward
<FutureT
>(fut
);
214 const OpT
&get_op() { return op
; }
217 void record_blocking(const T
& blocker
) override
{
218 this->event
.trigger(op
, blocker
);
224 void dump(ceph::Formatter
*f
) const {
225 auto demangled_name
= boost::core::demangle(typeid(T
).name());
226 detail::dump_blocking_event(
227 demangled_name
.c_str(),
228 internal_backend
.timestamp
,
229 internal_backend
.blocker
,
234 virtual ~BlockerT() = default;
235 template <class TriggerT
, class... Args
>
236 decltype(auto) track_blocking(TriggerT
&& trigger
, Args
&&... args
) {
237 return std::forward
<TriggerT
>(trigger
).maybe_record_blocking(
238 std::forward
<Args
>(args
)..., static_cast<const T
&>(*this));
242 const char *get_type_name() const final
{
243 return static_cast<const T
*>(this)->type_name
;
248 struct AggregateBlockingEvent
{
251 struct TriggerContainerI
{
252 virtual typename
T::TriggerI
& get_trigger() = 0;
253 virtual ~TriggerContainerI() = default;
255 using TriggerContainerIRef
= std::unique_ptr
<TriggerContainerI
>;
256 virtual TriggerContainerIRef
create_part_trigger() = 0;
259 template <class FutureT
>
260 auto maybe_record_blocking(FutureT
&& fut
,
261 const typename
T::Blocker
& blocker
) {
262 // AggregateBlockingEvent is supposed to be used on relatively cold
263 // paths (recovery), so we don't need to worry about the dynamic
264 // polymothps / dynamic memory's overhead.
265 auto tcont
= create_part_trigger();
266 return tcont
->get_trigger().maybe_record_blocking(
267 std::move(fut
), blocker
268 ).finally([tcont
=std::move(tcont
)] {});
271 virtual ~TriggerI() = default;
275 struct Trigger final
: TriggerI
{
276 Trigger(AggregateBlockingEvent
& event
, const OpT
& op
)
277 : event(event
), op(op
) {}
279 class TriggerContainer final
: public TriggerI::TriggerContainerI
{
280 AggregateBlockingEvent
& event
;
281 typename
decltype(event
.events
)::iterator iter
;
282 typename
T::template Trigger
<OpT
> trigger
;
284 typename
T::TriggerI
&get_trigger() final
{
289 TriggerContainer(AggregateBlockingEvent
& _event
, const OpT
& op
) :
291 iter(event
.events
.emplace(event
.events
.end())),
292 trigger(*iter
, op
) {}
294 ~TriggerContainer() final
{
295 event
.events
.erase(iter
);
300 typename
TriggerI::TriggerContainerIRef
create_part_trigger() final
{
301 return std::make_unique
<TriggerContainer
>(event
, op
);
305 AggregateBlockingEvent
& event
;
312 friend class Trigger
;
316 * Common base for all crimson-osd operations. Mainly provides
317 * an interface for registering ops in flight and dumping
318 * diagnostic information.
320 class Operation
: public boost::intrusive_ref_counter
<
321 Operation
, boost::thread_unsafe_counter
> {
323 using id_t
= uint64_t;
324 static constexpr id_t NULL_ID
= std::numeric_limits
<uint64_t>::max();
325 id_t
get_id() const {
329 static constexpr bool is_trackable
= false;
331 virtual unsigned get_type() const = 0;
332 virtual const char *get_type_name() const = 0;
333 virtual void print(std::ostream
&) const = 0;
335 void dump(ceph::Formatter
*f
) const;
336 void dump_brief(ceph::Formatter
*f
) const;
337 virtual ~Operation() = default;
340 virtual void dump_detail(ceph::Formatter
*f
) const = 0;
342 registry_hook_t registry_hook
;
345 void set_id(id_t in_id
) {
349 friend class OperationRegistryI
;
351 friend class OperationRegistryT
;
353 using OperationRef
= boost::intrusive_ptr
<Operation
>;
355 std::ostream
&operator<<(std::ostream
&, const Operation
&op
);
358 * Maintains a set of lists of all active ops.
360 class OperationRegistryI
{
361 using op_list_member_option
= boost::intrusive::member_hook
<
364 &Operation::registry_hook
367 friend class Operation
;
368 seastar::timer
<seastar::lowres_clock
> shutdown_timer
;
369 seastar::promise
<> shutdown
;
372 virtual void do_register(Operation
*op
) = 0;
373 virtual bool registries_empty() const = 0;
374 virtual void do_stop() = 0;
377 using op_list
= boost::intrusive::list
<
379 op_list_member_option
,
380 boost::intrusive::constant_time_size
<false>>;
382 template <typename T
, typename
... Args
>
383 auto create_operation(Args
&&... args
) {
384 boost::intrusive_ptr
<T
> op
= new T(std::forward
<Args
>(args
)...);
389 seastar::future
<> stop() {
390 crimson::get_logger(ceph_subsys_osd
).info("OperationRegistryI::{}", __func__
);
392 shutdown_timer
.set_callback([this] {
393 if (registries_empty()) {
394 shutdown
.set_value();
395 shutdown_timer
.cancel();
398 shutdown_timer
.arm_periodic(
399 std::chrono::milliseconds(100/*TODO: use option instead*/));
400 return shutdown
.get_future();
405 template <size_t NUM_REGISTRIES
>
406 class OperationRegistryT
: public OperationRegistryI
{
407 Operation::id_t next_id
= 0;
414 void do_register(Operation
*op
) final
{
415 const auto op_type
= op
->get_type();
416 registries
[op_type
].push_back(*op
);
417 op
->set_id(++next_id
);
420 bool registries_empty() const final
{
421 return std::all_of(registries
.begin(),
429 OperationRegistryT(core_id_t core
)
430 // Use core to initialize upper 8 bits of counters to ensure that
431 // ids generated by different cores are disjoint
432 : next_id(static_cast<id_t
>(core
) <<
433 (std::numeric_limits
<id_t
>::digits
- 8))
436 template <size_t REGISTRY_INDEX
>
437 const op_list
& get_registry() const {
439 REGISTRY_INDEX
< std::tuple_size
<decltype(registries
)>::value
);
440 return registries
[REGISTRY_INDEX
];
443 template <size_t REGISTRY_INDEX
>
444 op_list
& get_registry() {
446 REGISTRY_INDEX
< std::tuple_size
<decltype(registries
)>::value
);
447 return registries
[REGISTRY_INDEX
];
451 /// Iterate over live ops
452 template <typename F
>
453 void for_each_op(F
&&f
) const {
454 for (const auto ®istry
: registries
) {
455 for (const auto &op
: registry
) {
461 /// Removes op from registry
462 void remove_from_registry(Operation
&op
) {
463 const auto op_type
= op
.get_type();
464 registries
[op_type
].erase(op_list::s_iterator_to(op
));
467 /// Adds op to registry
468 void add_to_registry(Operation
&op
) {
469 const auto op_type
= op
.get_type();
470 registries
[op_type
].push_back(op
);
474 class PipelineExitBarrierI
{
476 using Ref
= std::unique_ptr
<PipelineExitBarrierI
>;
478 /// Waits for exit barrier
479 virtual std::optional
<seastar::future
<>> wait() = 0;
481 /// Releases pipeline stage, can only be called after wait
482 virtual void exit() = 0;
484 /// Releases pipeline resources without waiting on barrier
485 virtual void cancel() = 0;
487 /// Must ensure that resources are released, likely by calling cancel()
488 virtual ~PipelineExitBarrierI() {}
492 class PipelineStageIT
: public BlockerT
<T
> {
493 const core_id_t core
= seastar::this_shard_id();
495 core_id_t
get_core() const { return core
; }
497 template <class... Args
>
498 decltype(auto) enter(Args
&&... args
) {
499 return static_cast<T
*>(this)->enter(std::forward
<Args
>(args
)...);
503 class PipelineHandle
{
504 PipelineExitBarrierI::Ref barrier
;
506 std::optional
<seastar::future
<>> wait_barrier() {
507 return barrier
? barrier
->wait() : std::nullopt
;
511 PipelineHandle() = default;
513 PipelineHandle(const PipelineHandle
&) = delete;
514 PipelineHandle(PipelineHandle
&&) = default;
515 PipelineHandle
&operator=(const PipelineHandle
&) = delete;
516 PipelineHandle
&operator=(PipelineHandle
&&) = default;
519 * Returns a future which unblocks when the handle has entered the passed
520 * OrderedPipelinePhase. If already in a phase, enter will also release
521 * that phase after placing itself in the queue for the next one to preserve
524 template <typename OpT
, typename T
>
526 enter(T
&stage
, typename
T::BlockingEvent::template Trigger
<OpT
>&& t
) {
527 ceph_assert(stage
.get_core() == seastar::this_shard_id());
528 auto wait_fut
= wait_barrier();
529 if (wait_fut
.has_value()) {
530 return wait_fut
.value().then([this, &stage
, t
=std::move(t
)] () mutable {
531 auto fut
= t
.maybe_record_blocking(stage
.enter(t
), stage
);
533 return std::move(fut
).then(
534 [this, t
=std::move(t
)](auto &&barrier_ref
) mutable {
535 barrier
= std::move(barrier_ref
);
536 return seastar::now();
540 auto fut
= t
.maybe_record_blocking(stage
.enter(t
), stage
);
542 return std::move(fut
).then(
543 [this, t
=std::move(t
)](auto &&barrier_ref
) mutable {
544 barrier
= std::move(barrier_ref
);
545 return seastar::now();
551 * Completes pending exit barrier without entering a new one.
553 seastar::future
<> complete() {
554 auto ret
= wait_barrier();
556 return ret
? std::move(ret
.value()) : seastar::now();
560 * Exits current phase, skips exit barrier, should only be used for op
561 * failure. Permitting the handle to be destructed as the same effect.
570 * Ensures that at most one op may consider itself in the phase at a time.
571 * Ops will see enter() unblock in the order in which they tried to enter
572 * the phase. entering (though not necessarily waiting for the future to
573 * resolve) a new phase prior to exiting the previous one will ensure that
574 * the op ordering is preserved.
577 class OrderedExclusivePhaseT
: public PipelineStageIT
<T
> {
578 void dump_detail(ceph::Formatter
*f
) const final
{
579 f
->dump_unsigned("waiting", waiting
);
580 if (held_by
!= Operation::NULL_ID
) {
581 f
->dump_unsigned("held_by_operation_id", held_by
);
585 class ExitBarrier final
: public PipelineExitBarrierI
{
586 OrderedExclusivePhaseT
*phase
;
587 Operation::id_t op_id
;
589 ExitBarrier(OrderedExclusivePhaseT
*phase
, Operation::id_t id
)
590 : phase(phase
), op_id(id
) {}
592 std::optional
<seastar::future
<>> wait() final
{
601 std::ignore
= seastar::smp::submit_to(
609 void cancel() final
{
613 ~ExitBarrier() final
{
618 void exit(Operation::id_t op_id
) {
619 clear_held_by(op_id
);
624 template <class TriggerT
>
625 seastar::future
<PipelineExitBarrierI::Ref
> enter(TriggerT
& t
) {
627 return mutex
.lock().then([this, op_id
=t
.get_op().get_id()] {
628 ceph_assert_always(waiting
> 0);
631 return PipelineExitBarrierI::Ref(new ExitBarrier
{this, op_id
});
636 void set_held_by(Operation::id_t id
) {
637 ceph_assert_always(held_by
== Operation::NULL_ID
);
641 void clear_held_by(Operation::id_t id
) {
642 ceph_assert_always(held_by
== id
);
643 held_by
= Operation::NULL_ID
;
646 unsigned waiting
= 0;
647 seastar::shared_mutex mutex
;
648 Operation::id_t held_by
= Operation::NULL_ID
;
652 * Permits multiple ops to inhabit the stage concurrently, but ensures that
653 * they will proceed to the next stage in the order in which they called
657 class OrderedConcurrentPhaseT
: public PipelineStageIT
<T
> {
658 using base_t
= PipelineStageIT
<T
>;
660 struct BlockingEvent
: base_t::BlockingEvent
{
661 using base_t::BlockingEvent::BlockingEvent
;
663 struct ExitBarrierEvent
: TimeEvent
<ExitBarrierEvent
> {};
666 struct Trigger
: base_t::BlockingEvent::template Trigger
<OpT
> {
667 using base_t::BlockingEvent::template Trigger
<OpT
>::Trigger
;
669 template <class FutureT
>
670 decltype(auto) maybe_record_exit_barrier(FutureT
&& fut
) {
671 if (!fut
.available()) {
672 exit_barrier_event
.trigger(this->op
);
674 return std::forward
<FutureT
>(fut
);
677 ExitBarrierEvent exit_barrier_event
;
682 void dump_detail(ceph::Formatter
*f
) const final
{}
684 template <class TriggerT
>
685 class ExitBarrier final
: public PipelineExitBarrierI
{
686 OrderedConcurrentPhaseT
*phase
;
687 std::optional
<seastar::future
<>> barrier
;
691 OrderedConcurrentPhaseT
*phase
,
692 seastar::future
<> &&barrier
,
693 TriggerT
& trigger
) : phase(phase
), barrier(std::move(barrier
)), trigger(trigger
) {}
695 std::optional
<seastar::future
<>> wait() final
{
698 auto ret
= std::move(*barrier
);
699 barrier
= std::nullopt
;
700 return trigger
.maybe_record_exit_barrier(std::move(ret
));
706 std::move(*barrier
).then([phase
=this->phase
] { phase
->mutex
.unlock(); }));
707 barrier
= std::nullopt
;
711 std::ignore
= seastar::smp::submit_to(
714 phase
->mutex
.unlock();
720 void cancel() final
{
724 ~ExitBarrier() final
{
730 template <class TriggerT
>
731 seastar::future
<PipelineExitBarrierI::Ref
> enter(TriggerT
& t
) {
732 return seastar::make_ready_future
<PipelineExitBarrierI::Ref
>(
733 new ExitBarrier
<TriggerT
>{this, mutex
.lock(), t
});
737 seastar::shared_mutex mutex
;
741 * Imposes no ordering or exclusivity at all. Ops enter without constraint and
742 * may exit in any order. Useful mainly for informational purposes between
743 * stages with constraints.
746 class UnorderedStageT
: public PipelineStageIT
<T
> {
747 void dump_detail(ceph::Formatter
*f
) const final
{}
749 class ExitBarrier final
: public PipelineExitBarrierI
{
751 ExitBarrier() = default;
753 std::optional
<seastar::future
<>> wait() final
{
759 void cancel() final
{}
761 ~ExitBarrier() final
{}
765 template <class... IgnoreArgs
>
766 seastar::future
<PipelineExitBarrierI::Ref
> enter(IgnoreArgs
&&...) {
767 return seastar::make_ready_future
<PipelineExitBarrierI::Ref
>(
774 #if FMT_VERSION >= 90000
775 template <> struct fmt::formatter
<crimson::Operation
> : fmt::ostream_formatter
{};