]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/common/interruptible_future.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / common / interruptible_future.h
CommitLineData
20effc67
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#pragma once
5
6#include <seastar/core/future-util.hh>
7#include <seastar/core/do_with.hh>
8#include <seastar/core/when_all.hh>
9#include <seastar/core/thread.hh>
10
11#include "crimson/common/log.h"
12#include "crimson/common/errorator.h"
13#ifndef NDEBUG
14#define INTR_FUT_DEBUG(FMT_MSG, ...) crimson::get_logger(ceph_subsys_).trace(FMT_MSG, ##__VA_ARGS__)
15#else
16#define INTR_FUT_DEBUG(FMT_MSG, ...)
17#endif
18
19// The interrupt condition generally works this way:
20//
21// 1. It is created by call_with_interruption_impl method, and is recorded in the thread
22// local global variable "::crimson::interruptible::interrupt_cond".
23// 2. Any continuation that's created within the execution of the continuation
24// that calls the call_with_interruption_impl method will capture the "interrupt_cond";
25// and when they starts to run, they will put that capture interruption condition
26// into "::crimson::interruptible::interrupt_cond" so that further continuations
27// created can also capture the interruption condition;
28// 3. At the end of the continuation run, the global "interrupt_cond" will be cleared
29// to prevent other continuations that are not supposed to be interrupted wrongly
30// capture an interruption condition.
31// With this approach, continuations capture the interrupt condition at their creation,
32// restore the interrupt conditions at the beginning of their execution and clear those
33// interrupt conditions at the end of their execution. So the global "interrupt_cond"
34// only hold valid interrupt conditions when the corresponding continuations are actually
35// running after which it gets cleared. Since continuations can't be executed simultaneously,
36// different continuation chains won't be able to interfere with each other.
37//
38// The global "interrupt_cond" can work as a signal about whether the continuation
39// is supposed to be interrupted, the reason that the global "interrupt_cond"
40// exists is that there may be this scenario:
41//
42// Say there's some method PG::func1(), in which the continuations created may
43// or may not be supposed to be interrupted in different situations. If we don't
44// have a global signal, we have to add an extra parameter to every method like
45// PG::func1() to indicate whether the current run should create to-be-interrupted
46// continuations or not.
47//
48// interruptor::with_interruption() and helpers can be used by users to wrap a future in
49// the interruption machinery.
50
51namespace crimson::os::seastore {
52 class TransactionConflictCondition;
53}
54
55// GCC tries to instantiate
56// seastar::lw_shared_ptr<crimson::os::seastore::TransactionConflictCondition>.
57// but we *may* not have the definition of TransactionConflictCondition at this moment,
58// a full specialization for lw_shared_ptr_accessors helps to bypass the default
59// lw_shared_ptr_accessors implementation, where std::is_base_of<.., T> is used.
60namespace seastar::internal {
61 template<>
62 struct lw_shared_ptr_accessors<::crimson::os::seastore::TransactionConflictCondition, void>
63 : lw_shared_ptr_accessors_no_esft<::crimson::os::seastore::TransactionConflictCondition>
64 {};
65}
66
1e59de90
TL
67SEASTAR_CONCEPT(
68namespace crimson::interruptible {
69 template<typename InterruptCond, typename FutureType>
70 class interruptible_future_detail;
71}
72namespace seastar::impl {
73 template <typename InterruptCond, typename FutureType, typename... Rest>
74 struct is_tuple_of_futures<std::tuple<crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>, Rest...>>
75 : is_tuple_of_futures<std::tuple<Rest...>> {};
76}
77)
78
20effc67
TL
79namespace crimson::interruptible {
80
81struct ready_future_marker {};
82struct exception_future_marker {};
83
84template <typename InterruptCond>
85class interruptible_future_builder;
86
87template <typename InterruptCond>
88struct interruptor;
89
90template <typename InterruptCond>
91using InterruptCondRef = seastar::lw_shared_ptr<InterruptCond>;
92
93template <typename InterruptCond>
94struct interrupt_cond_t {
95 InterruptCondRef<InterruptCond> interrupt_cond;
96 uint64_t ref_count = 0;
97 void set(
98 InterruptCondRef<InterruptCond>& ic) {
1e59de90
TL
99 INTR_FUT_DEBUG(
100 "{}: going to set interrupt_cond: {}, ic: {}",
101 __func__,
102 (void*)interrupt_cond.get(),
103 (void*)ic.get());
20effc67
TL
104 if (!interrupt_cond) {
105 interrupt_cond = ic;
106 }
107 assert(interrupt_cond.get() == ic.get());
108 ref_count++;
109 INTR_FUT_DEBUG(
110 "{}: interrupt_cond: {}, ref_count: {}",
111 __func__,
112 (void*)interrupt_cond.get(),
113 ref_count);
114 }
115 void reset() {
116 if (--ref_count == 0) {
117 INTR_FUT_DEBUG(
1e59de90
TL
118 "{}: clearing interrupt_cond: {},{}",
119 __func__,
20effc67
TL
120 (void*)interrupt_cond.get(),
121 typeid(InterruptCond).name());
122 interrupt_cond.release();
123 } else {
124 INTR_FUT_DEBUG(
1e59de90
TL
125 "{}: end without clearing interrupt_cond: {},{}, ref_count: {}",
126 __func__,
20effc67
TL
127 (void*)interrupt_cond.get(),
128 typeid(InterruptCond).name(),
129 ref_count);
130 }
131 }
132};
133
134template <typename InterruptCond>
135thread_local interrupt_cond_t<InterruptCond> interrupt_cond;
136
137extern template thread_local interrupt_cond_t<crimson::os::seastore::TransactionConflictCondition>
138interrupt_cond<crimson::os::seastore::TransactionConflictCondition>;
139
140template <typename InterruptCond, typename FutureType>
141class [[nodiscard]] interruptible_future_detail {};
142
143template <typename FutureType>
144struct is_interruptible_future : public std::false_type {};
145
146template <typename InterruptCond, typename FutureType>
147struct is_interruptible_future<
148 interruptible_future_detail<
149 InterruptCond,
150 FutureType>>
151 : public std::true_type {};
1e59de90
TL
152template <typename FutureType>
153concept IsInterruptibleFuture = is_interruptible_future<FutureType>::value;
154template <typename Func, typename... Args>
155concept InvokeReturnsInterruptibleFuture =
156 IsInterruptibleFuture<std::invoke_result_t<Func, Args...>>;
20effc67
TL
157
158namespace internal {
159
160template <typename InterruptCond, typename Func, typename... Args>
161auto call_with_interruption_impl(
162 InterruptCondRef<InterruptCond> interrupt_condition,
163 Func&& func, Args&&... args)
164{
165 using futurator_t = seastar::futurize<std::invoke_result_t<Func, Args...>>;
166 // there might be a case like this:
167 // with_interruption([] {
168 // interruptor::do_for_each([] {
169 // ...
170 // return interruptible_errorated_future();
171 // }).safe_then_interruptible([] {
172 // ...
173 // });
174 // })
175 // In this case, as crimson::do_for_each would directly do futurize_invoke
176 // for "call_with_interruption", we have to make sure this invocation would
177 // not errorly release ::crimson::interruptible::interrupt_cond<InterruptCond>
178
179 // If there exists an interrupt condition, which means "Func" may not be
180 // permitted to run as a result of the interruption, test it. If it does
181 // need to be interrupted, return an interruption; otherwise, restore the
182 // global "interrupt_cond" with the interruption condition, and go ahead
183 // executing the Func.
184 assert(interrupt_condition);
1e59de90 185 auto fut = interrupt_condition->template may_interrupt<
20effc67
TL
186 typename futurator_t::type>();
187 INTR_FUT_DEBUG(
188 "call_with_interruption_impl: may_interrupt: {}, "
1e59de90 189 "local interrupt_condition: {}, "
20effc67 190 "global interrupt_cond: {},{}",
1e59de90 191 (bool)fut,
20effc67
TL
192 (void*)interrupt_condition.get(),
193 (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
194 typeid(InterruptCond).name());
1e59de90 195 if (fut) {
20effc67
TL
196 return std::move(*fut);
197 }
198 interrupt_cond<InterruptCond>.set(interrupt_condition);
199
200 auto fut2 = seastar::futurize_invoke(
201 std::forward<Func>(func),
202 std::forward<Args>(args)...);
203 // Clear the global "interrupt_cond" to prevent it from interfering other
204 // continuation chains.
205 interrupt_cond<InterruptCond>.reset();
206 return fut2;
207}
208
209}
210
1e59de90
TL
211template <typename InterruptCond, typename Func, seastar::Future Ret>
212requires (!InterruptCond::template is_interruption_v<Ret>)
20effc67
TL
213auto call_with_interruption(
214 InterruptCondRef<InterruptCond> interrupt_condition,
215 Func&& func, Ret&& fut)
216{
1e59de90 217 using Result = std::invoke_result_t<Func, Ret>;
20effc67
TL
218 // if "T" is already an interrupt exception, return it directly;
219 // otherwise, upper layer application may encounter errors executing
220 // the "Func" body.
221 if (fut.failed()) {
222 std::exception_ptr eptr = fut.get_exception();
223 if (interrupt_condition->is_interruption(eptr)) {
224 return seastar::futurize<Result>::make_exception_future(std::move(eptr));
225 }
226 return internal::call_with_interruption_impl(
227 interrupt_condition,
228 std::forward<Func>(func),
229 seastar::futurize<Ret>::make_exception_future(
230 std::move(eptr)));
231 }
232 return internal::call_with_interruption_impl(
233 interrupt_condition,
234 std::forward<Func>(func),
235 std::move(fut));
236}
237
1e59de90
TL
238template <typename InterruptCond, typename Func, typename T>
239requires (InterruptCond::template is_interruption_v<T>)
20effc67
TL
240auto call_with_interruption(
241 InterruptCondRef<InterruptCond> interrupt_condition,
242 Func&& func, T&& arg)
243{
1e59de90 244 using Result = std::invoke_result_t<Func, T>;
20effc67
TL
245 // if "T" is already an interrupt exception, return it directly;
246 // otherwise, upper layer application may encounter errors executing
247 // the "Func" body.
248 return seastar::futurize<Result>::make_exception_future(
249 std::get<0>(std::tuple(std::forward<T>(arg))));
250}
251
1e59de90
TL
252template <typename InterruptCond, typename Func, typename T>
253requires (!InterruptCond::template is_interruption_v<T>) && (!seastar::Future<T>)
20effc67
TL
254auto call_with_interruption(
255 InterruptCondRef<InterruptCond> interrupt_condition,
256 Func&& func, T&& arg)
257{
258 return internal::call_with_interruption_impl(
259 interrupt_condition,
260 std::forward<Func>(func),
261 std::forward<T>(arg));
262}
263
264template <typename InterruptCond, typename Func,
265 typename Result = std::invoke_result_t<Func>>
266auto call_with_interruption(
267 InterruptCondRef<InterruptCond> interrupt_condition,
268 Func&& func)
269{
270 return internal::call_with_interruption_impl(
271 interrupt_condition,
272 std::forward<Func>(func));
273}
274
275template <typename InterruptCond, typename Func, typename... T,
276 typename Result = std::invoke_result_t<Func, T...>>
277Result non_futurized_call_with_interruption(
278 InterruptCondRef<InterruptCond> interrupt_condition,
279 Func&& func, T&&... args)
280{
281 assert(interrupt_condition);
1e59de90 282 auto fut = interrupt_condition->template may_interrupt<seastar::future<>>();
20effc67
TL
283 INTR_FUT_DEBUG(
284 "non_futurized_call_with_interruption may_interrupt: {}, "
285 "interrupt_condition: {}, interrupt_cond: {},{}",
1e59de90 286 (bool)fut,
20effc67
TL
287 (void*)interrupt_condition.get(),
288 (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
289 typeid(InterruptCond).name());
1e59de90 290 if (fut) {
20effc67
TL
291 std::rethrow_exception(fut->get_exception());
292 }
293 interrupt_cond<InterruptCond>.set(interrupt_condition);
294 try {
295 if constexpr (std::is_void_v<Result>) {
296 std::invoke(std::forward<Func>(func), std::forward<T>(args)...);
297
298 // Clear the global "interrupt_cond" to prevent it from interfering other
299 // continuation chains.
300 interrupt_cond<InterruptCond>.reset();
301 return;
302 } else {
303 auto&& err = std::invoke(std::forward<Func>(func), std::forward<T>(args)...);
304 interrupt_cond<InterruptCond>.reset();
305 return std::forward<Result>(err);
306 }
307 } catch (std::exception& e) {
308 // Clear the global "interrupt_cond" to prevent it from interfering other
309 // continuation chains.
310 interrupt_cond<InterruptCond>.reset();
311 throw e;
312 }
313}
314
315template <typename InterruptCond, typename Errorator>
316struct interruptible_errorator;
317
318template <typename T>
319struct parallel_for_each_ret {
1e59de90 320 static_assert(seastar::Future<T>);
20effc67
TL
321 using type = seastar::future<>;
322};
323
324template <template <typename...> typename ErroratedFuture, typename T>
325struct parallel_for_each_ret<
326 ErroratedFuture<
327 ::crimson::errorated_future_marker<T>>> {
328 using type = ErroratedFuture<::crimson::errorated_future_marker<void>>;
329};
330
331template <typename InterruptCond, typename FutureType>
332class parallel_for_each_state final : private seastar::continuation_base<> {
333 using elem_ret_t = std::conditional_t<
1e59de90 334 IsInterruptibleFuture<FutureType>,
20effc67
TL
335 typename FutureType::core_type,
336 FutureType>;
337 using future_t = interruptible_future_detail<
338 InterruptCond,
339 typename parallel_for_each_ret<elem_ret_t>::type>;
340 std::vector<future_t> _incomplete;
341 seastar::promise<> _result;
342 std::exception_ptr _ex;
343private:
344 void wait_for_one() noexcept {
345 while (!_incomplete.empty() && _incomplete.back().available()) {
346 if (_incomplete.back().failed()) {
347 _ex = _incomplete.back().get_exception();
348 }
349 _incomplete.pop_back();
350 }
351 if (!_incomplete.empty()) {
1e59de90
TL
352 seastar::internal::set_callback(std::move(_incomplete.back()),
353 static_cast<continuation_base<>*>(this));
20effc67
TL
354 _incomplete.pop_back();
355 return;
356 }
357 if (__builtin_expect(bool(_ex), false)) {
358 _result.set_exception(std::move(_ex));
359 } else {
360 _result.set_value();
361 }
362 delete this;
363 }
364 virtual void run_and_dispose() noexcept override {
365 if (_state.failed()) {
366 _ex = std::move(_state).get_exception();
367 }
368 _state = {};
369 wait_for_one();
370 }
371 task* waiting_task() noexcept override { return _result.waiting_task(); }
372public:
373 parallel_for_each_state(size_t n) {
374 _incomplete.reserve(n);
375 }
376 void add_future(future_t&& f) {
377 _incomplete.push_back(std::move(f));
378 }
379 future_t get_future() {
380 auto ret = _result.get_future();
381 wait_for_one();
382 return ret;
383 }
384 static future_t now() {
385 return seastar::now();
386 }
387};
388
389template <typename InterruptCond, typename T>
390class [[nodiscard]] interruptible_future_detail<InterruptCond, seastar::future<T>>
391 : private seastar::future<T> {
392public:
393 using core_type = seastar::future<T>;
394 template <typename U>
395 using interrupt_futurize_t =
396 typename interruptor<InterruptCond>::template futurize_t<U>;
397 using core_type::get0;
398 using core_type::core_type;
399 using core_type::get_exception;
400 using core_type::ignore_ready_future;
401
402 [[gnu::always_inline]]
403 interruptible_future_detail(seastar::future<T>&& base)
404 : core_type(std::move(base))
405 {}
406
407 using value_type = typename seastar::future<T>::value_type;
408 using tuple_type = typename seastar::future<T>::tuple_type;
409
410 [[gnu::always_inline]]
411 value_type&& get() {
412 if (core_type::available()) {
413 return core_type::get();
414 } else {
415 // destined to wait!
416 auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
417 INTR_FUT_DEBUG(
418 "interruptible_future_detail::get() waiting, interrupt_cond: {},{}",
419 (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
420 typeid(InterruptCond).name());
421 interrupt_cond<InterruptCond>.reset();
422 auto&& value = core_type::get();
423 interrupt_cond<InterruptCond>.set(interruption_condition);
424 INTR_FUT_DEBUG(
425 "interruptible_future_detail::get() got, interrupt_cond: {},{}",
426 (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
427 typeid(InterruptCond).name());
428 return std::move(value);
429 }
430 }
431
432 using core_type::available;
433 using core_type::failed;
434
435 template <typename Func,
436 typename Result = interrupt_futurize_t<
437 std::invoke_result_t<Func, seastar::future<T>>>>
438 [[gnu::always_inline]]
439 Result then_wrapped_interruptible(Func&& func) {
440 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
441 return core_type::then_wrapped(
442 [func=std::move(func),
443 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
444 (auto&& fut) mutable {
445 return call_with_interruption(
446 std::move(interrupt_condition),
447 std::forward<Func>(func),
448 std::move(fut));
449 });
450 }
451
452 template <typename Func>
453 [[gnu::always_inline]]
454 auto then_interruptible(Func&& func) {
455 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
456 if constexpr (std::is_void_v<T>) {
457 auto fut = core_type::then(
458 [func=std::move(func),
459 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
460 () mutable {
461 return call_with_interruption(
462 interrupt_condition,
463 std::move(func));
464 });
465 return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
466 } else {
467 auto fut = core_type::then(
468 [func=std::move(func),
469 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
470 (T&& arg) mutable {
471 return call_with_interruption(
472 interrupt_condition,
473 std::move(func),
474 std::forward<T>(arg));
475 });
476 return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
477 }
478 }
479
480 template <typename Func>
481 [[gnu::always_inline]]
482 auto then_unpack_interruptible(Func&& func) {
483 return then_interruptible([func=std::forward<Func>(func)](T&& tuple) mutable {
484 return std::apply(std::forward<Func>(func), std::move(tuple));
485 });
486 }
487
488 template <typename Func,
489 typename Result =interrupt_futurize_t<
490 std::result_of_t<Func(std::exception_ptr)>>>
491 [[gnu::always_inline]]
492 Result handle_exception_interruptible(Func&& func) {
493 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
494 return core_type::then_wrapped(
495 [func=std::forward<Func>(func),
496 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
497 (auto&& fut) mutable {
498 if (!fut.failed()) {
499 return seastar::make_ready_future<T>(fut.get());
500 } else {
501 return call_with_interruption(
502 interrupt_condition,
503 std::move(func),
504 fut.get_exception());
505 }
506 });
507 }
508
509 template <bool may_interrupt = true, typename Func,
510 typename Result = interrupt_futurize_t<
511 std::result_of_t<Func()>>>
512 [[gnu::always_inline]]
513 Result finally_interruptible(Func&& func) {
514 if constexpr (may_interrupt) {
515 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
516 return core_type::then_wrapped(
517 [func=std::forward<Func>(func),
518 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
519 (auto&& fut) mutable {
520 return call_with_interruption(
521 interrupt_condition,
522 std::move(func));
523 });
524 } else {
525 return core_type::finally(std::forward<Func>(func));
526 }
527 }
528
529 template <typename Func,
530 typename Result = interrupt_futurize_t<
531 std::result_of_t<Func(
532 typename seastar::function_traits<Func>::template arg<0>::type)>>>
533 [[gnu::always_inline]]
534 Result handle_exception_type_interruptible(Func&& func) {
535 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
536 using trait = seastar::function_traits<Func>;
537 static_assert(trait::arity == 1, "func can take only one parameter");
538 using ex_type = typename trait::template arg<0>::type;
539 return core_type::then_wrapped(
540 [func=std::forward<Func>(func),
541 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
542 (auto&& fut) mutable -> Result {
543 if (!fut.failed()) {
544 return seastar::make_ready_future<T>(fut.get());
545 } else {
546 try {
547 std::rethrow_exception(fut.get_exception());
548 } catch (ex_type& ex) {
549 return call_with_interruption(
550 interrupt_condition,
551 std::move(func), ex);
552 }
553 }
554 });
555 }
556
557
558 using my_type = interruptible_future_detail<InterruptCond, seastar::future<T>>;
559
560 template <typename Func>
561 [[gnu::always_inline]]
562 my_type finally(Func&& func) {
563 return core_type::finally(std::forward<Func>(func));
564 }
565private:
566 template <typename Func>
567 [[gnu::always_inline]]
568 auto handle_interruption(Func&& func) {
569 return core_type::then_wrapped(
570 [func=std::move(func)](auto&& fut) mutable {
571 if (fut.failed()) {
572 std::exception_ptr ex = fut.get_exception();
573 if (InterruptCond::is_interruption(ex)) {
574 return seastar::futurize_invoke(std::move(func), std::move(ex));
575 } else {
576 return seastar::make_exception_future<T>(std::move(ex));
577 }
578 } else {
579 return seastar::make_ready_future<T>(fut.get());
580 }
581 });
582 }
583
584 seastar::future<T> to_future() {
585 return static_cast<core_type&&>(std::move(*this));
586 }
587 // this is only supposed to be invoked by seastar functions
588 template <typename Func,
589 typename Result = interrupt_futurize_t<
590 std::result_of_t<Func(seastar::future<T>)>>>
591 [[gnu::always_inline]]
592 Result then_wrapped(Func&& func) {
593 return core_type::then_wrapped(
594 [func=std::move(func),
595 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
596 (auto&& fut) mutable {
597 return call_with_interruption(
598 interrupt_condition,
599 std::forward<Func>(func),
600 std::move(fut));
601 });
602 }
603 friend interruptor<InterruptCond>;
604 friend class interruptible_future_builder<InterruptCond>;
605 template <typename U>
606 friend struct ::seastar::futurize;
607 template <typename>
608 friend class ::seastar::future;
609 template <typename HeldState, typename Future>
610 friend class seastar::internal::do_with_state;
611 template<typename TX, typename F>
612 friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f);
613 template<typename T1, typename T2, typename T3_or_F, typename... More>
614 friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more);
615 template <typename T1, typename T2, typename... More>
616 friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more);
617 template <typename, typename>
618 friend class ::crimson::maybe_handle_error_t;
619 template <typename>
620 friend class ::seastar::internal::extract_values_from_futures_vector;
621 template <typename, typename>
622 friend class interruptible_future_detail;
623 template <typename ResolvedVectorTransform, typename Future>
624 friend inline typename ResolvedVectorTransform::future_type
625 seastar::internal::complete_when_all(
626 std::vector<Future>&& futures,
627 typename std::vector<Future>::iterator pos) noexcept;
628 template <typename>
629 friend class ::seastar::internal::when_all_state_component;
630 template <typename Lock, typename Func>
631 friend inline auto seastar::with_lock(Lock& lock, Func&& f);
632 template <typename IC, typename FT>
633 friend class parallel_for_each_state;
634};
635
636template <typename InterruptCond, typename Errorator>
637struct interruptible_errorator {
638 using base_ertr = Errorator;
639 using intr_cond_t = InterruptCond;
640
641 template <typename ValueT = void>
642 using future = interruptible_future_detail<InterruptCond,
643 typename Errorator::template future<ValueT>>;
644
645 template <class... NewAllowedErrorsT>
646 using extend = interruptible_errorator<
647 InterruptCond,
648 typename Errorator::template extend<NewAllowedErrorsT...>>;
649
650 template <class Ertr>
651 using extend_ertr = interruptible_errorator<
652 InterruptCond,
653 typename Errorator::template extend_ertr<Ertr>>;
654
655 template <typename ValueT = void, typename... A>
656 static interruptible_future_detail<
657 InterruptCond,
658 typename Errorator::template future<ValueT>>
659 make_ready_future(A&&... value) {
660 return interruptible_future_detail<
661 InterruptCond, typename Errorator::template future<ValueT>>(
662 Errorator::template make_ready_future<ValueT>(
663 std::forward<A>(value)...));
664 }
665 static interruptible_future_detail<
666 InterruptCond,
667 typename Errorator::template future<>> now() {
668 return interruptible_future_detail<
669 InterruptCond, typename Errorator::template future<>>(
670 Errorator::now());
671 }
672
673 using pass_further = typename Errorator::pass_further;
674};
675
676template <typename InterruptCond,
677 template <typename...> typename ErroratedFuture,
678 typename T>
679class [[nodiscard]] interruptible_future_detail<
680 InterruptCond,
681 ErroratedFuture<::crimson::errorated_future_marker<T>>>
682 : private ErroratedFuture<::crimson::errorated_future_marker<T>>
683{
684public:
685 using core_type = ErroratedFuture<crimson::errorated_future_marker<T>>;
686 using errorator_type = typename core_type::errorator_type;
687 using interrupt_errorator_type =
688 interruptible_errorator<InterruptCond, errorator_type>;
689 using interrupt_cond_type = InterruptCond;
690
691 template <typename U>
692 using interrupt_futurize_t =
693 typename interruptor<InterruptCond>::template futurize_t<U>;
694
695 using core_type::available;
696 using core_type::failed;
697 using core_type::core_type;
698 using core_type::get_exception;
699
700 using value_type = typename core_type::value_type;
701
702 interruptible_future_detail(seastar::future<T>&& fut)
703 : core_type(std::move(fut))
704 {}
705
706 template <template <typename...> typename ErroratedFuture2,
707 typename... U>
708 [[gnu::always_inline]]
709 interruptible_future_detail(
710 ErroratedFuture2<::crimson::errorated_future_marker<U...>>&& fut)
711 : core_type(std::move(fut)) {}
712
713 template <template <typename...> typename ErroratedFuture2,
714 typename... U>
715 [[gnu::always_inline]]
716 interruptible_future_detail(
717 interruptible_future_detail<InterruptCond,
718 ErroratedFuture2<::crimson::errorated_future_marker<U...>>>&& fut)
719 : core_type(static_cast<typename std::decay_t<decltype(fut)>::core_type&&>(fut)) {
720 using src_errorator_t = \
721 typename ErroratedFuture2<
722 ::crimson::errorated_future_marker<U...>>::errorator_type;
723 static_assert(core_type::errorator_type::template contains_once_v<
724 src_errorator_t>,
725 "conversion is only possible from less-or-eq errorated future!");
726 }
727
728 [[gnu::always_inline]]
729 interruptible_future_detail(
730 interruptible_future_detail<InterruptCond, seastar::future<T>>&& fut)
731 : core_type(static_cast<seastar::future<T>&&>(fut)) {}
732
733 template <class... A>
734 [[gnu::always_inline]]
735 interruptible_future_detail(ready_future_marker, A&&... a)
736 : core_type(::seastar::make_ready_future<typename core_type::value_type>(
737 std::forward<A>(a)...)) {
738 }
739 [[gnu::always_inline]]
740 interruptible_future_detail(exception_future_marker, ::seastar::future_state_base&& state) noexcept
741 : core_type(::seastar::futurize<core_type>::make_exception_future(std::move(state))) {
742 }
743 [[gnu::always_inline]]
744 interruptible_future_detail(exception_future_marker, std::exception_ptr&& ep) noexcept
745 : core_type(::seastar::futurize<core_type>::make_exception_future(std::move(ep))) {
746 }
747
748 template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT,
749 std::enable_if_t<!interruptible, int> = 0>
750 [[gnu::always_inline]]
751 auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
752 auto fut = core_type::safe_then(
753 std::forward<ValueInterruptCondT>(valfunc),
754 std::forward<ErrorVisitorT>(errfunc));
755 return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
756 }
757
758 template <typename... Args>
759 auto si_then(Args&&... args) {
760 return safe_then_interruptible(std::forward<Args>(args)...);
761 }
762
763
764 template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT,
765 typename U = T, std::enable_if_t<!std::is_void_v<U> && interruptible, int> = 0>
766 [[gnu::always_inline]]
767 auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
768 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
769 auto fut = core_type::safe_then(
770 [func=std::move(valfunc),
771 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
772 (T&& args) mutable {
773 return call_with_interruption(
774 interrupt_condition,
775 std::move(func),
776 std::forward<T>(args));
777 }, [func=std::move(errfunc),
778 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
779 (auto&& err) mutable -> decltype(auto) {
780 constexpr bool return_void = std::is_void_v<
781 std::invoke_result_t<ErrorVisitorT,
782 std::decay_t<decltype(err)>>>;
783 constexpr bool return_err = ::crimson::is_error_v<
784 std::decay_t<std::invoke_result_t<ErrorVisitorT,
785 std::decay_t<decltype(err)>>>>;
786 if constexpr (return_err || return_void) {
787 return non_futurized_call_with_interruption(
788 interrupt_condition,
789 std::move(func),
790 std::move(err));
791 } else {
792 return call_with_interruption(
793 interrupt_condition,
794 std::move(func),
795 std::move(err));
796 }
797 });
798 return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
799 }
800
801 template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT,
802 typename U = T, std::enable_if_t<std::is_void_v<U> && interruptible, int> = 0>
803 [[gnu::always_inline]]
804 auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
805 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
806 auto fut = core_type::safe_then(
807 [func=std::move(valfunc),
808 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
809 () mutable {
810 return call_with_interruption(
811 interrupt_condition,
812 std::move(func));
813 }, [func=std::move(errfunc),
814 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
815 (auto&& err) mutable -> decltype(auto) {
816 constexpr bool return_void = std::is_void_v<
817 std::invoke_result_t<ErrorVisitorT,
818 std::decay_t<decltype(err)>>>;
819 constexpr bool return_err = ::crimson::is_error_v<
820 std::decay_t<std::invoke_result_t<ErrorVisitorT,
821 std::decay_t<decltype(err)>>>>;
822 if constexpr (return_err || return_void) {
823 return non_futurized_call_with_interruption(
824 interrupt_condition,
825 std::move(func),
826 std::move(err));
827 } else {
828 return call_with_interruption(
829 interrupt_condition,
830 std::move(func),
831 std::move(err));
832 }
833 });
834 return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
835 }
836
837 template <bool interruptible = true, typename ValueInterruptCondT,
838 typename U = T, std::enable_if_t<std::is_void_v<T> && interruptible, int> = 0>
839 [[gnu::always_inline]]
840 auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
841 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
842 auto fut = core_type::safe_then(
843 [func=std::move(valfunc),
844 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
845 () mutable {
846 return call_with_interruption(
847 interrupt_condition,
848 std::move(func));
849 });
850 return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
851 }
852
853 template <typename ValFuncT, typename ErrorFuncT>
854 [[gnu::always_inline]]
855 auto safe_then_unpack_interruptible(ValFuncT&& func, ErrorFuncT&& errfunc) {
856 return safe_then_interruptible([func=std::forward<ValFuncT>(func)](T&& tuple) mutable {
857 return std::apply(std::forward<ValFuncT>(func), std::move(tuple));
858 }, std::forward<ErrorFuncT>(errfunc));
859 }
860
861 template <typename ValFuncT>
862 [[gnu::always_inline]]
863 auto safe_then_unpack_interruptible(ValFuncT&& func) {
864 return safe_then_interruptible([func=std::forward<ValFuncT>(func)](T&& tuple) mutable {
865 return std::apply(std::forward<ValFuncT>(func), std::move(tuple));
866 });
867 }
868
869 template <bool interruptible = true, typename ValueInterruptCondT,
870 typename U = T, std::enable_if_t<!std::is_void_v<T> && interruptible, int> = 0>
871 [[gnu::always_inline]]
872 auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
873 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
874 auto fut = core_type::safe_then(
875 [func=std::move(valfunc),
876 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
877 (T&& arg) mutable {
878 return call_with_interruption(
879 interrupt_condition,
880 std::move(func),
881 std::forward<T>(arg));
882 });
883 return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
884 }
885
886 template <bool interruptible = true, typename ValueInterruptCondT,
887 std::enable_if_t<!interruptible, int> = 0>
888 [[gnu::always_inline]]
889 auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
890 auto fut = core_type::safe_then(std::forward<ValueInterruptCondT>(valfunc));
891 return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
892 }
893
894 template <typename ValueInterruptCondT,
895 typename ErrorVisitorHeadT,
896 typename... ErrorVisitorTailT>
897 [[gnu::always_inline]]
898 auto safe_then_interruptible(ValueInterruptCondT&& valfunc,
899 ErrorVisitorHeadT&& err_func_head,
900 ErrorVisitorTailT&&... err_func_tail) {
901 return safe_then_interruptible(
902 std::forward<ValueInterruptCondT>(valfunc),
903 ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head),
904 std::forward<ErrorVisitorTailT>(err_func_tail)...));
905 }
906
907 template <typename ValueInterruptCondT,
908 typename ErrorVisitorHeadT,
909 typename... ErrorVisitorTailT>
910 [[gnu::always_inline]]
911 auto safe_then_interruptible_tuple(ValueInterruptCondT&& valfunc,
912 ErrorVisitorHeadT&& err_func_head,
913 ErrorVisitorTailT&&... err_func_tail) {
914 return safe_then_interruptible(
915 std::forward<ValueInterruptCondT>(valfunc),
916 ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head),
917 std::forward<ErrorVisitorTailT>(err_func_tail)...));
918 }
919
920 template <typename ValFuncT,
921 typename ErrorVisitorHeadT,
922 typename... ErrorVisitorTailT>
923 [[gnu::always_inline]]
924 auto safe_then_unpack_interruptible_tuple(
925 ValFuncT&& valfunc,
926 ErrorVisitorHeadT&& err_func_head,
927 ErrorVisitorTailT&&... err_func_tail) {
928 return safe_then_interruptible_tuple(
929 [valfunc=std::forward<ValFuncT>(valfunc)](T&& tuple) mutable {
930 return std::apply(std::forward<ValFuncT>(valfunc), std::move(tuple));
931 },
932 ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head),
933 std::forward<ErrorVisitorTailT>(err_func_tail)...));
934 }
935
936 template <bool interruptible = true, typename ErrorFunc>
937 auto handle_error_interruptible(ErrorFunc&& errfunc) {
938 if constexpr (interruptible) {
939 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
940 auto fut = core_type::handle_error(
941 [errfunc=std::move(errfunc),
942 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
943 (auto&& err) mutable -> decltype(auto) {
944 constexpr bool return_void = std::is_void_v<
945 std::invoke_result_t<ErrorFunc,
946 std::decay_t<decltype(err)>>>;
947 constexpr bool return_err = ::crimson::is_error_v<
948 std::decay_t<std::invoke_result_t<ErrorFunc,
949 std::decay_t<decltype(err)>>>>;
950 if constexpr (return_err || return_void) {
951 return non_futurized_call_with_interruption(
952 interrupt_condition,
953 std::move(errfunc),
954 std::move(err));
955 } else {
956 return call_with_interruption(
957 interrupt_condition,
958 std::move(errfunc),
959 std::move(err));
960 }
961 });
962 return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
963 } else {
964 return core_type::handle_error(std::forward<ErrorFunc>(errfunc));
965 }
966 }
967
968 template <typename ErrorFuncHead,
969 typename... ErrorFuncTail>
970 auto handle_error_interruptible(ErrorFuncHead&& error_func_head,
971 ErrorFuncTail&&... error_func_tail) {
972 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
973 static_assert(sizeof...(ErrorFuncTail) > 0);
974 return this->handle_error_interruptible(
975 ::crimson::composer(
976 std::forward<ErrorFuncHead>(error_func_head),
977 std::forward<ErrorFuncTail>(error_func_tail)...));
978 }
979
980 template <typename Func>
981 [[gnu::always_inline]]
982 auto finally(Func&& func) {
983 auto fut = core_type::finally(std::forward<Func>(func));
984 return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
985 }
986
987private:
988 using core_type::_then;
989 template <typename Func>
990 [[gnu::always_inline]]
991 auto handle_interruption(Func&& func) {
992 // see errorator.h safe_then definition
993 using func_result_t =
994 typename std::invoke_result<Func, std::exception_ptr>::type;
995 using func_ertr_t =
996 typename core_type::template get_errorator_t<func_result_t>;
997 using this_ertr_t = typename core_type::errorator_type;
998 using ret_ertr_t = typename this_ertr_t::template extend_ertr<func_ertr_t>;
999 using futurator_t = typename ret_ertr_t::template futurize<func_result_t>;
1000 return core_type::then_wrapped(
1001 [func=std::move(func),
1002 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
1003 (auto&& fut) mutable
1004 -> typename futurator_t::type {
1005 if (fut.failed()) {
1006 std::exception_ptr ex = fut.get_exception();
1007 if (InterruptCond::is_interruption(ex)) {
1008 return futurator_t::invoke(std::move(func), std::move(ex));
1009 } else {
1010 return futurator_t::make_exception_future(std::move(ex));
1011 }
1012 } else {
1013 return std::move(fut);
1014 }
1015 });
1016 }
1017
1018 ErroratedFuture<::crimson::errorated_future_marker<T>>
1019 to_future() {
1020 return static_cast<core_type&&>(std::move(*this));
1021 }
1022
1023 friend class interruptor<InterruptCond>;
1024 friend class interruptible_future_builder<InterruptCond>;
1025 template <typename U>
1026 friend struct ::seastar::futurize;
1027 template <typename>
1028 friend class ::seastar::future;
1029 template<typename TX, typename F>
1030 friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f);
1031 template<typename T1, typename T2, typename T3_or_F, typename... More>
1032 friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more);
1033 template <typename T1, typename T2, typename... More>
1034 friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more);
1035 template <typename HeldState, typename Future>
1036 friend class seastar::internal::do_with_state;
1037 template <typename, typename>
1038 friend class ::crimson::maybe_handle_error_t;
1039 template <typename, typename>
1040 friend class interruptible_future_detail;
1041 template <typename Lock, typename Func>
1042 friend inline auto seastar::with_lock(Lock& lock, Func&& f);
1043 template <typename IC, typename FT>
1044 friend class parallel_for_each_state;
1045};
1046
1047template <typename InterruptCond, typename T = void>
1048using interruptible_future =
1049 interruptible_future_detail<InterruptCond, seastar::future<T>>;
1050
1051template <typename InterruptCond, typename Errorator, typename T = void>
1052using interruptible_errorated_future =
1053 interruptible_future_detail<
1054 InterruptCond,
1055 typename Errorator::template future<T>>;
1056
1057template <typename InterruptCond>
1058struct interruptor
1059{
1060public:
1061 using condition = InterruptCond;
1062
1063 template <typename FutureType>
1064 [[gnu::always_inline]]
1065 static interruptible_future_detail<InterruptCond, FutureType>
1066 make_interruptible(FutureType&& fut) {
1067 return interruptible_future_detail<InterruptCond, FutureType>(std::move(fut));
1068 }
1069
1070 [[gnu::always_inline]]
1071 static interruptible_future_detail<InterruptCond, seastar::future<>> now() {
1072 return interruptible_future_detail<
1073 InterruptCond,
1074 seastar::future<>>(seastar::now());
1075 }
1076
1077 template <typename ValueT = void, typename... A>
1078 [[gnu::always_inline]]
1079 static interruptible_future_detail<InterruptCond, seastar::future<ValueT>>
1080 make_ready_future(A&&... value) {
1081 return interruptible_future_detail<InterruptCond, seastar::future<ValueT>>(
1082 seastar::make_ready_future<ValueT>(std::forward<A>(value)...));
1083 }
1084
1085 template <typename T>
1086 struct futurize {
1087 using type = interruptible_future_detail<
1088 InterruptCond, typename seastar::futurize<T>::type>;
1089 };
1090
1091 template <typename FutureType>
1092 struct futurize<interruptible_future_detail<InterruptCond, FutureType>> {
1093 using type = interruptible_future_detail<InterruptCond, FutureType>;
1094 };
1095
1096 template <typename T>
1097 using futurize_t = typename futurize<T>::type;
1098
1099 template <typename Container, typename AsyncAction>
1100 [[gnu::always_inline]]
1101 static auto do_for_each(Container& c, AsyncAction&& action) {
1102 return do_for_each(std::begin(c), std::end(c),
1103 std::forward<AsyncAction>(action));
1104 }
1105
1106 template <typename OpFunc, typename OnInterrupt,
1107 typename... Params>
1108 static inline auto with_interruption_cond(
1109 OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCond &&cond, Params&&... params) {
1110 INTR_FUT_DEBUG(
1111 "with_interruption_cond: interrupt_cond: {}",
1112 (void*)interrupt_cond<InterruptCond>.interrupt_cond.get());
1113 return internal::call_with_interruption_impl(
1114 seastar::make_lw_shared<InterruptCond>(std::move(cond)),
1115 std::forward<OpFunc>(opfunc),
1116 std::forward<Params>(params)...
1117 ).template handle_interruption(std::move(efunc));
1118 }
1119
1120 template <typename OpFunc, typename OnInterrupt,
1121 typename... InterruptCondParams>
1122 static inline auto with_interruption(
1123 OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCondParams&&... params) {
1124 return with_interruption_cond(
1125 std::forward<OpFunc>(opfunc),
1126 std::forward<OnInterrupt>(efunc),
1127 InterruptCond(std::forward<InterruptCondParams>(params)...));
1128 }
1129
1130 template <typename Error,
1131 typename Func,
1132 typename... Params>
1133 static inline auto with_interruption_to_error(
1134 Func &&f, InterruptCond &&cond, Params&&... params) {
1135 using func_result_t = std::invoke_result_t<Func, Params...>;
1136 using func_ertr_t =
1137 typename seastar::template futurize<
1138 func_result_t>::core_type::errorator_type;
1139 using with_trans_ertr =
1140 typename func_ertr_t::template extend_ertr<errorator<Error>>;
1141
1142 using value_type = typename func_result_t::value_type;
1143 using ftype = typename std::conditional_t<
1144 std::is_same_v<value_type, seastar::internal::monostate>,
1145 typename with_trans_ertr::template future<>,
1146 typename with_trans_ertr::template future<value_type>>;
1147
1148 return with_interruption_cond(
1149 std::forward<Func>(f),
1150 [](auto e) -> ftype {
1151 return Error::make();
1152 },
1153 std::forward<InterruptCond>(cond),
1154 std::forward<Params>(params)...);
1155 }
1156
1157 template <typename Func>
1158 [[gnu::always_inline]]
1159 static auto wrap_function(Func&& func) {
1160 return [func=std::forward<Func>(func),
1161 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable {
1162 return call_with_interruption(
1163 interrupt_condition,
1164 std::forward<Func>(func));
1165 };
1166 }
1167
1e59de90
TL
1168 template <typename Iterator,
1169 InvokeReturnsInterruptibleFuture<typename Iterator::reference> AsyncAction>
20effc67
TL
1170 [[gnu::always_inline]]
1171 static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) {
1e59de90
TL
1172 using Result = std::invoke_result_t<AsyncAction, typename Iterator::reference>;
1173 if constexpr (seastar::Future<typename Result::core_type>) {
20effc67
TL
1174 return make_interruptible(
1175 ::seastar::do_for_each(begin, end,
1176 [action=std::move(action),
1177 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
1178 (typename Iterator::reference x) mutable {
1179 return call_with_interruption(
1180 interrupt_condition,
1181 std::move(action),
1182 std::forward<decltype(*begin)>(x)).to_future();
1183 })
1184 );
1185 } else {
1186 return make_interruptible(
1187 ::crimson::do_for_each(begin, end,
1188 [action=std::move(action),
1189 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
1190 (typename Iterator::reference x) mutable {
1191 return call_with_interruption(
1192 interrupt_condition,
1193 std::move(action),
1194 std::forward<decltype(*begin)>(x)).to_future();
1195 })
1196 );
1197 }
1198 }
1199
1e59de90
TL
1200 template <typename Iterator, typename AsyncAction>
1201 requires (!InvokeReturnsInterruptibleFuture<AsyncAction, typename Iterator::reference>)
20effc67
TL
1202 [[gnu::always_inline]]
1203 static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) {
1e59de90 1204 if constexpr (seastar::InvokeReturnsAnyFuture<AsyncAction, typename Iterator::reference>) {
20effc67
TL
1205 return make_interruptible(
1206 ::seastar::do_for_each(begin, end,
1207 [action=std::move(action),
1208 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
1209 (typename Iterator::reference x) mutable {
1210 return call_with_interruption(
1211 interrupt_condition,
1212 std::move(action),
1213 std::forward<decltype(*begin)>(x));
1214 })
1215 );
1216 } else {
1217 return make_interruptible(
1218 ::crimson::do_for_each(begin, end,
1219 [action=std::move(action),
1220 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
1221 (typename Iterator::reference x) mutable {
1222 return call_with_interruption(
1223 interrupt_condition,
1224 std::move(action),
1225 std::forward<decltype(*begin)>(x));
1226 })
1227 );
1228 }
1229 }
1230
1e59de90 1231 template <InvokeReturnsInterruptibleFuture AsyncAction>
20effc67
TL
1232 [[gnu::always_inline]]
1233 static auto repeat(AsyncAction&& action) {
1e59de90
TL
1234 using Result = std::invoke_result_t<AsyncAction>;
1235 if constexpr (seastar::Future<typename Result::core_type>) {
20effc67
TL
1236 return make_interruptible(
1237 ::seastar::repeat(
1238 [action=std::move(action),
1239 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] {
1240 return call_with_interruption(
1241 interrupt_condition,
1242 std::move(action)).to_future();
1243 })
1244 );
1245 } else {
1246 return make_interruptible(
1247 ::crimson::repeat(
1248 [action=std::move(action),
1e59de90 1249 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable {
20effc67
TL
1250 return call_with_interruption(
1251 interrupt_condition,
1252 std::move(action)).to_future();
1253 })
1254 );
1255 }
1256 }
1e59de90
TL
1257 template <typename AsyncAction>
1258 requires (!InvokeReturnsInterruptibleFuture<AsyncAction>)
20effc67
TL
1259 [[gnu::always_inline]]
1260 static auto repeat(AsyncAction&& action) {
1e59de90 1261 if constexpr (seastar::InvokeReturnsAnyFuture<AsyncAction>) {
20effc67
TL
1262 return make_interruptible(
1263 ::seastar::repeat(
1264 [action=std::move(action),
1265 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] {
1266 return call_with_interruption(
1267 interrupt_condition,
1268 std::move(action));
1269 })
1270 );
1271 } else {
1272 return make_interruptible(
1273 ::crimson::repeat(
1274 [action=std::move(action),
1275 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] {
1276 return call_with_interruption(
1277 interrupt_condition,
1278 std::move(action));
1279 })
1280 );
1281 }
1282 }
1283
1284 template <typename Iterator, typename Func>
1285 static inline auto parallel_for_each(
1286 Iterator begin,
1287 Iterator end,
1288 Func&& func
1289 ) noexcept {
1290 using ResultType = std::invoke_result_t<Func, typename Iterator::reference>;
1291 parallel_for_each_state<InterruptCond, ResultType>* s = nullptr;
1292 auto decorated_func =
1293 [func=std::forward<Func>(func),
1294 interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
1295 (decltype(*Iterator())&& x) mutable {
1296 return call_with_interruption(
1297 interrupt_condition,
1298 std::forward<Func>(func),
1299 std::forward<decltype(*begin)>(x));
1300 };
1301 // Process all elements, giving each future the following treatment:
1302 // - available, not failed: do nothing
1303 // - available, failed: collect exception in ex
1304 // - not available: collect in s (allocating it if needed)
1305 while (begin != end) {
1306 auto f = seastar::futurize_invoke(decorated_func, *begin++);
1307 if (!f.available() || f.failed()) {
1308 if (!s) {
1309 using itraits = std::iterator_traits<Iterator>;
1310 auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
1311 begin, end, typename itraits::iterator_category()) + 1);
1312 s = new parallel_for_each_state<InterruptCond, ResultType>(n);
1313 }
1314 s->add_future(std::move(f));
1315 }
1316 }
1317 // If any futures were not available, hand off to parallel_for_each_state::start().
1318 // Otherwise we can return a result immediately.
1319 if (s) {
1320 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
1321 // so this isn't a leak
1322 return s->get_future();
1323 }
1324 return parallel_for_each_state<InterruptCond, ResultType>::now();
1325 }
1326
1327 template <typename Container, typename Func>
1e59de90 1328 static inline auto parallel_for_each(Container& container, Func&& func) noexcept {
20effc67
TL
1329 return parallel_for_each(
1330 std::begin(container),
1331 std::end(container),
1332 std::forward<Func>(func));
1333 }
1334
1335 template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
1336 static inline interruptible_future<InterruptCond, Initial> map_reduce(
1337 Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce&& reduce) {
1338 struct state {
1339 Initial result;
1340 Reduce reduce;
1341 };
1342 auto s = seastar::make_lw_shared(state{std::move(initial), std::move(reduce)});
1343 interruptible_future<InterruptCond> ret = seastar::make_ready_future<>();
1344 while (begin != end) {
1345 ret = seastar::futurize_invoke(mapper, *begin++).then_wrapped_interruptible(
1346 [s = s.get(), ret = std::move(ret)] (auto f) mutable {
1347 try {
1348 s->result = s->reduce(std::move(s->result), std::move(f.get0()));
1349 return std::move(ret);
1350 } catch (...) {
1351 return std::move(ret).then_wrapped_interruptible([ex = std::current_exception()] (auto f) {
1352 f.ignore_ready_future();
1353 return seastar::make_exception_future<>(ex);
1354 });
1355 }
1356 });
1357 }
1358 return ret.then_interruptible([s] {
1359 return seastar::make_ready_future<Initial>(std::move(s->result));
1360 });
1361 }
1362 template <typename Range, typename Mapper, typename Initial, typename Reduce>
1363 static inline interruptible_future<InterruptCond, Initial> map_reduce(
1364 Range&& range, Mapper&& mapper, Initial initial, Reduce&& reduce) {
1365 return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper),
1366 std::move(initial), std::move(reduce));
1367 }
1368
1e59de90
TL
1369 template<typename Fut>
1370 requires seastar::Future<Fut> || IsInterruptibleFuture<Fut>
20effc67
TL
1371 static auto futurize_invoke_if_func(Fut&& fut) noexcept {
1372 return std::forward<Fut>(fut);
1373 }
1374
1e59de90
TL
1375 template<typename Func>
1376 requires (!seastar::Future<Func>) && (!IsInterruptibleFuture<Func>)
20effc67
TL
1377 static auto futurize_invoke_if_func(Func&& func) noexcept {
1378 return seastar::futurize_invoke(std::forward<Func>(func));
1379 }
1380
1381 template <typename... FutOrFuncs>
1382 static inline auto when_all(FutOrFuncs&&... fut_or_funcs) noexcept {
1383 return ::seastar::internal::when_all_impl(
1384 futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
1385 }
1386
1387 template <typename... FutOrFuncs>
1388 static inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) noexcept {
1389 return ::seastar::internal::when_all_succeed_impl(
1390 futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
1391 }
1392
1393 template <typename Func,
1394 typename Result = futurize_t<std::invoke_result_t<Func>>>
1395 static inline Result async(Func&& func) {
1e59de90
TL
1396 auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
1397 INTR_FUT_DEBUG(
1398 "interruptible_future_detail::async() yielding out, "
1399 "interrupt_cond {},{} cleared",
1400 (void*)interruption_condition.get(),
1401 typeid(InterruptCond).name());
1402 interrupt_cond<InterruptCond>.reset();
1403 auto ret = seastar::async([func=std::forward<Func>(func),
1404 interruption_condition] () mutable {
20effc67 1405 return non_futurized_call_with_interruption(
1e59de90 1406 interruption_condition, std::forward<Func>(func));
20effc67 1407 });
1e59de90
TL
1408 interrupt_cond<InterruptCond>.set(interruption_condition);
1409 INTR_FUT_DEBUG(
1410 "interruptible_future_detail::async() yield back, interrupt_cond: {},{}",
1411 (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
1412 typeid(InterruptCond).name());
1413 return ret;
1414 }
1415
1416 template <class FutureT>
1417 static decltype(auto) green_get(FutureT&& fut) {
1418 if (fut.available()) {
1419 return fut.get();
1420 } else {
1421 // destined to wait!
1422 auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
1423 INTR_FUT_DEBUG(
1424 "green_get() waiting, interrupt_cond: {},{}",
1425 (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
1426 typeid(InterruptCond).name());
1427 interrupt_cond<InterruptCond>.reset();
1428 auto&& value = fut.get();
1429 interrupt_cond<InterruptCond>.set(interruption_condition);
1430 INTR_FUT_DEBUG(
1431 "green_get() got, interrupt_cond: {},{}",
1432 (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
1433 typeid(InterruptCond).name());
1434 return std::move(value);
1435 }
20effc67
TL
1436 }
1437
1438 static void yield() {
1439 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
1440 auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
1441 INTR_FUT_DEBUG(
1442 "interruptible_future_detail::yield() yielding out, "
1443 "interrupt_cond {},{} cleared",
1444 (void*)interruption_condition.get(),
1445 typeid(InterruptCond).name());
1446 interrupt_cond<InterruptCond>.reset();
1447 seastar::thread::yield();
1448 interrupt_cond<InterruptCond>.set(interruption_condition);
1449 INTR_FUT_DEBUG(
1450 "interruptible_future_detail::yield() yield back, interrupt_cond: {},{}",
1451 (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
1452 typeid(InterruptCond).name());
1453 }
1454
1455 static void maybe_yield() {
1456 ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
1457 if (seastar::thread::should_yield()) {
1458 auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
1459 INTR_FUT_DEBUG(
1460 "interruptible_future_detail::may_yield() yielding out, "
1461 "interrupt_cond {},{} cleared",
1462 (void*)interruption_condition.get(),
1463 typeid(InterruptCond).name());
1464 interrupt_cond<InterruptCond>.reset();
1465 seastar::thread::yield();
1466 interrupt_cond<InterruptCond>.set(interruption_condition);
1467 INTR_FUT_DEBUG(
1468 "interruptible_future_detail::may_yield() yield back, interrupt_cond: {},{}",
1469 (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
1470 typeid(InterruptCond).name());
1471 }
1472 }
1473};
1474
1475} // namespace crimson::interruptible
1476
1477namespace seastar {
1478
1479template <typename InterruptCond, typename... T>
1480struct futurize<::crimson::interruptible::interruptible_future_detail<
1481 InterruptCond, seastar::future<T...>>> {
1482 using type = ::crimson::interruptible::interruptible_future_detail<
1483 InterruptCond, seastar::future<T...>>;
1484
1485 using value_type = typename type::value_type;
1486 using tuple_type = typename type::tuple_type;
1487
1488 static type from_tuple(tuple_type&& value) {
1489 return type(ready_future_marker(), std::move(value));
1490 }
1491 static type from_tuple(const tuple_type& value) {
1492 return type(ready_future_marker(), value);
1493 }
1494 static type from_tuple(value_type&& value) {
1495 return type(ready_future_marker(), std::move(value));
1496 }
1497 static type from_tuple(const value_type& value) {
1498 return type(ready_future_marker(), value);
1499 }
1500
1501 template <typename Func, typename... FuncArgs>
1502 [[gnu::always_inline]]
1503 static inline type invoke(Func&& func, FuncArgs&&... args) noexcept {
1504 try {
1505 return func(std::forward<FuncArgs>(args)...);
1506 } catch (...) {
1507 return make_exception_future(std::current_exception());
1508 }
1509 }
1510
1511 template <typename Func>
1512 [[gnu::always_inline]]
1513 static type invoke(Func&& func, seastar::internal::monostate) noexcept {
1514 try {
1515 return ::seastar::futurize_invoke(std::forward<Func>(func));
1516 } catch (...) {
1517 return make_exception_future(std::current_exception());
1518 }
1519 }
1520
1521 template <typename Arg>
1522 static inline type make_exception_future(Arg&& arg) noexcept {
1523 return seastar::make_exception_future<T...>(std::forward<Arg>(arg));
1524 }
1525
1526 static inline type make_exception_future(future_state_base&& state) noexcept {
1527 return seastar::internal::make_exception_future<T...>(std::move(state));
1528 }
1529
1530 template<typename PromiseT, typename Func>
1531 static void satisfy_with_result_of(PromiseT&& pr, Func&& func) {
1532 func().forward_to(std::move(pr));
1533 }
1534};
1535
1536template <typename InterruptCond,
1537 template <typename...> typename ErroratedFuture,
1538 typename... T>
1539struct futurize<
1540 ::crimson::interruptible::interruptible_future_detail<
1541 InterruptCond,
1542 ErroratedFuture<::crimson::errorated_future_marker<T...>>
1543 >
1544> {
1545 using type = ::crimson::interruptible::interruptible_future_detail<
1546 InterruptCond,
1547 ErroratedFuture<::crimson::errorated_future_marker<T...>>>;
1548 using core_type = ErroratedFuture<
1549 ::crimson::errorated_future_marker<T...>>;
1550 using errorator_type =
1551 ::crimson::interruptible::interruptible_errorator<
1552 InterruptCond,
1553 typename ErroratedFuture<
1554 ::crimson::errorated_future_marker<T...>>::errorator_type>;
1555
1556 template<typename Func, typename... FuncArgs>
1557 static inline type invoke(Func&& func, FuncArgs&&... args) noexcept {
1558 try {
1559 return func(std::forward<FuncArgs>(args)...);
1560 } catch (...) {
1561 return make_exception_future(std::current_exception());
1562 }
1563 }
1564
1565 template <typename Func>
1566 [[gnu::always_inline]]
1567 static type invoke(Func&& func, seastar::internal::monostate) noexcept {
1568 try {
1569 return ::seastar::futurize_invoke(std::forward<Func>(func));
1570 } catch (...) {
1571 return make_exception_future(std::current_exception());
1572 }
1573 }
1574
1575 template <typename Arg>
1576 static inline type make_exception_future(Arg&& arg) noexcept {
1577 return core_type::errorator_type::template make_exception_future2<T...>(
1578 std::forward<Arg>(arg));
1579 }
1580
1581 template<typename PromiseT, typename Func>
1582 static void satisfy_with_result_of(PromiseT&& pr, Func&& func) {
1583 func().forward_to(std::move(pr));
1584 }
1585
1586};
1587
1588template <typename InterruptCond, typename FutureType>
1589struct continuation_base_from_future<
1590 ::crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>> {
1591 using type = typename seastar::continuation_base_from_future<FutureType>::type;
1592};
1593
1e59de90
TL
1594template <typename InterruptCond, typename FutureType>
1595struct is_future<
1596 ::crimson::interruptible::interruptible_future_detail<
1597 InterruptCond,
1598 FutureType>>
1599 : std::true_type {};
20effc67 1600} // namespace seastar