]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/util/async_generator.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / util / async_generator.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #pragma once
19
20 #include <atomic>
21 #include <cassert>
22 #include <cstring>
23 #include <deque>
24 #include <limits>
25 #include <queue>
26
27 #include "arrow/util/async_util.h"
28 #include "arrow/util/functional.h"
29 #include "arrow/util/future.h"
30 #include "arrow/util/io_util.h"
31 #include "arrow/util/iterator.h"
32 #include "arrow/util/mutex.h"
33 #include "arrow/util/optional.h"
34 #include "arrow/util/queue.h"
35 #include "arrow/util/thread_pool.h"
36
37 namespace arrow {
38
39 // The methods in this file create, modify, and utilize AsyncGenerator which is an
40 // iterator of futures. This allows an asynchronous source (like file input) to be run
41 // through a pipeline in the same way that iterators can be used to create pipelined
42 // workflows.
43 //
44 // In order to support pipeline parallelism we introduce the concept of asynchronous
45 // reentrancy. This is different than synchronous reentrancy. With synchronous code a
46 // function is reentrant if the function can be called again while a previous call to that
47 // function is still running. Unless otherwise specified none of these generators are
48 // synchronously reentrant. Care should be taken to avoid calling them in such a way (and
49 // the utilities Visit/Collect/Await take care to do this).
50 //
51 // Asynchronous reentrancy on the other hand means the function is called again before the
52 // future returned by the function is marked finished (but after the call to get the
53 // future returns). Some of these generators are async-reentrant while others (e.g.
54 // those that depend on ordered processing like decompression) are not. Read the MakeXYZ
55 // function comments to determine which generators support async reentrancy.
56 //
57 // Note: Generators that are not asynchronously reentrant can still support readahead
58 // (\see MakeSerialReadaheadGenerator).
59 //
60 // Readahead operators, and some other operators, may introduce queueing. Any operators
61 // that introduce buffering should detail the amount of buffering they introduce in their
62 // MakeXYZ function comments.
63 template <typename T>
64 using AsyncGenerator = std::function<Future<T>()>;
65
66 template <typename T>
67 struct IterationTraits<AsyncGenerator<T>> {
68 /// \brief by default when iterating through a sequence of AsyncGenerator<T>,
69 /// an empty function indicates the end of iteration.
70 static AsyncGenerator<T> End() { return AsyncGenerator<T>(); }
71
72 static bool IsEnd(const AsyncGenerator<T>& val) { return !val; }
73 };
74
75 template <typename T>
76 Future<T> AsyncGeneratorEnd() {
77 return Future<T>::MakeFinished(IterationTraits<T>::End());
78 }
79
80 /// returning a future that completes when all have been visited
81 template <typename T, typename Visitor>
82 Future<> VisitAsyncGenerator(AsyncGenerator<T> generator, Visitor visitor) {
83 struct LoopBody {
84 struct Callback {
85 Result<ControlFlow<>> operator()(const T& next) {
86 if (IsIterationEnd(next)) {
87 return Break();
88 } else {
89 auto visited = visitor(next);
90 if (visited.ok()) {
91 return Continue();
92 } else {
93 return visited;
94 }
95 }
96 }
97
98 Visitor visitor;
99 };
100
101 Future<ControlFlow<>> operator()() {
102 Callback callback{visitor};
103 auto next = generator();
104 return next.Then(std::move(callback));
105 }
106
107 AsyncGenerator<T> generator;
108 Visitor visitor;
109 };
110
111 return Loop(LoopBody{std::move(generator), std::move(visitor)});
112 }
113
114 /// \brief Wait for an async generator to complete, discarding results.
115 template <typename T>
116 Future<> DiscardAllFromAsyncGenerator(AsyncGenerator<T> generator) {
117 std::function<Status(T)> visitor = [](const T&) { return Status::OK(); };
118 return VisitAsyncGenerator(generator, visitor);
119 }
120
121 /// \brief Collect the results of an async generator into a vector
122 template <typename T>
123 Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
124 auto vec = std::make_shared<std::vector<T>>();
125 struct LoopBody {
126 Future<ControlFlow<std::vector<T>>> operator()() {
127 auto next = generator_();
128 auto vec = vec_;
129 return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
130 if (IsIterationEnd(result)) {
131 return Break(*vec);
132 } else {
133 vec->push_back(result);
134 return Continue();
135 }
136 });
137 }
138 AsyncGenerator<T> generator_;
139 std::shared_ptr<std::vector<T>> vec_;
140 };
141 return Loop(LoopBody{std::move(generator), std::move(vec)});
142 }
143
144 /// \see MakeMappedGenerator
145 template <typename T, typename V>
146 class MappingGenerator {
147 public:
148 MappingGenerator(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map)
149 : state_(std::make_shared<State>(std::move(source), std::move(map))) {}
150
151 Future<V> operator()() {
152 auto future = Future<V>::Make();
153 bool should_trigger;
154 {
155 auto guard = state_->mutex.Lock();
156 if (state_->finished) {
157 return AsyncGeneratorEnd<V>();
158 }
159 should_trigger = state_->waiting_jobs.empty();
160 state_->waiting_jobs.push_back(future);
161 }
162 if (should_trigger) {
163 state_->source().AddCallback(Callback{state_});
164 }
165 return future;
166 }
167
168 private:
169 struct State {
170 State(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map)
171 : source(std::move(source)),
172 map(std::move(map)),
173 waiting_jobs(),
174 mutex(),
175 finished(false) {}
176
177 void Purge() {
178 // This might be called by an original callback (if the source iterator fails or
179 // ends) or by a mapped callback (if the map function fails or ends prematurely).
180 // Either way it should only be called once and after finished is set so there is no
181 // need to guard access to `waiting_jobs`.
182 while (!waiting_jobs.empty()) {
183 waiting_jobs.front().MarkFinished(IterationTraits<V>::End());
184 waiting_jobs.pop_front();
185 }
186 }
187
188 AsyncGenerator<T> source;
189 std::function<Future<V>(const T&)> map;
190 std::deque<Future<V>> waiting_jobs;
191 util::Mutex mutex;
192 bool finished;
193 };
194
195 struct Callback;
196
197 struct MappedCallback {
198 void operator()(const Result<V>& maybe_next) {
199 bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next);
200 bool should_purge = false;
201 if (end) {
202 {
203 auto guard = state->mutex.Lock();
204 should_purge = !state->finished;
205 state->finished = true;
206 }
207 }
208 sink.MarkFinished(maybe_next);
209 if (should_purge) {
210 state->Purge();
211 }
212 }
213 std::shared_ptr<State> state;
214 Future<V> sink;
215 };
216
217 struct Callback {
218 void operator()(const Result<T>& maybe_next) {
219 Future<V> sink;
220 bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next);
221 bool should_purge = false;
222 bool should_trigger;
223 {
224 auto guard = state->mutex.Lock();
225 // A MappedCallback may have purged or be purging the queue;
226 // we shouldn't do anything here.
227 if (state->finished) return;
228 if (end) {
229 should_purge = !state->finished;
230 state->finished = true;
231 }
232 sink = state->waiting_jobs.front();
233 state->waiting_jobs.pop_front();
234 should_trigger = !end && !state->waiting_jobs.empty();
235 }
236 if (should_purge) {
237 state->Purge();
238 }
239 if (should_trigger) {
240 state->source().AddCallback(Callback{state});
241 }
242 if (maybe_next.ok()) {
243 const T& val = maybe_next.ValueUnsafe();
244 if (IsIterationEnd(val)) {
245 sink.MarkFinished(IterationTraits<V>::End());
246 } else {
247 Future<V> mapped_fut = state->map(val);
248 mapped_fut.AddCallback(MappedCallback{std::move(state), std::move(sink)});
249 }
250 } else {
251 sink.MarkFinished(maybe_next.status());
252 }
253 }
254
255 std::shared_ptr<State> state;
256 };
257
258 std::shared_ptr<State> state_;
259 };
260
261 /// \brief Create a generator that will apply the map function to each element of
262 /// source. The map function is not called on the end token.
263 ///
264 /// Note: This function makes a copy of `map` for each item
265 /// Note: Errors returned from the `map` function will be propagated
266 ///
267 /// If the source generator is async-reentrant then this generator will be also
268 template <typename T, typename MapFn,
269 typename Mapped = detail::result_of_t<MapFn(const T&)>,
270 typename V = typename EnsureFuture<Mapped>::type::ValueType>
271 AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, MapFn map) {
272 struct MapCallback {
273 MapFn map_;
274
275 Future<V> operator()(const T& val) { return ToFuture(map_(val)); }
276 };
277
278 return MappingGenerator<T, V>(std::move(source_generator), MapCallback{std::move(map)});
279 }
280
281 /// \brief Create a generator that will apply the map function to
282 /// each element of source. The map function is not called on the end
283 /// token. The result of the map function should be another
284 /// generator; all these generators will then be flattened to produce
285 /// a single stream of items.
286 ///
287 /// Note: This function makes a copy of `map` for each item
288 /// Note: Errors returned from the `map` function will be propagated
289 ///
290 /// If the source generator is async-reentrant then this generator will be also
291 template <typename T, typename MapFn,
292 typename Mapped = detail::result_of_t<MapFn(const T&)>,
293 typename V = typename EnsureFuture<Mapped>::type::ValueType>
294 AsyncGenerator<T> MakeFlatMappedGenerator(AsyncGenerator<T> source_generator, MapFn map) {
295 return MakeConcatenatedGenerator(
296 MakeMappedGenerator(std::move(source_generator), std::move(map)));
297 }
298
299 /// \see MakeSequencingGenerator
300 template <typename T, typename ComesAfter, typename IsNext>
301 class SequencingGenerator {
302 public:
303 SequencingGenerator(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next,
304 T initial_value)
305 : state_(std::make_shared<State>(std::move(source), std::move(compare),
306 std::move(is_next), std::move(initial_value))) {}
307
308 Future<T> operator()() {
309 {
310 auto guard = state_->mutex.Lock();
311 // We can send a result immediately if the top of the queue is either an
312 // error or the next item
313 if (!state_->queue.empty() &&
314 (!state_->queue.top().ok() ||
315 state_->is_next(state_->previous_value, *state_->queue.top()))) {
316 auto result = std::move(state_->queue.top());
317 if (result.ok()) {
318 state_->previous_value = *result;
319 }
320 state_->queue.pop();
321 return Future<T>::MakeFinished(result);
322 }
323 if (state_->finished) {
324 return AsyncGeneratorEnd<T>();
325 }
326 // The next item is not in the queue so we will need to wait
327 auto new_waiting_fut = Future<T>::Make();
328 state_->waiting_future = new_waiting_fut;
329 guard.Unlock();
330 state_->source().AddCallback(Callback{state_});
331 return new_waiting_fut;
332 }
333 }
334
335 private:
336 struct WrappedComesAfter {
337 bool operator()(const Result<T>& left, const Result<T>& right) {
338 if (!left.ok() || !right.ok()) {
339 // Should never happen
340 return false;
341 }
342 return compare(*left, *right);
343 }
344 ComesAfter compare;
345 };
346
347 struct State {
348 State(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next, T initial_value)
349 : source(std::move(source)),
350 is_next(std::move(is_next)),
351 previous_value(std::move(initial_value)),
352 waiting_future(),
353 queue(WrappedComesAfter{compare}),
354 finished(false),
355 mutex() {}
356
357 AsyncGenerator<T> source;
358 IsNext is_next;
359 T previous_value;
360 Future<T> waiting_future;
361 std::priority_queue<Result<T>, std::vector<Result<T>>, WrappedComesAfter> queue;
362 bool finished;
363 util::Mutex mutex;
364 };
365
366 class Callback {
367 public:
368 explicit Callback(std::shared_ptr<State> state) : state_(std::move(state)) {}
369
370 void operator()(const Result<T> result) {
371 Future<T> to_deliver;
372 bool finished;
373 {
374 auto guard = state_->mutex.Lock();
375 bool ready_to_deliver = false;
376 if (!result.ok()) {
377 // Clear any cached results
378 while (!state_->queue.empty()) {
379 state_->queue.pop();
380 }
381 ready_to_deliver = true;
382 state_->finished = true;
383 } else if (IsIterationEnd<T>(result.ValueUnsafe())) {
384 ready_to_deliver = state_->queue.empty();
385 state_->finished = true;
386 } else {
387 ready_to_deliver = state_->is_next(state_->previous_value, *result);
388 }
389
390 if (ready_to_deliver && state_->waiting_future.is_valid()) {
391 to_deliver = state_->waiting_future;
392 if (result.ok()) {
393 state_->previous_value = *result;
394 }
395 } else {
396 state_->queue.push(result);
397 }
398 // Capture state_->finished so we can access it outside the mutex
399 finished = state_->finished;
400 }
401 // Must deliver result outside of the mutex
402 if (to_deliver.is_valid()) {
403 to_deliver.MarkFinished(result);
404 } else {
405 // Otherwise, if we didn't get the next item (or a terminal item), we
406 // need to keep looking
407 if (!finished) {
408 state_->source().AddCallback(Callback{state_});
409 }
410 }
411 }
412
413 private:
414 const std::shared_ptr<State> state_;
415 };
416
417 const std::shared_ptr<State> state_;
418 };
419
420 /// \brief Buffer an AsyncGenerator to return values in sequence order ComesAfter
421 /// and IsNext determine the sequence order.
422 ///
423 /// ComesAfter should be a BinaryPredicate that only returns true if a comes after b
424 ///
425 /// IsNext should be a BinaryPredicate that returns true, given `a` and `b`, only if
426 /// `b` follows immediately after `a`. It should return true given `initial_value` and
427 /// `b` if `b` is the first item in the sequence.
428 ///
429 /// This operator will queue unboundedly while waiting for the next item. It is intended
430 /// for jittery sources that might scatter an ordered sequence. It is NOT intended to
431 /// sort. Using it to try and sort could result in excessive RAM usage. This generator
432 /// will queue up to N blocks where N is the max "out of order"ness of the source.
433 ///
434 /// For example, if the source is 1,6,2,5,4,3 it will queue 3 blocks because 3 is 3
435 /// blocks beyond where it belongs.
436 ///
437 /// This generator is not async-reentrant but it consists only of a simple log(n)
438 /// insertion into a priority queue.
439 template <typename T, typename ComesAfter, typename IsNext>
440 AsyncGenerator<T> MakeSequencingGenerator(AsyncGenerator<T> source_generator,
441 ComesAfter compare, IsNext is_next,
442 T initial_value) {
443 return SequencingGenerator<T, ComesAfter, IsNext>(
444 std::move(source_generator), std::move(compare), std::move(is_next),
445 std::move(initial_value));
446 }
447
448 /// \see MakeTransformedGenerator
449 template <typename T, typename V>
450 class TransformingGenerator {
451 // The transforming generator state will be referenced as an async generator but will
452 // also be referenced via callback to various futures. If the async generator owner
453 // moves it around we need the state to be consistent for future callbacks.
454 struct TransformingGeneratorState
455 : std::enable_shared_from_this<TransformingGeneratorState> {
456 TransformingGeneratorState(AsyncGenerator<T> generator, Transformer<T, V> transformer)
457 : generator_(std::move(generator)),
458 transformer_(std::move(transformer)),
459 last_value_(),
460 finished_() {}
461
462 Future<V> operator()() {
463 while (true) {
464 auto maybe_next_result = Pump();
465 if (!maybe_next_result.ok()) {
466 return Future<V>::MakeFinished(maybe_next_result.status());
467 }
468 auto maybe_next = std::move(maybe_next_result).ValueUnsafe();
469 if (maybe_next.has_value()) {
470 return Future<V>::MakeFinished(*std::move(maybe_next));
471 }
472
473 auto next_fut = generator_();
474 // If finished already, process results immediately inside the loop to avoid
475 // stack overflow
476 if (next_fut.is_finished()) {
477 auto next_result = next_fut.result();
478 if (next_result.ok()) {
479 last_value_ = *next_result;
480 } else {
481 return Future<V>::MakeFinished(next_result.status());
482 }
483 // Otherwise, if not finished immediately, add callback to process results
484 } else {
485 auto self = this->shared_from_this();
486 return next_fut.Then([self](const T& next_result) {
487 self->last_value_ = next_result;
488 return (*self)();
489 });
490 }
491 }
492 }
493
494 // See comment on TransformingIterator::Pump
495 Result<util::optional<V>> Pump() {
496 if (!finished_ && last_value_.has_value()) {
497 ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
498 if (next.ReadyForNext()) {
499 if (IsIterationEnd(*last_value_)) {
500 finished_ = true;
501 }
502 last_value_.reset();
503 }
504 if (next.Finished()) {
505 finished_ = true;
506 }
507 if (next.HasValue()) {
508 return next.Value();
509 }
510 }
511 if (finished_) {
512 return IterationTraits<V>::End();
513 }
514 return util::nullopt;
515 }
516
517 AsyncGenerator<T> generator_;
518 Transformer<T, V> transformer_;
519 util::optional<T> last_value_;
520 bool finished_;
521 };
522
523 public:
524 explicit TransformingGenerator(AsyncGenerator<T> generator,
525 Transformer<T, V> transformer)
526 : state_(std::make_shared<TransformingGeneratorState>(std::move(generator),
527 std::move(transformer))) {}
528
529 Future<V> operator()() { return (*state_)(); }
530
531 protected:
532 std::shared_ptr<TransformingGeneratorState> state_;
533 };
534
535 /// \brief Transform an async generator using a transformer function returning a new
536 /// AsyncGenerator
537 ///
538 /// The transform function here behaves exactly the same as the transform function in
539 /// MakeTransformedIterator and you can safely use the same transform function to
540 /// transform both synchronous and asynchronous streams.
541 ///
542 /// This generator is not async-reentrant
543 ///
544 /// This generator may queue up to 1 instance of T but will not delay
545 template <typename T, typename V>
546 AsyncGenerator<V> MakeTransformedGenerator(AsyncGenerator<T> generator,
547 Transformer<T, V> transformer) {
548 return TransformingGenerator<T, V>(generator, transformer);
549 }
550
551 /// \see MakeSerialReadaheadGenerator
552 template <typename T>
553 class SerialReadaheadGenerator {
554 public:
555 SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
556 : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
557
558 Future<T> operator()() {
559 if (state_->first_) {
560 // Lazy generator, need to wait for the first ask to prime the pump
561 state_->first_ = false;
562 auto next = state_->source_();
563 return next.Then(Callback{state_}, ErrCallback{state_});
564 }
565
566 // This generator is not async-reentrant. We won't be called until the last
567 // future finished so we know there is something in the queue
568 auto finished = state_->finished_.load();
569 if (finished && state_->readahead_queue_.IsEmpty()) {
570 return AsyncGeneratorEnd<T>();
571 }
572
573 std::shared_ptr<Future<T>> next;
574 if (!state_->readahead_queue_.Read(next)) {
575 return Status::UnknownError("Could not read from readahead_queue");
576 }
577
578 auto last_available = state_->spaces_available_.fetch_add(1);
579 if (last_available == 0 && !finished) {
580 // Reader idled out, we need to restart it
581 ARROW_RETURN_NOT_OK(state_->Pump(state_));
582 }
583 return *next;
584 }
585
586 private:
587 struct State {
588 State(AsyncGenerator<T> source, int max_readahead)
589 : first_(true),
590 source_(std::move(source)),
591 finished_(false),
592 // There is one extra "space" for the in-flight request
593 spaces_available_(max_readahead + 1),
594 // The SPSC queue has size-1 "usable" slots so we need to overallocate 1
595 readahead_queue_(max_readahead + 1) {}
596
597 Status Pump(const std::shared_ptr<State>& self) {
598 // Can't do readahead_queue.write(source().Then(...)) because then the
599 // callback might run immediately and add itself to the queue before this gets added
600 // to the queue messing up the order.
601 auto next_slot = std::make_shared<Future<T>>();
602 auto written = readahead_queue_.Write(next_slot);
603 if (!written) {
604 return Status::UnknownError("Could not write to readahead_queue");
605 }
606 // If this Pump is being called from a callback it is possible for the source to
607 // poll and read from the queue between the Write and this spot where we fill the
608 // value in. However, it is not possible for the future to read this value we are
609 // writing. That is because this callback (the callback for future X) must be
610 // finished before future X is marked complete and this source is not pulled
611 // reentrantly so it will not poll for future X+1 until this callback has completed.
612 *next_slot = source_().Then(Callback{self}, ErrCallback{self});
613 return Status::OK();
614 }
615
616 // Only accessed by the consumer end
617 bool first_;
618 // Accessed by both threads
619 AsyncGenerator<T> source_;
620 std::atomic<bool> finished_;
621 // The queue has a size but it is not atomic. We keep track of how many spaces are
622 // left in the queue here so we know if we've just written the last value and we need
623 // to stop reading ahead or if we've just read from a full queue and we need to
624 // restart reading ahead
625 std::atomic<uint32_t> spaces_available_;
626 // Needs to be a queue of shared_ptr and not Future because we set the value of the
627 // future after we add it to the queue
628 util::SpscQueue<std::shared_ptr<Future<T>>> readahead_queue_;
629 };
630
631 struct Callback {
632 Result<T> operator()(const T& next) {
633 if (IsIterationEnd(next)) {
634 state_->finished_.store(true);
635 return next;
636 }
637 auto last_available = state_->spaces_available_.fetch_sub(1);
638 if (last_available > 1) {
639 ARROW_RETURN_NOT_OK(state_->Pump(state_));
640 }
641 return next;
642 }
643
644 std::shared_ptr<State> state_;
645 };
646
647 struct ErrCallback {
648 Result<T> operator()(const Status& st) {
649 state_->finished_.store(true);
650 return st;
651 }
652
653 std::shared_ptr<State> state_;
654 };
655
656 std::shared_ptr<State> state_;
657 };
658
659 /// \see MakeFromFuture
660 template <typename T>
661 class FutureFirstGenerator {
662 public:
663 explicit FutureFirstGenerator(Future<AsyncGenerator<T>> future)
664 : state_(std::make_shared<State>(std::move(future))) {}
665
666 Future<T> operator()() {
667 if (state_->source_) {
668 return state_->source_();
669 } else {
670 auto state = state_;
671 return state_->future_.Then([state](const AsyncGenerator<T>& source) {
672 state->source_ = source;
673 return state->source_();
674 });
675 }
676 }
677
678 private:
679 struct State {
680 explicit State(Future<AsyncGenerator<T>> future) : future_(future), source_() {}
681
682 Future<AsyncGenerator<T>> future_;
683 AsyncGenerator<T> source_;
684 };
685
686 std::shared_ptr<State> state_;
687 };
688
689 /// \brief Transform a Future<AsyncGenerator<T>> into an AsyncGenerator<T>
690 /// that waits for the future to complete as part of the first item.
691 ///
692 /// This generator is not async-reentrant (even if the generator yielded by future is)
693 ///
694 /// This generator does not queue
695 template <typename T>
696 AsyncGenerator<T> MakeFromFuture(Future<AsyncGenerator<T>> future) {
697 return FutureFirstGenerator<T>(std::move(future));
698 }
699
700 /// \brief Create a generator that will pull from the source into a queue. Unlike
701 /// MakeReadaheadGenerator this will not pull reentrantly from the source.
702 ///
703 /// The source generator does not need to be async-reentrant
704 ///
705 /// This generator is not async-reentrant (even if the source is)
706 ///
707 /// This generator may queue up to max_readahead additional instances of T
708 template <typename T>
709 AsyncGenerator<T> MakeSerialReadaheadGenerator(AsyncGenerator<T> source_generator,
710 int max_readahead) {
711 return SerialReadaheadGenerator<T>(std::move(source_generator), max_readahead);
712 }
713
714 /// \brief Create a generator that immediately pulls from the source
715 ///
716 /// Typical generators do not pull from their source until they themselves
717 /// are pulled. This generator does not follow that convention and will call
718 /// generator() once before it returns. The returned generator will otherwise
719 /// mirror the source.
720 ///
721 /// This generator forwards aysnc-reentrant pressure to the source
722 /// This generator buffers one item (the first result) until it is delivered.
723 template <typename T>
724 AsyncGenerator<T> MakeAutoStartingGenerator(AsyncGenerator<T> generator) {
725 struct AutostartGenerator {
726 Future<T> operator()() {
727 if (first_future->is_valid()) {
728 Future<T> result = *first_future;
729 *first_future = Future<T>();
730 return result;
731 }
732 return source();
733 }
734
735 std::shared_ptr<Future<T>> first_future;
736 AsyncGenerator<T> source;
737 };
738
739 std::shared_ptr<Future<T>> first_future = std::make_shared<Future<T>>(generator());
740 return AutostartGenerator{std::move(first_future), std::move(generator)};
741 }
742
743 /// \see MakeReadaheadGenerator
744 template <typename T>
745 class ReadaheadGenerator {
746 public:
747 ReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
748 : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
749
750 Future<T> AddMarkFinishedContinuation(Future<T> fut) {
751 auto state = state_;
752 return fut.Then(
753 [state](const T& result) -> Result<T> {
754 state->MarkFinishedIfDone(result);
755 return result;
756 },
757 [state](const Status& err) -> Result<T> {
758 state->finished.store(true);
759 return err;
760 });
761 }
762
763 Future<T> operator()() {
764 if (state_->readahead_queue.empty()) {
765 // This is the first request, let's pump the underlying queue
766 for (int i = 0; i < state_->max_readahead; i++) {
767 auto next = state_->source_generator();
768 auto next_after_check = AddMarkFinishedContinuation(std::move(next));
769 state_->readahead_queue.push(std::move(next_after_check));
770 }
771 }
772 // Pop one and add one
773 auto result = state_->readahead_queue.front();
774 state_->readahead_queue.pop();
775 if (state_->finished.load()) {
776 state_->readahead_queue.push(AsyncGeneratorEnd<T>());
777 } else {
778 auto back_of_queue = state_->source_generator();
779 auto back_of_queue_after_check =
780 AddMarkFinishedContinuation(std::move(back_of_queue));
781 state_->readahead_queue.push(std::move(back_of_queue_after_check));
782 }
783 return result;
784 }
785
786 private:
787 struct State {
788 State(AsyncGenerator<T> source_generator, int max_readahead)
789 : source_generator(std::move(source_generator)), max_readahead(max_readahead) {
790 finished.store(false);
791 }
792
793 void MarkFinishedIfDone(const T& next_result) {
794 if (IsIterationEnd(next_result)) {
795 finished.store(true);
796 }
797 }
798
799 AsyncGenerator<T> source_generator;
800 int max_readahead;
801 std::atomic<bool> finished;
802 std::queue<Future<T>> readahead_queue;
803 };
804
805 std::shared_ptr<State> state_;
806 };
807
808 /// \brief A generator where the producer pushes items on a queue.
809 ///
810 /// No back-pressure is applied, so this generator is mostly useful when
811 /// producing the values is neither CPU- nor memory-expensive (e.g. fetching
812 /// filesystem metadata).
813 ///
814 /// This generator is not async-reentrant.
815 template <typename T>
816 class PushGenerator {
817 struct State {
818 explicit State(util::BackpressureOptions backpressure)
819 : backpressure(std::move(backpressure)) {}
820
821 void OpenBackpressureIfFreeUnlocked(util::Mutex::Guard&& guard) {
822 if (backpressure.toggle && result_q.size() < backpressure.resume_if_below) {
823 // Open might trigger callbacks so release the lock first
824 guard.Unlock();
825 backpressure.toggle->Open();
826 }
827 }
828
829 void CloseBackpressureIfFullUnlocked() {
830 if (backpressure.toggle && result_q.size() > backpressure.pause_if_above) {
831 backpressure.toggle->Close();
832 }
833 }
834
835 util::BackpressureOptions backpressure;
836 util::Mutex mutex;
837 std::deque<Result<T>> result_q;
838 util::optional<Future<T>> consumer_fut;
839 bool finished = false;
840 };
841
842 public:
843 /// Producer API for PushGenerator
844 class Producer {
845 public:
846 explicit Producer(const std::shared_ptr<State>& state) : weak_state_(state) {}
847
848 /// \brief Push a value on the queue
849 ///
850 /// True is returned if the value was pushed, false if the generator is
851 /// already closed or destroyed. If the latter, it is recommended to stop
852 /// producing any further values.
853 bool Push(Result<T> result) {
854 auto state = weak_state_.lock();
855 if (!state) {
856 // Generator was destroyed
857 return false;
858 }
859 auto lock = state->mutex.Lock();
860 if (state->finished) {
861 // Closed early
862 return false;
863 }
864 if (state->consumer_fut.has_value()) {
865 auto fut = std::move(state->consumer_fut.value());
866 state->consumer_fut.reset();
867 lock.Unlock(); // unlock before potentially invoking a callback
868 fut.MarkFinished(std::move(result));
869 } else {
870 state->result_q.push_back(std::move(result));
871 state->CloseBackpressureIfFullUnlocked();
872 }
873 return true;
874 }
875
876 /// \brief Tell the consumer we have finished producing
877 ///
878 /// It is allowed to call this and later call Push() again ("early close").
879 /// In this case, calls to Push() after the queue is closed are silently
880 /// ignored. This can help implementing non-trivial cancellation cases.
881 ///
882 /// True is returned on success, false if the generator is already closed
883 /// or destroyed.
884 bool Close() {
885 auto state = weak_state_.lock();
886 if (!state) {
887 // Generator was destroyed
888 return false;
889 }
890 auto lock = state->mutex.Lock();
891 if (state->finished) {
892 // Already closed
893 return false;
894 }
895 state->finished = true;
896 if (state->consumer_fut.has_value()) {
897 auto fut = std::move(state->consumer_fut.value());
898 state->consumer_fut.reset();
899 lock.Unlock(); // unlock before potentially invoking a callback
900 fut.MarkFinished(IterationTraits<T>::End());
901 }
902 return true;
903 }
904
905 /// Return whether the generator was closed or destroyed.
906 bool is_closed() const {
907 auto state = weak_state_.lock();
908 if (!state) {
909 // Generator was destroyed
910 return true;
911 }
912 auto lock = state->mutex.Lock();
913 return state->finished;
914 }
915
916 private:
917 const std::weak_ptr<State> weak_state_;
918 };
919
920 explicit PushGenerator(util::BackpressureOptions backpressure = {})
921 : state_(std::make_shared<State>(std::move(backpressure))) {}
922
923 /// Read an item from the queue
924 Future<T> operator()() const {
925 auto lock = state_->mutex.Lock();
926 assert(!state_->consumer_fut.has_value()); // Non-reentrant
927 if (!state_->result_q.empty()) {
928 auto fut = Future<T>::MakeFinished(std::move(state_->result_q.front()));
929 state_->result_q.pop_front();
930 state_->OpenBackpressureIfFreeUnlocked(std::move(lock));
931 return fut;
932 }
933 if (state_->finished) {
934 return AsyncGeneratorEnd<T>();
935 }
936 auto fut = Future<T>::Make();
937 state_->consumer_fut = fut;
938 return fut;
939 }
940
941 /// \brief Return producer-side interface
942 ///
943 /// The returned object must be used by the producer to push values on the queue.
944 /// Only a single Producer object should be instantiated.
945 Producer producer() { return Producer{state_}; }
946
947 private:
948 const std::shared_ptr<State> state_;
949 };
950
951 /// \brief Create a generator that pulls reentrantly from a source
952 /// This generator will pull reentrantly from a source, ensuring that max_readahead
953 /// requests are active at any given time.
954 ///
955 /// The source generator must be async-reentrant
956 ///
957 /// This generator itself is async-reentrant.
958 ///
959 /// This generator may queue up to max_readahead instances of T
960 template <typename T>
961 AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator,
962 int max_readahead) {
963 return ReadaheadGenerator<T>(std::move(source_generator), max_readahead);
964 }
965
966 /// \brief Creates a generator that will yield finished futures from a vector
967 ///
968 /// This generator is async-reentrant
969 template <typename T>
970 AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) {
971 struct State {
972 explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {}
973
974 std::vector<T> vec;
975 std::atomic<std::size_t> vec_idx;
976 };
977
978 auto state = std::make_shared<State>(std::move(vec));
979 return [state]() {
980 auto idx = state->vec_idx.fetch_add(1);
981 if (idx >= state->vec.size()) {
982 // Eagerly return memory
983 state->vec.clear();
984 return AsyncGeneratorEnd<T>();
985 }
986 return Future<T>::MakeFinished(state->vec[idx]);
987 };
988 }
989
990 /// \see MakeMergedGenerator
991 template <typename T>
992 class MergedGenerator {
993 public:
994 explicit MergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
995 int max_subscriptions)
996 : state_(std::make_shared<State>(std::move(source), max_subscriptions)) {}
997
998 Future<T> operator()() {
999 Future<T> waiting_future;
1000 std::shared_ptr<DeliveredJob> delivered_job;
1001 {
1002 auto guard = state_->mutex.Lock();
1003 if (!state_->delivered_jobs.empty()) {
1004 delivered_job = std::move(state_->delivered_jobs.front());
1005 state_->delivered_jobs.pop_front();
1006 } else if (state_->finished) {
1007 return IterationTraits<T>::End();
1008 } else {
1009 waiting_future = Future<T>::Make();
1010 state_->waiting_jobs.push_back(std::make_shared<Future<T>>(waiting_future));
1011 }
1012 }
1013 if (delivered_job) {
1014 // deliverer will be invalid if outer callback encounters an error and delivers a
1015 // failed result
1016 if (delivered_job->deliverer) {
1017 delivered_job->deliverer().AddCallback(
1018 InnerCallback{state_, delivered_job->index});
1019 }
1020 return std::move(delivered_job->value);
1021 }
1022 if (state_->first) {
1023 state_->first = false;
1024 for (std::size_t i = 0; i < state_->active_subscriptions.size(); i++) {
1025 state_->PullSource().AddCallback(OuterCallback{state_, i});
1026 }
1027 }
1028 return waiting_future;
1029 }
1030
1031 private:
1032 struct DeliveredJob {
1033 explicit DeliveredJob(AsyncGenerator<T> deliverer_, Result<T> value_,
1034 std::size_t index_)
1035 : deliverer(deliverer_), value(std::move(value_)), index(index_) {}
1036
1037 AsyncGenerator<T> deliverer;
1038 Result<T> value;
1039 std::size_t index;
1040 };
1041
1042 struct State {
1043 State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions)
1044 : source(std::move(source)),
1045 active_subscriptions(max_subscriptions),
1046 delivered_jobs(),
1047 waiting_jobs(),
1048 mutex(),
1049 first(true),
1050 source_exhausted(false),
1051 finished(false),
1052 num_active_subscriptions(max_subscriptions) {}
1053
1054 Future<AsyncGenerator<T>> PullSource() {
1055 // Need to guard access to source() so we don't pull sync-reentrantly which
1056 // is never valid.
1057 auto lock = mutex.Lock();
1058 return source();
1059 }
1060
1061 AsyncGenerator<AsyncGenerator<T>> source;
1062 // active_subscriptions and delivered_jobs will be bounded by max_subscriptions
1063 std::vector<AsyncGenerator<T>> active_subscriptions;
1064 std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs;
1065 // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will provide the
1066 // backpressure
1067 std::deque<std::shared_ptr<Future<T>>> waiting_jobs;
1068 util::Mutex mutex;
1069 bool first;
1070 bool source_exhausted;
1071 bool finished;
1072 int num_active_subscriptions;
1073 };
1074
1075 struct InnerCallback {
1076 void operator()(const Result<T>& maybe_next_ref) {
1077 Future<T> next_fut;
1078 const Result<T>* maybe_next = &maybe_next_ref;
1079
1080 while (true) {
1081 Future<T> sink;
1082 bool sub_finished = maybe_next->ok() && IsIterationEnd(**maybe_next);
1083 {
1084 auto guard = state->mutex.Lock();
1085 if (state->finished) {
1086 // We've errored out so just ignore this result and don't keep pumping
1087 return;
1088 }
1089 if (!sub_finished) {
1090 if (state->waiting_jobs.empty()) {
1091 state->delivered_jobs.push_back(std::make_shared<DeliveredJob>(
1092 state->active_subscriptions[index], *maybe_next, index));
1093 } else {
1094 sink = std::move(*state->waiting_jobs.front());
1095 state->waiting_jobs.pop_front();
1096 }
1097 }
1098 }
1099 if (sub_finished) {
1100 state->PullSource().AddCallback(OuterCallback{state, index});
1101 } else if (sink.is_valid()) {
1102 sink.MarkFinished(*maybe_next);
1103 if (!maybe_next->ok()) return;
1104
1105 next_fut = state->active_subscriptions[index]();
1106 if (next_fut.TryAddCallback([this]() { return *this; })) {
1107 return;
1108 }
1109 // Already completed. Avoid very deep recursion by looping
1110 // here instead of relying on the callback.
1111 maybe_next = &next_fut.result();
1112 continue;
1113 }
1114 return;
1115 }
1116 }
1117 std::shared_ptr<State> state;
1118 std::size_t index;
1119 };
1120
1121 struct OuterCallback {
1122 void operator()(const Result<AsyncGenerator<T>>& maybe_next) {
1123 bool should_purge = false;
1124 bool should_continue = false;
1125 Future<T> error_sink;
1126 {
1127 auto guard = state->mutex.Lock();
1128 if (!maybe_next.ok() || IsIterationEnd(*maybe_next)) {
1129 state->source_exhausted = true;
1130 if (!maybe_next.ok() || --state->num_active_subscriptions == 0) {
1131 state->finished = true;
1132 should_purge = true;
1133 }
1134 if (!maybe_next.ok()) {
1135 if (state->waiting_jobs.empty()) {
1136 state->delivered_jobs.push_back(std::make_shared<DeliveredJob>(
1137 AsyncGenerator<T>(), maybe_next.status(), index));
1138 } else {
1139 error_sink = std::move(*state->waiting_jobs.front());
1140 state->waiting_jobs.pop_front();
1141 }
1142 }
1143 } else {
1144 state->active_subscriptions[index] = *maybe_next;
1145 should_continue = true;
1146 }
1147 }
1148 if (error_sink.is_valid()) {
1149 error_sink.MarkFinished(maybe_next.status());
1150 }
1151 if (should_continue) {
1152 (*maybe_next)().AddCallback(InnerCallback{state, index});
1153 } else if (should_purge) {
1154 // At this point state->finished has been marked true so no one else
1155 // will be interacting with waiting_jobs and we can iterate outside lock
1156 while (!state->waiting_jobs.empty()) {
1157 state->waiting_jobs.front()->MarkFinished(IterationTraits<T>::End());
1158 state->waiting_jobs.pop_front();
1159 }
1160 }
1161 }
1162 std::shared_ptr<State> state;
1163 std::size_t index;
1164 };
1165
1166 std::shared_ptr<State> state_;
1167 };
1168
1169 /// \brief Create a generator that takes in a stream of generators and pulls from up to
1170 /// max_subscriptions at a time
1171 ///
1172 /// Note: This may deliver items out of sequence. For example, items from the third
1173 /// AsyncGenerator generated by the source may be emitted before some items from the first
1174 /// AsyncGenerator generated by the source.
1175 ///
1176 /// This generator will pull from source async-reentrantly unless max_subscriptions is 1
1177 /// This generator will not pull from the individual subscriptions reentrantly. Add
1178 /// readahead to the individual subscriptions if that is desired.
1179 /// This generator is async-reentrant
1180 ///
1181 /// This generator may queue up to max_subscriptions instances of T
1182 template <typename T>
1183 AsyncGenerator<T> MakeMergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
1184 int max_subscriptions) {
1185 return MergedGenerator<T>(std::move(source), max_subscriptions);
1186 }
1187
1188 template <typename T>
1189 Result<AsyncGenerator<T>> MakeSequencedMergedGenerator(
1190 AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions) {
1191 if (max_subscriptions < 0) {
1192 return Status::Invalid("max_subscriptions must be a positive integer");
1193 }
1194 if (max_subscriptions == 1) {
1195 return Status::Invalid("Use MakeConcatenatedGenerator if max_subscriptions is 1");
1196 }
1197 AsyncGenerator<AsyncGenerator<T>> autostarting_source = MakeMappedGenerator(
1198 std::move(source),
1199 [](const AsyncGenerator<T>& sub) { return MakeAutoStartingGenerator(sub); });
1200 AsyncGenerator<AsyncGenerator<T>> sub_readahead =
1201 MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1);
1202 return MakeConcatenatedGenerator(std::move(sub_readahead));
1203 }
1204
1205 /// \brief Create a generator that takes in a stream of generators and pulls from each
1206 /// one in sequence.
1207 ///
1208 /// This generator is async-reentrant but will never pull from source reentrantly and
1209 /// will never pull from any subscription reentrantly.
1210 ///
1211 /// This generator may queue 1 instance of T
1212 ///
1213 /// TODO: Could potentially make a bespoke implementation instead of MergedGenerator that
1214 /// forwards async-reentrant requests instead of buffering them (which is what
1215 /// MergedGenerator does)
1216 template <typename T>
1217 AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> source) {
1218 return MergedGenerator<T>(std::move(source), 1);
1219 }
1220
1221 template <typename T>
1222 struct Enumerated {
1223 T value;
1224 int index;
1225 bool last;
1226 };
1227
1228 template <typename T>
1229 struct IterationTraits<Enumerated<T>> {
1230 static Enumerated<T> End() { return Enumerated<T>{IterationEnd<T>(), -1, false}; }
1231 static bool IsEnd(const Enumerated<T>& val) { return val.index < 0; }
1232 };
1233
1234 /// \see MakeEnumeratedGenerator
1235 template <typename T>
1236 class EnumeratingGenerator {
1237 public:
1238 EnumeratingGenerator(AsyncGenerator<T> source, T initial_value)
1239 : state_(std::make_shared<State>(std::move(source), std::move(initial_value))) {}
1240
1241 Future<Enumerated<T>> operator()() {
1242 if (state_->finished) {
1243 return AsyncGeneratorEnd<Enumerated<T>>();
1244 } else {
1245 auto state = state_;
1246 return state->source().Then([state](const T& next) {
1247 auto finished = IsIterationEnd<T>(next);
1248 auto prev = Enumerated<T>{state->prev_value, state->prev_index, finished};
1249 state->prev_value = next;
1250 state->prev_index++;
1251 state->finished = finished;
1252 return prev;
1253 });
1254 }
1255 }
1256
1257 private:
1258 struct State {
1259 State(AsyncGenerator<T> source, T initial_value)
1260 : source(std::move(source)), prev_value(std::move(initial_value)), prev_index(0) {
1261 finished = IsIterationEnd<T>(prev_value);
1262 }
1263
1264 AsyncGenerator<T> source;
1265 T prev_value;
1266 int prev_index;
1267 bool finished;
1268 };
1269
1270 std::shared_ptr<State> state_;
1271 };
1272
1273 /// Wrap items from a source generator with positional information
1274 ///
1275 /// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be
1276 /// processed in a "first-available" fashion and later resequenced which can reduce the
1277 /// impact of sources with erratic performance (e.g. a filesystem where some items may
1278 /// take longer to read than others).
1279 ///
1280 /// TODO(ARROW-12371) Would require this generator be async-reentrant
1281 ///
1282 /// \see MakeSequencingGenerator for an example of putting items back in order
1283 ///
1284 /// This generator is not async-reentrant
1285 ///
1286 /// This generator buffers one item (so it knows which item is the last item)
1287 template <typename T>
1288 AsyncGenerator<Enumerated<T>> MakeEnumeratedGenerator(AsyncGenerator<T> source) {
1289 return FutureFirstGenerator<Enumerated<T>>(
1290 source().Then([source](const T& initial_value) -> AsyncGenerator<Enumerated<T>> {
1291 return EnumeratingGenerator<T>(std::move(source), initial_value);
1292 }));
1293 }
1294
1295 /// \see MakeTransferredGenerator
1296 template <typename T>
1297 class TransferringGenerator {
1298 public:
1299 explicit TransferringGenerator(AsyncGenerator<T> source, internal::Executor* executor)
1300 : source_(std::move(source)), executor_(executor) {}
1301
1302 Future<T> operator()() { return executor_->Transfer(source_()); }
1303
1304 private:
1305 AsyncGenerator<T> source_;
1306 internal::Executor* executor_;
1307 };
1308
1309 /// \brief Transfer a future to an underlying executor.
1310 ///
1311 /// Continuations run on the returned future will be run on the given executor
1312 /// if they cannot be run synchronously.
1313 ///
1314 /// This is often needed to move computation off I/O threads or other external
1315 /// completion sources and back on to the CPU executor so the I/O thread can
1316 /// stay busy and focused on I/O
1317 ///
1318 /// Keep in mind that continuations called on an already completed future will
1319 /// always be run synchronously and so no transfer will happen in that case.
1320 ///
1321 /// This generator is async reentrant if the source is
1322 ///
1323 /// This generator will not queue
1324 template <typename T>
1325 AsyncGenerator<T> MakeTransferredGenerator(AsyncGenerator<T> source,
1326 internal::Executor* executor) {
1327 return TransferringGenerator<T>(std::move(source), executor);
1328 }
1329
1330 /// \see MakeBackgroundGenerator
1331 template <typename T>
1332 class BackgroundGenerator {
1333 public:
1334 explicit BackgroundGenerator(Iterator<T> it, internal::Executor* io_executor, int max_q,
1335 int q_restart)
1336 : state_(std::make_shared<State>(io_executor, std::move(it), max_q, q_restart)),
1337 cleanup_(std::make_shared<Cleanup>(state_.get())) {}
1338
1339 Future<T> operator()() {
1340 auto guard = state_->mutex.Lock();
1341 Future<T> waiting_future;
1342 if (state_->queue.empty()) {
1343 if (state_->finished) {
1344 return AsyncGeneratorEnd<T>();
1345 } else {
1346 waiting_future = Future<T>::Make();
1347 state_->waiting_future = waiting_future;
1348 }
1349 } else {
1350 auto next = Future<T>::MakeFinished(std::move(state_->queue.front()));
1351 state_->queue.pop();
1352 if (state_->NeedsRestart()) {
1353 return state_->RestartTask(state_, std::move(guard), std::move(next));
1354 }
1355 return next;
1356 }
1357 // This should only trigger the very first time this method is called
1358 if (state_->NeedsRestart()) {
1359 return state_->RestartTask(state_, std::move(guard), std::move(waiting_future));
1360 }
1361 return waiting_future;
1362 }
1363
1364 protected:
1365 static constexpr uint64_t kUnlikelyThreadId{std::numeric_limits<uint64_t>::max()};
1366
1367 struct State {
1368 State(internal::Executor* io_executor, Iterator<T> it, int max_q, int q_restart)
1369 : io_executor(io_executor),
1370 max_q(max_q),
1371 q_restart(q_restart),
1372 it(std::move(it)),
1373 reading(false),
1374 finished(false),
1375 should_shutdown(false) {}
1376
1377 void ClearQueue() {
1378 while (!queue.empty()) {
1379 queue.pop();
1380 }
1381 }
1382
1383 bool TaskIsRunning() const { return task_finished.is_valid(); }
1384
1385 bool NeedsRestart() const {
1386 return !finished && !reading && static_cast<int>(queue.size()) <= q_restart;
1387 }
1388
1389 void DoRestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) {
1390 // If we get here we are actually going to start a new task so let's create a
1391 // task_finished future for it
1392 state->task_finished = Future<>::Make();
1393 state->reading = true;
1394 auto spawn_status = io_executor->Spawn(
1395 [state]() { BackgroundGenerator::WorkerTask(std::move(state)); });
1396 if (!spawn_status.ok()) {
1397 // If we can't spawn a new task then send an error to the consumer (either via a
1398 // waiting future or the queue) and mark ourselves finished
1399 state->finished = true;
1400 state->task_finished = Future<>();
1401 if (waiting_future.has_value()) {
1402 auto to_deliver = std::move(waiting_future.value());
1403 waiting_future.reset();
1404 guard.Unlock();
1405 to_deliver.MarkFinished(spawn_status);
1406 } else {
1407 ClearQueue();
1408 queue.push(spawn_status);
1409 }
1410 }
1411 }
1412
1413 Future<T> RestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard,
1414 Future<T> next) {
1415 if (TaskIsRunning()) {
1416 // If the task is still cleaning up we need to wait for it to finish before
1417 // restarting. We also want to block the consumer until we've restarted the
1418 // reader to avoid multiple restarts
1419 return task_finished.Then([state, next]() {
1420 // This may appear dangerous (recursive mutex) but we should be guaranteed the
1421 // outer guard has been released by this point. We know...
1422 // * task_finished is not already finished (it would be invalid in that case)
1423 // * task_finished will not be marked complete until we've given up the mutex
1424 auto guard_ = state->mutex.Lock();
1425 state->DoRestartTask(state, std::move(guard_));
1426 return next;
1427 });
1428 }
1429 // Otherwise we can restart immediately
1430 DoRestartTask(std::move(state), std::move(guard));
1431 return next;
1432 }
1433
1434 internal::Executor* io_executor;
1435 const int max_q;
1436 const int q_restart;
1437 Iterator<T> it;
1438 std::atomic<uint64_t> worker_thread_id{kUnlikelyThreadId};
1439
1440 // If true, the task is actively pumping items from the queue and does not need a
1441 // restart
1442 bool reading;
1443 // Set to true when a terminal item arrives
1444 bool finished;
1445 // Signal to the background task to end early because consumers have given up on it
1446 bool should_shutdown;
1447 // If the queue is empty, the consumer will create a waiting future and wait for it
1448 std::queue<Result<T>> queue;
1449 util::optional<Future<T>> waiting_future;
1450 // Every background task is given a future to complete when it is entirely finished
1451 // processing and ready for the next task to start or for State to be destroyed
1452 Future<> task_finished;
1453 util::Mutex mutex;
1454 };
1455
1456 // Cleanup task that will be run when all consumer references to the generator are lost
1457 struct Cleanup {
1458 explicit Cleanup(State* state) : state(state) {}
1459 ~Cleanup() {
1460 /// TODO: Once ARROW-13109 is available then we can be force consumers to spawn and
1461 /// there is no need to perform this check.
1462 ///
1463 /// It's a deadlock if we enter cleanup from
1464 /// the worker thread but it can happen if the consumer doesn't transfer away
1465 assert(state->worker_thread_id.load() != ::arrow::internal::GetThreadId());
1466 Future<> finish_fut;
1467 {
1468 auto lock = state->mutex.Lock();
1469 if (!state->TaskIsRunning()) {
1470 return;
1471 }
1472 // Signal the current task to stop and wait for it to finish
1473 state->should_shutdown = true;
1474 finish_fut = state->task_finished;
1475 }
1476 // Using future as a condition variable here
1477 Status st = finish_fut.status();
1478 ARROW_UNUSED(st);
1479 }
1480 State* state;
1481 };
1482
1483 static void WorkerTask(std::shared_ptr<State> state) {
1484 state->worker_thread_id.store(::arrow::internal::GetThreadId());
1485 // We need to capture the state to read while outside the mutex
1486 bool reading = true;
1487 while (reading) {
1488 auto next = state->it.Next();
1489 // Need to capture state->waiting_future inside the mutex to mark finished outside
1490 Future<T> waiting_future;
1491 {
1492 auto guard = state->mutex.Lock();
1493
1494 if (state->should_shutdown) {
1495 state->finished = true;
1496 break;
1497 }
1498
1499 if (!next.ok() || IsIterationEnd<T>(*next)) {
1500 // Terminal item. Mark finished to true, send this last item, and quit
1501 state->finished = true;
1502 if (!next.ok()) {
1503 state->ClearQueue();
1504 }
1505 }
1506 // At this point we are going to send an item. Either we will add it to the
1507 // queue or deliver it to a waiting future.
1508 if (state->waiting_future.has_value()) {
1509 waiting_future = std::move(state->waiting_future.value());
1510 state->waiting_future.reset();
1511 } else {
1512 state->queue.push(std::move(next));
1513 // We just filled up the queue so it is time to quit. We may need to notify
1514 // a cleanup task so we transition to Quitting
1515 if (static_cast<int>(state->queue.size()) >= state->max_q) {
1516 state->reading = false;
1517 }
1518 }
1519 reading = state->reading && !state->finished;
1520 }
1521 // This should happen outside the mutex. Presumably there is a
1522 // transferring generator on the other end that will quickly transfer any
1523 // callbacks off of this thread so we can continue looping. Still, best not to
1524 // rely on that
1525 if (waiting_future.is_valid()) {
1526 waiting_future.MarkFinished(next);
1527 }
1528 }
1529 // Once we've sent our last item we can notify any waiters that we are done and so
1530 // either state can be cleaned up or a new background task can be started
1531 Future<> task_finished;
1532 {
1533 auto guard = state->mutex.Lock();
1534 // After we give up the mutex state can be safely deleted. We will no longer
1535 // reference it. We can safely transition to idle now.
1536 task_finished = state->task_finished;
1537 state->task_finished = Future<>();
1538 state->worker_thread_id.store(kUnlikelyThreadId);
1539 }
1540 task_finished.MarkFinished();
1541 }
1542
1543 std::shared_ptr<State> state_;
1544 // state_ is held by both the generator and the background thread so it won't be cleaned
1545 // up when all consumer references are relinquished. cleanup_ is only held by the
1546 // generator so it will be destructed when the last consumer reference is gone. We use
1547 // this to cleanup / stop the background generator in case the consuming end stops
1548 // listening (e.g. due to a downstream error)
1549 std::shared_ptr<Cleanup> cleanup_;
1550 };
1551
1552 constexpr int kDefaultBackgroundMaxQ = 32;
1553 constexpr int kDefaultBackgroundQRestart = 16;
1554
1555 /// \brief Create an AsyncGenerator<T> by iterating over an Iterator<T> on a background
1556 /// thread
1557 ///
1558 /// The parameter max_q and q_restart control queue size and background thread task
1559 /// management. If the background task is fast you typically don't want it creating a
1560 /// thread task for every item. Instead the background thread will run until it fills
1561 /// up a readahead queue.
1562 ///
1563 /// Once the queue has filled up the background thread task will terminate (allowing other
1564 /// I/O tasks to use the thread). Once the queue has been drained enough (specified by
1565 /// q_restart) then the background thread task will be restarted. If q_restart is too low
1566 /// then you may exhaust the queue waiting for the background thread task to start running
1567 /// again. If it is too high then it will be constantly stopping and restarting the
1568 /// background queue task
1569 ///
1570 /// The "background thread" is a logical thread and will run as tasks on the io_executor.
1571 /// This thread may stop and start when the queue fills up but there will only be one
1572 /// active background thread task at any given time. You MUST transfer away from this
1573 /// background generator. Otherwise there could be a race condition if a callback on the
1574 /// background thread deletes the last consumer reference to the background generator. You
1575 /// can transfer onto the same executor as the background thread, it is only neccesary to
1576 /// create a new thread task, not to switch executors.
1577 ///
1578 /// This generator is not async-reentrant
1579 ///
1580 /// This generator will queue up to max_q blocks
1581 template <typename T>
1582 static Result<AsyncGenerator<T>> MakeBackgroundGenerator(
1583 Iterator<T> iterator, internal::Executor* io_executor,
1584 int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart) {
1585 if (max_q < q_restart) {
1586 return Status::Invalid("max_q must be >= q_restart");
1587 }
1588 return BackgroundGenerator<T>(std::move(iterator), io_executor, max_q, q_restart);
1589 }
1590
1591 /// \see MakeGeneratorIterator
1592 template <typename T>
1593 class GeneratorIterator {
1594 public:
1595 explicit GeneratorIterator(AsyncGenerator<T> source) : source_(std::move(source)) {}
1596
1597 Result<T> Next() { return source_().result(); }
1598
1599 private:
1600 AsyncGenerator<T> source_;
1601 };
1602
1603 /// \brief Convert an AsyncGenerator<T> to an Iterator<T> which blocks until each future
1604 /// is finished
1605 template <typename T>
1606 Iterator<T> MakeGeneratorIterator(AsyncGenerator<T> source) {
1607 return Iterator<T>(GeneratorIterator<T>(std::move(source)));
1608 }
1609
1610 /// \brief Add readahead to an iterator using a background thread.
1611 ///
1612 /// Under the hood this is converting the iterator to a generator using
1613 /// MakeBackgroundGenerator, adding readahead to the converted generator with
1614 /// MakeReadaheadGenerator, and then converting back to an iterator using
1615 /// MakeGeneratorIterator.
1616 template <typename T>
1617 Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> it, int readahead_queue_size) {
1618 ARROW_ASSIGN_OR_RAISE(auto io_executor, internal::ThreadPool::Make(1));
1619 auto max_q = readahead_queue_size;
1620 auto q_restart = std::max(1, max_q / 2);
1621 ARROW_ASSIGN_OR_RAISE(
1622 auto background_generator,
1623 MakeBackgroundGenerator(std::move(it), io_executor.get(), max_q, q_restart));
1624 // Capture io_executor to keep it alive as long as owned_bg_generator is still
1625 // referenced
1626 AsyncGenerator<T> owned_bg_generator = [io_executor, background_generator]() {
1627 return background_generator();
1628 };
1629 return MakeGeneratorIterator(std::move(owned_bg_generator));
1630 }
1631
1632 /// \brief Make a generator that returns a single pre-generated future
1633 ///
1634 /// This generator is async-reentrant.
1635 template <typename T>
1636 std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
1637 assert(future.is_valid());
1638 auto state = std::make_shared<Future<T>>(std::move(future));
1639 return [state]() -> Future<T> {
1640 auto fut = std::move(*state);
1641 if (fut.is_valid()) {
1642 return fut;
1643 } else {
1644 return AsyncGeneratorEnd<T>();
1645 }
1646 };
1647 }
1648
1649 /// \brief Make a generator that immediately ends.
1650 ///
1651 /// This generator is async-reentrant.
1652 template <typename T>
1653 std::function<Future<T>()> MakeEmptyGenerator() {
1654 return []() -> Future<T> { return AsyncGeneratorEnd<T>(); };
1655 }
1656
1657 /// \brief Make a generator that always fails with a given error
1658 ///
1659 /// This generator is async-reentrant.
1660 template <typename T>
1661 AsyncGenerator<T> MakeFailingGenerator(Status st) {
1662 assert(!st.ok());
1663 auto state = std::make_shared<Status>(std::move(st));
1664 return [state]() -> Future<T> {
1665 auto st = std::move(*state);
1666 if (!st.ok()) {
1667 return std::move(st);
1668 } else {
1669 return AsyncGeneratorEnd<T>();
1670 }
1671 };
1672 }
1673
1674 /// \brief Make a generator that always fails with a given error
1675 ///
1676 /// This overload allows inferring the return type from the argument.
1677 template <typename T>
1678 AsyncGenerator<T> MakeFailingGenerator(const Result<T>& result) {
1679 return MakeFailingGenerator<T>(result.status());
1680 }
1681
1682 /// \brief Prepend initial_values onto a generator
1683 ///
1684 /// This generator is async-reentrant but will buffer requests and will not
1685 /// pull from following_values async-reentrantly.
1686 template <typename T>
1687 AsyncGenerator<T> MakeGeneratorStartsWith(std::vector<T> initial_values,
1688 AsyncGenerator<T> following_values) {
1689 auto initial_values_vec_gen = MakeVectorGenerator(std::move(initial_values));
1690 auto gen_gen = MakeVectorGenerator<AsyncGenerator<T>>(
1691 {std::move(initial_values_vec_gen), std::move(following_values)});
1692 return MakeConcatenatedGenerator(std::move(gen_gen));
1693 }
1694
1695 template <typename T>
1696 struct CancellableGenerator {
1697 Future<T> operator()() {
1698 if (stop_token.IsStopRequested()) {
1699 return stop_token.Poll();
1700 }
1701 return source();
1702 }
1703
1704 AsyncGenerator<T> source;
1705 StopToken stop_token;
1706 };
1707
1708 /// \brief Allow an async generator to be cancelled
1709 ///
1710 /// This generator is async-reentrant
1711 template <typename T>
1712 AsyncGenerator<T> MakeCancellable(AsyncGenerator<T> source, StopToken stop_token) {
1713 return CancellableGenerator<T>{std::move(source), std::move(stop_token)};
1714 }
1715
1716 template <typename T>
1717 struct PauseableGenerator {
1718 public:
1719 PauseableGenerator(AsyncGenerator<T> source, std::shared_ptr<util::AsyncToggle> toggle)
1720 : state_(std::make_shared<PauseableGeneratorState>(std::move(source),
1721 std::move(toggle))) {}
1722
1723 Future<T> operator()() { return (*state_)(); }
1724
1725 private:
1726 struct PauseableGeneratorState
1727 : public std::enable_shared_from_this<PauseableGeneratorState> {
1728 PauseableGeneratorState(AsyncGenerator<T> source,
1729 std::shared_ptr<util::AsyncToggle> toggle)
1730 : source_(std::move(source)), toggle_(std::move(toggle)) {}
1731
1732 Future<T> operator()() {
1733 std::shared_ptr<PauseableGeneratorState> self = this->shared_from_this();
1734 return toggle_->WhenOpen().Then([self] {
1735 util::Mutex::Guard guard = self->mutex_.Lock();
1736 return self->source_();
1737 });
1738 }
1739
1740 AsyncGenerator<T> source_;
1741 std::shared_ptr<util::AsyncToggle> toggle_;
1742 util::Mutex mutex_;
1743 };
1744 std::shared_ptr<PauseableGeneratorState> state_;
1745 };
1746
1747 /// \brief Allow an async generator to be paused
1748 ///
1749 /// This generator is NOT async-reentrant and calling it in an async-reentrant fashion
1750 /// may lead to items getting reordered (and potentially truncated if the end token is
1751 /// reordered ahead of valid items)
1752 ///
1753 /// This generator forwards async-reentrant pressure
1754 template <typename T>
1755 AsyncGenerator<T> MakePauseable(AsyncGenerator<T> source,
1756 std::shared_ptr<util::AsyncToggle> toggle) {
1757 return PauseableGenerator<T>(std::move(source), std::move(toggle));
1758 }
1759
1760 template <typename T>
1761 class DefaultIfEmptyGenerator {
1762 public:
1763 DefaultIfEmptyGenerator(AsyncGenerator<T> source, T or_value)
1764 : state_(std::make_shared<State>(std::move(source), std::move(or_value))) {}
1765
1766 Future<T> operator()() {
1767 if (state_->first) {
1768 state_->first = false;
1769 struct {
1770 T or_value;
1771
1772 Result<T> operator()(const T& value) {
1773 if (IterationTraits<T>::IsEnd(value)) {
1774 return std::move(or_value);
1775 }
1776 return value;
1777 }
1778 } Continuation;
1779 Continuation.or_value = std::move(state_->or_value);
1780 return state_->source().Then(std::move(Continuation));
1781 }
1782 return state_->source();
1783 }
1784
1785 private:
1786 struct State {
1787 AsyncGenerator<T> source;
1788 T or_value;
1789 bool first;
1790 State(AsyncGenerator<T> source_, T or_value_)
1791 : source(std::move(source_)), or_value(std::move(or_value_)), first(true) {}
1792 };
1793 std::shared_ptr<State> state_;
1794 };
1795
1796 /// \brief If the generator is empty, return the given value, else
1797 /// forward the values from the generator.
1798 ///
1799 /// This generator is async-reentrant.
1800 template <typename T>
1801 AsyncGenerator<T> MakeDefaultIfEmptyGenerator(AsyncGenerator<T> source, T or_value) {
1802 return DefaultIfEmptyGenerator<T>(std::move(source), std::move(or_value));
1803 }
1804 } // namespace arrow