]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | // vim: ts=8 sw=2 smarttab expandtab | |
3 | ||
4 | #pragma once | |
5 | ||
6 | #include <algorithm> | |
7 | #include <array> | |
8 | #include <set> | |
9 | #include <vector> | |
1e59de90 | 10 | #include <boost/core/demangle.hpp> |
20effc67 TL |
11 | #include <boost/intrusive/list.hpp> |
12 | #include <boost/intrusive_ptr.hpp> | |
13 | #include <boost/smart_ptr/intrusive_ref_counter.hpp> | |
14 | #include <seastar/core/shared_mutex.hh> | |
15 | #include <seastar/core/future.hh> | |
16 | #include <seastar/core/timer.hh> | |
17 | #include <seastar/core/lowres_clock.hh> | |
18 | #include <seastar/core/future-util.hh> | |
19 | ||
20 | #include "include/ceph_assert.h" | |
1e59de90 TL |
21 | #include "include/utime.h" |
22 | #include "common/Clock.h" | |
23 | #include "common/Formatter.h" | |
20effc67 | 24 | #include "crimson/common/interruptible_future.h" |
1e59de90 TL |
25 | #include "crimson/common/smp_helpers.h" |
26 | #include "crimson/common/log.h" | |
20effc67 TL |
27 | |
28 | namespace ceph { | |
29 | class Formatter; | |
30 | } | |
31 | ||
32 | namespace crimson { | |
33 | ||
34 | using registry_hook_t = boost::intrusive::list_member_hook< | |
35 | boost::intrusive::link_mode<boost::intrusive::auto_unlink>>; | |
36 | ||
37 | class Operation; | |
38 | class Blocker; | |
39 | ||
1e59de90 TL |
40 | |
41 | namespace detail { | |
42 | void dump_time_event(const char* name, | |
43 | const utime_t& timestamp, | |
44 | ceph::Formatter* f); | |
45 | void dump_blocking_event(const char* name, | |
46 | const utime_t& timestamp, | |
47 | const Blocker* blocker, | |
48 | ceph::Formatter* f); | |
49 | } // namespace detail | |
50 | ||
20effc67 | 51 | /** |
1e59de90 TL |
52 | * Provides an interface for dumping diagnostic information about |
53 | * why a particular op is not making progress. | |
20effc67 | 54 | */ |
1e59de90 TL |
55 | class Blocker { |
56 | public: | |
57 | void dump(ceph::Formatter *f) const; | |
58 | virtual ~Blocker() = default; | |
20effc67 | 59 | |
1e59de90 TL |
60 | private: |
61 | virtual void dump_detail(ceph::Formatter *f) const = 0; | |
62 | virtual const char *get_type_name() const = 0; | |
63 | }; | |
20effc67 | 64 | |
1e59de90 TL |
65 | // the main template. by default an operation has no extenral |
66 | // event handler (the empty tuple). specializing the template | |
67 | // allows to define backends on per-operation-type manner. | |
68 | // NOTE: basically this could be a function but C++ disallows | |
69 | // differentiating return type among specializations. | |
70 | template <class T> | |
71 | struct EventBackendRegistry { | |
72 | template <typename...> static constexpr bool always_false = false; | |
73 | ||
74 | static std::tuple<> get_backends() { | |
75 | static_assert(always_false<T>, "Registry specialization not found"); | |
76 | return {}; | |
77 | } | |
78 | }; | |
20effc67 | 79 | |
1e59de90 TL |
80 | template <class T> |
81 | struct Event { | |
82 | T* that() { | |
83 | return static_cast<T*>(this); | |
84 | } | |
85 | const T* that() const { | |
86 | return static_cast<const T*>(this); | |
87 | } | |
20effc67 | 88 | |
1e59de90 TL |
89 | template <class OpT, class... Args> |
90 | void trigger(OpT&& op, Args&&... args) { | |
91 | that()->internal_backend.handle(*that(), | |
92 | std::forward<OpT>(op), | |
93 | std::forward<Args>(args)...); | |
94 | // let's call `handle()` for concrete event type from each single | |
95 | // of our backends. the order in the registry matters. | |
96 | std::apply([&, //args=std::forward_as_tuple(std::forward<Args>(args)...), | |
97 | this] (auto... backend) { | |
98 | (..., backend.handle(*that(), | |
99 | std::forward<OpT>(op), | |
100 | std::forward<Args>(args)...)); | |
101 | }, EventBackendRegistry<std::decay_t<OpT>>::get_backends()); | |
102 | } | |
103 | }; | |
20effc67 | 104 | |
20effc67 | 105 | |
1e59de90 TL |
106 | // simplest event type for recording things like beginning or end |
107 | // of TrackableOperation's life. | |
108 | template <class T> | |
109 | struct TimeEvent : Event<T> { | |
110 | struct Backend { | |
111 | // `T` is passed solely to let implementations to discriminate | |
112 | // basing on the type-of-event. | |
113 | virtual void handle(T&, const Operation&) = 0; | |
114 | }; | |
115 | ||
116 | // for the sake of dumping ops-in-flight. | |
117 | struct InternalBackend final : Backend { | |
118 | void handle(T&, const Operation&) override { | |
119 | timestamp = ceph_clock_now(); | |
120 | } | |
121 | utime_t timestamp; | |
122 | } internal_backend; | |
123 | ||
124 | void dump(ceph::Formatter *f) const { | |
125 | auto demangled_name = boost::core::demangle(typeid(T).name()); | |
126 | detail::dump_time_event( | |
127 | demangled_name.c_str(), | |
128 | internal_backend.timestamp, f); | |
20effc67 | 129 | } |
1e59de90 TL |
130 | |
131 | auto get_timestamp() const { | |
132 | return internal_backend.timestamp; | |
20effc67 TL |
133 | } |
134 | }; | |
135 | ||
20effc67 | 136 | |
1e59de90 TL |
137 | template <typename T> |
138 | class BlockerT : public Blocker { | |
139 | public: | |
140 | struct BlockingEvent : Event<typename T::BlockingEvent> { | |
141 | using Blocker = std::decay_t<T>; | |
142 | ||
143 | struct Backend { | |
144 | // `T` is based solely to let implementations to discriminate | |
145 | // basing on the type-of-event. | |
146 | virtual void handle(typename T::BlockingEvent&, const Operation&, const T&) = 0; | |
147 | }; | |
148 | ||
149 | struct InternalBackend : Backend { | |
150 | void handle(typename T::BlockingEvent&, | |
151 | const Operation&, | |
152 | const T& blocker) override { | |
153 | this->timestamp = ceph_clock_now(); | |
154 | this->blocker = &blocker; | |
155 | } | |
20effc67 | 156 | |
1e59de90 TL |
157 | utime_t timestamp; |
158 | const T* blocker; | |
159 | } internal_backend; | |
160 | ||
161 | // we don't want to make any BlockerT to be aware and coupled with | |
162 | // an operation. to not templatize an entire path from an op to | |
163 | // a blocker, type erasuring is used. | |
164 | struct TriggerI { | |
165 | TriggerI(BlockingEvent& event) : event(event) {} | |
166 | ||
167 | template <class FutureT> | |
168 | auto maybe_record_blocking(FutureT&& fut, const T& blocker) { | |
169 | if (!fut.available()) { | |
170 | // a full blown call via vtable. that's the cost for templatization | |
171 | // avoidance. anyway, most of the things actually have the type | |
172 | // knowledge. | |
173 | record_blocking(blocker); | |
174 | return std::forward<FutureT>(fut).finally( | |
175 | [&event=this->event, &blocker] () mutable { | |
176 | // beware trigger instance may be already dead when this | |
177 | // is executed! | |
178 | record_unblocking(event, blocker); | |
179 | }); | |
180 | } | |
181 | return std::forward<FutureT>(fut); | |
182 | } | |
183 | virtual ~TriggerI() = default; | |
184 | protected: | |
185 | // it's for the sake of erasing the OpT type | |
186 | virtual void record_blocking(const T& blocker) = 0; | |
187 | ||
188 | static void record_unblocking(BlockingEvent& event, const T& blocker) { | |
189 | assert(event.internal_backend.blocker == &blocker); | |
190 | event.internal_backend.blocker = nullptr; | |
191 | } | |
20effc67 | 192 | |
1e59de90 TL |
193 | BlockingEvent& event; |
194 | }; | |
195 | ||
196 | template <class OpT> | |
197 | struct Trigger : TriggerI { | |
198 | Trigger(BlockingEvent& event, const OpT& op) : TriggerI(event), op(op) {} | |
199 | ||
200 | template <class FutureT> | |
201 | auto maybe_record_blocking(FutureT&& fut, const T& blocker) { | |
202 | if (!fut.available()) { | |
203 | // no need for the dynamic dispatch! if we're lucky, a compiler | |
204 | // should collapse all these abstractions into a bunch of movs. | |
205 | this->Trigger::record_blocking(blocker); | |
206 | return std::forward<FutureT>(fut).finally( | |
207 | [&event=this->event, &blocker] () mutable { | |
208 | Trigger::record_unblocking(event, blocker); | |
209 | }); | |
210 | } | |
211 | return std::forward<FutureT>(fut); | |
212 | } | |
20effc67 | 213 | |
1e59de90 | 214 | const OpT &get_op() { return op; } |
20effc67 | 215 | |
1e59de90 TL |
216 | protected: |
217 | void record_blocking(const T& blocker) override { | |
218 | this->event.trigger(op, blocker); | |
219 | } | |
20effc67 | 220 | |
1e59de90 TL |
221 | const OpT& op; |
222 | }; | |
20effc67 | 223 | |
1e59de90 TL |
224 | void dump(ceph::Formatter *f) const { |
225 | auto demangled_name = boost::core::demangle(typeid(T).name()); | |
226 | detail::dump_blocking_event( | |
227 | demangled_name.c_str(), | |
228 | internal_backend.timestamp, | |
229 | internal_backend.blocker, | |
230 | f); | |
231 | } | |
232 | }; | |
20effc67 | 233 | |
20effc67 | 234 | virtual ~BlockerT() = default; |
1e59de90 TL |
235 | template <class TriggerT, class... Args> |
236 | decltype(auto) track_blocking(TriggerT&& trigger, Args&&... args) { | |
237 | return std::forward<TriggerT>(trigger).maybe_record_blocking( | |
238 | std::forward<Args>(args)..., static_cast<const T&>(*this)); | |
239 | } | |
240 | ||
20effc67 TL |
241 | private: |
242 | const char *get_type_name() const final { | |
1e59de90 | 243 | return static_cast<const T*>(this)->type_name; |
20effc67 TL |
244 | } |
245 | }; | |
246 | ||
1e59de90 TL |
247 | template <class T> |
248 | struct AggregateBlockingEvent { | |
249 | struct TriggerI { | |
250 | protected: | |
251 | struct TriggerContainerI { | |
252 | virtual typename T::TriggerI& get_trigger() = 0; | |
253 | virtual ~TriggerContainerI() = default; | |
254 | }; | |
255 | using TriggerContainerIRef = std::unique_ptr<TriggerContainerI>; | |
256 | virtual TriggerContainerIRef create_part_trigger() = 0; | |
20effc67 | 257 | |
1e59de90 TL |
258 | public: |
259 | template <class FutureT> | |
260 | auto maybe_record_blocking(FutureT&& fut, | |
261 | const typename T::Blocker& blocker) { | |
262 | // AggregateBlockingEvent is supposed to be used on relatively cold | |
263 | // paths (recovery), so we don't need to worry about the dynamic | |
264 | // polymothps / dynamic memory's overhead. | |
265 | auto tcont = create_part_trigger(); | |
266 | return tcont->get_trigger().maybe_record_blocking( | |
267 | std::move(fut), blocker | |
268 | ).finally([tcont=std::move(tcont)] {}); | |
269 | } | |
20effc67 | 270 | |
1e59de90 TL |
271 | virtual ~TriggerI() = default; |
272 | }; | |
273 | ||
274 | template <class OpT> | |
275 | struct Trigger final : TriggerI { | |
276 | Trigger(AggregateBlockingEvent& event, const OpT& op) | |
277 | : event(event), op(op) {} | |
278 | ||
279 | class TriggerContainer final : public TriggerI::TriggerContainerI { | |
280 | AggregateBlockingEvent& event; | |
281 | typename decltype(event.events)::iterator iter; | |
282 | typename T::template Trigger<OpT> trigger; | |
283 | ||
284 | typename T::TriggerI &get_trigger() final { | |
285 | return trigger; | |
286 | } | |
287 | ||
288 | public: | |
289 | TriggerContainer(AggregateBlockingEvent& _event, const OpT& op) : | |
290 | event(_event), | |
291 | iter(event.events.emplace(event.events.end())), | |
292 | trigger(*iter, op) {} | |
293 | ||
294 | ~TriggerContainer() final { | |
295 | event.events.erase(iter); | |
296 | } | |
297 | }; | |
298 | ||
299 | protected: | |
300 | typename TriggerI::TriggerContainerIRef create_part_trigger() final { | |
301 | return std::make_unique<TriggerContainer>(event, op); | |
302 | } | |
303 | ||
304 | private: | |
305 | AggregateBlockingEvent& event; | |
306 | const OpT& op; | |
307 | }; | |
20effc67 | 308 | |
1e59de90 TL |
309 | private: |
310 | std::list<T> events; | |
311 | template <class OpT> | |
312 | friend class Trigger; | |
313 | }; | |
20effc67 TL |
314 | |
315 | /** | |
316 | * Common base for all crimson-osd operations. Mainly provides | |
317 | * an interface for registering ops in flight and dumping | |
318 | * diagnostic information. | |
319 | */ | |
320 | class Operation : public boost::intrusive_ref_counter< | |
321 | Operation, boost::thread_unsafe_counter> { | |
322 | public: | |
1e59de90 TL |
323 | using id_t = uint64_t; |
324 | static constexpr id_t NULL_ID = std::numeric_limits<uint64_t>::max(); | |
325 | id_t get_id() const { | |
20effc67 TL |
326 | return id; |
327 | } | |
328 | ||
1e59de90 TL |
329 | static constexpr bool is_trackable = false; |
330 | ||
20effc67 TL |
331 | virtual unsigned get_type() const = 0; |
332 | virtual const char *get_type_name() const = 0; | |
333 | virtual void print(std::ostream &) const = 0; | |
334 | ||
20effc67 TL |
335 | void dump(ceph::Formatter *f) const; |
336 | void dump_brief(ceph::Formatter *f) const; | |
337 | virtual ~Operation() = default; | |
338 | ||
339 | private: | |
340 | virtual void dump_detail(ceph::Formatter *f) const = 0; | |
341 | ||
342 | registry_hook_t registry_hook; | |
343 | ||
1e59de90 TL |
344 | id_t id = 0; |
345 | void set_id(id_t in_id) { | |
20effc67 TL |
346 | id = in_id; |
347 | } | |
348 | ||
20effc67 TL |
349 | friend class OperationRegistryI; |
350 | template <size_t> | |
351 | friend class OperationRegistryT; | |
352 | }; | |
353 | using OperationRef = boost::intrusive_ptr<Operation>; | |
354 | ||
355 | std::ostream &operator<<(std::ostream &, const Operation &op); | |
356 | ||
357 | /** | |
358 | * Maintains a set of lists of all active ops. | |
359 | */ | |
360 | class OperationRegistryI { | |
361 | using op_list_member_option = boost::intrusive::member_hook< | |
362 | Operation, | |
363 | registry_hook_t, | |
364 | &Operation::registry_hook | |
365 | >; | |
366 | ||
367 | friend class Operation; | |
368 | seastar::timer<seastar::lowres_clock> shutdown_timer; | |
369 | seastar::promise<> shutdown; | |
370 | ||
371 | protected: | |
372 | virtual void do_register(Operation *op) = 0; | |
373 | virtual bool registries_empty() const = 0; | |
1e59de90 | 374 | virtual void do_stop() = 0; |
20effc67 TL |
375 | |
376 | public: | |
377 | using op_list = boost::intrusive::list< | |
378 | Operation, | |
379 | op_list_member_option, | |
380 | boost::intrusive::constant_time_size<false>>; | |
381 | ||
382 | template <typename T, typename... Args> | |
1e59de90 TL |
383 | auto create_operation(Args&&... args) { |
384 | boost::intrusive_ptr<T> op = new T(std::forward<Args>(args)...); | |
20effc67 TL |
385 | do_register(&*op); |
386 | return op; | |
387 | } | |
388 | ||
389 | seastar::future<> stop() { | |
1e59de90 TL |
390 | crimson::get_logger(ceph_subsys_osd).info("OperationRegistryI::{}", __func__); |
391 | do_stop(); | |
20effc67 TL |
392 | shutdown_timer.set_callback([this] { |
393 | if (registries_empty()) { | |
394 | shutdown.set_value(); | |
395 | shutdown_timer.cancel(); | |
396 | } | |
397 | }); | |
398 | shutdown_timer.arm_periodic( | |
399 | std::chrono::milliseconds(100/*TODO: use option instead*/)); | |
400 | return shutdown.get_future(); | |
401 | } | |
402 | }; | |
403 | ||
404 | ||
405 | template <size_t NUM_REGISTRIES> | |
406 | class OperationRegistryT : public OperationRegistryI { | |
1e59de90 | 407 | Operation::id_t next_id = 0; |
20effc67 TL |
408 | std::array< |
409 | op_list, | |
410 | NUM_REGISTRIES | |
411 | > registries; | |
412 | ||
20effc67 TL |
413 | protected: |
414 | void do_register(Operation *op) final { | |
1e59de90 TL |
415 | const auto op_type = op->get_type(); |
416 | registries[op_type].push_back(*op); | |
417 | op->set_id(++next_id); | |
20effc67 TL |
418 | } |
419 | ||
420 | bool registries_empty() const final { | |
421 | return std::all_of(registries.begin(), | |
422 | registries.end(), | |
423 | [](auto& opl) { | |
424 | return opl.empty(); | |
425 | }); | |
426 | } | |
1e59de90 TL |
427 | |
428 | protected: | |
429 | OperationRegistryT(core_id_t core) | |
430 | // Use core to initialize upper 8 bits of counters to ensure that | |
431 | // ids generated by different cores are disjoint | |
432 | : next_id(static_cast<id_t>(core) << | |
433 | (std::numeric_limits<id_t>::digits - 8)) | |
434 | {} | |
435 | ||
20effc67 TL |
436 | template <size_t REGISTRY_INDEX> |
437 | const op_list& get_registry() const { | |
438 | static_assert( | |
439 | REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value); | |
440 | return registries[REGISTRY_INDEX]; | |
441 | } | |
1e59de90 TL |
442 | |
443 | template <size_t REGISTRY_INDEX> | |
444 | op_list& get_registry() { | |
445 | static_assert( | |
446 | REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value); | |
447 | return registries[REGISTRY_INDEX]; | |
448 | } | |
449 | ||
450 | public: | |
451 | /// Iterate over live ops | |
452 | template <typename F> | |
453 | void for_each_op(F &&f) const { | |
454 | for (const auto ®istry: registries) { | |
455 | for (const auto &op: registry) { | |
456 | std::invoke(f, op); | |
457 | } | |
458 | } | |
459 | } | |
460 | ||
461 | /// Removes op from registry | |
462 | void remove_from_registry(Operation &op) { | |
463 | const auto op_type = op.get_type(); | |
464 | registries[op_type].erase(op_list::s_iterator_to(op)); | |
465 | } | |
466 | ||
467 | /// Adds op to registry | |
468 | void add_to_registry(Operation &op) { | |
469 | const auto op_type = op.get_type(); | |
470 | registries[op_type].push_back(op); | |
471 | } | |
20effc67 TL |
472 | }; |
473 | ||
474 | class PipelineExitBarrierI { | |
475 | public: | |
476 | using Ref = std::unique_ptr<PipelineExitBarrierI>; | |
477 | ||
478 | /// Waits for exit barrier | |
aee94f69 | 479 | virtual std::optional<seastar::future<>> wait() = 0; |
20effc67 TL |
480 | |
481 | /// Releases pipeline stage, can only be called after wait | |
482 | virtual void exit() = 0; | |
483 | ||
484 | /// Releases pipeline resources without waiting on barrier | |
485 | virtual void cancel() = 0; | |
486 | ||
487 | /// Must ensure that resources are released, likely by calling cancel() | |
488 | virtual ~PipelineExitBarrierI() {} | |
489 | }; | |
490 | ||
1e59de90 TL |
491 | template <class T> |
492 | class PipelineStageIT : public BlockerT<T> { | |
493 | const core_id_t core = seastar::this_shard_id(); | |
20effc67 | 494 | public: |
1e59de90 TL |
495 | core_id_t get_core() const { return core; } |
496 | ||
497 | template <class... Args> | |
498 | decltype(auto) enter(Args&&... args) { | |
499 | return static_cast<T*>(this)->enter(std::forward<Args>(args)...); | |
500 | } | |
20effc67 TL |
501 | }; |
502 | ||
503 | class PipelineHandle { | |
504 | PipelineExitBarrierI::Ref barrier; | |
505 | ||
aee94f69 TL |
506 | std::optional<seastar::future<>> wait_barrier() { |
507 | return barrier ? barrier->wait() : std::nullopt; | |
20effc67 TL |
508 | } |
509 | ||
510 | public: | |
511 | PipelineHandle() = default; | |
512 | ||
513 | PipelineHandle(const PipelineHandle&) = delete; | |
514 | PipelineHandle(PipelineHandle&&) = default; | |
515 | PipelineHandle &operator=(const PipelineHandle&) = delete; | |
516 | PipelineHandle &operator=(PipelineHandle&&) = default; | |
517 | ||
518 | /** | |
1e59de90 TL |
519 | * Returns a future which unblocks when the handle has entered the passed |
520 | * OrderedPipelinePhase. If already in a phase, enter will also release | |
521 | * that phase after placing itself in the queue for the next one to preserve | |
522 | * ordering. | |
523 | */ | |
524 | template <typename OpT, typename T> | |
525 | seastar::future<> | |
526 | enter(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) { | |
527 | ceph_assert(stage.get_core() == seastar::this_shard_id()); | |
aee94f69 TL |
528 | auto wait_fut = wait_barrier(); |
529 | if (wait_fut.has_value()) { | |
530 | return wait_fut.value().then([this, &stage, t=std::move(t)] () mutable { | |
531 | auto fut = t.maybe_record_blocking(stage.enter(t), stage); | |
532 | exit(); | |
533 | return std::move(fut).then( | |
534 | [this, t=std::move(t)](auto &&barrier_ref) mutable { | |
535 | barrier = std::move(barrier_ref); | |
536 | return seastar::now(); | |
537 | }); | |
1e59de90 | 538 | }); |
aee94f69 TL |
539 | } else { |
540 | auto fut = t.maybe_record_blocking(stage.enter(t), stage); | |
541 | exit(); | |
542 | return std::move(fut).then( | |
543 | [this, t=std::move(t)](auto &&barrier_ref) mutable { | |
544 | barrier = std::move(barrier_ref); | |
545 | return seastar::now(); | |
546 | }); | |
547 | } | |
20effc67 TL |
548 | } |
549 | ||
550 | /** | |
551 | * Completes pending exit barrier without entering a new one. | |
552 | */ | |
553 | seastar::future<> complete() { | |
554 | auto ret = wait_barrier(); | |
555 | barrier.reset(); | |
aee94f69 | 556 | return ret ? std::move(ret.value()) : seastar::now(); |
20effc67 TL |
557 | } |
558 | ||
559 | /** | |
560 | * Exits current phase, skips exit barrier, should only be used for op | |
561 | * failure. Permitting the handle to be destructed as the same effect. | |
562 | */ | |
563 | void exit() { | |
564 | barrier.reset(); | |
565 | } | |
566 | ||
567 | }; | |
568 | ||
569 | /** | |
570 | * Ensures that at most one op may consider itself in the phase at a time. | |
571 | * Ops will see enter() unblock in the order in which they tried to enter | |
572 | * the phase. entering (though not necessarily waiting for the future to | |
573 | * resolve) a new phase prior to exiting the previous one will ensure that | |
574 | * the op ordering is preserved. | |
575 | */ | |
1e59de90 TL |
576 | template <class T> |
577 | class OrderedExclusivePhaseT : public PipelineStageIT<T> { | |
578 | void dump_detail(ceph::Formatter *f) const final { | |
579 | f->dump_unsigned("waiting", waiting); | |
580 | if (held_by != Operation::NULL_ID) { | |
581 | f->dump_unsigned("held_by_operation_id", held_by); | |
582 | } | |
20effc67 TL |
583 | } |
584 | ||
585 | class ExitBarrier final : public PipelineExitBarrierI { | |
1e59de90 TL |
586 | OrderedExclusivePhaseT *phase; |
587 | Operation::id_t op_id; | |
20effc67 | 588 | public: |
1e59de90 TL |
589 | ExitBarrier(OrderedExclusivePhaseT *phase, Operation::id_t id) |
590 | : phase(phase), op_id(id) {} | |
20effc67 | 591 | |
aee94f69 TL |
592 | std::optional<seastar::future<>> wait() final { |
593 | return std::nullopt; | |
20effc67 TL |
594 | } |
595 | ||
596 | void exit() final { | |
597 | if (phase) { | |
1e59de90 TL |
598 | auto *p = phase; |
599 | auto id = op_id; | |
20effc67 | 600 | phase = nullptr; |
1e59de90 TL |
601 | std::ignore = seastar::smp::submit_to( |
602 | p->get_core(), | |
603 | [p, id] { | |
604 | p->exit(id); | |
605 | }); | |
20effc67 TL |
606 | } |
607 | } | |
608 | ||
609 | void cancel() final { | |
610 | exit(); | |
611 | } | |
612 | ||
613 | ~ExitBarrier() final { | |
614 | cancel(); | |
615 | } | |
616 | }; | |
617 | ||
1e59de90 TL |
618 | void exit(Operation::id_t op_id) { |
619 | clear_held_by(op_id); | |
20effc67 TL |
620 | mutex.unlock(); |
621 | } | |
622 | ||
623 | public: | |
1e59de90 TL |
624 | template <class TriggerT> |
625 | seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) { | |
626 | waiting++; | |
627 | return mutex.lock().then([this, op_id=t.get_op().get_id()] { | |
628 | ceph_assert_always(waiting > 0); | |
629 | --waiting; | |
630 | set_held_by(op_id); | |
631 | return PipelineExitBarrierI::Ref(new ExitBarrier{this, op_id}); | |
20effc67 TL |
632 | }); |
633 | } | |
634 | ||
20effc67 | 635 | private: |
1e59de90 TL |
636 | void set_held_by(Operation::id_t id) { |
637 | ceph_assert_always(held_by == Operation::NULL_ID); | |
638 | held_by = id; | |
639 | } | |
640 | ||
641 | void clear_held_by(Operation::id_t id) { | |
642 | ceph_assert_always(held_by == id); | |
643 | held_by = Operation::NULL_ID; | |
644 | } | |
645 | ||
646 | unsigned waiting = 0; | |
20effc67 | 647 | seastar::shared_mutex mutex; |
1e59de90 | 648 | Operation::id_t held_by = Operation::NULL_ID; |
20effc67 TL |
649 | }; |
650 | ||
651 | /** | |
652 | * Permits multiple ops to inhabit the stage concurrently, but ensures that | |
653 | * they will proceed to the next stage in the order in which they called | |
654 | * enter. | |
655 | */ | |
1e59de90 TL |
656 | template <class T> |
657 | class OrderedConcurrentPhaseT : public PipelineStageIT<T> { | |
658 | using base_t = PipelineStageIT<T>; | |
659 | public: | |
660 | struct BlockingEvent : base_t::BlockingEvent { | |
661 | using base_t::BlockingEvent::BlockingEvent; | |
662 | ||
663 | struct ExitBarrierEvent : TimeEvent<ExitBarrierEvent> {}; | |
664 | ||
665 | template <class OpT> | |
666 | struct Trigger : base_t::BlockingEvent::template Trigger<OpT> { | |
667 | using base_t::BlockingEvent::template Trigger<OpT>::Trigger; | |
668 | ||
669 | template <class FutureT> | |
670 | decltype(auto) maybe_record_exit_barrier(FutureT&& fut) { | |
671 | if (!fut.available()) { | |
672 | exit_barrier_event.trigger(this->op); | |
673 | } | |
674 | return std::forward<FutureT>(fut); | |
675 | } | |
676 | ||
677 | ExitBarrierEvent exit_barrier_event; | |
678 | }; | |
679 | }; | |
680 | ||
681 | private: | |
682 | void dump_detail(ceph::Formatter *f) const final {} | |
20effc67 | 683 | |
1e59de90 | 684 | template <class TriggerT> |
20effc67 | 685 | class ExitBarrier final : public PipelineExitBarrierI { |
1e59de90 | 686 | OrderedConcurrentPhaseT *phase; |
20effc67 | 687 | std::optional<seastar::future<>> barrier; |
1e59de90 | 688 | TriggerT trigger; |
20effc67 TL |
689 | public: |
690 | ExitBarrier( | |
1e59de90 TL |
691 | OrderedConcurrentPhaseT *phase, |
692 | seastar::future<> &&barrier, | |
693 | TriggerT& trigger) : phase(phase), barrier(std::move(barrier)), trigger(trigger) {} | |
20effc67 | 694 | |
aee94f69 | 695 | std::optional<seastar::future<>> wait() final { |
20effc67 TL |
696 | assert(phase); |
697 | assert(barrier); | |
698 | auto ret = std::move(*barrier); | |
699 | barrier = std::nullopt; | |
1e59de90 | 700 | return trigger.maybe_record_exit_barrier(std::move(ret)); |
20effc67 TL |
701 | } |
702 | ||
703 | void exit() final { | |
704 | if (barrier) { | |
705 | static_cast<void>( | |
706 | std::move(*barrier).then([phase=this->phase] { phase->mutex.unlock(); })); | |
707 | barrier = std::nullopt; | |
708 | phase = nullptr; | |
709 | } | |
710 | if (phase) { | |
1e59de90 TL |
711 | std::ignore = seastar::smp::submit_to( |
712 | phase->get_core(), | |
713 | [this] { | |
714 | phase->mutex.unlock(); | |
715 | phase = nullptr; | |
716 | }); | |
20effc67 TL |
717 | } |
718 | } | |
719 | ||
720 | void cancel() final { | |
721 | exit(); | |
722 | } | |
723 | ||
724 | ~ExitBarrier() final { | |
725 | cancel(); | |
726 | } | |
727 | }; | |
728 | ||
729 | public: | |
1e59de90 TL |
730 | template <class TriggerT> |
731 | seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) { | |
20effc67 | 732 | return seastar::make_ready_future<PipelineExitBarrierI::Ref>( |
1e59de90 | 733 | new ExitBarrier<TriggerT>{this, mutex.lock(), t}); |
20effc67 TL |
734 | } |
735 | ||
20effc67 | 736 | private: |
20effc67 TL |
737 | seastar::shared_mutex mutex; |
738 | }; | |
739 | ||
740 | /** | |
741 | * Imposes no ordering or exclusivity at all. Ops enter without constraint and | |
742 | * may exit in any order. Useful mainly for informational purposes between | |
743 | * stages with constraints. | |
744 | */ | |
1e59de90 TL |
745 | template <class T> |
746 | class UnorderedStageT : public PipelineStageIT<T> { | |
20effc67 | 747 | void dump_detail(ceph::Formatter *f) const final {} |
20effc67 TL |
748 | |
749 | class ExitBarrier final : public PipelineExitBarrierI { | |
750 | public: | |
751 | ExitBarrier() = default; | |
752 | ||
aee94f69 TL |
753 | std::optional<seastar::future<>> wait() final { |
754 | return std::nullopt; | |
20effc67 TL |
755 | } |
756 | ||
757 | void exit() final {} | |
758 | ||
759 | void cancel() final {} | |
760 | ||
761 | ~ExitBarrier() final {} | |
762 | }; | |
763 | ||
764 | public: | |
1e59de90 TL |
765 | template <class... IgnoreArgs> |
766 | seastar::future<PipelineExitBarrierI::Ref> enter(IgnoreArgs&&...) { | |
20effc67 TL |
767 | return seastar::make_ready_future<PipelineExitBarrierI::Ref>( |
768 | new ExitBarrier); | |
769 | } | |
20effc67 TL |
770 | }; |
771 | ||
20effc67 | 772 | } |
1e59de90 TL |
773 | |
774 | #if FMT_VERSION >= 90000 | |
775 | template <> struct fmt::formatter<crimson::Operation> : fmt::ostream_formatter {}; | |
776 | #endif |