]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/common/operation.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / common / operation.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
3
4 #pragma once
5
6 #include <algorithm>
7 #include <array>
8 #include <set>
9 #include <vector>
10 #include <boost/core/demangle.hpp>
11 #include <boost/intrusive/list.hpp>
12 #include <boost/intrusive_ptr.hpp>
13 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
14 #include <seastar/core/shared_mutex.hh>
15 #include <seastar/core/future.hh>
16 #include <seastar/core/timer.hh>
17 #include <seastar/core/lowres_clock.hh>
18 #include <seastar/core/future-util.hh>
19
20 #include "include/ceph_assert.h"
21 #include "include/utime.h"
22 #include "common/Clock.h"
23 #include "common/Formatter.h"
24 #include "crimson/common/interruptible_future.h"
25 #include "crimson/common/smp_helpers.h"
26 #include "crimson/common/log.h"
27
28 namespace ceph {
29 class Formatter;
30 }
31
32 namespace crimson {
33
34 using registry_hook_t = boost::intrusive::list_member_hook<
35 boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
36
37 class Operation;
38 class Blocker;
39
40
41 namespace detail {
42 void dump_time_event(const char* name,
43 const utime_t& timestamp,
44 ceph::Formatter* f);
45 void dump_blocking_event(const char* name,
46 const utime_t& timestamp,
47 const Blocker* blocker,
48 ceph::Formatter* f);
49 } // namespace detail
50
51 /**
52 * Provides an interface for dumping diagnostic information about
53 * why a particular op is not making progress.
54 */
55 class Blocker {
56 public:
57 void dump(ceph::Formatter *f) const;
58 virtual ~Blocker() = default;
59
60 private:
61 virtual void dump_detail(ceph::Formatter *f) const = 0;
62 virtual const char *get_type_name() const = 0;
63 };
64
65 // the main template. by default an operation has no extenral
66 // event handler (the empty tuple). specializing the template
67 // allows to define backends on per-operation-type manner.
68 // NOTE: basically this could be a function but C++ disallows
69 // differentiating return type among specializations.
70 template <class T>
71 struct EventBackendRegistry {
72 template <typename...> static constexpr bool always_false = false;
73
74 static std::tuple<> get_backends() {
75 static_assert(always_false<T>, "Registry specialization not found");
76 return {};
77 }
78 };
79
80 template <class T>
81 struct Event {
82 T* that() {
83 return static_cast<T*>(this);
84 }
85 const T* that() const {
86 return static_cast<const T*>(this);
87 }
88
89 template <class OpT, class... Args>
90 void trigger(OpT&& op, Args&&... args) {
91 that()->internal_backend.handle(*that(),
92 std::forward<OpT>(op),
93 std::forward<Args>(args)...);
94 // let's call `handle()` for concrete event type from each single
95 // of our backends. the order in the registry matters.
96 std::apply([&, //args=std::forward_as_tuple(std::forward<Args>(args)...),
97 this] (auto... backend) {
98 (..., backend.handle(*that(),
99 std::forward<OpT>(op),
100 std::forward<Args>(args)...));
101 }, EventBackendRegistry<std::decay_t<OpT>>::get_backends());
102 }
103 };
104
105
106 // simplest event type for recording things like beginning or end
107 // of TrackableOperation's life.
108 template <class T>
109 struct TimeEvent : Event<T> {
110 struct Backend {
111 // `T` is passed solely to let implementations to discriminate
112 // basing on the type-of-event.
113 virtual void handle(T&, const Operation&) = 0;
114 };
115
116 // for the sake of dumping ops-in-flight.
117 struct InternalBackend final : Backend {
118 void handle(T&, const Operation&) override {
119 timestamp = ceph_clock_now();
120 }
121 utime_t timestamp;
122 } internal_backend;
123
124 void dump(ceph::Formatter *f) const {
125 auto demangled_name = boost::core::demangle(typeid(T).name());
126 detail::dump_time_event(
127 demangled_name.c_str(),
128 internal_backend.timestamp, f);
129 }
130
131 auto get_timestamp() const {
132 return internal_backend.timestamp;
133 }
134 };
135
136
137 template <typename T>
138 class BlockerT : public Blocker {
139 public:
140 struct BlockingEvent : Event<typename T::BlockingEvent> {
141 using Blocker = std::decay_t<T>;
142
143 struct Backend {
144 // `T` is based solely to let implementations to discriminate
145 // basing on the type-of-event.
146 virtual void handle(typename T::BlockingEvent&, const Operation&, const T&) = 0;
147 };
148
149 struct InternalBackend : Backend {
150 void handle(typename T::BlockingEvent&,
151 const Operation&,
152 const T& blocker) override {
153 this->timestamp = ceph_clock_now();
154 this->blocker = &blocker;
155 }
156
157 utime_t timestamp;
158 const T* blocker;
159 } internal_backend;
160
161 // we don't want to make any BlockerT to be aware and coupled with
162 // an operation. to not templatize an entire path from an op to
163 // a blocker, type erasuring is used.
164 struct TriggerI {
165 TriggerI(BlockingEvent& event) : event(event) {}
166
167 template <class FutureT>
168 auto maybe_record_blocking(FutureT&& fut, const T& blocker) {
169 if (!fut.available()) {
170 // a full blown call via vtable. that's the cost for templatization
171 // avoidance. anyway, most of the things actually have the type
172 // knowledge.
173 record_blocking(blocker);
174 return std::forward<FutureT>(fut).finally(
175 [&event=this->event, &blocker] () mutable {
176 // beware trigger instance may be already dead when this
177 // is executed!
178 record_unblocking(event, blocker);
179 });
180 }
181 return std::forward<FutureT>(fut);
182 }
183 virtual ~TriggerI() = default;
184 protected:
185 // it's for the sake of erasing the OpT type
186 virtual void record_blocking(const T& blocker) = 0;
187
188 static void record_unblocking(BlockingEvent& event, const T& blocker) {
189 assert(event.internal_backend.blocker == &blocker);
190 event.internal_backend.blocker = nullptr;
191 }
192
193 BlockingEvent& event;
194 };
195
196 template <class OpT>
197 struct Trigger : TriggerI {
198 Trigger(BlockingEvent& event, const OpT& op) : TriggerI(event), op(op) {}
199
200 template <class FutureT>
201 auto maybe_record_blocking(FutureT&& fut, const T& blocker) {
202 if (!fut.available()) {
203 // no need for the dynamic dispatch! if we're lucky, a compiler
204 // should collapse all these abstractions into a bunch of movs.
205 this->Trigger::record_blocking(blocker);
206 return std::forward<FutureT>(fut).finally(
207 [&event=this->event, &blocker] () mutable {
208 Trigger::record_unblocking(event, blocker);
209 });
210 }
211 return std::forward<FutureT>(fut);
212 }
213
214 const OpT &get_op() { return op; }
215
216 protected:
217 void record_blocking(const T& blocker) override {
218 this->event.trigger(op, blocker);
219 }
220
221 const OpT& op;
222 };
223
224 void dump(ceph::Formatter *f) const {
225 auto demangled_name = boost::core::demangle(typeid(T).name());
226 detail::dump_blocking_event(
227 demangled_name.c_str(),
228 internal_backend.timestamp,
229 internal_backend.blocker,
230 f);
231 }
232 };
233
234 virtual ~BlockerT() = default;
235 template <class TriggerT, class... Args>
236 decltype(auto) track_blocking(TriggerT&& trigger, Args&&... args) {
237 return std::forward<TriggerT>(trigger).maybe_record_blocking(
238 std::forward<Args>(args)..., static_cast<const T&>(*this));
239 }
240
241 private:
242 const char *get_type_name() const final {
243 return static_cast<const T*>(this)->type_name;
244 }
245 };
246
247 template <class T>
248 struct AggregateBlockingEvent {
249 struct TriggerI {
250 protected:
251 struct TriggerContainerI {
252 virtual typename T::TriggerI& get_trigger() = 0;
253 virtual ~TriggerContainerI() = default;
254 };
255 using TriggerContainerIRef = std::unique_ptr<TriggerContainerI>;
256 virtual TriggerContainerIRef create_part_trigger() = 0;
257
258 public:
259 template <class FutureT>
260 auto maybe_record_blocking(FutureT&& fut,
261 const typename T::Blocker& blocker) {
262 // AggregateBlockingEvent is supposed to be used on relatively cold
263 // paths (recovery), so we don't need to worry about the dynamic
264 // polymothps / dynamic memory's overhead.
265 auto tcont = create_part_trigger();
266 return tcont->get_trigger().maybe_record_blocking(
267 std::move(fut), blocker
268 ).finally([tcont=std::move(tcont)] {});
269 }
270
271 virtual ~TriggerI() = default;
272 };
273
274 template <class OpT>
275 struct Trigger final : TriggerI {
276 Trigger(AggregateBlockingEvent& event, const OpT& op)
277 : event(event), op(op) {}
278
279 class TriggerContainer final : public TriggerI::TriggerContainerI {
280 AggregateBlockingEvent& event;
281 typename decltype(event.events)::iterator iter;
282 typename T::template Trigger<OpT> trigger;
283
284 typename T::TriggerI &get_trigger() final {
285 return trigger;
286 }
287
288 public:
289 TriggerContainer(AggregateBlockingEvent& _event, const OpT& op) :
290 event(_event),
291 iter(event.events.emplace(event.events.end())),
292 trigger(*iter, op) {}
293
294 ~TriggerContainer() final {
295 event.events.erase(iter);
296 }
297 };
298
299 protected:
300 typename TriggerI::TriggerContainerIRef create_part_trigger() final {
301 return std::make_unique<TriggerContainer>(event, op);
302 }
303
304 private:
305 AggregateBlockingEvent& event;
306 const OpT& op;
307 };
308
309 private:
310 std::list<T> events;
311 template <class OpT>
312 friend class Trigger;
313 };
314
315 /**
316 * Common base for all crimson-osd operations. Mainly provides
317 * an interface for registering ops in flight and dumping
318 * diagnostic information.
319 */
320 class Operation : public boost::intrusive_ref_counter<
321 Operation, boost::thread_unsafe_counter> {
322 public:
323 using id_t = uint64_t;
324 static constexpr id_t NULL_ID = std::numeric_limits<uint64_t>::max();
325 id_t get_id() const {
326 return id;
327 }
328
329 static constexpr bool is_trackable = false;
330
331 virtual unsigned get_type() const = 0;
332 virtual const char *get_type_name() const = 0;
333 virtual void print(std::ostream &) const = 0;
334
335 void dump(ceph::Formatter *f) const;
336 void dump_brief(ceph::Formatter *f) const;
337 virtual ~Operation() = default;
338
339 private:
340 virtual void dump_detail(ceph::Formatter *f) const = 0;
341
342 registry_hook_t registry_hook;
343
344 id_t id = 0;
345 void set_id(id_t in_id) {
346 id = in_id;
347 }
348
349 friend class OperationRegistryI;
350 template <size_t>
351 friend class OperationRegistryT;
352 };
353 using OperationRef = boost::intrusive_ptr<Operation>;
354
355 std::ostream &operator<<(std::ostream &, const Operation &op);
356
357 /**
358 * Maintains a set of lists of all active ops.
359 */
360 class OperationRegistryI {
361 using op_list_member_option = boost::intrusive::member_hook<
362 Operation,
363 registry_hook_t,
364 &Operation::registry_hook
365 >;
366
367 friend class Operation;
368 seastar::timer<seastar::lowres_clock> shutdown_timer;
369 seastar::promise<> shutdown;
370
371 protected:
372 virtual void do_register(Operation *op) = 0;
373 virtual bool registries_empty() const = 0;
374 virtual void do_stop() = 0;
375
376 public:
377 using op_list = boost::intrusive::list<
378 Operation,
379 op_list_member_option,
380 boost::intrusive::constant_time_size<false>>;
381
382 template <typename T, typename... Args>
383 auto create_operation(Args&&... args) {
384 boost::intrusive_ptr<T> op = new T(std::forward<Args>(args)...);
385 do_register(&*op);
386 return op;
387 }
388
389 seastar::future<> stop() {
390 crimson::get_logger(ceph_subsys_osd).info("OperationRegistryI::{}", __func__);
391 do_stop();
392 shutdown_timer.set_callback([this] {
393 if (registries_empty()) {
394 shutdown.set_value();
395 shutdown_timer.cancel();
396 }
397 });
398 shutdown_timer.arm_periodic(
399 std::chrono::milliseconds(100/*TODO: use option instead*/));
400 return shutdown.get_future();
401 }
402 };
403
404
405 template <size_t NUM_REGISTRIES>
406 class OperationRegistryT : public OperationRegistryI {
407 Operation::id_t next_id = 0;
408 std::array<
409 op_list,
410 NUM_REGISTRIES
411 > registries;
412
413 protected:
414 void do_register(Operation *op) final {
415 const auto op_type = op->get_type();
416 registries[op_type].push_back(*op);
417 op->set_id(++next_id);
418 }
419
420 bool registries_empty() const final {
421 return std::all_of(registries.begin(),
422 registries.end(),
423 [](auto& opl) {
424 return opl.empty();
425 });
426 }
427
428 protected:
429 OperationRegistryT(core_id_t core)
430 // Use core to initialize upper 8 bits of counters to ensure that
431 // ids generated by different cores are disjoint
432 : next_id(static_cast<id_t>(core) <<
433 (std::numeric_limits<id_t>::digits - 8))
434 {}
435
436 template <size_t REGISTRY_INDEX>
437 const op_list& get_registry() const {
438 static_assert(
439 REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value);
440 return registries[REGISTRY_INDEX];
441 }
442
443 template <size_t REGISTRY_INDEX>
444 op_list& get_registry() {
445 static_assert(
446 REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value);
447 return registries[REGISTRY_INDEX];
448 }
449
450 public:
451 /// Iterate over live ops
452 template <typename F>
453 void for_each_op(F &&f) const {
454 for (const auto &registry: registries) {
455 for (const auto &op: registry) {
456 std::invoke(f, op);
457 }
458 }
459 }
460
461 /// Removes op from registry
462 void remove_from_registry(Operation &op) {
463 const auto op_type = op.get_type();
464 registries[op_type].erase(op_list::s_iterator_to(op));
465 }
466
467 /// Adds op to registry
468 void add_to_registry(Operation &op) {
469 const auto op_type = op.get_type();
470 registries[op_type].push_back(op);
471 }
472 };
473
474 class PipelineExitBarrierI {
475 public:
476 using Ref = std::unique_ptr<PipelineExitBarrierI>;
477
478 /// Waits for exit barrier
479 virtual std::optional<seastar::future<>> wait() = 0;
480
481 /// Releases pipeline stage, can only be called after wait
482 virtual void exit() = 0;
483
484 /// Releases pipeline resources without waiting on barrier
485 virtual void cancel() = 0;
486
487 /// Must ensure that resources are released, likely by calling cancel()
488 virtual ~PipelineExitBarrierI() {}
489 };
490
491 template <class T>
492 class PipelineStageIT : public BlockerT<T> {
493 const core_id_t core = seastar::this_shard_id();
494 public:
495 core_id_t get_core() const { return core; }
496
497 template <class... Args>
498 decltype(auto) enter(Args&&... args) {
499 return static_cast<T*>(this)->enter(std::forward<Args>(args)...);
500 }
501 };
502
503 class PipelineHandle {
504 PipelineExitBarrierI::Ref barrier;
505
506 std::optional<seastar::future<>> wait_barrier() {
507 return barrier ? barrier->wait() : std::nullopt;
508 }
509
510 public:
511 PipelineHandle() = default;
512
513 PipelineHandle(const PipelineHandle&) = delete;
514 PipelineHandle(PipelineHandle&&) = default;
515 PipelineHandle &operator=(const PipelineHandle&) = delete;
516 PipelineHandle &operator=(PipelineHandle&&) = default;
517
518 /**
519 * Returns a future which unblocks when the handle has entered the passed
520 * OrderedPipelinePhase. If already in a phase, enter will also release
521 * that phase after placing itself in the queue for the next one to preserve
522 * ordering.
523 */
524 template <typename OpT, typename T>
525 seastar::future<>
526 enter(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
527 ceph_assert(stage.get_core() == seastar::this_shard_id());
528 auto wait_fut = wait_barrier();
529 if (wait_fut.has_value()) {
530 return wait_fut.value().then([this, &stage, t=std::move(t)] () mutable {
531 auto fut = t.maybe_record_blocking(stage.enter(t), stage);
532 exit();
533 return std::move(fut).then(
534 [this, t=std::move(t)](auto &&barrier_ref) mutable {
535 barrier = std::move(barrier_ref);
536 return seastar::now();
537 });
538 });
539 } else {
540 auto fut = t.maybe_record_blocking(stage.enter(t), stage);
541 exit();
542 return std::move(fut).then(
543 [this, t=std::move(t)](auto &&barrier_ref) mutable {
544 barrier = std::move(barrier_ref);
545 return seastar::now();
546 });
547 }
548 }
549
550 /**
551 * Completes pending exit barrier without entering a new one.
552 */
553 seastar::future<> complete() {
554 auto ret = wait_barrier();
555 barrier.reset();
556 return ret ? std::move(ret.value()) : seastar::now();
557 }
558
559 /**
560 * Exits current phase, skips exit barrier, should only be used for op
561 * failure. Permitting the handle to be destructed as the same effect.
562 */
563 void exit() {
564 barrier.reset();
565 }
566
567 };
568
569 /**
570 * Ensures that at most one op may consider itself in the phase at a time.
571 * Ops will see enter() unblock in the order in which they tried to enter
572 * the phase. entering (though not necessarily waiting for the future to
573 * resolve) a new phase prior to exiting the previous one will ensure that
574 * the op ordering is preserved.
575 */
576 template <class T>
577 class OrderedExclusivePhaseT : public PipelineStageIT<T> {
578 void dump_detail(ceph::Formatter *f) const final {
579 f->dump_unsigned("waiting", waiting);
580 if (held_by != Operation::NULL_ID) {
581 f->dump_unsigned("held_by_operation_id", held_by);
582 }
583 }
584
585 class ExitBarrier final : public PipelineExitBarrierI {
586 OrderedExclusivePhaseT *phase;
587 Operation::id_t op_id;
588 public:
589 ExitBarrier(OrderedExclusivePhaseT *phase, Operation::id_t id)
590 : phase(phase), op_id(id) {}
591
592 std::optional<seastar::future<>> wait() final {
593 return std::nullopt;
594 }
595
596 void exit() final {
597 if (phase) {
598 auto *p = phase;
599 auto id = op_id;
600 phase = nullptr;
601 std::ignore = seastar::smp::submit_to(
602 p->get_core(),
603 [p, id] {
604 p->exit(id);
605 });
606 }
607 }
608
609 void cancel() final {
610 exit();
611 }
612
613 ~ExitBarrier() final {
614 cancel();
615 }
616 };
617
618 void exit(Operation::id_t op_id) {
619 clear_held_by(op_id);
620 mutex.unlock();
621 }
622
623 public:
624 template <class TriggerT>
625 seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) {
626 waiting++;
627 return mutex.lock().then([this, op_id=t.get_op().get_id()] {
628 ceph_assert_always(waiting > 0);
629 --waiting;
630 set_held_by(op_id);
631 return PipelineExitBarrierI::Ref(new ExitBarrier{this, op_id});
632 });
633 }
634
635 private:
636 void set_held_by(Operation::id_t id) {
637 ceph_assert_always(held_by == Operation::NULL_ID);
638 held_by = id;
639 }
640
641 void clear_held_by(Operation::id_t id) {
642 ceph_assert_always(held_by == id);
643 held_by = Operation::NULL_ID;
644 }
645
646 unsigned waiting = 0;
647 seastar::shared_mutex mutex;
648 Operation::id_t held_by = Operation::NULL_ID;
649 };
650
651 /**
652 * Permits multiple ops to inhabit the stage concurrently, but ensures that
653 * they will proceed to the next stage in the order in which they called
654 * enter.
655 */
656 template <class T>
657 class OrderedConcurrentPhaseT : public PipelineStageIT<T> {
658 using base_t = PipelineStageIT<T>;
659 public:
660 struct BlockingEvent : base_t::BlockingEvent {
661 using base_t::BlockingEvent::BlockingEvent;
662
663 struct ExitBarrierEvent : TimeEvent<ExitBarrierEvent> {};
664
665 template <class OpT>
666 struct Trigger : base_t::BlockingEvent::template Trigger<OpT> {
667 using base_t::BlockingEvent::template Trigger<OpT>::Trigger;
668
669 template <class FutureT>
670 decltype(auto) maybe_record_exit_barrier(FutureT&& fut) {
671 if (!fut.available()) {
672 exit_barrier_event.trigger(this->op);
673 }
674 return std::forward<FutureT>(fut);
675 }
676
677 ExitBarrierEvent exit_barrier_event;
678 };
679 };
680
681 private:
682 void dump_detail(ceph::Formatter *f) const final {}
683
684 template <class TriggerT>
685 class ExitBarrier final : public PipelineExitBarrierI {
686 OrderedConcurrentPhaseT *phase;
687 std::optional<seastar::future<>> barrier;
688 TriggerT trigger;
689 public:
690 ExitBarrier(
691 OrderedConcurrentPhaseT *phase,
692 seastar::future<> &&barrier,
693 TriggerT& trigger) : phase(phase), barrier(std::move(barrier)), trigger(trigger) {}
694
695 std::optional<seastar::future<>> wait() final {
696 assert(phase);
697 assert(barrier);
698 auto ret = std::move(*barrier);
699 barrier = std::nullopt;
700 return trigger.maybe_record_exit_barrier(std::move(ret));
701 }
702
703 void exit() final {
704 if (barrier) {
705 static_cast<void>(
706 std::move(*barrier).then([phase=this->phase] { phase->mutex.unlock(); }));
707 barrier = std::nullopt;
708 phase = nullptr;
709 }
710 if (phase) {
711 std::ignore = seastar::smp::submit_to(
712 phase->get_core(),
713 [this] {
714 phase->mutex.unlock();
715 phase = nullptr;
716 });
717 }
718 }
719
720 void cancel() final {
721 exit();
722 }
723
724 ~ExitBarrier() final {
725 cancel();
726 }
727 };
728
729 public:
730 template <class TriggerT>
731 seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) {
732 return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
733 new ExitBarrier<TriggerT>{this, mutex.lock(), t});
734 }
735
736 private:
737 seastar::shared_mutex mutex;
738 };
739
740 /**
741 * Imposes no ordering or exclusivity at all. Ops enter without constraint and
742 * may exit in any order. Useful mainly for informational purposes between
743 * stages with constraints.
744 */
745 template <class T>
746 class UnorderedStageT : public PipelineStageIT<T> {
747 void dump_detail(ceph::Formatter *f) const final {}
748
749 class ExitBarrier final : public PipelineExitBarrierI {
750 public:
751 ExitBarrier() = default;
752
753 std::optional<seastar::future<>> wait() final {
754 return std::nullopt;
755 }
756
757 void exit() final {}
758
759 void cancel() final {}
760
761 ~ExitBarrier() final {}
762 };
763
764 public:
765 template <class... IgnoreArgs>
766 seastar::future<PipelineExitBarrierI::Ref> enter(IgnoreArgs&&...) {
767 return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
768 new ExitBarrier);
769 }
770 };
771
772 }
773
774 #if FMT_VERSION >= 90000
775 template <> struct fmt::formatter<crimson::Operation> : fmt::ostream_formatter {};
776 #endif