]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/common/operation.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / common / operation.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
3
4 #pragma once
5
6 #include <algorithm>
7 #include <array>
8 #include <set>
9 #include <vector>
10 #include <boost/intrusive/list.hpp>
11 #include <boost/intrusive_ptr.hpp>
12 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
13 #include <seastar/core/shared_mutex.hh>
14 #include <seastar/core/future.hh>
15 #include <seastar/core/timer.hh>
16 #include <seastar/core/lowres_clock.hh>
17 #include <seastar/core/future-util.hh>
18
19 #include "include/ceph_assert.h"
20 #include "crimson/common/interruptible_future.h"
21
22 namespace ceph {
23 class Formatter;
24 }
25
26 namespace crimson {
27
28 using registry_hook_t = boost::intrusive::list_member_hook<
29 boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
30
31 class Operation;
32 class Blocker;
33
34 /**
35 * Provides an abstraction for registering and unregistering a blocker
36 * for the duration of a future becoming available.
37 */
38 template <typename Fut>
39 class blocking_future_detail {
40 friend class Operation;
41 friend class Blocker;
42 Blocker *blocker;
43 Fut fut;
44 blocking_future_detail(Blocker *b, Fut &&f)
45 : blocker(b), fut(std::move(f)) {}
46
47 template <typename V, typename... U>
48 friend blocking_future_detail<seastar::future<V>>
49 make_ready_blocking_future(U&&... args);
50
51 template <typename V, typename Exception>
52 friend blocking_future_detail<seastar::future<V>>
53 make_exception_blocking_future(Exception&& e);
54
55 template <typename U>
56 friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
57
58 template <typename InterruptCond, typename T>
59 friend blocking_future_detail<
60 ::crimson::interruptible::interruptible_future<InterruptCond>>
61 join_blocking_interruptible_futures(T&& t);
62
63 template <typename U>
64 friend class blocking_future_detail;
65
66 public:
67 template <typename F>
68 auto then(F &&f) && {
69 using result = decltype(std::declval<Fut>().then(f));
70 return blocking_future_detail<seastar::futurize_t<result>>(
71 blocker,
72 std::move(fut).then(std::forward<F>(f)));
73 }
74 template <typename InterruptCond, typename F>
75 auto then_interruptible(F &&f) && {
76 using result = decltype(std::declval<Fut>().then_interruptible(f));
77 return blocking_future_detail<
78 typename ::crimson::interruptible::interruptor<
79 InterruptCond>::template futurize<result>::type>(
80 blocker,
81 std::move(fut).then_interruptible(std::forward<F>(f)));
82 }
83 };
84
85 template <typename T=void>
86 using blocking_future = blocking_future_detail<seastar::future<T>>;
87
88 template <typename InterruptCond, typename T = void>
89 using blocking_interruptible_future = blocking_future_detail<
90 ::crimson::interruptible::interruptible_future<InterruptCond, T>>;
91
92 template <typename InterruptCond, typename V, typename U>
93 blocking_interruptible_future<InterruptCond, V>
94 make_ready_blocking_interruptible_future(U&& args) {
95 return blocking_interruptible_future<InterruptCond, V>(
96 nullptr,
97 seastar::make_ready_future<V>(std::forward<U>(args)));
98 }
99
100 template <typename InterruptCond, typename V, typename Exception>
101 blocking_interruptible_future<InterruptCond, V>
102 make_exception_blocking_interruptible_future(Exception&& e) {
103 return blocking_interruptible_future<InterruptCond, V>(
104 nullptr,
105 seastar::make_exception_future<InterruptCond, V>(e));
106 }
107
108 template <typename V=void, typename... U>
109 blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&&... args) {
110 return blocking_future<V>(
111 nullptr,
112 seastar::make_ready_future<V>(std::forward<U>(args)...));
113 }
114
115 template <typename V, typename Exception>
116 blocking_future_detail<seastar::future<V>>
117 make_exception_blocking_future(Exception&& e) {
118 return blocking_future<V>(
119 nullptr,
120 seastar::make_exception_future<V>(e));
121 }
122
123 /**
124 * Provides an interface for dumping diagnostic information about
125 * why a particular op is not making progress.
126 */
127 class Blocker {
128 public:
129 template <typename T>
130 blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
131 return blocking_future<T>(this, std::move(f));
132 }
133
134 template <typename InterruptCond, typename T>
135 blocking_interruptible_future<InterruptCond, T>
136 make_blocking_future(
137 crimson::interruptible::interruptible_future<InterruptCond, T> &&f) {
138 return blocking_interruptible_future<InterruptCond, T>(
139 this, std::move(f));
140 }
141 template <typename InterruptCond, typename T = void>
142 blocking_interruptible_future<InterruptCond, T>
143 make_blocking_interruptible_future(seastar::future<T> &&f) {
144 return blocking_interruptible_future<InterruptCond, T>(
145 this,
146 ::crimson::interruptible::interruptor<InterruptCond>::make_interruptible(
147 std::move(f)));
148 }
149
150 void dump(ceph::Formatter *f) const;
151 virtual ~Blocker() = default;
152
153 private:
154 virtual void dump_detail(ceph::Formatter *f) const = 0;
155 virtual const char *get_type_name() const = 0;
156 };
157
158 template <typename T>
159 class BlockerT : public Blocker {
160 public:
161 virtual ~BlockerT() = default;
162 private:
163 const char *get_type_name() const final {
164 return T::type_name;
165 }
166 };
167
168 class AggregateBlocker : public BlockerT<AggregateBlocker> {
169 std::vector<Blocker*> parent_blockers;
170 public:
171 AggregateBlocker(std::vector<Blocker*> &&parent_blockers)
172 : parent_blockers(std::move(parent_blockers)) {}
173 static constexpr const char *type_name = "AggregateBlocker";
174 private:
175 void dump_detail(ceph::Formatter *f) const final;
176 };
177
178 template <typename T>
179 blocking_future<> join_blocking_futures(T &&t) {
180 std::vector<Blocker*> blockers;
181 blockers.reserve(t.size());
182 for (auto &&bf: t) {
183 blockers.push_back(bf.blocker);
184 bf.blocker = nullptr;
185 }
186 auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
187 return agg->make_blocking_future(
188 seastar::parallel_for_each(
189 std::forward<T>(t),
190 [](auto &&bf) {
191 return std::move(bf.fut);
192 }).then([agg=std::move(agg)] {
193 return seastar::make_ready_future<>();
194 }));
195 }
196
197 template <typename InterruptCond, typename T>
198 blocking_interruptible_future<InterruptCond>
199 join_blocking_interruptible_futures(T&& t) {
200 std::vector<Blocker*> blockers;
201 blockers.reserve(t.size());
202 for (auto &&bf: t) {
203 blockers.push_back(bf.blocker);
204 bf.blocker = nullptr;
205 }
206 auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
207 return agg->make_blocking_future(
208 ::crimson::interruptible::interruptor<InterruptCond>::parallel_for_each(
209 std::forward<T>(t),
210 [](auto &&bf) {
211 return std::move(bf.fut);
212 }).then_interruptible([agg=std::move(agg)] {
213 return seastar::make_ready_future<>();
214 }));
215 }
216
217
218 /**
219 * Common base for all crimson-osd operations. Mainly provides
220 * an interface for registering ops in flight and dumping
221 * diagnostic information.
222 */
223 class Operation : public boost::intrusive_ref_counter<
224 Operation, boost::thread_unsafe_counter> {
225 public:
226 uint64_t get_id() const {
227 return id;
228 }
229
230 virtual unsigned get_type() const = 0;
231 virtual const char *get_type_name() const = 0;
232 virtual void print(std::ostream &) const = 0;
233
234 template <typename T>
235 seastar::future<T> with_blocking_future(blocking_future<T> &&f) {
236 if (f.fut.available()) {
237 return std::move(f.fut);
238 }
239 assert(f.blocker);
240 add_blocker(f.blocker);
241 return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
242 clear_blocker(blocker);
243 return std::move(arg);
244 });
245 }
246
247 template <typename InterruptCond, typename T>
248 ::crimson::interruptible::interruptible_future<InterruptCond, T>
249 with_blocking_future_interruptible(blocking_future<T> &&f) {
250 if (f.fut.available()) {
251 return std::move(f.fut);
252 }
253 assert(f.blocker);
254 add_blocker(f.blocker);
255 auto fut = std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
256 clear_blocker(blocker);
257 return std::move(arg);
258 });
259 return ::crimson::interruptible::interruptible_future<
260 InterruptCond, T>(std::move(fut));
261 }
262
263 template <typename InterruptCond, typename T>
264 ::crimson::interruptible::interruptible_future<InterruptCond, T>
265 with_blocking_future_interruptible(
266 blocking_interruptible_future<InterruptCond, T> &&f) {
267 if (f.fut.available()) {
268 return std::move(f.fut);
269 }
270 assert(f.blocker);
271 add_blocker(f.blocker);
272 return std::move(f.fut).template then_wrapped_interruptible(
273 [this, blocker=f.blocker](auto &&arg) {
274 clear_blocker(blocker);
275 return std::move(arg);
276 });
277 }
278
279 void dump(ceph::Formatter *f) const;
280 void dump_brief(ceph::Formatter *f) const;
281 virtual ~Operation() = default;
282
283 private:
284 virtual void dump_detail(ceph::Formatter *f) const = 0;
285
286 registry_hook_t registry_hook;
287
288 std::vector<Blocker*> blockers;
289 uint64_t id = 0;
290 void set_id(uint64_t in_id) {
291 id = in_id;
292 }
293
294 void add_blocker(Blocker *b) {
295 blockers.push_back(b);
296 }
297
298 void clear_blocker(Blocker *b) {
299 auto iter = std::find(blockers.begin(), blockers.end(), b);
300 if (iter != blockers.end()) {
301 blockers.erase(iter);
302 }
303 }
304
305 friend class OperationRegistryI;
306 template <size_t>
307 friend class OperationRegistryT;
308 };
309 using OperationRef = boost::intrusive_ptr<Operation>;
310
311 std::ostream &operator<<(std::ostream &, const Operation &op);
312
313 /**
314 * Maintains a set of lists of all active ops.
315 */
316 class OperationRegistryI {
317 using op_list_member_option = boost::intrusive::member_hook<
318 Operation,
319 registry_hook_t,
320 &Operation::registry_hook
321 >;
322
323 friend class Operation;
324 seastar::timer<seastar::lowres_clock> shutdown_timer;
325 seastar::promise<> shutdown;
326
327 protected:
328 virtual void do_register(Operation *op) = 0;
329 virtual bool registries_empty() const = 0;
330
331 public:
332 using op_list = boost::intrusive::list<
333 Operation,
334 op_list_member_option,
335 boost::intrusive::constant_time_size<false>>;
336
337 template <typename T, typename... Args>
338 typename T::IRef create_operation(Args&&... args) {
339 typename T::IRef op = new T(std::forward<Args>(args)...);
340 do_register(&*op);
341 return op;
342 }
343
344 seastar::future<> stop() {
345 shutdown_timer.set_callback([this] {
346 if (registries_empty()) {
347 shutdown.set_value();
348 shutdown_timer.cancel();
349 }
350 });
351 shutdown_timer.arm_periodic(
352 std::chrono::milliseconds(100/*TODO: use option instead*/));
353 return shutdown.get_future();
354 }
355 };
356
357
358 template <size_t NUM_REGISTRIES>
359 class OperationRegistryT : public OperationRegistryI {
360 std::array<
361 op_list,
362 NUM_REGISTRIES
363 > registries;
364
365 std::array<
366 uint64_t,
367 NUM_REGISTRIES
368 > op_id_counters = {};
369
370 protected:
371 void do_register(Operation *op) final {
372 registries[op->get_type()].push_back(*op);
373 op->set_id(++op_id_counters[op->get_type()]);
374 }
375
376 bool registries_empty() const final {
377 return std::all_of(registries.begin(),
378 registries.end(),
379 [](auto& opl) {
380 return opl.empty();
381 });
382 }
383 public:
384 template <size_t REGISTRY_INDEX>
385 const op_list& get_registry() const {
386 static_assert(
387 REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value);
388 return registries[REGISTRY_INDEX];
389 }
390 };
391
392 class PipelineExitBarrierI {
393 public:
394 using Ref = std::unique_ptr<PipelineExitBarrierI>;
395
396 /// Waits for exit barrier
397 virtual seastar::future<> wait() = 0;
398
399 /// Releases pipeline stage, can only be called after wait
400 virtual void exit() = 0;
401
402 /// Releases pipeline resources without waiting on barrier
403 virtual void cancel() = 0;
404
405 /// Must ensure that resources are released, likely by calling cancel()
406 virtual ~PipelineExitBarrierI() {}
407 };
408
409 class PipelineStageI : public Blocker {
410 public:
411 virtual seastar::future<PipelineExitBarrierI::Ref> enter() = 0;
412 };
413
414 class PipelineHandle {
415 PipelineExitBarrierI::Ref barrier;
416
417 auto wait_barrier() {
418 return barrier ? barrier->wait() : seastar::now();
419 }
420
421 public:
422 PipelineHandle() = default;
423
424 PipelineHandle(const PipelineHandle&) = delete;
425 PipelineHandle(PipelineHandle&&) = default;
426 PipelineHandle &operator=(const PipelineHandle&) = delete;
427 PipelineHandle &operator=(PipelineHandle&&) = default;
428
429 /**
430 * Returns a future which unblocks when the handle has entered the passed
431 * OrderedPipelinePhase. If already in a phase, enter will also release
432 * that phase after placing itself in the queue for the next one to preserve
433 * ordering.
434 */
435 template <typename T>
436 blocking_future<> enter(T &t) {
437 /* Strictly speaking, we probably want the blocker to be registered on
438 * the previous stage until wait_barrier() resolves and on the next
439 * until enter() resolves, but blocking_future will need some refactoring
440 * to permit that. TODO
441 */
442 return t.make_blocking_future(
443 wait_barrier().then([this, &t] {
444 auto fut = t.enter();
445 exit();
446 return std::move(fut).then([this](auto &&barrier_ref) {
447 barrier = std::move(barrier_ref);
448 return seastar::now();
449 });
450 })
451 );
452 }
453
454 /**
455 * Completes pending exit barrier without entering a new one.
456 */
457 seastar::future<> complete() {
458 auto ret = wait_barrier();
459 barrier.reset();
460 return ret;
461 }
462
463 /**
464 * Exits current phase, skips exit barrier, should only be used for op
465 * failure. Permitting the handle to be destructed as the same effect.
466 */
467 void exit() {
468 barrier.reset();
469 }
470
471 };
472
473 /**
474 * Ensures that at most one op may consider itself in the phase at a time.
475 * Ops will see enter() unblock in the order in which they tried to enter
476 * the phase. entering (though not necessarily waiting for the future to
477 * resolve) a new phase prior to exiting the previous one will ensure that
478 * the op ordering is preserved.
479 */
480 class OrderedExclusivePhase : public PipelineStageI {
481 void dump_detail(ceph::Formatter *f) const final;
482 const char *get_type_name() const final {
483 return name;
484 }
485
486 class ExitBarrier final : public PipelineExitBarrierI {
487 OrderedExclusivePhase *phase;
488 public:
489 ExitBarrier(OrderedExclusivePhase *phase) : phase(phase) {}
490
491 seastar::future<> wait() final {
492 return seastar::now();
493 }
494
495 void exit() final {
496 if (phase) {
497 phase->exit();
498 phase = nullptr;
499 }
500 }
501
502 void cancel() final {
503 exit();
504 }
505
506 ~ExitBarrier() final {
507 cancel();
508 }
509 };
510
511 void exit() {
512 mutex.unlock();
513 }
514
515 public:
516 seastar::future<PipelineExitBarrierI::Ref> enter() final {
517 return mutex.lock().then([this] {
518 return PipelineExitBarrierI::Ref(new ExitBarrier{this});
519 });
520 }
521
522 OrderedExclusivePhase(const char *name) : name(name) {}
523
524 private:
525 const char * name;
526 seastar::shared_mutex mutex;
527 };
528
529 /**
530 * Permits multiple ops to inhabit the stage concurrently, but ensures that
531 * they will proceed to the next stage in the order in which they called
532 * enter.
533 */
534 class OrderedConcurrentPhase : public PipelineStageI {
535 void dump_detail(ceph::Formatter *f) const final;
536 const char *get_type_name() const final {
537 return name;
538 }
539
540 class ExitBarrier final : public PipelineExitBarrierI {
541 OrderedConcurrentPhase *phase;
542 std::optional<seastar::future<>> barrier;
543 public:
544 ExitBarrier(
545 OrderedConcurrentPhase *phase,
546 seastar::future<> &&barrier) : phase(phase), barrier(std::move(barrier)) {}
547
548 seastar::future<> wait() final {
549 assert(phase);
550 assert(barrier);
551 auto ret = std::move(*barrier);
552 barrier = std::nullopt;
553 return ret;
554 }
555
556 void exit() final {
557 if (barrier) {
558 static_cast<void>(
559 std::move(*barrier).then([phase=this->phase] { phase->mutex.unlock(); }));
560 barrier = std::nullopt;
561 phase = nullptr;
562 }
563 if (phase) {
564 phase->mutex.unlock();
565 phase = nullptr;
566 }
567 }
568
569 void cancel() final {
570 exit();
571 }
572
573 ~ExitBarrier() final {
574 cancel();
575 }
576 };
577
578 public:
579 seastar::future<PipelineExitBarrierI::Ref> enter() final {
580 return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
581 new ExitBarrier{this, mutex.lock()});
582 }
583
584 OrderedConcurrentPhase(const char *name) : name(name) {}
585
586 private:
587 const char * name;
588 seastar::shared_mutex mutex;
589 };
590
591 /**
592 * Imposes no ordering or exclusivity at all. Ops enter without constraint and
593 * may exit in any order. Useful mainly for informational purposes between
594 * stages with constraints.
595 */
596 class UnorderedStage : public PipelineStageI {
597 void dump_detail(ceph::Formatter *f) const final {}
598 const char *get_type_name() const final {
599 return name;
600 }
601
602 class ExitBarrier final : public PipelineExitBarrierI {
603 public:
604 ExitBarrier() = default;
605
606 seastar::future<> wait() final {
607 return seastar::now();
608 }
609
610 void exit() final {}
611
612 void cancel() final {}
613
614 ~ExitBarrier() final {}
615 };
616
617 public:
618 seastar::future<PipelineExitBarrierI::Ref> enter() final {
619 return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
620 new ExitBarrier);
621 }
622
623 UnorderedStage(const char *name) : name(name) {}
624
625 private:
626 const char * name;
627 };
628
629
630 }