]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #pragma once | |
5 | ||
6 | #include <seastar/core/future-util.hh> | |
7 | #include <seastar/core/do_with.hh> | |
8 | #include <seastar/core/when_all.hh> | |
9 | #include <seastar/core/thread.hh> | |
10 | ||
11 | #include "crimson/common/log.h" | |
12 | #include "crimson/common/errorator.h" | |
13 | #ifndef NDEBUG | |
14 | #define INTR_FUT_DEBUG(FMT_MSG, ...) crimson::get_logger(ceph_subsys_).trace(FMT_MSG, ##__VA_ARGS__) | |
15 | #else | |
16 | #define INTR_FUT_DEBUG(FMT_MSG, ...) | |
17 | #endif | |
18 | ||
19 | // The interrupt condition generally works this way: | |
20 | // | |
21 | // 1. It is created by call_with_interruption_impl method, and is recorded in the thread | |
22 | // local global variable "::crimson::interruptible::interrupt_cond". | |
23 | // 2. Any continuation that's created within the execution of the continuation | |
24 | // that calls the call_with_interruption_impl method will capture the "interrupt_cond"; | |
25 | // and when they starts to run, they will put that capture interruption condition | |
26 | // into "::crimson::interruptible::interrupt_cond" so that further continuations | |
27 | // created can also capture the interruption condition; | |
28 | // 3. At the end of the continuation run, the global "interrupt_cond" will be cleared | |
29 | // to prevent other continuations that are not supposed to be interrupted wrongly | |
30 | // capture an interruption condition. | |
31 | // With this approach, continuations capture the interrupt condition at their creation, | |
32 | // restore the interrupt conditions at the beginning of their execution and clear those | |
33 | // interrupt conditions at the end of their execution. So the global "interrupt_cond" | |
34 | // only hold valid interrupt conditions when the corresponding continuations are actually | |
35 | // running after which it gets cleared. Since continuations can't be executed simultaneously, | |
36 | // different continuation chains won't be able to interfere with each other. | |
37 | // | |
38 | // The global "interrupt_cond" can work as a signal about whether the continuation | |
39 | // is supposed to be interrupted, the reason that the global "interrupt_cond" | |
40 | // exists is that there may be this scenario: | |
41 | // | |
42 | // Say there's some method PG::func1(), in which the continuations created may | |
43 | // or may not be supposed to be interrupted in different situations. If we don't | |
44 | // have a global signal, we have to add an extra parameter to every method like | |
45 | // PG::func1() to indicate whether the current run should create to-be-interrupted | |
46 | // continuations or not. | |
47 | // | |
48 | // interruptor::with_interruption() and helpers can be used by users to wrap a future in | |
49 | // the interruption machinery. | |
50 | ||
51 | namespace crimson::os::seastore { | |
52 | class TransactionConflictCondition; | |
53 | } | |
54 | ||
55 | // GCC tries to instantiate | |
56 | // seastar::lw_shared_ptr<crimson::os::seastore::TransactionConflictCondition>. | |
57 | // but we *may* not have the definition of TransactionConflictCondition at this moment, | |
58 | // a full specialization for lw_shared_ptr_accessors helps to bypass the default | |
59 | // lw_shared_ptr_accessors implementation, where std::is_base_of<.., T> is used. | |
60 | namespace seastar::internal { | |
61 | template<> | |
62 | struct lw_shared_ptr_accessors<::crimson::os::seastore::TransactionConflictCondition, void> | |
63 | : lw_shared_ptr_accessors_no_esft<::crimson::os::seastore::TransactionConflictCondition> | |
64 | {}; | |
65 | } | |
66 | ||
1e59de90 TL |
67 | SEASTAR_CONCEPT( |
68 | namespace crimson::interruptible { | |
69 | template<typename InterruptCond, typename FutureType> | |
70 | class interruptible_future_detail; | |
71 | } | |
72 | namespace seastar::impl { | |
73 | template <typename InterruptCond, typename FutureType, typename... Rest> | |
74 | struct is_tuple_of_futures<std::tuple<crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>, Rest...>> | |
75 | : is_tuple_of_futures<std::tuple<Rest...>> {}; | |
76 | } | |
77 | ) | |
78 | ||
20effc67 TL |
79 | namespace crimson::interruptible { |
80 | ||
81 | struct ready_future_marker {}; | |
82 | struct exception_future_marker {}; | |
83 | ||
84 | template <typename InterruptCond> | |
85 | class interruptible_future_builder; | |
86 | ||
87 | template <typename InterruptCond> | |
88 | struct interruptor; | |
89 | ||
90 | template <typename InterruptCond> | |
91 | using InterruptCondRef = seastar::lw_shared_ptr<InterruptCond>; | |
92 | ||
93 | template <typename InterruptCond> | |
94 | struct interrupt_cond_t { | |
95 | InterruptCondRef<InterruptCond> interrupt_cond; | |
96 | uint64_t ref_count = 0; | |
97 | void set( | |
98 | InterruptCondRef<InterruptCond>& ic) { | |
1e59de90 TL |
99 | INTR_FUT_DEBUG( |
100 | "{}: going to set interrupt_cond: {}, ic: {}", | |
101 | __func__, | |
102 | (void*)interrupt_cond.get(), | |
103 | (void*)ic.get()); | |
20effc67 TL |
104 | if (!interrupt_cond) { |
105 | interrupt_cond = ic; | |
106 | } | |
107 | assert(interrupt_cond.get() == ic.get()); | |
108 | ref_count++; | |
109 | INTR_FUT_DEBUG( | |
110 | "{}: interrupt_cond: {}, ref_count: {}", | |
111 | __func__, | |
112 | (void*)interrupt_cond.get(), | |
113 | ref_count); | |
114 | } | |
115 | void reset() { | |
116 | if (--ref_count == 0) { | |
117 | INTR_FUT_DEBUG( | |
1e59de90 TL |
118 | "{}: clearing interrupt_cond: {},{}", |
119 | __func__, | |
20effc67 TL |
120 | (void*)interrupt_cond.get(), |
121 | typeid(InterruptCond).name()); | |
122 | interrupt_cond.release(); | |
123 | } else { | |
124 | INTR_FUT_DEBUG( | |
1e59de90 TL |
125 | "{}: end without clearing interrupt_cond: {},{}, ref_count: {}", |
126 | __func__, | |
20effc67 TL |
127 | (void*)interrupt_cond.get(), |
128 | typeid(InterruptCond).name(), | |
129 | ref_count); | |
130 | } | |
131 | } | |
132 | }; | |
133 | ||
134 | template <typename InterruptCond> | |
135 | thread_local interrupt_cond_t<InterruptCond> interrupt_cond; | |
136 | ||
137 | extern template thread_local interrupt_cond_t<crimson::os::seastore::TransactionConflictCondition> | |
138 | interrupt_cond<crimson::os::seastore::TransactionConflictCondition>; | |
139 | ||
140 | template <typename InterruptCond, typename FutureType> | |
141 | class [[nodiscard]] interruptible_future_detail {}; | |
142 | ||
143 | template <typename FutureType> | |
144 | struct is_interruptible_future : public std::false_type {}; | |
145 | ||
146 | template <typename InterruptCond, typename FutureType> | |
147 | struct is_interruptible_future< | |
148 | interruptible_future_detail< | |
149 | InterruptCond, | |
150 | FutureType>> | |
151 | : public std::true_type {}; | |
1e59de90 TL |
152 | template <typename FutureType> |
153 | concept IsInterruptibleFuture = is_interruptible_future<FutureType>::value; | |
154 | template <typename Func, typename... Args> | |
155 | concept InvokeReturnsInterruptibleFuture = | |
156 | IsInterruptibleFuture<std::invoke_result_t<Func, Args...>>; | |
20effc67 TL |
157 | |
158 | namespace internal { | |
159 | ||
160 | template <typename InterruptCond, typename Func, typename... Args> | |
161 | auto call_with_interruption_impl( | |
162 | InterruptCondRef<InterruptCond> interrupt_condition, | |
163 | Func&& func, Args&&... args) | |
164 | { | |
165 | using futurator_t = seastar::futurize<std::invoke_result_t<Func, Args...>>; | |
166 | // there might be a case like this: | |
167 | // with_interruption([] { | |
168 | // interruptor::do_for_each([] { | |
169 | // ... | |
170 | // return interruptible_errorated_future(); | |
171 | // }).safe_then_interruptible([] { | |
172 | // ... | |
173 | // }); | |
174 | // }) | |
175 | // In this case, as crimson::do_for_each would directly do futurize_invoke | |
176 | // for "call_with_interruption", we have to make sure this invocation would | |
177 | // not errorly release ::crimson::interruptible::interrupt_cond<InterruptCond> | |
178 | ||
179 | // If there exists an interrupt condition, which means "Func" may not be | |
180 | // permitted to run as a result of the interruption, test it. If it does | |
181 | // need to be interrupted, return an interruption; otherwise, restore the | |
182 | // global "interrupt_cond" with the interruption condition, and go ahead | |
183 | // executing the Func. | |
184 | assert(interrupt_condition); | |
1e59de90 | 185 | auto fut = interrupt_condition->template may_interrupt< |
20effc67 TL |
186 | typename futurator_t::type>(); |
187 | INTR_FUT_DEBUG( | |
188 | "call_with_interruption_impl: may_interrupt: {}, " | |
1e59de90 | 189 | "local interrupt_condition: {}, " |
20effc67 | 190 | "global interrupt_cond: {},{}", |
1e59de90 | 191 | (bool)fut, |
20effc67 TL |
192 | (void*)interrupt_condition.get(), |
193 | (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), | |
194 | typeid(InterruptCond).name()); | |
1e59de90 | 195 | if (fut) { |
20effc67 TL |
196 | return std::move(*fut); |
197 | } | |
198 | interrupt_cond<InterruptCond>.set(interrupt_condition); | |
199 | ||
200 | auto fut2 = seastar::futurize_invoke( | |
201 | std::forward<Func>(func), | |
202 | std::forward<Args>(args)...); | |
203 | // Clear the global "interrupt_cond" to prevent it from interfering other | |
204 | // continuation chains. | |
205 | interrupt_cond<InterruptCond>.reset(); | |
206 | return fut2; | |
207 | } | |
208 | ||
209 | } | |
210 | ||
1e59de90 TL |
211 | template <typename InterruptCond, typename Func, seastar::Future Ret> |
212 | requires (!InterruptCond::template is_interruption_v<Ret>) | |
20effc67 TL |
213 | auto call_with_interruption( |
214 | InterruptCondRef<InterruptCond> interrupt_condition, | |
215 | Func&& func, Ret&& fut) | |
216 | { | |
1e59de90 | 217 | using Result = std::invoke_result_t<Func, Ret>; |
20effc67 TL |
218 | // if "T" is already an interrupt exception, return it directly; |
219 | // otherwise, upper layer application may encounter errors executing | |
220 | // the "Func" body. | |
221 | if (fut.failed()) { | |
222 | std::exception_ptr eptr = fut.get_exception(); | |
223 | if (interrupt_condition->is_interruption(eptr)) { | |
224 | return seastar::futurize<Result>::make_exception_future(std::move(eptr)); | |
225 | } | |
226 | return internal::call_with_interruption_impl( | |
227 | interrupt_condition, | |
228 | std::forward<Func>(func), | |
229 | seastar::futurize<Ret>::make_exception_future( | |
230 | std::move(eptr))); | |
231 | } | |
232 | return internal::call_with_interruption_impl( | |
233 | interrupt_condition, | |
234 | std::forward<Func>(func), | |
235 | std::move(fut)); | |
236 | } | |
237 | ||
1e59de90 TL |
238 | template <typename InterruptCond, typename Func, typename T> |
239 | requires (InterruptCond::template is_interruption_v<T>) | |
20effc67 TL |
240 | auto call_with_interruption( |
241 | InterruptCondRef<InterruptCond> interrupt_condition, | |
242 | Func&& func, T&& arg) | |
243 | { | |
1e59de90 | 244 | using Result = std::invoke_result_t<Func, T>; |
20effc67 TL |
245 | // if "T" is already an interrupt exception, return it directly; |
246 | // otherwise, upper layer application may encounter errors executing | |
247 | // the "Func" body. | |
248 | return seastar::futurize<Result>::make_exception_future( | |
249 | std::get<0>(std::tuple(std::forward<T>(arg)))); | |
250 | } | |
251 | ||
1e59de90 TL |
252 | template <typename InterruptCond, typename Func, typename T> |
253 | requires (!InterruptCond::template is_interruption_v<T>) && (!seastar::Future<T>) | |
20effc67 TL |
254 | auto call_with_interruption( |
255 | InterruptCondRef<InterruptCond> interrupt_condition, | |
256 | Func&& func, T&& arg) | |
257 | { | |
258 | return internal::call_with_interruption_impl( | |
259 | interrupt_condition, | |
260 | std::forward<Func>(func), | |
261 | std::forward<T>(arg)); | |
262 | } | |
263 | ||
264 | template <typename InterruptCond, typename Func, | |
265 | typename Result = std::invoke_result_t<Func>> | |
266 | auto call_with_interruption( | |
267 | InterruptCondRef<InterruptCond> interrupt_condition, | |
268 | Func&& func) | |
269 | { | |
270 | return internal::call_with_interruption_impl( | |
271 | interrupt_condition, | |
272 | std::forward<Func>(func)); | |
273 | } | |
274 | ||
275 | template <typename InterruptCond, typename Func, typename... T, | |
276 | typename Result = std::invoke_result_t<Func, T...>> | |
277 | Result non_futurized_call_with_interruption( | |
278 | InterruptCondRef<InterruptCond> interrupt_condition, | |
279 | Func&& func, T&&... args) | |
280 | { | |
281 | assert(interrupt_condition); | |
1e59de90 | 282 | auto fut = interrupt_condition->template may_interrupt<seastar::future<>>(); |
20effc67 TL |
283 | INTR_FUT_DEBUG( |
284 | "non_futurized_call_with_interruption may_interrupt: {}, " | |
285 | "interrupt_condition: {}, interrupt_cond: {},{}", | |
1e59de90 | 286 | (bool)fut, |
20effc67 TL |
287 | (void*)interrupt_condition.get(), |
288 | (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), | |
289 | typeid(InterruptCond).name()); | |
1e59de90 | 290 | if (fut) { |
20effc67 TL |
291 | std::rethrow_exception(fut->get_exception()); |
292 | } | |
293 | interrupt_cond<InterruptCond>.set(interrupt_condition); | |
294 | try { | |
295 | if constexpr (std::is_void_v<Result>) { | |
296 | std::invoke(std::forward<Func>(func), std::forward<T>(args)...); | |
297 | ||
298 | // Clear the global "interrupt_cond" to prevent it from interfering other | |
299 | // continuation chains. | |
300 | interrupt_cond<InterruptCond>.reset(); | |
301 | return; | |
302 | } else { | |
303 | auto&& err = std::invoke(std::forward<Func>(func), std::forward<T>(args)...); | |
304 | interrupt_cond<InterruptCond>.reset(); | |
305 | return std::forward<Result>(err); | |
306 | } | |
307 | } catch (std::exception& e) { | |
308 | // Clear the global "interrupt_cond" to prevent it from interfering other | |
309 | // continuation chains. | |
310 | interrupt_cond<InterruptCond>.reset(); | |
311 | throw e; | |
312 | } | |
313 | } | |
314 | ||
315 | template <typename InterruptCond, typename Errorator> | |
316 | struct interruptible_errorator; | |
317 | ||
318 | template <typename T> | |
319 | struct parallel_for_each_ret { | |
1e59de90 | 320 | static_assert(seastar::Future<T>); |
20effc67 TL |
321 | using type = seastar::future<>; |
322 | }; | |
323 | ||
324 | template <template <typename...> typename ErroratedFuture, typename T> | |
325 | struct parallel_for_each_ret< | |
326 | ErroratedFuture< | |
327 | ::crimson::errorated_future_marker<T>>> { | |
328 | using type = ErroratedFuture<::crimson::errorated_future_marker<void>>; | |
329 | }; | |
330 | ||
331 | template <typename InterruptCond, typename FutureType> | |
332 | class parallel_for_each_state final : private seastar::continuation_base<> { | |
333 | using elem_ret_t = std::conditional_t< | |
1e59de90 | 334 | IsInterruptibleFuture<FutureType>, |
20effc67 TL |
335 | typename FutureType::core_type, |
336 | FutureType>; | |
337 | using future_t = interruptible_future_detail< | |
338 | InterruptCond, | |
339 | typename parallel_for_each_ret<elem_ret_t>::type>; | |
340 | std::vector<future_t> _incomplete; | |
341 | seastar::promise<> _result; | |
342 | std::exception_ptr _ex; | |
343 | private: | |
344 | void wait_for_one() noexcept { | |
345 | while (!_incomplete.empty() && _incomplete.back().available()) { | |
346 | if (_incomplete.back().failed()) { | |
347 | _ex = _incomplete.back().get_exception(); | |
348 | } | |
349 | _incomplete.pop_back(); | |
350 | } | |
351 | if (!_incomplete.empty()) { | |
1e59de90 TL |
352 | seastar::internal::set_callback(std::move(_incomplete.back()), |
353 | static_cast<continuation_base<>*>(this)); | |
20effc67 TL |
354 | _incomplete.pop_back(); |
355 | return; | |
356 | } | |
357 | if (__builtin_expect(bool(_ex), false)) { | |
358 | _result.set_exception(std::move(_ex)); | |
359 | } else { | |
360 | _result.set_value(); | |
361 | } | |
362 | delete this; | |
363 | } | |
364 | virtual void run_and_dispose() noexcept override { | |
365 | if (_state.failed()) { | |
366 | _ex = std::move(_state).get_exception(); | |
367 | } | |
368 | _state = {}; | |
369 | wait_for_one(); | |
370 | } | |
371 | task* waiting_task() noexcept override { return _result.waiting_task(); } | |
372 | public: | |
373 | parallel_for_each_state(size_t n) { | |
374 | _incomplete.reserve(n); | |
375 | } | |
376 | void add_future(future_t&& f) { | |
377 | _incomplete.push_back(std::move(f)); | |
378 | } | |
379 | future_t get_future() { | |
380 | auto ret = _result.get_future(); | |
381 | wait_for_one(); | |
382 | return ret; | |
383 | } | |
384 | static future_t now() { | |
385 | return seastar::now(); | |
386 | } | |
387 | }; | |
388 | ||
389 | template <typename InterruptCond, typename T> | |
390 | class [[nodiscard]] interruptible_future_detail<InterruptCond, seastar::future<T>> | |
391 | : private seastar::future<T> { | |
392 | public: | |
393 | using core_type = seastar::future<T>; | |
394 | template <typename U> | |
395 | using interrupt_futurize_t = | |
396 | typename interruptor<InterruptCond>::template futurize_t<U>; | |
397 | using core_type::get0; | |
398 | using core_type::core_type; | |
399 | using core_type::get_exception; | |
400 | using core_type::ignore_ready_future; | |
401 | ||
402 | [[gnu::always_inline]] | |
403 | interruptible_future_detail(seastar::future<T>&& base) | |
404 | : core_type(std::move(base)) | |
405 | {} | |
406 | ||
407 | using value_type = typename seastar::future<T>::value_type; | |
408 | using tuple_type = typename seastar::future<T>::tuple_type; | |
409 | ||
410 | [[gnu::always_inline]] | |
411 | value_type&& get() { | |
412 | if (core_type::available()) { | |
413 | return core_type::get(); | |
414 | } else { | |
415 | // destined to wait! | |
416 | auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; | |
417 | INTR_FUT_DEBUG( | |
418 | "interruptible_future_detail::get() waiting, interrupt_cond: {},{}", | |
419 | (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), | |
420 | typeid(InterruptCond).name()); | |
421 | interrupt_cond<InterruptCond>.reset(); | |
422 | auto&& value = core_type::get(); | |
423 | interrupt_cond<InterruptCond>.set(interruption_condition); | |
424 | INTR_FUT_DEBUG( | |
425 | "interruptible_future_detail::get() got, interrupt_cond: {},{}", | |
426 | (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), | |
427 | typeid(InterruptCond).name()); | |
428 | return std::move(value); | |
429 | } | |
430 | } | |
431 | ||
432 | using core_type::available; | |
433 | using core_type::failed; | |
434 | ||
435 | template <typename Func, | |
436 | typename Result = interrupt_futurize_t< | |
437 | std::invoke_result_t<Func, seastar::future<T>>>> | |
438 | [[gnu::always_inline]] | |
439 | Result then_wrapped_interruptible(Func&& func) { | |
440 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
441 | return core_type::then_wrapped( | |
442 | [func=std::move(func), | |
443 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
444 | (auto&& fut) mutable { | |
445 | return call_with_interruption( | |
446 | std::move(interrupt_condition), | |
447 | std::forward<Func>(func), | |
448 | std::move(fut)); | |
449 | }); | |
450 | } | |
451 | ||
452 | template <typename Func> | |
453 | [[gnu::always_inline]] | |
454 | auto then_interruptible(Func&& func) { | |
455 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
456 | if constexpr (std::is_void_v<T>) { | |
457 | auto fut = core_type::then( | |
458 | [func=std::move(func), | |
459 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
460 | () mutable { | |
461 | return call_with_interruption( | |
462 | interrupt_condition, | |
463 | std::move(func)); | |
464 | }); | |
465 | return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); | |
466 | } else { | |
467 | auto fut = core_type::then( | |
468 | [func=std::move(func), | |
469 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
470 | (T&& arg) mutable { | |
471 | return call_with_interruption( | |
472 | interrupt_condition, | |
473 | std::move(func), | |
474 | std::forward<T>(arg)); | |
475 | }); | |
476 | return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); | |
477 | } | |
478 | } | |
479 | ||
480 | template <typename Func> | |
481 | [[gnu::always_inline]] | |
482 | auto then_unpack_interruptible(Func&& func) { | |
483 | return then_interruptible([func=std::forward<Func>(func)](T&& tuple) mutable { | |
484 | return std::apply(std::forward<Func>(func), std::move(tuple)); | |
485 | }); | |
486 | } | |
487 | ||
488 | template <typename Func, | |
489 | typename Result =interrupt_futurize_t< | |
490 | std::result_of_t<Func(std::exception_ptr)>>> | |
491 | [[gnu::always_inline]] | |
492 | Result handle_exception_interruptible(Func&& func) { | |
493 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
494 | return core_type::then_wrapped( | |
495 | [func=std::forward<Func>(func), | |
496 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
497 | (auto&& fut) mutable { | |
498 | if (!fut.failed()) { | |
499 | return seastar::make_ready_future<T>(fut.get()); | |
500 | } else { | |
501 | return call_with_interruption( | |
502 | interrupt_condition, | |
503 | std::move(func), | |
504 | fut.get_exception()); | |
505 | } | |
506 | }); | |
507 | } | |
508 | ||
509 | template <bool may_interrupt = true, typename Func, | |
510 | typename Result = interrupt_futurize_t< | |
511 | std::result_of_t<Func()>>> | |
512 | [[gnu::always_inline]] | |
513 | Result finally_interruptible(Func&& func) { | |
514 | if constexpr (may_interrupt) { | |
515 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
516 | return core_type::then_wrapped( | |
517 | [func=std::forward<Func>(func), | |
518 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
519 | (auto&& fut) mutable { | |
520 | return call_with_interruption( | |
521 | interrupt_condition, | |
522 | std::move(func)); | |
523 | }); | |
524 | } else { | |
525 | return core_type::finally(std::forward<Func>(func)); | |
526 | } | |
527 | } | |
528 | ||
529 | template <typename Func, | |
530 | typename Result = interrupt_futurize_t< | |
531 | std::result_of_t<Func( | |
532 | typename seastar::function_traits<Func>::template arg<0>::type)>>> | |
533 | [[gnu::always_inline]] | |
534 | Result handle_exception_type_interruptible(Func&& func) { | |
535 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
536 | using trait = seastar::function_traits<Func>; | |
537 | static_assert(trait::arity == 1, "func can take only one parameter"); | |
538 | using ex_type = typename trait::template arg<0>::type; | |
539 | return core_type::then_wrapped( | |
540 | [func=std::forward<Func>(func), | |
541 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
542 | (auto&& fut) mutable -> Result { | |
543 | if (!fut.failed()) { | |
544 | return seastar::make_ready_future<T>(fut.get()); | |
545 | } else { | |
546 | try { | |
547 | std::rethrow_exception(fut.get_exception()); | |
548 | } catch (ex_type& ex) { | |
549 | return call_with_interruption( | |
550 | interrupt_condition, | |
551 | std::move(func), ex); | |
552 | } | |
553 | } | |
554 | }); | |
555 | } | |
556 | ||
557 | ||
558 | using my_type = interruptible_future_detail<InterruptCond, seastar::future<T>>; | |
559 | ||
560 | template <typename Func> | |
561 | [[gnu::always_inline]] | |
562 | my_type finally(Func&& func) { | |
563 | return core_type::finally(std::forward<Func>(func)); | |
564 | } | |
565 | private: | |
566 | template <typename Func> | |
567 | [[gnu::always_inline]] | |
568 | auto handle_interruption(Func&& func) { | |
569 | return core_type::then_wrapped( | |
570 | [func=std::move(func)](auto&& fut) mutable { | |
571 | if (fut.failed()) { | |
572 | std::exception_ptr ex = fut.get_exception(); | |
573 | if (InterruptCond::is_interruption(ex)) { | |
574 | return seastar::futurize_invoke(std::move(func), std::move(ex)); | |
575 | } else { | |
576 | return seastar::make_exception_future<T>(std::move(ex)); | |
577 | } | |
578 | } else { | |
579 | return seastar::make_ready_future<T>(fut.get()); | |
580 | } | |
581 | }); | |
582 | } | |
583 | ||
584 | seastar::future<T> to_future() { | |
585 | return static_cast<core_type&&>(std::move(*this)); | |
586 | } | |
587 | // this is only supposed to be invoked by seastar functions | |
588 | template <typename Func, | |
589 | typename Result = interrupt_futurize_t< | |
590 | std::result_of_t<Func(seastar::future<T>)>>> | |
591 | [[gnu::always_inline]] | |
592 | Result then_wrapped(Func&& func) { | |
593 | return core_type::then_wrapped( | |
594 | [func=std::move(func), | |
595 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
596 | (auto&& fut) mutable { | |
597 | return call_with_interruption( | |
598 | interrupt_condition, | |
599 | std::forward<Func>(func), | |
600 | std::move(fut)); | |
601 | }); | |
602 | } | |
603 | friend interruptor<InterruptCond>; | |
604 | friend class interruptible_future_builder<InterruptCond>; | |
605 | template <typename U> | |
606 | friend struct ::seastar::futurize; | |
607 | template <typename> | |
608 | friend class ::seastar::future; | |
609 | template <typename HeldState, typename Future> | |
610 | friend class seastar::internal::do_with_state; | |
611 | template<typename TX, typename F> | |
612 | friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f); | |
613 | template<typename T1, typename T2, typename T3_or_F, typename... More> | |
614 | friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more); | |
615 | template <typename T1, typename T2, typename... More> | |
616 | friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more); | |
617 | template <typename, typename> | |
618 | friend class ::crimson::maybe_handle_error_t; | |
619 | template <typename> | |
620 | friend class ::seastar::internal::extract_values_from_futures_vector; | |
621 | template <typename, typename> | |
622 | friend class interruptible_future_detail; | |
623 | template <typename ResolvedVectorTransform, typename Future> | |
624 | friend inline typename ResolvedVectorTransform::future_type | |
625 | seastar::internal::complete_when_all( | |
626 | std::vector<Future>&& futures, | |
627 | typename std::vector<Future>::iterator pos) noexcept; | |
628 | template <typename> | |
629 | friend class ::seastar::internal::when_all_state_component; | |
630 | template <typename Lock, typename Func> | |
631 | friend inline auto seastar::with_lock(Lock& lock, Func&& f); | |
632 | template <typename IC, typename FT> | |
633 | friend class parallel_for_each_state; | |
634 | }; | |
635 | ||
636 | template <typename InterruptCond, typename Errorator> | |
637 | struct interruptible_errorator { | |
638 | using base_ertr = Errorator; | |
639 | using intr_cond_t = InterruptCond; | |
640 | ||
641 | template <typename ValueT = void> | |
642 | using future = interruptible_future_detail<InterruptCond, | |
643 | typename Errorator::template future<ValueT>>; | |
644 | ||
645 | template <class... NewAllowedErrorsT> | |
646 | using extend = interruptible_errorator< | |
647 | InterruptCond, | |
648 | typename Errorator::template extend<NewAllowedErrorsT...>>; | |
649 | ||
650 | template <class Ertr> | |
651 | using extend_ertr = interruptible_errorator< | |
652 | InterruptCond, | |
653 | typename Errorator::template extend_ertr<Ertr>>; | |
654 | ||
655 | template <typename ValueT = void, typename... A> | |
656 | static interruptible_future_detail< | |
657 | InterruptCond, | |
658 | typename Errorator::template future<ValueT>> | |
659 | make_ready_future(A&&... value) { | |
660 | return interruptible_future_detail< | |
661 | InterruptCond, typename Errorator::template future<ValueT>>( | |
662 | Errorator::template make_ready_future<ValueT>( | |
663 | std::forward<A>(value)...)); | |
664 | } | |
665 | static interruptible_future_detail< | |
666 | InterruptCond, | |
667 | typename Errorator::template future<>> now() { | |
668 | return interruptible_future_detail< | |
669 | InterruptCond, typename Errorator::template future<>>( | |
670 | Errorator::now()); | |
671 | } | |
672 | ||
673 | using pass_further = typename Errorator::pass_further; | |
674 | }; | |
675 | ||
676 | template <typename InterruptCond, | |
677 | template <typename...> typename ErroratedFuture, | |
678 | typename T> | |
679 | class [[nodiscard]] interruptible_future_detail< | |
680 | InterruptCond, | |
681 | ErroratedFuture<::crimson::errorated_future_marker<T>>> | |
682 | : private ErroratedFuture<::crimson::errorated_future_marker<T>> | |
683 | { | |
684 | public: | |
685 | using core_type = ErroratedFuture<crimson::errorated_future_marker<T>>; | |
686 | using errorator_type = typename core_type::errorator_type; | |
687 | using interrupt_errorator_type = | |
688 | interruptible_errorator<InterruptCond, errorator_type>; | |
689 | using interrupt_cond_type = InterruptCond; | |
690 | ||
691 | template <typename U> | |
692 | using interrupt_futurize_t = | |
693 | typename interruptor<InterruptCond>::template futurize_t<U>; | |
694 | ||
695 | using core_type::available; | |
696 | using core_type::failed; | |
697 | using core_type::core_type; | |
698 | using core_type::get_exception; | |
699 | ||
700 | using value_type = typename core_type::value_type; | |
701 | ||
702 | interruptible_future_detail(seastar::future<T>&& fut) | |
703 | : core_type(std::move(fut)) | |
704 | {} | |
705 | ||
706 | template <template <typename...> typename ErroratedFuture2, | |
707 | typename... U> | |
708 | [[gnu::always_inline]] | |
709 | interruptible_future_detail( | |
710 | ErroratedFuture2<::crimson::errorated_future_marker<U...>>&& fut) | |
711 | : core_type(std::move(fut)) {} | |
712 | ||
713 | template <template <typename...> typename ErroratedFuture2, | |
714 | typename... U> | |
715 | [[gnu::always_inline]] | |
716 | interruptible_future_detail( | |
717 | interruptible_future_detail<InterruptCond, | |
718 | ErroratedFuture2<::crimson::errorated_future_marker<U...>>>&& fut) | |
719 | : core_type(static_cast<typename std::decay_t<decltype(fut)>::core_type&&>(fut)) { | |
720 | using src_errorator_t = \ | |
721 | typename ErroratedFuture2< | |
722 | ::crimson::errorated_future_marker<U...>>::errorator_type; | |
723 | static_assert(core_type::errorator_type::template contains_once_v< | |
724 | src_errorator_t>, | |
725 | "conversion is only possible from less-or-eq errorated future!"); | |
726 | } | |
727 | ||
728 | [[gnu::always_inline]] | |
729 | interruptible_future_detail( | |
730 | interruptible_future_detail<InterruptCond, seastar::future<T>>&& fut) | |
731 | : core_type(static_cast<seastar::future<T>&&>(fut)) {} | |
732 | ||
733 | template <class... A> | |
734 | [[gnu::always_inline]] | |
735 | interruptible_future_detail(ready_future_marker, A&&... a) | |
736 | : core_type(::seastar::make_ready_future<typename core_type::value_type>( | |
737 | std::forward<A>(a)...)) { | |
738 | } | |
739 | [[gnu::always_inline]] | |
740 | interruptible_future_detail(exception_future_marker, ::seastar::future_state_base&& state) noexcept | |
741 | : core_type(::seastar::futurize<core_type>::make_exception_future(std::move(state))) { | |
742 | } | |
743 | [[gnu::always_inline]] | |
744 | interruptible_future_detail(exception_future_marker, std::exception_ptr&& ep) noexcept | |
745 | : core_type(::seastar::futurize<core_type>::make_exception_future(std::move(ep))) { | |
746 | } | |
747 | ||
748 | template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT, | |
749 | std::enable_if_t<!interruptible, int> = 0> | |
750 | [[gnu::always_inline]] | |
751 | auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { | |
752 | auto fut = core_type::safe_then( | |
753 | std::forward<ValueInterruptCondT>(valfunc), | |
754 | std::forward<ErrorVisitorT>(errfunc)); | |
755 | return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); | |
756 | } | |
757 | ||
758 | template <typename... Args> | |
759 | auto si_then(Args&&... args) { | |
760 | return safe_then_interruptible(std::forward<Args>(args)...); | |
761 | } | |
762 | ||
763 | ||
764 | template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT, | |
765 | typename U = T, std::enable_if_t<!std::is_void_v<U> && interruptible, int> = 0> | |
766 | [[gnu::always_inline]] | |
767 | auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { | |
768 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
769 | auto fut = core_type::safe_then( | |
770 | [func=std::move(valfunc), | |
771 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
772 | (T&& args) mutable { | |
773 | return call_with_interruption( | |
774 | interrupt_condition, | |
775 | std::move(func), | |
776 | std::forward<T>(args)); | |
777 | }, [func=std::move(errfunc), | |
778 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
779 | (auto&& err) mutable -> decltype(auto) { | |
780 | constexpr bool return_void = std::is_void_v< | |
781 | std::invoke_result_t<ErrorVisitorT, | |
782 | std::decay_t<decltype(err)>>>; | |
783 | constexpr bool return_err = ::crimson::is_error_v< | |
784 | std::decay_t<std::invoke_result_t<ErrorVisitorT, | |
785 | std::decay_t<decltype(err)>>>>; | |
786 | if constexpr (return_err || return_void) { | |
787 | return non_futurized_call_with_interruption( | |
788 | interrupt_condition, | |
789 | std::move(func), | |
790 | std::move(err)); | |
791 | } else { | |
792 | return call_with_interruption( | |
793 | interrupt_condition, | |
794 | std::move(func), | |
795 | std::move(err)); | |
796 | } | |
797 | }); | |
798 | return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); | |
799 | } | |
800 | ||
801 | template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT, | |
802 | typename U = T, std::enable_if_t<std::is_void_v<U> && interruptible, int> = 0> | |
803 | [[gnu::always_inline]] | |
804 | auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { | |
805 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
806 | auto fut = core_type::safe_then( | |
807 | [func=std::move(valfunc), | |
808 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
809 | () mutable { | |
810 | return call_with_interruption( | |
811 | interrupt_condition, | |
812 | std::move(func)); | |
813 | }, [func=std::move(errfunc), | |
814 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
815 | (auto&& err) mutable -> decltype(auto) { | |
816 | constexpr bool return_void = std::is_void_v< | |
817 | std::invoke_result_t<ErrorVisitorT, | |
818 | std::decay_t<decltype(err)>>>; | |
819 | constexpr bool return_err = ::crimson::is_error_v< | |
820 | std::decay_t<std::invoke_result_t<ErrorVisitorT, | |
821 | std::decay_t<decltype(err)>>>>; | |
822 | if constexpr (return_err || return_void) { | |
823 | return non_futurized_call_with_interruption( | |
824 | interrupt_condition, | |
825 | std::move(func), | |
826 | std::move(err)); | |
827 | } else { | |
828 | return call_with_interruption( | |
829 | interrupt_condition, | |
830 | std::move(func), | |
831 | std::move(err)); | |
832 | } | |
833 | }); | |
834 | return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); | |
835 | } | |
836 | ||
837 | template <bool interruptible = true, typename ValueInterruptCondT, | |
838 | typename U = T, std::enable_if_t<std::is_void_v<T> && interruptible, int> = 0> | |
839 | [[gnu::always_inline]] | |
840 | auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { | |
841 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
842 | auto fut = core_type::safe_then( | |
843 | [func=std::move(valfunc), | |
844 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
845 | () mutable { | |
846 | return call_with_interruption( | |
847 | interrupt_condition, | |
848 | std::move(func)); | |
849 | }); | |
850 | return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); | |
851 | } | |
852 | ||
853 | template <typename ValFuncT, typename ErrorFuncT> | |
854 | [[gnu::always_inline]] | |
855 | auto safe_then_unpack_interruptible(ValFuncT&& func, ErrorFuncT&& errfunc) { | |
856 | return safe_then_interruptible([func=std::forward<ValFuncT>(func)](T&& tuple) mutable { | |
857 | return std::apply(std::forward<ValFuncT>(func), std::move(tuple)); | |
858 | }, std::forward<ErrorFuncT>(errfunc)); | |
859 | } | |
860 | ||
861 | template <typename ValFuncT> | |
862 | [[gnu::always_inline]] | |
863 | auto safe_then_unpack_interruptible(ValFuncT&& func) { | |
864 | return safe_then_interruptible([func=std::forward<ValFuncT>(func)](T&& tuple) mutable { | |
865 | return std::apply(std::forward<ValFuncT>(func), std::move(tuple)); | |
866 | }); | |
867 | } | |
868 | ||
869 | template <bool interruptible = true, typename ValueInterruptCondT, | |
870 | typename U = T, std::enable_if_t<!std::is_void_v<T> && interruptible, int> = 0> | |
871 | [[gnu::always_inline]] | |
872 | auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { | |
873 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
874 | auto fut = core_type::safe_then( | |
875 | [func=std::move(valfunc), | |
876 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
877 | (T&& arg) mutable { | |
878 | return call_with_interruption( | |
879 | interrupt_condition, | |
880 | std::move(func), | |
881 | std::forward<T>(arg)); | |
882 | }); | |
883 | return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); | |
884 | } | |
885 | ||
886 | template <bool interruptible = true, typename ValueInterruptCondT, | |
887 | std::enable_if_t<!interruptible, int> = 0> | |
888 | [[gnu::always_inline]] | |
889 | auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { | |
890 | auto fut = core_type::safe_then(std::forward<ValueInterruptCondT>(valfunc)); | |
891 | return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); | |
892 | } | |
893 | ||
894 | template <typename ValueInterruptCondT, | |
895 | typename ErrorVisitorHeadT, | |
896 | typename... ErrorVisitorTailT> | |
897 | [[gnu::always_inline]] | |
898 | auto safe_then_interruptible(ValueInterruptCondT&& valfunc, | |
899 | ErrorVisitorHeadT&& err_func_head, | |
900 | ErrorVisitorTailT&&... err_func_tail) { | |
901 | return safe_then_interruptible( | |
902 | std::forward<ValueInterruptCondT>(valfunc), | |
903 | ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head), | |
904 | std::forward<ErrorVisitorTailT>(err_func_tail)...)); | |
905 | } | |
906 | ||
907 | template <typename ValueInterruptCondT, | |
908 | typename ErrorVisitorHeadT, | |
909 | typename... ErrorVisitorTailT> | |
910 | [[gnu::always_inline]] | |
911 | auto safe_then_interruptible_tuple(ValueInterruptCondT&& valfunc, | |
912 | ErrorVisitorHeadT&& err_func_head, | |
913 | ErrorVisitorTailT&&... err_func_tail) { | |
914 | return safe_then_interruptible( | |
915 | std::forward<ValueInterruptCondT>(valfunc), | |
916 | ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head), | |
917 | std::forward<ErrorVisitorTailT>(err_func_tail)...)); | |
918 | } | |
919 | ||
920 | template <typename ValFuncT, | |
921 | typename ErrorVisitorHeadT, | |
922 | typename... ErrorVisitorTailT> | |
923 | [[gnu::always_inline]] | |
924 | auto safe_then_unpack_interruptible_tuple( | |
925 | ValFuncT&& valfunc, | |
926 | ErrorVisitorHeadT&& err_func_head, | |
927 | ErrorVisitorTailT&&... err_func_tail) { | |
928 | return safe_then_interruptible_tuple( | |
929 | [valfunc=std::forward<ValFuncT>(valfunc)](T&& tuple) mutable { | |
930 | return std::apply(std::forward<ValFuncT>(valfunc), std::move(tuple)); | |
931 | }, | |
932 | ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head), | |
933 | std::forward<ErrorVisitorTailT>(err_func_tail)...)); | |
934 | } | |
935 | ||
936 | template <bool interruptible = true, typename ErrorFunc> | |
937 | auto handle_error_interruptible(ErrorFunc&& errfunc) { | |
938 | if constexpr (interruptible) { | |
939 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
940 | auto fut = core_type::handle_error( | |
941 | [errfunc=std::move(errfunc), | |
942 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
943 | (auto&& err) mutable -> decltype(auto) { | |
944 | constexpr bool return_void = std::is_void_v< | |
945 | std::invoke_result_t<ErrorFunc, | |
946 | std::decay_t<decltype(err)>>>; | |
947 | constexpr bool return_err = ::crimson::is_error_v< | |
948 | std::decay_t<std::invoke_result_t<ErrorFunc, | |
949 | std::decay_t<decltype(err)>>>>; | |
950 | if constexpr (return_err || return_void) { | |
951 | return non_futurized_call_with_interruption( | |
952 | interrupt_condition, | |
953 | std::move(errfunc), | |
954 | std::move(err)); | |
955 | } else { | |
956 | return call_with_interruption( | |
957 | interrupt_condition, | |
958 | std::move(errfunc), | |
959 | std::move(err)); | |
960 | } | |
961 | }); | |
962 | return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); | |
963 | } else { | |
964 | return core_type::handle_error(std::forward<ErrorFunc>(errfunc)); | |
965 | } | |
966 | } | |
967 | ||
968 | template <typename ErrorFuncHead, | |
969 | typename... ErrorFuncTail> | |
970 | auto handle_error_interruptible(ErrorFuncHead&& error_func_head, | |
971 | ErrorFuncTail&&... error_func_tail) { | |
972 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
973 | static_assert(sizeof...(ErrorFuncTail) > 0); | |
974 | return this->handle_error_interruptible( | |
975 | ::crimson::composer( | |
976 | std::forward<ErrorFuncHead>(error_func_head), | |
977 | std::forward<ErrorFuncTail>(error_func_tail)...)); | |
978 | } | |
979 | ||
980 | template <typename Func> | |
981 | [[gnu::always_inline]] | |
982 | auto finally(Func&& func) { | |
983 | auto fut = core_type::finally(std::forward<Func>(func)); | |
984 | return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); | |
985 | } | |
986 | ||
987 | private: | |
988 | using core_type::_then; | |
989 | template <typename Func> | |
990 | [[gnu::always_inline]] | |
991 | auto handle_interruption(Func&& func) { | |
992 | // see errorator.h safe_then definition | |
993 | using func_result_t = | |
994 | typename std::invoke_result<Func, std::exception_ptr>::type; | |
995 | using func_ertr_t = | |
996 | typename core_type::template get_errorator_t<func_result_t>; | |
997 | using this_ertr_t = typename core_type::errorator_type; | |
998 | using ret_ertr_t = typename this_ertr_t::template extend_ertr<func_ertr_t>; | |
999 | using futurator_t = typename ret_ertr_t::template futurize<func_result_t>; | |
1000 | return core_type::then_wrapped( | |
1001 | [func=std::move(func), | |
1002 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
1003 | (auto&& fut) mutable | |
1004 | -> typename futurator_t::type { | |
1005 | if (fut.failed()) { | |
1006 | std::exception_ptr ex = fut.get_exception(); | |
1007 | if (InterruptCond::is_interruption(ex)) { | |
1008 | return futurator_t::invoke(std::move(func), std::move(ex)); | |
1009 | } else { | |
1010 | return futurator_t::make_exception_future(std::move(ex)); | |
1011 | } | |
1012 | } else { | |
1013 | return std::move(fut); | |
1014 | } | |
1015 | }); | |
1016 | } | |
1017 | ||
1018 | ErroratedFuture<::crimson::errorated_future_marker<T>> | |
1019 | to_future() { | |
1020 | return static_cast<core_type&&>(std::move(*this)); | |
1021 | } | |
1022 | ||
1023 | friend class interruptor<InterruptCond>; | |
1024 | friend class interruptible_future_builder<InterruptCond>; | |
1025 | template <typename U> | |
1026 | friend struct ::seastar::futurize; | |
1027 | template <typename> | |
1028 | friend class ::seastar::future; | |
1029 | template<typename TX, typename F> | |
1030 | friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f); | |
1031 | template<typename T1, typename T2, typename T3_or_F, typename... More> | |
1032 | friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more); | |
1033 | template <typename T1, typename T2, typename... More> | |
1034 | friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more); | |
1035 | template <typename HeldState, typename Future> | |
1036 | friend class seastar::internal::do_with_state; | |
1037 | template <typename, typename> | |
1038 | friend class ::crimson::maybe_handle_error_t; | |
1039 | template <typename, typename> | |
1040 | friend class interruptible_future_detail; | |
1041 | template <typename Lock, typename Func> | |
1042 | friend inline auto seastar::with_lock(Lock& lock, Func&& f); | |
1043 | template <typename IC, typename FT> | |
1044 | friend class parallel_for_each_state; | |
1045 | }; | |
1046 | ||
1047 | template <typename InterruptCond, typename T = void> | |
1048 | using interruptible_future = | |
1049 | interruptible_future_detail<InterruptCond, seastar::future<T>>; | |
1050 | ||
1051 | template <typename InterruptCond, typename Errorator, typename T = void> | |
1052 | using interruptible_errorated_future = | |
1053 | interruptible_future_detail< | |
1054 | InterruptCond, | |
1055 | typename Errorator::template future<T>>; | |
1056 | ||
1057 | template <typename InterruptCond> | |
1058 | struct interruptor | |
1059 | { | |
1060 | public: | |
1061 | using condition = InterruptCond; | |
1062 | ||
1063 | template <typename FutureType> | |
1064 | [[gnu::always_inline]] | |
1065 | static interruptible_future_detail<InterruptCond, FutureType> | |
1066 | make_interruptible(FutureType&& fut) { | |
1067 | return interruptible_future_detail<InterruptCond, FutureType>(std::move(fut)); | |
1068 | } | |
1069 | ||
1070 | [[gnu::always_inline]] | |
1071 | static interruptible_future_detail<InterruptCond, seastar::future<>> now() { | |
1072 | return interruptible_future_detail< | |
1073 | InterruptCond, | |
1074 | seastar::future<>>(seastar::now()); | |
1075 | } | |
1076 | ||
1077 | template <typename ValueT = void, typename... A> | |
1078 | [[gnu::always_inline]] | |
1079 | static interruptible_future_detail<InterruptCond, seastar::future<ValueT>> | |
1080 | make_ready_future(A&&... value) { | |
1081 | return interruptible_future_detail<InterruptCond, seastar::future<ValueT>>( | |
1082 | seastar::make_ready_future<ValueT>(std::forward<A>(value)...)); | |
1083 | } | |
1084 | ||
1085 | template <typename T> | |
1086 | struct futurize { | |
1087 | using type = interruptible_future_detail< | |
1088 | InterruptCond, typename seastar::futurize<T>::type>; | |
1089 | }; | |
1090 | ||
1091 | template <typename FutureType> | |
1092 | struct futurize<interruptible_future_detail<InterruptCond, FutureType>> { | |
1093 | using type = interruptible_future_detail<InterruptCond, FutureType>; | |
1094 | }; | |
1095 | ||
1096 | template <typename T> | |
1097 | using futurize_t = typename futurize<T>::type; | |
1098 | ||
1099 | template <typename Container, typename AsyncAction> | |
1100 | [[gnu::always_inline]] | |
1101 | static auto do_for_each(Container& c, AsyncAction&& action) { | |
1102 | return do_for_each(std::begin(c), std::end(c), | |
1103 | std::forward<AsyncAction>(action)); | |
1104 | } | |
1105 | ||
1106 | template <typename OpFunc, typename OnInterrupt, | |
1107 | typename... Params> | |
1108 | static inline auto with_interruption_cond( | |
1109 | OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCond &&cond, Params&&... params) { | |
1110 | INTR_FUT_DEBUG( | |
1111 | "with_interruption_cond: interrupt_cond: {}", | |
1112 | (void*)interrupt_cond<InterruptCond>.interrupt_cond.get()); | |
1113 | return internal::call_with_interruption_impl( | |
1114 | seastar::make_lw_shared<InterruptCond>(std::move(cond)), | |
1115 | std::forward<OpFunc>(opfunc), | |
1116 | std::forward<Params>(params)... | |
1117 | ).template handle_interruption(std::move(efunc)); | |
1118 | } | |
1119 | ||
1120 | template <typename OpFunc, typename OnInterrupt, | |
1121 | typename... InterruptCondParams> | |
1122 | static inline auto with_interruption( | |
1123 | OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCondParams&&... params) { | |
1124 | return with_interruption_cond( | |
1125 | std::forward<OpFunc>(opfunc), | |
1126 | std::forward<OnInterrupt>(efunc), | |
1127 | InterruptCond(std::forward<InterruptCondParams>(params)...)); | |
1128 | } | |
1129 | ||
1130 | template <typename Error, | |
1131 | typename Func, | |
1132 | typename... Params> | |
1133 | static inline auto with_interruption_to_error( | |
1134 | Func &&f, InterruptCond &&cond, Params&&... params) { | |
1135 | using func_result_t = std::invoke_result_t<Func, Params...>; | |
1136 | using func_ertr_t = | |
1137 | typename seastar::template futurize< | |
1138 | func_result_t>::core_type::errorator_type; | |
1139 | using with_trans_ertr = | |
1140 | typename func_ertr_t::template extend_ertr<errorator<Error>>; | |
1141 | ||
1142 | using value_type = typename func_result_t::value_type; | |
1143 | using ftype = typename std::conditional_t< | |
1144 | std::is_same_v<value_type, seastar::internal::monostate>, | |
1145 | typename with_trans_ertr::template future<>, | |
1146 | typename with_trans_ertr::template future<value_type>>; | |
1147 | ||
1148 | return with_interruption_cond( | |
1149 | std::forward<Func>(f), | |
1150 | [](auto e) -> ftype { | |
1151 | return Error::make(); | |
1152 | }, | |
1153 | std::forward<InterruptCond>(cond), | |
1154 | std::forward<Params>(params)...); | |
1155 | } | |
1156 | ||
1157 | template <typename Func> | |
1158 | [[gnu::always_inline]] | |
1159 | static auto wrap_function(Func&& func) { | |
1160 | return [func=std::forward<Func>(func), | |
1161 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable { | |
1162 | return call_with_interruption( | |
1163 | interrupt_condition, | |
1164 | std::forward<Func>(func)); | |
1165 | }; | |
1166 | } | |
1167 | ||
1e59de90 TL |
1168 | template <typename Iterator, |
1169 | InvokeReturnsInterruptibleFuture<typename Iterator::reference> AsyncAction> | |
20effc67 TL |
1170 | [[gnu::always_inline]] |
1171 | static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) { | |
1e59de90 TL |
1172 | using Result = std::invoke_result_t<AsyncAction, typename Iterator::reference>; |
1173 | if constexpr (seastar::Future<typename Result::core_type>) { | |
20effc67 TL |
1174 | return make_interruptible( |
1175 | ::seastar::do_for_each(begin, end, | |
1176 | [action=std::move(action), | |
1177 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
1178 | (typename Iterator::reference x) mutable { | |
1179 | return call_with_interruption( | |
1180 | interrupt_condition, | |
1181 | std::move(action), | |
1182 | std::forward<decltype(*begin)>(x)).to_future(); | |
1183 | }) | |
1184 | ); | |
1185 | } else { | |
1186 | return make_interruptible( | |
1187 | ::crimson::do_for_each(begin, end, | |
1188 | [action=std::move(action), | |
1189 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
1190 | (typename Iterator::reference x) mutable { | |
1191 | return call_with_interruption( | |
1192 | interrupt_condition, | |
1193 | std::move(action), | |
1194 | std::forward<decltype(*begin)>(x)).to_future(); | |
1195 | }) | |
1196 | ); | |
1197 | } | |
1198 | } | |
1199 | ||
1e59de90 TL |
1200 | template <typename Iterator, typename AsyncAction> |
1201 | requires (!InvokeReturnsInterruptibleFuture<AsyncAction, typename Iterator::reference>) | |
20effc67 TL |
1202 | [[gnu::always_inline]] |
1203 | static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) { | |
1e59de90 | 1204 | if constexpr (seastar::InvokeReturnsAnyFuture<AsyncAction, typename Iterator::reference>) { |
20effc67 TL |
1205 | return make_interruptible( |
1206 | ::seastar::do_for_each(begin, end, | |
1207 | [action=std::move(action), | |
1208 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
1209 | (typename Iterator::reference x) mutable { | |
1210 | return call_with_interruption( | |
1211 | interrupt_condition, | |
1212 | std::move(action), | |
1213 | std::forward<decltype(*begin)>(x)); | |
1214 | }) | |
1215 | ); | |
1216 | } else { | |
1217 | return make_interruptible( | |
1218 | ::crimson::do_for_each(begin, end, | |
1219 | [action=std::move(action), | |
1220 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
1221 | (typename Iterator::reference x) mutable { | |
1222 | return call_with_interruption( | |
1223 | interrupt_condition, | |
1224 | std::move(action), | |
1225 | std::forward<decltype(*begin)>(x)); | |
1226 | }) | |
1227 | ); | |
1228 | } | |
1229 | } | |
1230 | ||
1e59de90 | 1231 | template <InvokeReturnsInterruptibleFuture AsyncAction> |
20effc67 TL |
1232 | [[gnu::always_inline]] |
1233 | static auto repeat(AsyncAction&& action) { | |
1e59de90 TL |
1234 | using Result = std::invoke_result_t<AsyncAction>; |
1235 | if constexpr (seastar::Future<typename Result::core_type>) { | |
20effc67 TL |
1236 | return make_interruptible( |
1237 | ::seastar::repeat( | |
1238 | [action=std::move(action), | |
1239 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] { | |
1240 | return call_with_interruption( | |
1241 | interrupt_condition, | |
1242 | std::move(action)).to_future(); | |
1243 | }) | |
1244 | ); | |
1245 | } else { | |
1246 | return make_interruptible( | |
1247 | ::crimson::repeat( | |
1248 | [action=std::move(action), | |
1e59de90 | 1249 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable { |
20effc67 TL |
1250 | return call_with_interruption( |
1251 | interrupt_condition, | |
1252 | std::move(action)).to_future(); | |
1253 | }) | |
1254 | ); | |
1255 | } | |
1256 | } | |
1e59de90 TL |
1257 | template <typename AsyncAction> |
1258 | requires (!InvokeReturnsInterruptibleFuture<AsyncAction>) | |
20effc67 TL |
1259 | [[gnu::always_inline]] |
1260 | static auto repeat(AsyncAction&& action) { | |
1e59de90 | 1261 | if constexpr (seastar::InvokeReturnsAnyFuture<AsyncAction>) { |
20effc67 TL |
1262 | return make_interruptible( |
1263 | ::seastar::repeat( | |
1264 | [action=std::move(action), | |
1265 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] { | |
1266 | return call_with_interruption( | |
1267 | interrupt_condition, | |
1268 | std::move(action)); | |
1269 | }) | |
1270 | ); | |
1271 | } else { | |
1272 | return make_interruptible( | |
1273 | ::crimson::repeat( | |
1274 | [action=std::move(action), | |
1275 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] { | |
1276 | return call_with_interruption( | |
1277 | interrupt_condition, | |
1278 | std::move(action)); | |
1279 | }) | |
1280 | ); | |
1281 | } | |
1282 | } | |
1283 | ||
1284 | template <typename Iterator, typename Func> | |
1285 | static inline auto parallel_for_each( | |
1286 | Iterator begin, | |
1287 | Iterator end, | |
1288 | Func&& func | |
1289 | ) noexcept { | |
1290 | using ResultType = std::invoke_result_t<Func, typename Iterator::reference>; | |
1291 | parallel_for_each_state<InterruptCond, ResultType>* s = nullptr; | |
1292 | auto decorated_func = | |
1293 | [func=std::forward<Func>(func), | |
1294 | interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] | |
1295 | (decltype(*Iterator())&& x) mutable { | |
1296 | return call_with_interruption( | |
1297 | interrupt_condition, | |
1298 | std::forward<Func>(func), | |
1299 | std::forward<decltype(*begin)>(x)); | |
1300 | }; | |
1301 | // Process all elements, giving each future the following treatment: | |
1302 | // - available, not failed: do nothing | |
1303 | // - available, failed: collect exception in ex | |
1304 | // - not available: collect in s (allocating it if needed) | |
1305 | while (begin != end) { | |
1306 | auto f = seastar::futurize_invoke(decorated_func, *begin++); | |
1307 | if (!f.available() || f.failed()) { | |
1308 | if (!s) { | |
1309 | using itraits = std::iterator_traits<Iterator>; | |
1310 | auto n = (seastar::internal::iterator_range_estimate_vector_capacity( | |
1311 | begin, end, typename itraits::iterator_category()) + 1); | |
1312 | s = new parallel_for_each_state<InterruptCond, ResultType>(n); | |
1313 | } | |
1314 | s->add_future(std::move(f)); | |
1315 | } | |
1316 | } | |
1317 | // If any futures were not available, hand off to parallel_for_each_state::start(). | |
1318 | // Otherwise we can return a result immediately. | |
1319 | if (s) { | |
1320 | // s->get_future() takes ownership of s (and chains it to one of the futures it contains) | |
1321 | // so this isn't a leak | |
1322 | return s->get_future(); | |
1323 | } | |
1324 | return parallel_for_each_state<InterruptCond, ResultType>::now(); | |
1325 | } | |
1326 | ||
1327 | template <typename Container, typename Func> | |
1e59de90 | 1328 | static inline auto parallel_for_each(Container& container, Func&& func) noexcept { |
20effc67 TL |
1329 | return parallel_for_each( |
1330 | std::begin(container), | |
1331 | std::end(container), | |
1332 | std::forward<Func>(func)); | |
1333 | } | |
1334 | ||
1335 | template <typename Iterator, typename Mapper, typename Initial, typename Reduce> | |
1336 | static inline interruptible_future<InterruptCond, Initial> map_reduce( | |
1337 | Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce&& reduce) { | |
1338 | struct state { | |
1339 | Initial result; | |
1340 | Reduce reduce; | |
1341 | }; | |
1342 | auto s = seastar::make_lw_shared(state{std::move(initial), std::move(reduce)}); | |
1343 | interruptible_future<InterruptCond> ret = seastar::make_ready_future<>(); | |
1344 | while (begin != end) { | |
1345 | ret = seastar::futurize_invoke(mapper, *begin++).then_wrapped_interruptible( | |
1346 | [s = s.get(), ret = std::move(ret)] (auto f) mutable { | |
1347 | try { | |
1348 | s->result = s->reduce(std::move(s->result), std::move(f.get0())); | |
1349 | return std::move(ret); | |
1350 | } catch (...) { | |
1351 | return std::move(ret).then_wrapped_interruptible([ex = std::current_exception()] (auto f) { | |
1352 | f.ignore_ready_future(); | |
1353 | return seastar::make_exception_future<>(ex); | |
1354 | }); | |
1355 | } | |
1356 | }); | |
1357 | } | |
1358 | return ret.then_interruptible([s] { | |
1359 | return seastar::make_ready_future<Initial>(std::move(s->result)); | |
1360 | }); | |
1361 | } | |
1362 | template <typename Range, typename Mapper, typename Initial, typename Reduce> | |
1363 | static inline interruptible_future<InterruptCond, Initial> map_reduce( | |
1364 | Range&& range, Mapper&& mapper, Initial initial, Reduce&& reduce) { | |
1365 | return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper), | |
1366 | std::move(initial), std::move(reduce)); | |
1367 | } | |
1368 | ||
1e59de90 TL |
1369 | template<typename Fut> |
1370 | requires seastar::Future<Fut> || IsInterruptibleFuture<Fut> | |
20effc67 TL |
1371 | static auto futurize_invoke_if_func(Fut&& fut) noexcept { |
1372 | return std::forward<Fut>(fut); | |
1373 | } | |
1374 | ||
1e59de90 TL |
1375 | template<typename Func> |
1376 | requires (!seastar::Future<Func>) && (!IsInterruptibleFuture<Func>) | |
20effc67 TL |
1377 | static auto futurize_invoke_if_func(Func&& func) noexcept { |
1378 | return seastar::futurize_invoke(std::forward<Func>(func)); | |
1379 | } | |
1380 | ||
1381 | template <typename... FutOrFuncs> | |
1382 | static inline auto when_all(FutOrFuncs&&... fut_or_funcs) noexcept { | |
1383 | return ::seastar::internal::when_all_impl( | |
1384 | futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...); | |
1385 | } | |
1386 | ||
1387 | template <typename... FutOrFuncs> | |
1388 | static inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) noexcept { | |
1389 | return ::seastar::internal::when_all_succeed_impl( | |
1390 | futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...); | |
1391 | } | |
1392 | ||
1393 | template <typename Func, | |
1394 | typename Result = futurize_t<std::invoke_result_t<Func>>> | |
1395 | static inline Result async(Func&& func) { | |
1e59de90 TL |
1396 | auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; |
1397 | INTR_FUT_DEBUG( | |
1398 | "interruptible_future_detail::async() yielding out, " | |
1399 | "interrupt_cond {},{} cleared", | |
1400 | (void*)interruption_condition.get(), | |
1401 | typeid(InterruptCond).name()); | |
1402 | interrupt_cond<InterruptCond>.reset(); | |
1403 | auto ret = seastar::async([func=std::forward<Func>(func), | |
1404 | interruption_condition] () mutable { | |
20effc67 | 1405 | return non_futurized_call_with_interruption( |
1e59de90 | 1406 | interruption_condition, std::forward<Func>(func)); |
20effc67 | 1407 | }); |
1e59de90 TL |
1408 | interrupt_cond<InterruptCond>.set(interruption_condition); |
1409 | INTR_FUT_DEBUG( | |
1410 | "interruptible_future_detail::async() yield back, interrupt_cond: {},{}", | |
1411 | (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), | |
1412 | typeid(InterruptCond).name()); | |
1413 | return ret; | |
1414 | } | |
1415 | ||
1416 | template <class FutureT> | |
1417 | static decltype(auto) green_get(FutureT&& fut) { | |
1418 | if (fut.available()) { | |
1419 | return fut.get(); | |
1420 | } else { | |
1421 | // destined to wait! | |
1422 | auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; | |
1423 | INTR_FUT_DEBUG( | |
1424 | "green_get() waiting, interrupt_cond: {},{}", | |
1425 | (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), | |
1426 | typeid(InterruptCond).name()); | |
1427 | interrupt_cond<InterruptCond>.reset(); | |
1428 | auto&& value = fut.get(); | |
1429 | interrupt_cond<InterruptCond>.set(interruption_condition); | |
1430 | INTR_FUT_DEBUG( | |
1431 | "green_get() got, interrupt_cond: {},{}", | |
1432 | (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), | |
1433 | typeid(InterruptCond).name()); | |
1434 | return std::move(value); | |
1435 | } | |
20effc67 TL |
1436 | } |
1437 | ||
1438 | static void yield() { | |
1439 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
1440 | auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; | |
1441 | INTR_FUT_DEBUG( | |
1442 | "interruptible_future_detail::yield() yielding out, " | |
1443 | "interrupt_cond {},{} cleared", | |
1444 | (void*)interruption_condition.get(), | |
1445 | typeid(InterruptCond).name()); | |
1446 | interrupt_cond<InterruptCond>.reset(); | |
1447 | seastar::thread::yield(); | |
1448 | interrupt_cond<InterruptCond>.set(interruption_condition); | |
1449 | INTR_FUT_DEBUG( | |
1450 | "interruptible_future_detail::yield() yield back, interrupt_cond: {},{}", | |
1451 | (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), | |
1452 | typeid(InterruptCond).name()); | |
1453 | } | |
1454 | ||
1455 | static void maybe_yield() { | |
1456 | ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); | |
1457 | if (seastar::thread::should_yield()) { | |
1458 | auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; | |
1459 | INTR_FUT_DEBUG( | |
1460 | "interruptible_future_detail::may_yield() yielding out, " | |
1461 | "interrupt_cond {},{} cleared", | |
1462 | (void*)interruption_condition.get(), | |
1463 | typeid(InterruptCond).name()); | |
1464 | interrupt_cond<InterruptCond>.reset(); | |
1465 | seastar::thread::yield(); | |
1466 | interrupt_cond<InterruptCond>.set(interruption_condition); | |
1467 | INTR_FUT_DEBUG( | |
1468 | "interruptible_future_detail::may_yield() yield back, interrupt_cond: {},{}", | |
1469 | (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), | |
1470 | typeid(InterruptCond).name()); | |
1471 | } | |
1472 | } | |
1473 | }; | |
1474 | ||
1475 | } // namespace crimson::interruptible | |
1476 | ||
1477 | namespace seastar { | |
1478 | ||
1479 | template <typename InterruptCond, typename... T> | |
1480 | struct futurize<::crimson::interruptible::interruptible_future_detail< | |
1481 | InterruptCond, seastar::future<T...>>> { | |
1482 | using type = ::crimson::interruptible::interruptible_future_detail< | |
1483 | InterruptCond, seastar::future<T...>>; | |
1484 | ||
1485 | using value_type = typename type::value_type; | |
1486 | using tuple_type = typename type::tuple_type; | |
1487 | ||
1488 | static type from_tuple(tuple_type&& value) { | |
1489 | return type(ready_future_marker(), std::move(value)); | |
1490 | } | |
1491 | static type from_tuple(const tuple_type& value) { | |
1492 | return type(ready_future_marker(), value); | |
1493 | } | |
1494 | static type from_tuple(value_type&& value) { | |
1495 | return type(ready_future_marker(), std::move(value)); | |
1496 | } | |
1497 | static type from_tuple(const value_type& value) { | |
1498 | return type(ready_future_marker(), value); | |
1499 | } | |
1500 | ||
1501 | template <typename Func, typename... FuncArgs> | |
1502 | [[gnu::always_inline]] | |
1503 | static inline type invoke(Func&& func, FuncArgs&&... args) noexcept { | |
1504 | try { | |
1505 | return func(std::forward<FuncArgs>(args)...); | |
1506 | } catch (...) { | |
1507 | return make_exception_future(std::current_exception()); | |
1508 | } | |
1509 | } | |
1510 | ||
1511 | template <typename Func> | |
1512 | [[gnu::always_inline]] | |
1513 | static type invoke(Func&& func, seastar::internal::monostate) noexcept { | |
1514 | try { | |
1515 | return ::seastar::futurize_invoke(std::forward<Func>(func)); | |
1516 | } catch (...) { | |
1517 | return make_exception_future(std::current_exception()); | |
1518 | } | |
1519 | } | |
1520 | ||
1521 | template <typename Arg> | |
1522 | static inline type make_exception_future(Arg&& arg) noexcept { | |
1523 | return seastar::make_exception_future<T...>(std::forward<Arg>(arg)); | |
1524 | } | |
1525 | ||
1526 | static inline type make_exception_future(future_state_base&& state) noexcept { | |
1527 | return seastar::internal::make_exception_future<T...>(std::move(state)); | |
1528 | } | |
1529 | ||
1530 | template<typename PromiseT, typename Func> | |
1531 | static void satisfy_with_result_of(PromiseT&& pr, Func&& func) { | |
1532 | func().forward_to(std::move(pr)); | |
1533 | } | |
1534 | }; | |
1535 | ||
1536 | template <typename InterruptCond, | |
1537 | template <typename...> typename ErroratedFuture, | |
1538 | typename... T> | |
1539 | struct futurize< | |
1540 | ::crimson::interruptible::interruptible_future_detail< | |
1541 | InterruptCond, | |
1542 | ErroratedFuture<::crimson::errorated_future_marker<T...>> | |
1543 | > | |
1544 | > { | |
1545 | using type = ::crimson::interruptible::interruptible_future_detail< | |
1546 | InterruptCond, | |
1547 | ErroratedFuture<::crimson::errorated_future_marker<T...>>>; | |
1548 | using core_type = ErroratedFuture< | |
1549 | ::crimson::errorated_future_marker<T...>>; | |
1550 | using errorator_type = | |
1551 | ::crimson::interruptible::interruptible_errorator< | |
1552 | InterruptCond, | |
1553 | typename ErroratedFuture< | |
1554 | ::crimson::errorated_future_marker<T...>>::errorator_type>; | |
1555 | ||
1556 | template<typename Func, typename... FuncArgs> | |
1557 | static inline type invoke(Func&& func, FuncArgs&&... args) noexcept { | |
1558 | try { | |
1559 | return func(std::forward<FuncArgs>(args)...); | |
1560 | } catch (...) { | |
1561 | return make_exception_future(std::current_exception()); | |
1562 | } | |
1563 | } | |
1564 | ||
1565 | template <typename Func> | |
1566 | [[gnu::always_inline]] | |
1567 | static type invoke(Func&& func, seastar::internal::monostate) noexcept { | |
1568 | try { | |
1569 | return ::seastar::futurize_invoke(std::forward<Func>(func)); | |
1570 | } catch (...) { | |
1571 | return make_exception_future(std::current_exception()); | |
1572 | } | |
1573 | } | |
1574 | ||
1575 | template <typename Arg> | |
1576 | static inline type make_exception_future(Arg&& arg) noexcept { | |
1577 | return core_type::errorator_type::template make_exception_future2<T...>( | |
1578 | std::forward<Arg>(arg)); | |
1579 | } | |
1580 | ||
1581 | template<typename PromiseT, typename Func> | |
1582 | static void satisfy_with_result_of(PromiseT&& pr, Func&& func) { | |
1583 | func().forward_to(std::move(pr)); | |
1584 | } | |
1585 | ||
1586 | }; | |
1587 | ||
1588 | template <typename InterruptCond, typename FutureType> | |
1589 | struct continuation_base_from_future< | |
1590 | ::crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>> { | |
1591 | using type = typename seastar::continuation_base_from_future<FutureType>::type; | |
1592 | }; | |
1593 | ||
1e59de90 TL |
1594 | template <typename InterruptCond, typename FutureType> |
1595 | struct is_future< | |
1596 | ::crimson::interruptible::interruptible_future_detail< | |
1597 | InterruptCond, | |
1598 | FutureType>> | |
1599 | : std::true_type {}; | |
20effc67 | 1600 | } // namespace seastar |