1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
10 #include <boost/intrusive/list.hpp>
11 #include <boost/intrusive_ptr.hpp>
12 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
13 #include <seastar/core/shared_mutex.hh>
14 #include <seastar/core/future.hh>
15 #include <seastar/core/timer.hh>
16 #include <seastar/core/lowres_clock.hh>
17 #include <seastar/core/future-util.hh>
19 #include "include/ceph_assert.h"
20 #include "crimson/common/interruptible_future.h"
28 using registry_hook_t
= boost::intrusive::list_member_hook
<
29 boost::intrusive::link_mode
<boost::intrusive::auto_unlink
>>;
35 * Provides an abstraction for registering and unregistering a blocker
36 * for the duration of a future becoming available.
38 template <typename Fut
>
39 class blocking_future_detail
{
40 friend class Operation
;
44 blocking_future_detail(Blocker
*b
, Fut
&&f
)
45 : blocker(b
), fut(std::move(f
)) {}
47 template <typename V
, typename
... U
>
48 friend blocking_future_detail
<seastar::future
<V
>>
49 make_ready_blocking_future(U
&&... args
);
51 template <typename V
, typename Exception
>
52 friend blocking_future_detail
<seastar::future
<V
>>
53 make_exception_blocking_future(Exception
&& e
);
56 friend blocking_future_detail
<seastar::future
<>> join_blocking_futures(U
&&u
);
58 template <typename InterruptCond
, typename T
>
59 friend blocking_future_detail
<
60 ::crimson::interruptible::interruptible_future
<InterruptCond
>>
61 join_blocking_interruptible_futures(T
&& t
);
64 friend class blocking_future_detail
;
69 using result
= decltype(std::declval
<Fut
>().then(f
));
70 return blocking_future_detail
<seastar::futurize_t
<result
>>(
72 std::move(fut
).then(std::forward
<F
>(f
)));
74 template <typename InterruptCond
, typename F
>
75 auto then_interruptible(F
&&f
) && {
76 using result
= decltype(std::declval
<Fut
>().then_interruptible(f
));
77 return blocking_future_detail
<
78 typename ::crimson::interruptible::interruptor
<
79 InterruptCond
>::template futurize
<result
>::type
>(
81 std::move(fut
).then_interruptible(std::forward
<F
>(f
)));
85 template <typename T
=void>
86 using blocking_future
= blocking_future_detail
<seastar::future
<T
>>;
88 template <typename InterruptCond
, typename T
= void>
89 using blocking_interruptible_future
= blocking_future_detail
<
90 ::crimson::interruptible::interruptible_future
<InterruptCond
, T
>>;
92 template <typename InterruptCond
, typename V
, typename U
>
93 blocking_interruptible_future
<InterruptCond
, V
>
94 make_ready_blocking_interruptible_future(U
&& args
) {
95 return blocking_interruptible_future
<InterruptCond
, V
>(
97 seastar::make_ready_future
<V
>(std::forward
<U
>(args
)));
100 template <typename InterruptCond
, typename V
, typename Exception
>
101 blocking_interruptible_future
<InterruptCond
, V
>
102 make_exception_blocking_interruptible_future(Exception
&& e
) {
103 return blocking_interruptible_future
<InterruptCond
, V
>(
105 seastar::make_exception_future
<InterruptCond
, V
>(e
));
108 template <typename V
=void, typename
... U
>
109 blocking_future_detail
<seastar::future
<V
>> make_ready_blocking_future(U
&&... args
) {
110 return blocking_future
<V
>(
112 seastar::make_ready_future
<V
>(std::forward
<U
>(args
)...));
115 template <typename V
, typename Exception
>
116 blocking_future_detail
<seastar::future
<V
>>
117 make_exception_blocking_future(Exception
&& e
) {
118 return blocking_future
<V
>(
120 seastar::make_exception_future
<V
>(e
));
124 * Provides an interface for dumping diagnostic information about
125 * why a particular op is not making progress.
129 template <typename T
>
130 blocking_future
<T
> make_blocking_future(seastar::future
<T
> &&f
) {
131 return blocking_future
<T
>(this, std::move(f
));
134 template <typename InterruptCond
, typename T
>
135 blocking_interruptible_future
<InterruptCond
, T
>
136 make_blocking_future(
137 crimson::interruptible::interruptible_future
<InterruptCond
, T
> &&f
) {
138 return blocking_interruptible_future
<InterruptCond
, T
>(
141 template <typename InterruptCond
, typename T
= void>
142 blocking_interruptible_future
<InterruptCond
, T
>
143 make_blocking_interruptible_future(seastar::future
<T
> &&f
) {
144 return blocking_interruptible_future
<InterruptCond
, T
>(
146 ::crimson::interruptible::interruptor
<InterruptCond
>::make_interruptible(
150 void dump(ceph::Formatter
*f
) const;
151 virtual ~Blocker() = default;
154 virtual void dump_detail(ceph::Formatter
*f
) const = 0;
155 virtual const char *get_type_name() const = 0;
158 template <typename T
>
159 class BlockerT
: public Blocker
{
161 virtual ~BlockerT() = default;
163 const char *get_type_name() const final
{
168 class AggregateBlocker
: public BlockerT
<AggregateBlocker
> {
169 std::vector
<Blocker
*> parent_blockers
;
171 AggregateBlocker(std::vector
<Blocker
*> &&parent_blockers
)
172 : parent_blockers(std::move(parent_blockers
)) {}
173 static constexpr const char *type_name
= "AggregateBlocker";
175 void dump_detail(ceph::Formatter
*f
) const final
;
178 template <typename T
>
179 blocking_future
<> join_blocking_futures(T
&&t
) {
180 std::vector
<Blocker
*> blockers
;
181 blockers
.reserve(t
.size());
183 blockers
.push_back(bf
.blocker
);
184 bf
.blocker
= nullptr;
186 auto agg
= std::make_unique
<AggregateBlocker
>(std::move(blockers
));
187 return agg
->make_blocking_future(
188 seastar::parallel_for_each(
191 return std::move(bf
.fut
);
192 }).then([agg
=std::move(agg
)] {
193 return seastar::make_ready_future
<>();
197 template <typename InterruptCond
, typename T
>
198 blocking_interruptible_future
<InterruptCond
>
199 join_blocking_interruptible_futures(T
&& t
) {
200 std::vector
<Blocker
*> blockers
;
201 blockers
.reserve(t
.size());
203 blockers
.push_back(bf
.blocker
);
204 bf
.blocker
= nullptr;
206 auto agg
= std::make_unique
<AggregateBlocker
>(std::move(blockers
));
207 return agg
->make_blocking_future(
208 ::crimson::interruptible::interruptor
<InterruptCond
>::parallel_for_each(
211 return std::move(bf
.fut
);
212 }).then_interruptible([agg
=std::move(agg
)] {
213 return seastar::make_ready_future
<>();
219 * Common base for all crimson-osd operations. Mainly provides
220 * an interface for registering ops in flight and dumping
221 * diagnostic information.
223 class Operation
: public boost::intrusive_ref_counter
<
224 Operation
, boost::thread_unsafe_counter
> {
226 uint64_t get_id() const {
230 virtual unsigned get_type() const = 0;
231 virtual const char *get_type_name() const = 0;
232 virtual void print(std::ostream
&) const = 0;
234 template <typename T
>
235 seastar::future
<T
> with_blocking_future(blocking_future
<T
> &&f
) {
236 if (f
.fut
.available()) {
237 return std::move(f
.fut
);
240 add_blocker(f
.blocker
);
241 return std::move(f
.fut
).then_wrapped([this, blocker
=f
.blocker
](auto &&arg
) {
242 clear_blocker(blocker
);
243 return std::move(arg
);
247 template <typename InterruptCond
, typename T
>
248 ::crimson::interruptible::interruptible_future
<InterruptCond
, T
>
249 with_blocking_future_interruptible(blocking_future
<T
> &&f
) {
250 if (f
.fut
.available()) {
251 return std::move(f
.fut
);
254 add_blocker(f
.blocker
);
255 auto fut
= std::move(f
.fut
).then_wrapped([this, blocker
=f
.blocker
](auto &&arg
) {
256 clear_blocker(blocker
);
257 return std::move(arg
);
259 return ::crimson::interruptible::interruptible_future
<
260 InterruptCond
, T
>(std::move(fut
));
263 template <typename InterruptCond
, typename T
>
264 ::crimson::interruptible::interruptible_future
<InterruptCond
, T
>
265 with_blocking_future_interruptible(
266 blocking_interruptible_future
<InterruptCond
, T
> &&f
) {
267 if (f
.fut
.available()) {
268 return std::move(f
.fut
);
271 add_blocker(f
.blocker
);
272 return std::move(f
.fut
).template then_wrapped_interruptible(
273 [this, blocker
=f
.blocker
](auto &&arg
) {
274 clear_blocker(blocker
);
275 return std::move(arg
);
279 void dump(ceph::Formatter
*f
) const;
280 void dump_brief(ceph::Formatter
*f
) const;
281 virtual ~Operation() = default;
284 virtual void dump_detail(ceph::Formatter
*f
) const = 0;
286 registry_hook_t registry_hook
;
288 std::vector
<Blocker
*> blockers
;
290 void set_id(uint64_t in_id
) {
294 void add_blocker(Blocker
*b
) {
295 blockers
.push_back(b
);
298 void clear_blocker(Blocker
*b
) {
299 auto iter
= std::find(blockers
.begin(), blockers
.end(), b
);
300 if (iter
!= blockers
.end()) {
301 blockers
.erase(iter
);
305 friend class OperationRegistryI
;
307 friend class OperationRegistryT
;
309 using OperationRef
= boost::intrusive_ptr
<Operation
>;
311 std::ostream
&operator<<(std::ostream
&, const Operation
&op
);
314 * Maintains a set of lists of all active ops.
316 class OperationRegistryI
{
317 using op_list_member_option
= boost::intrusive::member_hook
<
320 &Operation::registry_hook
323 friend class Operation
;
324 seastar::timer
<seastar::lowres_clock
> shutdown_timer
;
325 seastar::promise
<> shutdown
;
328 virtual void do_register(Operation
*op
) = 0;
329 virtual bool registries_empty() const = 0;
332 using op_list
= boost::intrusive::list
<
334 op_list_member_option
,
335 boost::intrusive::constant_time_size
<false>>;
337 template <typename T
, typename
... Args
>
338 typename
T::IRef
create_operation(Args
&&... args
) {
339 typename
T::IRef op
= new T(std::forward
<Args
>(args
)...);
344 seastar::future
<> stop() {
345 shutdown_timer
.set_callback([this] {
346 if (registries_empty()) {
347 shutdown
.set_value();
348 shutdown_timer
.cancel();
351 shutdown_timer
.arm_periodic(
352 std::chrono::milliseconds(100/*TODO: use option instead*/));
353 return shutdown
.get_future();
358 template <size_t NUM_REGISTRIES
>
359 class OperationRegistryT
: public OperationRegistryI
{
368 > op_id_counters
= {};
371 void do_register(Operation
*op
) final
{
372 registries
[op
->get_type()].push_back(*op
);
373 op
->set_id(++op_id_counters
[op
->get_type()]);
376 bool registries_empty() const final
{
377 return std::all_of(registries
.begin(),
384 template <size_t REGISTRY_INDEX
>
385 const op_list
& get_registry() const {
387 REGISTRY_INDEX
< std::tuple_size
<decltype(registries
)>::value
);
388 return registries
[REGISTRY_INDEX
];
392 class PipelineExitBarrierI
{
394 using Ref
= std::unique_ptr
<PipelineExitBarrierI
>;
396 /// Waits for exit barrier
397 virtual seastar::future
<> wait() = 0;
399 /// Releases pipeline stage, can only be called after wait
400 virtual void exit() = 0;
402 /// Releases pipeline resources without waiting on barrier
403 virtual void cancel() = 0;
405 /// Must ensure that resources are released, likely by calling cancel()
406 virtual ~PipelineExitBarrierI() {}
409 class PipelineStageI
: public Blocker
{
411 virtual seastar::future
<PipelineExitBarrierI::Ref
> enter() = 0;
414 class PipelineHandle
{
415 PipelineExitBarrierI::Ref barrier
;
417 auto wait_barrier() {
418 return barrier
? barrier
->wait() : seastar::now();
422 PipelineHandle() = default;
424 PipelineHandle(const PipelineHandle
&) = delete;
425 PipelineHandle(PipelineHandle
&&) = default;
426 PipelineHandle
&operator=(const PipelineHandle
&) = delete;
427 PipelineHandle
&operator=(PipelineHandle
&&) = default;
430 * Returns a future which unblocks when the handle has entered the passed
431 * OrderedPipelinePhase. If already in a phase, enter will also release
432 * that phase after placing itself in the queue for the next one to preserve
435 template <typename T
>
436 blocking_future
<> enter(T
&t
) {
437 /* Strictly speaking, we probably want the blocker to be registered on
438 * the previous stage until wait_barrier() resolves and on the next
439 * until enter() resolves, but blocking_future will need some refactoring
440 * to permit that. TODO
442 return t
.make_blocking_future(
443 wait_barrier().then([this, &t
] {
444 auto fut
= t
.enter();
446 return std::move(fut
).then([this](auto &&barrier_ref
) {
447 barrier
= std::move(barrier_ref
);
448 return seastar::now();
455 * Completes pending exit barrier without entering a new one.
457 seastar::future
<> complete() {
458 auto ret
= wait_barrier();
464 * Exits current phase, skips exit barrier, should only be used for op
465 * failure. Permitting the handle to be destructed as the same effect.
474 * Ensures that at most one op may consider itself in the phase at a time.
475 * Ops will see enter() unblock in the order in which they tried to enter
476 * the phase. entering (though not necessarily waiting for the future to
477 * resolve) a new phase prior to exiting the previous one will ensure that
478 * the op ordering is preserved.
480 class OrderedExclusivePhase
: public PipelineStageI
{
481 void dump_detail(ceph::Formatter
*f
) const final
;
482 const char *get_type_name() const final
{
486 class ExitBarrier final
: public PipelineExitBarrierI
{
487 OrderedExclusivePhase
*phase
;
489 ExitBarrier(OrderedExclusivePhase
*phase
) : phase(phase
) {}
491 seastar::future
<> wait() final
{
492 return seastar::now();
502 void cancel() final
{
506 ~ExitBarrier() final
{
516 seastar::future
<PipelineExitBarrierI::Ref
> enter() final
{
517 return mutex
.lock().then([this] {
518 return PipelineExitBarrierI::Ref(new ExitBarrier
{this});
522 OrderedExclusivePhase(const char *name
) : name(name
) {}
526 seastar::shared_mutex mutex
;
530 * Permits multiple ops to inhabit the stage concurrently, but ensures that
531 * they will proceed to the next stage in the order in which they called
534 class OrderedConcurrentPhase
: public PipelineStageI
{
535 void dump_detail(ceph::Formatter
*f
) const final
;
536 const char *get_type_name() const final
{
540 class ExitBarrier final
: public PipelineExitBarrierI
{
541 OrderedConcurrentPhase
*phase
;
542 std::optional
<seastar::future
<>> barrier
;
545 OrderedConcurrentPhase
*phase
,
546 seastar::future
<> &&barrier
) : phase(phase
), barrier(std::move(barrier
)) {}
548 seastar::future
<> wait() final
{
551 auto ret
= std::move(*barrier
);
552 barrier
= std::nullopt
;
559 std::move(*barrier
).then([phase
=this->phase
] { phase
->mutex
.unlock(); }));
560 barrier
= std::nullopt
;
564 phase
->mutex
.unlock();
569 void cancel() final
{
573 ~ExitBarrier() final
{
579 seastar::future
<PipelineExitBarrierI::Ref
> enter() final
{
580 return seastar::make_ready_future
<PipelineExitBarrierI::Ref
>(
581 new ExitBarrier
{this, mutex
.lock()});
584 OrderedConcurrentPhase(const char *name
) : name(name
) {}
588 seastar::shared_mutex mutex
;
592 * Imposes no ordering or exclusivity at all. Ops enter without constraint and
593 * may exit in any order. Useful mainly for informational purposes between
594 * stages with constraints.
596 class UnorderedStage
: public PipelineStageI
{
597 void dump_detail(ceph::Formatter
*f
) const final
{}
598 const char *get_type_name() const final
{
602 class ExitBarrier final
: public PipelineExitBarrierI
{
604 ExitBarrier() = default;
606 seastar::future
<> wait() final
{
607 return seastar::now();
612 void cancel() final
{}
614 ~ExitBarrier() final
{}
618 seastar::future
<PipelineExitBarrierI::Ref
> enter() final
{
619 return seastar::make_ready_future
<PipelineExitBarrierI::Ref
>(
623 UnorderedStage(const char *name
) : name(name
) {}