1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
27 #include "arrow/util/async_util.h"
28 #include "arrow/util/functional.h"
29 #include "arrow/util/future.h"
30 #include "arrow/util/io_util.h"
31 #include "arrow/util/iterator.h"
32 #include "arrow/util/mutex.h"
33 #include "arrow/util/optional.h"
34 #include "arrow/util/queue.h"
35 #include "arrow/util/thread_pool.h"
39 // The methods in this file create, modify, and utilize AsyncGenerator which is an
40 // iterator of futures. This allows an asynchronous source (like file input) to be run
41 // through a pipeline in the same way that iterators can be used to create pipelined
44 // In order to support pipeline parallelism we introduce the concept of asynchronous
45 // reentrancy. This is different than synchronous reentrancy. With synchronous code a
46 // function is reentrant if the function can be called again while a previous call to that
47 // function is still running. Unless otherwise specified none of these generators are
48 // synchronously reentrant. Care should be taken to avoid calling them in such a way (and
49 // the utilities Visit/Collect/Await take care to do this).
51 // Asynchronous reentrancy on the other hand means the function is called again before the
52 // future returned by the function is marked finished (but after the call to get the
53 // future returns). Some of these generators are async-reentrant while others (e.g.
54 // those that depend on ordered processing like decompression) are not. Read the MakeXYZ
55 // function comments to determine which generators support async reentrancy.
57 // Note: Generators that are not asynchronously reentrant can still support readahead
58 // (\see MakeSerialReadaheadGenerator).
60 // Readahead operators, and some other operators, may introduce queueing. Any operators
61 // that introduce buffering should detail the amount of buffering they introduce in their
62 // MakeXYZ function comments.
64 using AsyncGenerator
= std::function
<Future
<T
>()>;
67 struct IterationTraits
<AsyncGenerator
<T
>> {
68 /// \brief by default when iterating through a sequence of AsyncGenerator<T>,
69 /// an empty function indicates the end of iteration.
70 static AsyncGenerator
<T
> End() { return AsyncGenerator
<T
>(); }
72 static bool IsEnd(const AsyncGenerator
<T
>& val
) { return !val
; }
76 Future
<T
> AsyncGeneratorEnd() {
77 return Future
<T
>::MakeFinished(IterationTraits
<T
>::End());
80 /// returning a future that completes when all have been visited
81 template <typename T
, typename Visitor
>
82 Future
<> VisitAsyncGenerator(AsyncGenerator
<T
> generator
, Visitor visitor
) {
85 Result
<ControlFlow
<>> operator()(const T
& next
) {
86 if (IsIterationEnd(next
)) {
89 auto visited
= visitor(next
);
101 Future
<ControlFlow
<>> operator()() {
102 Callback callback
{visitor
};
103 auto next
= generator();
104 return next
.Then(std::move(callback
));
107 AsyncGenerator
<T
> generator
;
111 return Loop(LoopBody
{std::move(generator
), std::move(visitor
)});
114 /// \brief Wait for an async generator to complete, discarding results.
115 template <typename T
>
116 Future
<> DiscardAllFromAsyncGenerator(AsyncGenerator
<T
> generator
) {
117 std::function
<Status(T
)> visitor
= [](const T
&) { return Status::OK(); };
118 return VisitAsyncGenerator(generator
, visitor
);
121 /// \brief Collect the results of an async generator into a vector
122 template <typename T
>
123 Future
<std::vector
<T
>> CollectAsyncGenerator(AsyncGenerator
<T
> generator
) {
124 auto vec
= std::make_shared
<std::vector
<T
>>();
126 Future
<ControlFlow
<std::vector
<T
>>> operator()() {
127 auto next
= generator_();
129 return next
.Then([vec
](const T
& result
) -> Result
<ControlFlow
<std::vector
<T
>>> {
130 if (IsIterationEnd(result
)) {
133 vec
->push_back(result
);
138 AsyncGenerator
<T
> generator_
;
139 std::shared_ptr
<std::vector
<T
>> vec_
;
141 return Loop(LoopBody
{std::move(generator
), std::move(vec
)});
144 /// \see MakeMappedGenerator
145 template <typename T
, typename V
>
146 class MappingGenerator
{
148 MappingGenerator(AsyncGenerator
<T
> source
, std::function
<Future
<V
>(const T
&)> map
)
149 : state_(std::make_shared
<State
>(std::move(source
), std::move(map
))) {}
151 Future
<V
> operator()() {
152 auto future
= Future
<V
>::Make();
155 auto guard
= state_
->mutex
.Lock();
156 if (state_
->finished
) {
157 return AsyncGeneratorEnd
<V
>();
159 should_trigger
= state_
->waiting_jobs
.empty();
160 state_
->waiting_jobs
.push_back(future
);
162 if (should_trigger
) {
163 state_
->source().AddCallback(Callback
{state_
});
170 State(AsyncGenerator
<T
> source
, std::function
<Future
<V
>(const T
&)> map
)
171 : source(std::move(source
)),
178 // This might be called by an original callback (if the source iterator fails or
179 // ends) or by a mapped callback (if the map function fails or ends prematurely).
180 // Either way it should only be called once and after finished is set so there is no
181 // need to guard access to `waiting_jobs`.
182 while (!waiting_jobs
.empty()) {
183 waiting_jobs
.front().MarkFinished(IterationTraits
<V
>::End());
184 waiting_jobs
.pop_front();
188 AsyncGenerator
<T
> source
;
189 std::function
<Future
<V
>(const T
&)> map
;
190 std::deque
<Future
<V
>> waiting_jobs
;
197 struct MappedCallback
{
198 void operator()(const Result
<V
>& maybe_next
) {
199 bool end
= !maybe_next
.ok() || IsIterationEnd(*maybe_next
);
200 bool should_purge
= false;
203 auto guard
= state
->mutex
.Lock();
204 should_purge
= !state
->finished
;
205 state
->finished
= true;
208 sink
.MarkFinished(maybe_next
);
213 std::shared_ptr
<State
> state
;
218 void operator()(const Result
<T
>& maybe_next
) {
220 bool end
= !maybe_next
.ok() || IsIterationEnd(*maybe_next
);
221 bool should_purge
= false;
224 auto guard
= state
->mutex
.Lock();
225 // A MappedCallback may have purged or be purging the queue;
226 // we shouldn't do anything here.
227 if (state
->finished
) return;
229 should_purge
= !state
->finished
;
230 state
->finished
= true;
232 sink
= state
->waiting_jobs
.front();
233 state
->waiting_jobs
.pop_front();
234 should_trigger
= !end
&& !state
->waiting_jobs
.empty();
239 if (should_trigger
) {
240 state
->source().AddCallback(Callback
{state
});
242 if (maybe_next
.ok()) {
243 const T
& val
= maybe_next
.ValueUnsafe();
244 if (IsIterationEnd(val
)) {
245 sink
.MarkFinished(IterationTraits
<V
>::End());
247 Future
<V
> mapped_fut
= state
->map(val
);
248 mapped_fut
.AddCallback(MappedCallback
{std::move(state
), std::move(sink
)});
251 sink
.MarkFinished(maybe_next
.status());
255 std::shared_ptr
<State
> state
;
258 std::shared_ptr
<State
> state_
;
261 /// \brief Create a generator that will apply the map function to each element of
262 /// source. The map function is not called on the end token.
264 /// Note: This function makes a copy of `map` for each item
265 /// Note: Errors returned from the `map` function will be propagated
267 /// If the source generator is async-reentrant then this generator will be also
268 template <typename T
, typename MapFn
,
269 typename Mapped
= detail::result_of_t
<MapFn(const T
&)>,
270 typename V
= typename EnsureFuture
<Mapped
>::type::ValueType
>
271 AsyncGenerator
<V
> MakeMappedGenerator(AsyncGenerator
<T
> source_generator
, MapFn map
) {
275 Future
<V
> operator()(const T
& val
) { return ToFuture(map_(val
)); }
278 return MappingGenerator
<T
, V
>(std::move(source_generator
), MapCallback
{std::move(map
)});
281 /// \brief Create a generator that will apply the map function to
282 /// each element of source. The map function is not called on the end
283 /// token. The result of the map function should be another
284 /// generator; all these generators will then be flattened to produce
285 /// a single stream of items.
287 /// Note: This function makes a copy of `map` for each item
288 /// Note: Errors returned from the `map` function will be propagated
290 /// If the source generator is async-reentrant then this generator will be also
291 template <typename T
, typename MapFn
,
292 typename Mapped
= detail::result_of_t
<MapFn(const T
&)>,
293 typename V
= typename EnsureFuture
<Mapped
>::type::ValueType
>
294 AsyncGenerator
<T
> MakeFlatMappedGenerator(AsyncGenerator
<T
> source_generator
, MapFn map
) {
295 return MakeConcatenatedGenerator(
296 MakeMappedGenerator(std::move(source_generator
), std::move(map
)));
299 /// \see MakeSequencingGenerator
300 template <typename T
, typename ComesAfter
, typename IsNext
>
301 class SequencingGenerator
{
303 SequencingGenerator(AsyncGenerator
<T
> source
, ComesAfter compare
, IsNext is_next
,
305 : state_(std::make_shared
<State
>(std::move(source
), std::move(compare
),
306 std::move(is_next
), std::move(initial_value
))) {}
308 Future
<T
> operator()() {
310 auto guard
= state_
->mutex
.Lock();
311 // We can send a result immediately if the top of the queue is either an
312 // error or the next item
313 if (!state_
->queue
.empty() &&
314 (!state_
->queue
.top().ok() ||
315 state_
->is_next(state_
->previous_value
, *state_
->queue
.top()))) {
316 auto result
= std::move(state_
->queue
.top());
318 state_
->previous_value
= *result
;
321 return Future
<T
>::MakeFinished(result
);
323 if (state_
->finished
) {
324 return AsyncGeneratorEnd
<T
>();
326 // The next item is not in the queue so we will need to wait
327 auto new_waiting_fut
= Future
<T
>::Make();
328 state_
->waiting_future
= new_waiting_fut
;
330 state_
->source().AddCallback(Callback
{state_
});
331 return new_waiting_fut
;
336 struct WrappedComesAfter
{
337 bool operator()(const Result
<T
>& left
, const Result
<T
>& right
) {
338 if (!left
.ok() || !right
.ok()) {
339 // Should never happen
342 return compare(*left
, *right
);
348 State(AsyncGenerator
<T
> source
, ComesAfter compare
, IsNext is_next
, T initial_value
)
349 : source(std::move(source
)),
350 is_next(std::move(is_next
)),
351 previous_value(std::move(initial_value
)),
353 queue(WrappedComesAfter
{compare
}),
357 AsyncGenerator
<T
> source
;
360 Future
<T
> waiting_future
;
361 std::priority_queue
<Result
<T
>, std::vector
<Result
<T
>>, WrappedComesAfter
> queue
;
368 explicit Callback(std::shared_ptr
<State
> state
) : state_(std::move(state
)) {}
370 void operator()(const Result
<T
> result
) {
371 Future
<T
> to_deliver
;
374 auto guard
= state_
->mutex
.Lock();
375 bool ready_to_deliver
= false;
377 // Clear any cached results
378 while (!state_
->queue
.empty()) {
381 ready_to_deliver
= true;
382 state_
->finished
= true;
383 } else if (IsIterationEnd
<T
>(result
.ValueUnsafe())) {
384 ready_to_deliver
= state_
->queue
.empty();
385 state_
->finished
= true;
387 ready_to_deliver
= state_
->is_next(state_
->previous_value
, *result
);
390 if (ready_to_deliver
&& state_
->waiting_future
.is_valid()) {
391 to_deliver
= state_
->waiting_future
;
393 state_
->previous_value
= *result
;
396 state_
->queue
.push(result
);
398 // Capture state_->finished so we can access it outside the mutex
399 finished
= state_
->finished
;
401 // Must deliver result outside of the mutex
402 if (to_deliver
.is_valid()) {
403 to_deliver
.MarkFinished(result
);
405 // Otherwise, if we didn't get the next item (or a terminal item), we
406 // need to keep looking
408 state_
->source().AddCallback(Callback
{state_
});
414 const std::shared_ptr
<State
> state_
;
417 const std::shared_ptr
<State
> state_
;
420 /// \brief Buffer an AsyncGenerator to return values in sequence order ComesAfter
421 /// and IsNext determine the sequence order.
423 /// ComesAfter should be a BinaryPredicate that only returns true if a comes after b
425 /// IsNext should be a BinaryPredicate that returns true, given `a` and `b`, only if
426 /// `b` follows immediately after `a`. It should return true given `initial_value` and
427 /// `b` if `b` is the first item in the sequence.
429 /// This operator will queue unboundedly while waiting for the next item. It is intended
430 /// for jittery sources that might scatter an ordered sequence. It is NOT intended to
431 /// sort. Using it to try and sort could result in excessive RAM usage. This generator
432 /// will queue up to N blocks where N is the max "out of order"ness of the source.
434 /// For example, if the source is 1,6,2,5,4,3 it will queue 3 blocks because 3 is 3
435 /// blocks beyond where it belongs.
437 /// This generator is not async-reentrant but it consists only of a simple log(n)
438 /// insertion into a priority queue.
439 template <typename T
, typename ComesAfter
, typename IsNext
>
440 AsyncGenerator
<T
> MakeSequencingGenerator(AsyncGenerator
<T
> source_generator
,
441 ComesAfter compare
, IsNext is_next
,
443 return SequencingGenerator
<T
, ComesAfter
, IsNext
>(
444 std::move(source_generator
), std::move(compare
), std::move(is_next
),
445 std::move(initial_value
));
448 /// \see MakeTransformedGenerator
449 template <typename T
, typename V
>
450 class TransformingGenerator
{
451 // The transforming generator state will be referenced as an async generator but will
452 // also be referenced via callback to various futures. If the async generator owner
453 // moves it around we need the state to be consistent for future callbacks.
454 struct TransformingGeneratorState
455 : std::enable_shared_from_this
<TransformingGeneratorState
> {
456 TransformingGeneratorState(AsyncGenerator
<T
> generator
, Transformer
<T
, V
> transformer
)
457 : generator_(std::move(generator
)),
458 transformer_(std::move(transformer
)),
462 Future
<V
> operator()() {
464 auto maybe_next_result
= Pump();
465 if (!maybe_next_result
.ok()) {
466 return Future
<V
>::MakeFinished(maybe_next_result
.status());
468 auto maybe_next
= std::move(maybe_next_result
).ValueUnsafe();
469 if (maybe_next
.has_value()) {
470 return Future
<V
>::MakeFinished(*std::move(maybe_next
));
473 auto next_fut
= generator_();
474 // If finished already, process results immediately inside the loop to avoid
476 if (next_fut
.is_finished()) {
477 auto next_result
= next_fut
.result();
478 if (next_result
.ok()) {
479 last_value_
= *next_result
;
481 return Future
<V
>::MakeFinished(next_result
.status());
483 // Otherwise, if not finished immediately, add callback to process results
485 auto self
= this->shared_from_this();
486 return next_fut
.Then([self
](const T
& next_result
) {
487 self
->last_value_
= next_result
;
494 // See comment on TransformingIterator::Pump
495 Result
<util::optional
<V
>> Pump() {
496 if (!finished_
&& last_value_
.has_value()) {
497 ARROW_ASSIGN_OR_RAISE(TransformFlow
<V
> next
, transformer_(*last_value_
));
498 if (next
.ReadyForNext()) {
499 if (IsIterationEnd(*last_value_
)) {
504 if (next
.Finished()) {
507 if (next
.HasValue()) {
512 return IterationTraits
<V
>::End();
514 return util::nullopt
;
517 AsyncGenerator
<T
> generator_
;
518 Transformer
<T
, V
> transformer_
;
519 util::optional
<T
> last_value_
;
524 explicit TransformingGenerator(AsyncGenerator
<T
> generator
,
525 Transformer
<T
, V
> transformer
)
526 : state_(std::make_shared
<TransformingGeneratorState
>(std::move(generator
),
527 std::move(transformer
))) {}
529 Future
<V
> operator()() { return (*state_
)(); }
532 std::shared_ptr
<TransformingGeneratorState
> state_
;
535 /// \brief Transform an async generator using a transformer function returning a new
538 /// The transform function here behaves exactly the same as the transform function in
539 /// MakeTransformedIterator and you can safely use the same transform function to
540 /// transform both synchronous and asynchronous streams.
542 /// This generator is not async-reentrant
544 /// This generator may queue up to 1 instance of T but will not delay
545 template <typename T
, typename V
>
546 AsyncGenerator
<V
> MakeTransformedGenerator(AsyncGenerator
<T
> generator
,
547 Transformer
<T
, V
> transformer
) {
548 return TransformingGenerator
<T
, V
>(generator
, transformer
);
551 /// \see MakeSerialReadaheadGenerator
552 template <typename T
>
553 class SerialReadaheadGenerator
{
555 SerialReadaheadGenerator(AsyncGenerator
<T
> source_generator
, int max_readahead
)
556 : state_(std::make_shared
<State
>(std::move(source_generator
), max_readahead
)) {}
558 Future
<T
> operator()() {
559 if (state_
->first_
) {
560 // Lazy generator, need to wait for the first ask to prime the pump
561 state_
->first_
= false;
562 auto next
= state_
->source_();
563 return next
.Then(Callback
{state_
}, ErrCallback
{state_
});
566 // This generator is not async-reentrant. We won't be called until the last
567 // future finished so we know there is something in the queue
568 auto finished
= state_
->finished_
.load();
569 if (finished
&& state_
->readahead_queue_
.IsEmpty()) {
570 return AsyncGeneratorEnd
<T
>();
573 std::shared_ptr
<Future
<T
>> next
;
574 if (!state_
->readahead_queue_
.Read(next
)) {
575 return Status::UnknownError("Could not read from readahead_queue");
578 auto last_available
= state_
->spaces_available_
.fetch_add(1);
579 if (last_available
== 0 && !finished
) {
580 // Reader idled out, we need to restart it
581 ARROW_RETURN_NOT_OK(state_
->Pump(state_
));
588 State(AsyncGenerator
<T
> source
, int max_readahead
)
590 source_(std::move(source
)),
592 // There is one extra "space" for the in-flight request
593 spaces_available_(max_readahead
+ 1),
594 // The SPSC queue has size-1 "usable" slots so we need to overallocate 1
595 readahead_queue_(max_readahead
+ 1) {}
597 Status
Pump(const std::shared_ptr
<State
>& self
) {
598 // Can't do readahead_queue.write(source().Then(...)) because then the
599 // callback might run immediately and add itself to the queue before this gets added
600 // to the queue messing up the order.
601 auto next_slot
= std::make_shared
<Future
<T
>>();
602 auto written
= readahead_queue_
.Write(next_slot
);
604 return Status::UnknownError("Could not write to readahead_queue");
606 // If this Pump is being called from a callback it is possible for the source to
607 // poll and read from the queue between the Write and this spot where we fill the
608 // value in. However, it is not possible for the future to read this value we are
609 // writing. That is because this callback (the callback for future X) must be
610 // finished before future X is marked complete and this source is not pulled
611 // reentrantly so it will not poll for future X+1 until this callback has completed.
612 *next_slot
= source_().Then(Callback
{self
}, ErrCallback
{self
});
616 // Only accessed by the consumer end
618 // Accessed by both threads
619 AsyncGenerator
<T
> source_
;
620 std::atomic
<bool> finished_
;
621 // The queue has a size but it is not atomic. We keep track of how many spaces are
622 // left in the queue here so we know if we've just written the last value and we need
623 // to stop reading ahead or if we've just read from a full queue and we need to
624 // restart reading ahead
625 std::atomic
<uint32_t> spaces_available_
;
626 // Needs to be a queue of shared_ptr and not Future because we set the value of the
627 // future after we add it to the queue
628 util::SpscQueue
<std::shared_ptr
<Future
<T
>>> readahead_queue_
;
632 Result
<T
> operator()(const T
& next
) {
633 if (IsIterationEnd(next
)) {
634 state_
->finished_
.store(true);
637 auto last_available
= state_
->spaces_available_
.fetch_sub(1);
638 if (last_available
> 1) {
639 ARROW_RETURN_NOT_OK(state_
->Pump(state_
));
644 std::shared_ptr
<State
> state_
;
648 Result
<T
> operator()(const Status
& st
) {
649 state_
->finished_
.store(true);
653 std::shared_ptr
<State
> state_
;
656 std::shared_ptr
<State
> state_
;
659 /// \see MakeFromFuture
660 template <typename T
>
661 class FutureFirstGenerator
{
663 explicit FutureFirstGenerator(Future
<AsyncGenerator
<T
>> future
)
664 : state_(std::make_shared
<State
>(std::move(future
))) {}
666 Future
<T
> operator()() {
667 if (state_
->source_
) {
668 return state_
->source_();
671 return state_
->future_
.Then([state
](const AsyncGenerator
<T
>& source
) {
672 state
->source_
= source
;
673 return state
->source_();
680 explicit State(Future
<AsyncGenerator
<T
>> future
) : future_(future
), source_() {}
682 Future
<AsyncGenerator
<T
>> future_
;
683 AsyncGenerator
<T
> source_
;
686 std::shared_ptr
<State
> state_
;
689 /// \brief Transform a Future<AsyncGenerator<T>> into an AsyncGenerator<T>
690 /// that waits for the future to complete as part of the first item.
692 /// This generator is not async-reentrant (even if the generator yielded by future is)
694 /// This generator does not queue
695 template <typename T
>
696 AsyncGenerator
<T
> MakeFromFuture(Future
<AsyncGenerator
<T
>> future
) {
697 return FutureFirstGenerator
<T
>(std::move(future
));
700 /// \brief Create a generator that will pull from the source into a queue. Unlike
701 /// MakeReadaheadGenerator this will not pull reentrantly from the source.
703 /// The source generator does not need to be async-reentrant
705 /// This generator is not async-reentrant (even if the source is)
707 /// This generator may queue up to max_readahead additional instances of T
708 template <typename T
>
709 AsyncGenerator
<T
> MakeSerialReadaheadGenerator(AsyncGenerator
<T
> source_generator
,
711 return SerialReadaheadGenerator
<T
>(std::move(source_generator
), max_readahead
);
714 /// \brief Create a generator that immediately pulls from the source
716 /// Typical generators do not pull from their source until they themselves
717 /// are pulled. This generator does not follow that convention and will call
718 /// generator() once before it returns. The returned generator will otherwise
719 /// mirror the source.
721 /// This generator forwards aysnc-reentrant pressure to the source
722 /// This generator buffers one item (the first result) until it is delivered.
723 template <typename T
>
724 AsyncGenerator
<T
> MakeAutoStartingGenerator(AsyncGenerator
<T
> generator
) {
725 struct AutostartGenerator
{
726 Future
<T
> operator()() {
727 if (first_future
->is_valid()) {
728 Future
<T
> result
= *first_future
;
729 *first_future
= Future
<T
>();
735 std::shared_ptr
<Future
<T
>> first_future
;
736 AsyncGenerator
<T
> source
;
739 std::shared_ptr
<Future
<T
>> first_future
= std::make_shared
<Future
<T
>>(generator());
740 return AutostartGenerator
{std::move(first_future
), std::move(generator
)};
743 /// \see MakeReadaheadGenerator
744 template <typename T
>
745 class ReadaheadGenerator
{
747 ReadaheadGenerator(AsyncGenerator
<T
> source_generator
, int max_readahead
)
748 : state_(std::make_shared
<State
>(std::move(source_generator
), max_readahead
)) {}
750 Future
<T
> AddMarkFinishedContinuation(Future
<T
> fut
) {
753 [state
](const T
& result
) -> Result
<T
> {
754 state
->MarkFinishedIfDone(result
);
757 [state
](const Status
& err
) -> Result
<T
> {
758 state
->finished
.store(true);
763 Future
<T
> operator()() {
764 if (state_
->readahead_queue
.empty()) {
765 // This is the first request, let's pump the underlying queue
766 for (int i
= 0; i
< state_
->max_readahead
; i
++) {
767 auto next
= state_
->source_generator();
768 auto next_after_check
= AddMarkFinishedContinuation(std::move(next
));
769 state_
->readahead_queue
.push(std::move(next_after_check
));
772 // Pop one and add one
773 auto result
= state_
->readahead_queue
.front();
774 state_
->readahead_queue
.pop();
775 if (state_
->finished
.load()) {
776 state_
->readahead_queue
.push(AsyncGeneratorEnd
<T
>());
778 auto back_of_queue
= state_
->source_generator();
779 auto back_of_queue_after_check
=
780 AddMarkFinishedContinuation(std::move(back_of_queue
));
781 state_
->readahead_queue
.push(std::move(back_of_queue_after_check
));
788 State(AsyncGenerator
<T
> source_generator
, int max_readahead
)
789 : source_generator(std::move(source_generator
)), max_readahead(max_readahead
) {
790 finished
.store(false);
793 void MarkFinishedIfDone(const T
& next_result
) {
794 if (IsIterationEnd(next_result
)) {
795 finished
.store(true);
799 AsyncGenerator
<T
> source_generator
;
801 std::atomic
<bool> finished
;
802 std::queue
<Future
<T
>> readahead_queue
;
805 std::shared_ptr
<State
> state_
;
808 /// \brief A generator where the producer pushes items on a queue.
810 /// No back-pressure is applied, so this generator is mostly useful when
811 /// producing the values is neither CPU- nor memory-expensive (e.g. fetching
812 /// filesystem metadata).
814 /// This generator is not async-reentrant.
815 template <typename T
>
816 class PushGenerator
{
818 explicit State(util::BackpressureOptions backpressure
)
819 : backpressure(std::move(backpressure
)) {}
821 void OpenBackpressureIfFreeUnlocked(util::Mutex::Guard
&& guard
) {
822 if (backpressure
.toggle
&& result_q
.size() < backpressure
.resume_if_below
) {
823 // Open might trigger callbacks so release the lock first
825 backpressure
.toggle
->Open();
829 void CloseBackpressureIfFullUnlocked() {
830 if (backpressure
.toggle
&& result_q
.size() > backpressure
.pause_if_above
) {
831 backpressure
.toggle
->Close();
835 util::BackpressureOptions backpressure
;
837 std::deque
<Result
<T
>> result_q
;
838 util::optional
<Future
<T
>> consumer_fut
;
839 bool finished
= false;
843 /// Producer API for PushGenerator
846 explicit Producer(const std::shared_ptr
<State
>& state
) : weak_state_(state
) {}
848 /// \brief Push a value on the queue
850 /// True is returned if the value was pushed, false if the generator is
851 /// already closed or destroyed. If the latter, it is recommended to stop
852 /// producing any further values.
853 bool Push(Result
<T
> result
) {
854 auto state
= weak_state_
.lock();
856 // Generator was destroyed
859 auto lock
= state
->mutex
.Lock();
860 if (state
->finished
) {
864 if (state
->consumer_fut
.has_value()) {
865 auto fut
= std::move(state
->consumer_fut
.value());
866 state
->consumer_fut
.reset();
867 lock
.Unlock(); // unlock before potentially invoking a callback
868 fut
.MarkFinished(std::move(result
));
870 state
->result_q
.push_back(std::move(result
));
871 state
->CloseBackpressureIfFullUnlocked();
876 /// \brief Tell the consumer we have finished producing
878 /// It is allowed to call this and later call Push() again ("early close").
879 /// In this case, calls to Push() after the queue is closed are silently
880 /// ignored. This can help implementing non-trivial cancellation cases.
882 /// True is returned on success, false if the generator is already closed
885 auto state
= weak_state_
.lock();
887 // Generator was destroyed
890 auto lock
= state
->mutex
.Lock();
891 if (state
->finished
) {
895 state
->finished
= true;
896 if (state
->consumer_fut
.has_value()) {
897 auto fut
= std::move(state
->consumer_fut
.value());
898 state
->consumer_fut
.reset();
899 lock
.Unlock(); // unlock before potentially invoking a callback
900 fut
.MarkFinished(IterationTraits
<T
>::End());
905 /// Return whether the generator was closed or destroyed.
906 bool is_closed() const {
907 auto state
= weak_state_
.lock();
909 // Generator was destroyed
912 auto lock
= state
->mutex
.Lock();
913 return state
->finished
;
917 const std::weak_ptr
<State
> weak_state_
;
920 explicit PushGenerator(util::BackpressureOptions backpressure
= {})
921 : state_(std::make_shared
<State
>(std::move(backpressure
))) {}
923 /// Read an item from the queue
924 Future
<T
> operator()() const {
925 auto lock
= state_
->mutex
.Lock();
926 assert(!state_
->consumer_fut
.has_value()); // Non-reentrant
927 if (!state_
->result_q
.empty()) {
928 auto fut
= Future
<T
>::MakeFinished(std::move(state_
->result_q
.front()));
929 state_
->result_q
.pop_front();
930 state_
->OpenBackpressureIfFreeUnlocked(std::move(lock
));
933 if (state_
->finished
) {
934 return AsyncGeneratorEnd
<T
>();
936 auto fut
= Future
<T
>::Make();
937 state_
->consumer_fut
= fut
;
941 /// \brief Return producer-side interface
943 /// The returned object must be used by the producer to push values on the queue.
944 /// Only a single Producer object should be instantiated.
945 Producer
producer() { return Producer
{state_
}; }
948 const std::shared_ptr
<State
> state_
;
951 /// \brief Create a generator that pulls reentrantly from a source
952 /// This generator will pull reentrantly from a source, ensuring that max_readahead
953 /// requests are active at any given time.
955 /// The source generator must be async-reentrant
957 /// This generator itself is async-reentrant.
959 /// This generator may queue up to max_readahead instances of T
960 template <typename T
>
961 AsyncGenerator
<T
> MakeReadaheadGenerator(AsyncGenerator
<T
> source_generator
,
963 return ReadaheadGenerator
<T
>(std::move(source_generator
), max_readahead
);
966 /// \brief Creates a generator that will yield finished futures from a vector
968 /// This generator is async-reentrant
969 template <typename T
>
970 AsyncGenerator
<T
> MakeVectorGenerator(std::vector
<T
> vec
) {
972 explicit State(std::vector
<T
> vec_
) : vec(std::move(vec_
)), vec_idx(0) {}
975 std::atomic
<std::size_t> vec_idx
;
978 auto state
= std::make_shared
<State
>(std::move(vec
));
980 auto idx
= state
->vec_idx
.fetch_add(1);
981 if (idx
>= state
->vec
.size()) {
982 // Eagerly return memory
984 return AsyncGeneratorEnd
<T
>();
986 return Future
<T
>::MakeFinished(state
->vec
[idx
]);
990 /// \see MakeMergedGenerator
991 template <typename T
>
992 class MergedGenerator
{
994 explicit MergedGenerator(AsyncGenerator
<AsyncGenerator
<T
>> source
,
995 int max_subscriptions
)
996 : state_(std::make_shared
<State
>(std::move(source
), max_subscriptions
)) {}
998 Future
<T
> operator()() {
999 Future
<T
> waiting_future
;
1000 std::shared_ptr
<DeliveredJob
> delivered_job
;
1002 auto guard
= state_
->mutex
.Lock();
1003 if (!state_
->delivered_jobs
.empty()) {
1004 delivered_job
= std::move(state_
->delivered_jobs
.front());
1005 state_
->delivered_jobs
.pop_front();
1006 } else if (state_
->finished
) {
1007 return IterationTraits
<T
>::End();
1009 waiting_future
= Future
<T
>::Make();
1010 state_
->waiting_jobs
.push_back(std::make_shared
<Future
<T
>>(waiting_future
));
1013 if (delivered_job
) {
1014 // deliverer will be invalid if outer callback encounters an error and delivers a
1016 if (delivered_job
->deliverer
) {
1017 delivered_job
->deliverer().AddCallback(
1018 InnerCallback
{state_
, delivered_job
->index
});
1020 return std::move(delivered_job
->value
);
1022 if (state_
->first
) {
1023 state_
->first
= false;
1024 for (std::size_t i
= 0; i
< state_
->active_subscriptions
.size(); i
++) {
1025 state_
->PullSource().AddCallback(OuterCallback
{state_
, i
});
1028 return waiting_future
;
1032 struct DeliveredJob
{
1033 explicit DeliveredJob(AsyncGenerator
<T
> deliverer_
, Result
<T
> value_
,
1035 : deliverer(deliverer_
), value(std::move(value_
)), index(index_
) {}
1037 AsyncGenerator
<T
> deliverer
;
1043 State(AsyncGenerator
<AsyncGenerator
<T
>> source
, int max_subscriptions
)
1044 : source(std::move(source
)),
1045 active_subscriptions(max_subscriptions
),
1050 source_exhausted(false),
1052 num_active_subscriptions(max_subscriptions
) {}
1054 Future
<AsyncGenerator
<T
>> PullSource() {
1055 // Need to guard access to source() so we don't pull sync-reentrantly which
1057 auto lock
= mutex
.Lock();
1061 AsyncGenerator
<AsyncGenerator
<T
>> source
;
1062 // active_subscriptions and delivered_jobs will be bounded by max_subscriptions
1063 std::vector
<AsyncGenerator
<T
>> active_subscriptions
;
1064 std::deque
<std::shared_ptr
<DeliveredJob
>> delivered_jobs
;
1065 // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will provide the
1067 std::deque
<std::shared_ptr
<Future
<T
>>> waiting_jobs
;
1070 bool source_exhausted
;
1072 int num_active_subscriptions
;
1075 struct InnerCallback
{
1076 void operator()(const Result
<T
>& maybe_next_ref
) {
1078 const Result
<T
>* maybe_next
= &maybe_next_ref
;
1082 bool sub_finished
= maybe_next
->ok() && IsIterationEnd(**maybe_next
);
1084 auto guard
= state
->mutex
.Lock();
1085 if (state
->finished
) {
1086 // We've errored out so just ignore this result and don't keep pumping
1089 if (!sub_finished
) {
1090 if (state
->waiting_jobs
.empty()) {
1091 state
->delivered_jobs
.push_back(std::make_shared
<DeliveredJob
>(
1092 state
->active_subscriptions
[index
], *maybe_next
, index
));
1094 sink
= std::move(*state
->waiting_jobs
.front());
1095 state
->waiting_jobs
.pop_front();
1100 state
->PullSource().AddCallback(OuterCallback
{state
, index
});
1101 } else if (sink
.is_valid()) {
1102 sink
.MarkFinished(*maybe_next
);
1103 if (!maybe_next
->ok()) return;
1105 next_fut
= state
->active_subscriptions
[index
]();
1106 if (next_fut
.TryAddCallback([this]() { return *this; })) {
1109 // Already completed. Avoid very deep recursion by looping
1110 // here instead of relying on the callback.
1111 maybe_next
= &next_fut
.result();
1117 std::shared_ptr
<State
> state
;
1121 struct OuterCallback
{
1122 void operator()(const Result
<AsyncGenerator
<T
>>& maybe_next
) {
1123 bool should_purge
= false;
1124 bool should_continue
= false;
1125 Future
<T
> error_sink
;
1127 auto guard
= state
->mutex
.Lock();
1128 if (!maybe_next
.ok() || IsIterationEnd(*maybe_next
)) {
1129 state
->source_exhausted
= true;
1130 if (!maybe_next
.ok() || --state
->num_active_subscriptions
== 0) {
1131 state
->finished
= true;
1132 should_purge
= true;
1134 if (!maybe_next
.ok()) {
1135 if (state
->waiting_jobs
.empty()) {
1136 state
->delivered_jobs
.push_back(std::make_shared
<DeliveredJob
>(
1137 AsyncGenerator
<T
>(), maybe_next
.status(), index
));
1139 error_sink
= std::move(*state
->waiting_jobs
.front());
1140 state
->waiting_jobs
.pop_front();
1144 state
->active_subscriptions
[index
] = *maybe_next
;
1145 should_continue
= true;
1148 if (error_sink
.is_valid()) {
1149 error_sink
.MarkFinished(maybe_next
.status());
1151 if (should_continue
) {
1152 (*maybe_next
)().AddCallback(InnerCallback
{state
, index
});
1153 } else if (should_purge
) {
1154 // At this point state->finished has been marked true so no one else
1155 // will be interacting with waiting_jobs and we can iterate outside lock
1156 while (!state
->waiting_jobs
.empty()) {
1157 state
->waiting_jobs
.front()->MarkFinished(IterationTraits
<T
>::End());
1158 state
->waiting_jobs
.pop_front();
1162 std::shared_ptr
<State
> state
;
1166 std::shared_ptr
<State
> state_
;
1169 /// \brief Create a generator that takes in a stream of generators and pulls from up to
1170 /// max_subscriptions at a time
1172 /// Note: This may deliver items out of sequence. For example, items from the third
1173 /// AsyncGenerator generated by the source may be emitted before some items from the first
1174 /// AsyncGenerator generated by the source.
1176 /// This generator will pull from source async-reentrantly unless max_subscriptions is 1
1177 /// This generator will not pull from the individual subscriptions reentrantly. Add
1178 /// readahead to the individual subscriptions if that is desired.
1179 /// This generator is async-reentrant
1181 /// This generator may queue up to max_subscriptions instances of T
1182 template <typename T
>
1183 AsyncGenerator
<T
> MakeMergedGenerator(AsyncGenerator
<AsyncGenerator
<T
>> source
,
1184 int max_subscriptions
) {
1185 return MergedGenerator
<T
>(std::move(source
), max_subscriptions
);
1188 template <typename T
>
1189 Result
<AsyncGenerator
<T
>> MakeSequencedMergedGenerator(
1190 AsyncGenerator
<AsyncGenerator
<T
>> source
, int max_subscriptions
) {
1191 if (max_subscriptions
< 0) {
1192 return Status::Invalid("max_subscriptions must be a positive integer");
1194 if (max_subscriptions
== 1) {
1195 return Status::Invalid("Use MakeConcatenatedGenerator if max_subscriptions is 1");
1197 AsyncGenerator
<AsyncGenerator
<T
>> autostarting_source
= MakeMappedGenerator(
1199 [](const AsyncGenerator
<T
>& sub
) { return MakeAutoStartingGenerator(sub
); });
1200 AsyncGenerator
<AsyncGenerator
<T
>> sub_readahead
=
1201 MakeSerialReadaheadGenerator(std::move(autostarting_source
), max_subscriptions
- 1);
1202 return MakeConcatenatedGenerator(std::move(sub_readahead
));
1205 /// \brief Create a generator that takes in a stream of generators and pulls from each
1206 /// one in sequence.
1208 /// This generator is async-reentrant but will never pull from source reentrantly and
1209 /// will never pull from any subscription reentrantly.
1211 /// This generator may queue 1 instance of T
1213 /// TODO: Could potentially make a bespoke implementation instead of MergedGenerator that
1214 /// forwards async-reentrant requests instead of buffering them (which is what
1215 /// MergedGenerator does)
1216 template <typename T
>
1217 AsyncGenerator
<T
> MakeConcatenatedGenerator(AsyncGenerator
<AsyncGenerator
<T
>> source
) {
1218 return MergedGenerator
<T
>(std::move(source
), 1);
1221 template <typename T
>
1228 template <typename T
>
1229 struct IterationTraits
<Enumerated
<T
>> {
1230 static Enumerated
<T
> End() { return Enumerated
<T
>{IterationEnd
<T
>(), -1, false}; }
1231 static bool IsEnd(const Enumerated
<T
>& val
) { return val
.index
< 0; }
1234 /// \see MakeEnumeratedGenerator
1235 template <typename T
>
1236 class EnumeratingGenerator
{
1238 EnumeratingGenerator(AsyncGenerator
<T
> source
, T initial_value
)
1239 : state_(std::make_shared
<State
>(std::move(source
), std::move(initial_value
))) {}
1241 Future
<Enumerated
<T
>> operator()() {
1242 if (state_
->finished
) {
1243 return AsyncGeneratorEnd
<Enumerated
<T
>>();
1245 auto state
= state_
;
1246 return state
->source().Then([state
](const T
& next
) {
1247 auto finished
= IsIterationEnd
<T
>(next
);
1248 auto prev
= Enumerated
<T
>{state
->prev_value
, state
->prev_index
, finished
};
1249 state
->prev_value
= next
;
1250 state
->prev_index
++;
1251 state
->finished
= finished
;
1259 State(AsyncGenerator
<T
> source
, T initial_value
)
1260 : source(std::move(source
)), prev_value(std::move(initial_value
)), prev_index(0) {
1261 finished
= IsIterationEnd
<T
>(prev_value
);
1264 AsyncGenerator
<T
> source
;
1270 std::shared_ptr
<State
> state_
;
1273 /// Wrap items from a source generator with positional information
1275 /// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be
1276 /// processed in a "first-available" fashion and later resequenced which can reduce the
1277 /// impact of sources with erratic performance (e.g. a filesystem where some items may
1278 /// take longer to read than others).
1280 /// TODO(ARROW-12371) Would require this generator be async-reentrant
1282 /// \see MakeSequencingGenerator for an example of putting items back in order
1284 /// This generator is not async-reentrant
1286 /// This generator buffers one item (so it knows which item is the last item)
1287 template <typename T
>
1288 AsyncGenerator
<Enumerated
<T
>> MakeEnumeratedGenerator(AsyncGenerator
<T
> source
) {
1289 return FutureFirstGenerator
<Enumerated
<T
>>(
1290 source().Then([source
](const T
& initial_value
) -> AsyncGenerator
<Enumerated
<T
>> {
1291 return EnumeratingGenerator
<T
>(std::move(source
), initial_value
);
1295 /// \see MakeTransferredGenerator
1296 template <typename T
>
1297 class TransferringGenerator
{
1299 explicit TransferringGenerator(AsyncGenerator
<T
> source
, internal::Executor
* executor
)
1300 : source_(std::move(source
)), executor_(executor
) {}
1302 Future
<T
> operator()() { return executor_
->Transfer(source_()); }
1305 AsyncGenerator
<T
> source_
;
1306 internal::Executor
* executor_
;
1309 /// \brief Transfer a future to an underlying executor.
1311 /// Continuations run on the returned future will be run on the given executor
1312 /// if they cannot be run synchronously.
1314 /// This is often needed to move computation off I/O threads or other external
1315 /// completion sources and back on to the CPU executor so the I/O thread can
1316 /// stay busy and focused on I/O
1318 /// Keep in mind that continuations called on an already completed future will
1319 /// always be run synchronously and so no transfer will happen in that case.
1321 /// This generator is async reentrant if the source is
1323 /// This generator will not queue
1324 template <typename T
>
1325 AsyncGenerator
<T
> MakeTransferredGenerator(AsyncGenerator
<T
> source
,
1326 internal::Executor
* executor
) {
1327 return TransferringGenerator
<T
>(std::move(source
), executor
);
1330 /// \see MakeBackgroundGenerator
1331 template <typename T
>
1332 class BackgroundGenerator
{
1334 explicit BackgroundGenerator(Iterator
<T
> it
, internal::Executor
* io_executor
, int max_q
,
1336 : state_(std::make_shared
<State
>(io_executor
, std::move(it
), max_q
, q_restart
)),
1337 cleanup_(std::make_shared
<Cleanup
>(state_
.get())) {}
1339 Future
<T
> operator()() {
1340 auto guard
= state_
->mutex
.Lock();
1341 Future
<T
> waiting_future
;
1342 if (state_
->queue
.empty()) {
1343 if (state_
->finished
) {
1344 return AsyncGeneratorEnd
<T
>();
1346 waiting_future
= Future
<T
>::Make();
1347 state_
->waiting_future
= waiting_future
;
1350 auto next
= Future
<T
>::MakeFinished(std::move(state_
->queue
.front()));
1351 state_
->queue
.pop();
1352 if (state_
->NeedsRestart()) {
1353 return state_
->RestartTask(state_
, std::move(guard
), std::move(next
));
1357 // This should only trigger the very first time this method is called
1358 if (state_
->NeedsRestart()) {
1359 return state_
->RestartTask(state_
, std::move(guard
), std::move(waiting_future
));
1361 return waiting_future
;
1365 static constexpr uint64_t kUnlikelyThreadId
{std::numeric_limits
<uint64_t>::max()};
1368 State(internal::Executor
* io_executor
, Iterator
<T
> it
, int max_q
, int q_restart
)
1369 : io_executor(io_executor
),
1371 q_restart(q_restart
),
1375 should_shutdown(false) {}
1378 while (!queue
.empty()) {
1383 bool TaskIsRunning() const { return task_finished
.is_valid(); }
1385 bool NeedsRestart() const {
1386 return !finished
&& !reading
&& static_cast<int>(queue
.size()) <= q_restart
;
1389 void DoRestartTask(std::shared_ptr
<State
> state
, util::Mutex::Guard guard
) {
1390 // If we get here we are actually going to start a new task so let's create a
1391 // task_finished future for it
1392 state
->task_finished
= Future
<>::Make();
1393 state
->reading
= true;
1394 auto spawn_status
= io_executor
->Spawn(
1395 [state
]() { BackgroundGenerator::WorkerTask(std::move(state
)); });
1396 if (!spawn_status
.ok()) {
1397 // If we can't spawn a new task then send an error to the consumer (either via a
1398 // waiting future or the queue) and mark ourselves finished
1399 state
->finished
= true;
1400 state
->task_finished
= Future
<>();
1401 if (waiting_future
.has_value()) {
1402 auto to_deliver
= std::move(waiting_future
.value());
1403 waiting_future
.reset();
1405 to_deliver
.MarkFinished(spawn_status
);
1408 queue
.push(spawn_status
);
1413 Future
<T
> RestartTask(std::shared_ptr
<State
> state
, util::Mutex::Guard guard
,
1415 if (TaskIsRunning()) {
1416 // If the task is still cleaning up we need to wait for it to finish before
1417 // restarting. We also want to block the consumer until we've restarted the
1418 // reader to avoid multiple restarts
1419 return task_finished
.Then([state
, next
]() {
1420 // This may appear dangerous (recursive mutex) but we should be guaranteed the
1421 // outer guard has been released by this point. We know...
1422 // * task_finished is not already finished (it would be invalid in that case)
1423 // * task_finished will not be marked complete until we've given up the mutex
1424 auto guard_
= state
->mutex
.Lock();
1425 state
->DoRestartTask(state
, std::move(guard_
));
1429 // Otherwise we can restart immediately
1430 DoRestartTask(std::move(state
), std::move(guard
));
1434 internal::Executor
* io_executor
;
1436 const int q_restart
;
1438 std::atomic
<uint64_t> worker_thread_id
{kUnlikelyThreadId
};
1440 // If true, the task is actively pumping items from the queue and does not need a
1443 // Set to true when a terminal item arrives
1445 // Signal to the background task to end early because consumers have given up on it
1446 bool should_shutdown
;
1447 // If the queue is empty, the consumer will create a waiting future and wait for it
1448 std::queue
<Result
<T
>> queue
;
1449 util::optional
<Future
<T
>> waiting_future
;
1450 // Every background task is given a future to complete when it is entirely finished
1451 // processing and ready for the next task to start or for State to be destroyed
1452 Future
<> task_finished
;
1456 // Cleanup task that will be run when all consumer references to the generator are lost
1458 explicit Cleanup(State
* state
) : state(state
) {}
1460 /// TODO: Once ARROW-13109 is available then we can be force consumers to spawn and
1461 /// there is no need to perform this check.
1463 /// It's a deadlock if we enter cleanup from
1464 /// the worker thread but it can happen if the consumer doesn't transfer away
1465 assert(state
->worker_thread_id
.load() != ::arrow::internal::GetThreadId());
1466 Future
<> finish_fut
;
1468 auto lock
= state
->mutex
.Lock();
1469 if (!state
->TaskIsRunning()) {
1472 // Signal the current task to stop and wait for it to finish
1473 state
->should_shutdown
= true;
1474 finish_fut
= state
->task_finished
;
1476 // Using future as a condition variable here
1477 Status st
= finish_fut
.status();
1483 static void WorkerTask(std::shared_ptr
<State
> state
) {
1484 state
->worker_thread_id
.store(::arrow::internal::GetThreadId());
1485 // We need to capture the state to read while outside the mutex
1486 bool reading
= true;
1488 auto next
= state
->it
.Next();
1489 // Need to capture state->waiting_future inside the mutex to mark finished outside
1490 Future
<T
> waiting_future
;
1492 auto guard
= state
->mutex
.Lock();
1494 if (state
->should_shutdown
) {
1495 state
->finished
= true;
1499 if (!next
.ok() || IsIterationEnd
<T
>(*next
)) {
1500 // Terminal item. Mark finished to true, send this last item, and quit
1501 state
->finished
= true;
1503 state
->ClearQueue();
1506 // At this point we are going to send an item. Either we will add it to the
1507 // queue or deliver it to a waiting future.
1508 if (state
->waiting_future
.has_value()) {
1509 waiting_future
= std::move(state
->waiting_future
.value());
1510 state
->waiting_future
.reset();
1512 state
->queue
.push(std::move(next
));
1513 // We just filled up the queue so it is time to quit. We may need to notify
1514 // a cleanup task so we transition to Quitting
1515 if (static_cast<int>(state
->queue
.size()) >= state
->max_q
) {
1516 state
->reading
= false;
1519 reading
= state
->reading
&& !state
->finished
;
1521 // This should happen outside the mutex. Presumably there is a
1522 // transferring generator on the other end that will quickly transfer any
1523 // callbacks off of this thread so we can continue looping. Still, best not to
1525 if (waiting_future
.is_valid()) {
1526 waiting_future
.MarkFinished(next
);
1529 // Once we've sent our last item we can notify any waiters that we are done and so
1530 // either state can be cleaned up or a new background task can be started
1531 Future
<> task_finished
;
1533 auto guard
= state
->mutex
.Lock();
1534 // After we give up the mutex state can be safely deleted. We will no longer
1535 // reference it. We can safely transition to idle now.
1536 task_finished
= state
->task_finished
;
1537 state
->task_finished
= Future
<>();
1538 state
->worker_thread_id
.store(kUnlikelyThreadId
);
1540 task_finished
.MarkFinished();
1543 std::shared_ptr
<State
> state_
;
1544 // state_ is held by both the generator and the background thread so it won't be cleaned
1545 // up when all consumer references are relinquished. cleanup_ is only held by the
1546 // generator so it will be destructed when the last consumer reference is gone. We use
1547 // this to cleanup / stop the background generator in case the consuming end stops
1548 // listening (e.g. due to a downstream error)
1549 std::shared_ptr
<Cleanup
> cleanup_
;
1552 constexpr int kDefaultBackgroundMaxQ
= 32;
1553 constexpr int kDefaultBackgroundQRestart
= 16;
1555 /// \brief Create an AsyncGenerator<T> by iterating over an Iterator<T> on a background
1558 /// The parameter max_q and q_restart control queue size and background thread task
1559 /// management. If the background task is fast you typically don't want it creating a
1560 /// thread task for every item. Instead the background thread will run until it fills
1561 /// up a readahead queue.
1563 /// Once the queue has filled up the background thread task will terminate (allowing other
1564 /// I/O tasks to use the thread). Once the queue has been drained enough (specified by
1565 /// q_restart) then the background thread task will be restarted. If q_restart is too low
1566 /// then you may exhaust the queue waiting for the background thread task to start running
1567 /// again. If it is too high then it will be constantly stopping and restarting the
1568 /// background queue task
1570 /// The "background thread" is a logical thread and will run as tasks on the io_executor.
1571 /// This thread may stop and start when the queue fills up but there will only be one
1572 /// active background thread task at any given time. You MUST transfer away from this
1573 /// background generator. Otherwise there could be a race condition if a callback on the
1574 /// background thread deletes the last consumer reference to the background generator. You
1575 /// can transfer onto the same executor as the background thread, it is only neccesary to
1576 /// create a new thread task, not to switch executors.
1578 /// This generator is not async-reentrant
1580 /// This generator will queue up to max_q blocks
1581 template <typename T
>
1582 static Result
<AsyncGenerator
<T
>> MakeBackgroundGenerator(
1583 Iterator
<T
> iterator
, internal::Executor
* io_executor
,
1584 int max_q
= kDefaultBackgroundMaxQ
, int q_restart
= kDefaultBackgroundQRestart
) {
1585 if (max_q
< q_restart
) {
1586 return Status::Invalid("max_q must be >= q_restart");
1588 return BackgroundGenerator
<T
>(std::move(iterator
), io_executor
, max_q
, q_restart
);
1591 /// \see MakeGeneratorIterator
1592 template <typename T
>
1593 class GeneratorIterator
{
1595 explicit GeneratorIterator(AsyncGenerator
<T
> source
) : source_(std::move(source
)) {}
1597 Result
<T
> Next() { return source_().result(); }
1600 AsyncGenerator
<T
> source_
;
1603 /// \brief Convert an AsyncGenerator<T> to an Iterator<T> which blocks until each future
1605 template <typename T
>
1606 Iterator
<T
> MakeGeneratorIterator(AsyncGenerator
<T
> source
) {
1607 return Iterator
<T
>(GeneratorIterator
<T
>(std::move(source
)));
1610 /// \brief Add readahead to an iterator using a background thread.
1612 /// Under the hood this is converting the iterator to a generator using
1613 /// MakeBackgroundGenerator, adding readahead to the converted generator with
1614 /// MakeReadaheadGenerator, and then converting back to an iterator using
1615 /// MakeGeneratorIterator.
1616 template <typename T
>
1617 Result
<Iterator
<T
>> MakeReadaheadIterator(Iterator
<T
> it
, int readahead_queue_size
) {
1618 ARROW_ASSIGN_OR_RAISE(auto io_executor
, internal::ThreadPool::Make(1));
1619 auto max_q
= readahead_queue_size
;
1620 auto q_restart
= std::max(1, max_q
/ 2);
1621 ARROW_ASSIGN_OR_RAISE(
1622 auto background_generator
,
1623 MakeBackgroundGenerator(std::move(it
), io_executor
.get(), max_q
, q_restart
));
1624 // Capture io_executor to keep it alive as long as owned_bg_generator is still
1626 AsyncGenerator
<T
> owned_bg_generator
= [io_executor
, background_generator
]() {
1627 return background_generator();
1629 return MakeGeneratorIterator(std::move(owned_bg_generator
));
1632 /// \brief Make a generator that returns a single pre-generated future
1634 /// This generator is async-reentrant.
1635 template <typename T
>
1636 std::function
<Future
<T
>()> MakeSingleFutureGenerator(Future
<T
> future
) {
1637 assert(future
.is_valid());
1638 auto state
= std::make_shared
<Future
<T
>>(std::move(future
));
1639 return [state
]() -> Future
<T
> {
1640 auto fut
= std::move(*state
);
1641 if (fut
.is_valid()) {
1644 return AsyncGeneratorEnd
<T
>();
1649 /// \brief Make a generator that immediately ends.
1651 /// This generator is async-reentrant.
1652 template <typename T
>
1653 std::function
<Future
<T
>()> MakeEmptyGenerator() {
1654 return []() -> Future
<T
> { return AsyncGeneratorEnd
<T
>(); };
1657 /// \brief Make a generator that always fails with a given error
1659 /// This generator is async-reentrant.
1660 template <typename T
>
1661 AsyncGenerator
<T
> MakeFailingGenerator(Status st
) {
1663 auto state
= std::make_shared
<Status
>(std::move(st
));
1664 return [state
]() -> Future
<T
> {
1665 auto st
= std::move(*state
);
1667 return std::move(st
);
1669 return AsyncGeneratorEnd
<T
>();
1674 /// \brief Make a generator that always fails with a given error
1676 /// This overload allows inferring the return type from the argument.
1677 template <typename T
>
1678 AsyncGenerator
<T
> MakeFailingGenerator(const Result
<T
>& result
) {
1679 return MakeFailingGenerator
<T
>(result
.status());
1682 /// \brief Prepend initial_values onto a generator
1684 /// This generator is async-reentrant but will buffer requests and will not
1685 /// pull from following_values async-reentrantly.
1686 template <typename T
>
1687 AsyncGenerator
<T
> MakeGeneratorStartsWith(std::vector
<T
> initial_values
,
1688 AsyncGenerator
<T
> following_values
) {
1689 auto initial_values_vec_gen
= MakeVectorGenerator(std::move(initial_values
));
1690 auto gen_gen
= MakeVectorGenerator
<AsyncGenerator
<T
>>(
1691 {std::move(initial_values_vec_gen
), std::move(following_values
)});
1692 return MakeConcatenatedGenerator(std::move(gen_gen
));
1695 template <typename T
>
1696 struct CancellableGenerator
{
1697 Future
<T
> operator()() {
1698 if (stop_token
.IsStopRequested()) {
1699 return stop_token
.Poll();
1704 AsyncGenerator
<T
> source
;
1705 StopToken stop_token
;
1708 /// \brief Allow an async generator to be cancelled
1710 /// This generator is async-reentrant
1711 template <typename T
>
1712 AsyncGenerator
<T
> MakeCancellable(AsyncGenerator
<T
> source
, StopToken stop_token
) {
1713 return CancellableGenerator
<T
>{std::move(source
), std::move(stop_token
)};
1716 template <typename T
>
1717 struct PauseableGenerator
{
1719 PauseableGenerator(AsyncGenerator
<T
> source
, std::shared_ptr
<util::AsyncToggle
> toggle
)
1720 : state_(std::make_shared
<PauseableGeneratorState
>(std::move(source
),
1721 std::move(toggle
))) {}
1723 Future
<T
> operator()() { return (*state_
)(); }
1726 struct PauseableGeneratorState
1727 : public std::enable_shared_from_this
<PauseableGeneratorState
> {
1728 PauseableGeneratorState(AsyncGenerator
<T
> source
,
1729 std::shared_ptr
<util::AsyncToggle
> toggle
)
1730 : source_(std::move(source
)), toggle_(std::move(toggle
)) {}
1732 Future
<T
> operator()() {
1733 std::shared_ptr
<PauseableGeneratorState
> self
= this->shared_from_this();
1734 return toggle_
->WhenOpen().Then([self
] {
1735 util::Mutex::Guard guard
= self
->mutex_
.Lock();
1736 return self
->source_();
1740 AsyncGenerator
<T
> source_
;
1741 std::shared_ptr
<util::AsyncToggle
> toggle_
;
1744 std::shared_ptr
<PauseableGeneratorState
> state_
;
1747 /// \brief Allow an async generator to be paused
1749 /// This generator is NOT async-reentrant and calling it in an async-reentrant fashion
1750 /// may lead to items getting reordered (and potentially truncated if the end token is
1751 /// reordered ahead of valid items)
1753 /// This generator forwards async-reentrant pressure
1754 template <typename T
>
1755 AsyncGenerator
<T
> MakePauseable(AsyncGenerator
<T
> source
,
1756 std::shared_ptr
<util::AsyncToggle
> toggle
) {
1757 return PauseableGenerator
<T
>(std::move(source
), std::move(toggle
));
1760 template <typename T
>
1761 class DefaultIfEmptyGenerator
{
1763 DefaultIfEmptyGenerator(AsyncGenerator
<T
> source
, T or_value
)
1764 : state_(std::make_shared
<State
>(std::move(source
), std::move(or_value
))) {}
1766 Future
<T
> operator()() {
1767 if (state_
->first
) {
1768 state_
->first
= false;
1772 Result
<T
> operator()(const T
& value
) {
1773 if (IterationTraits
<T
>::IsEnd(value
)) {
1774 return std::move(or_value
);
1779 Continuation
.or_value
= std::move(state_
->or_value
);
1780 return state_
->source().Then(std::move(Continuation
));
1782 return state_
->source();
1787 AsyncGenerator
<T
> source
;
1790 State(AsyncGenerator
<T
> source_
, T or_value_
)
1791 : source(std::move(source_
)), or_value(std::move(or_value_
)), first(true) {}
1793 std::shared_ptr
<State
> state_
;
1796 /// \brief If the generator is empty, return the given value, else
1797 /// forward the values from the generator.
1799 /// This generator is async-reentrant.
1800 template <typename T
>
1801 AsyncGenerator
<T
> MakeDefaultIfEmptyGenerator(AsyncGenerator
<T
> source
, T or_value
) {
1802 return DefaultIfEmptyGenerator
<T
>(std::move(source
), std::move(or_value
));
1804 } // namespace arrow