]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/arrow/util/async_generator.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / util / async_generator.h
diff --git a/ceph/src/arrow/cpp/src/arrow/util/async_generator.h b/ceph/src/arrow/cpp/src/arrow/util/async_generator.h
new file mode 100644 (file)
index 0000000..0948e55
--- /dev/null
@@ -0,0 +1,1804 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <cassert>
+#include <cstring>
+#include <deque>
+#include <limits>
+#include <queue>
+
+#include "arrow/util/async_util.h"
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/io_util.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/mutex.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/queue.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+// The methods in this file create, modify, and utilize AsyncGenerator which is an
+// iterator of futures.  This allows an asynchronous source (like file input) to be run
+// through a pipeline in the same way that iterators can be used to create pipelined
+// workflows.
+//
+// In order to support pipeline parallelism we introduce the concept of asynchronous
+// reentrancy. This is different than synchronous reentrancy.  With synchronous code a
+// function is reentrant if the function can be called again while a previous call to that
+// function is still running.  Unless otherwise specified none of these generators are
+// synchronously reentrant.  Care should be taken to avoid calling them in such a way (and
+// the utilities Visit/Collect/Await take care to do this).
+//
+// Asynchronous reentrancy on the other hand means the function is called again before the
+// future returned by the function is marked finished (but after the call to get the
+// future returns).  Some of these generators are async-reentrant while others (e.g.
+// those that depend on ordered processing like decompression) are not.  Read the MakeXYZ
+// function comments to determine which generators support async reentrancy.
+//
+// Note: Generators that are not asynchronously reentrant can still support readahead
+// (\see MakeSerialReadaheadGenerator).
+//
+// Readahead operators, and some other operators, may introduce queueing.  Any operators
+// that introduce buffering should detail the amount of buffering they introduce in their
+// MakeXYZ function comments.
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+template <typename T>
+struct IterationTraits<AsyncGenerator<T>> {
+  /// \brief by default when iterating through a sequence of AsyncGenerator<T>,
+  /// an empty function indicates the end of iteration.
+  static AsyncGenerator<T> End() { return AsyncGenerator<T>(); }
+
+  static bool IsEnd(const AsyncGenerator<T>& val) { return !val; }
+};
+
+template <typename T>
+Future<T> AsyncGeneratorEnd() {
+  return Future<T>::MakeFinished(IterationTraits<T>::End());
+}
+
+/// returning a future that completes when all have been visited
+template <typename T, typename Visitor>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator, Visitor visitor) {
+  struct LoopBody {
+    struct Callback {
+      Result<ControlFlow<>> operator()(const T& next) {
+        if (IsIterationEnd(next)) {
+          return Break();
+        } else {
+          auto visited = visitor(next);
+          if (visited.ok()) {
+            return Continue();
+          } else {
+            return visited;
+          }
+        }
+      }
+
+      Visitor visitor;
+    };
+
+    Future<ControlFlow<>> operator()() {
+      Callback callback{visitor};
+      auto next = generator();
+      return next.Then(std::move(callback));
+    }
+
+    AsyncGenerator<T> generator;
+    Visitor visitor;
+  };
+
+  return Loop(LoopBody{std::move(generator), std::move(visitor)});
+}
+
+/// \brief Wait for an async generator to complete, discarding results.
+template <typename T>
+Future<> DiscardAllFromAsyncGenerator(AsyncGenerator<T> generator) {
+  std::function<Status(T)> visitor = [](const T&) { return Status::OK(); };
+  return VisitAsyncGenerator(generator, visitor);
+}
+
+/// \brief Collect the results of an async generator into a vector
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  struct LoopBody {
+    Future<ControlFlow<std::vector<T>>> operator()() {
+      auto next = generator_();
+      auto vec = vec_;
+      return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+        if (IsIterationEnd(result)) {
+          return Break(*vec);
+        } else {
+          vec->push_back(result);
+          return Continue();
+        }
+      });
+    }
+    AsyncGenerator<T> generator_;
+    std::shared_ptr<std::vector<T>> vec_;
+  };
+  return Loop(LoopBody{std::move(generator), std::move(vec)});
+}
+
+/// \see MakeMappedGenerator
+template <typename T, typename V>
+class MappingGenerator {
+ public:
+  MappingGenerator(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map)
+      : state_(std::make_shared<State>(std::move(source), std::move(map))) {}
+
+  Future<V> operator()() {
+    auto future = Future<V>::Make();
+    bool should_trigger;
+    {
+      auto guard = state_->mutex.Lock();
+      if (state_->finished) {
+        return AsyncGeneratorEnd<V>();
+      }
+      should_trigger = state_->waiting_jobs.empty();
+      state_->waiting_jobs.push_back(future);
+    }
+    if (should_trigger) {
+      state_->source().AddCallback(Callback{state_});
+    }
+    return future;
+  }
+
+ private:
+  struct State {
+    State(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map)
+        : source(std::move(source)),
+          map(std::move(map)),
+          waiting_jobs(),
+          mutex(),
+          finished(false) {}
+
+    void Purge() {
+      // This might be called by an original callback (if the source iterator fails or
+      // ends) or by a mapped callback (if the map function fails or ends prematurely).
+      // Either way it should only be called once and after finished is set so there is no
+      // need to guard access to `waiting_jobs`.
+      while (!waiting_jobs.empty()) {
+        waiting_jobs.front().MarkFinished(IterationTraits<V>::End());
+        waiting_jobs.pop_front();
+      }
+    }
+
+    AsyncGenerator<T> source;
+    std::function<Future<V>(const T&)> map;
+    std::deque<Future<V>> waiting_jobs;
+    util::Mutex mutex;
+    bool finished;
+  };
+
+  struct Callback;
+
+  struct MappedCallback {
+    void operator()(const Result<V>& maybe_next) {
+      bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next);
+      bool should_purge = false;
+      if (end) {
+        {
+          auto guard = state->mutex.Lock();
+          should_purge = !state->finished;
+          state->finished = true;
+        }
+      }
+      sink.MarkFinished(maybe_next);
+      if (should_purge) {
+        state->Purge();
+      }
+    }
+    std::shared_ptr<State> state;
+    Future<V> sink;
+  };
+
+  struct Callback {
+    void operator()(const Result<T>& maybe_next) {
+      Future<V> sink;
+      bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next);
+      bool should_purge = false;
+      bool should_trigger;
+      {
+        auto guard = state->mutex.Lock();
+        // A MappedCallback may have purged or be purging the queue;
+        // we shouldn't do anything here.
+        if (state->finished) return;
+        if (end) {
+          should_purge = !state->finished;
+          state->finished = true;
+        }
+        sink = state->waiting_jobs.front();
+        state->waiting_jobs.pop_front();
+        should_trigger = !end && !state->waiting_jobs.empty();
+      }
+      if (should_purge) {
+        state->Purge();
+      }
+      if (should_trigger) {
+        state->source().AddCallback(Callback{state});
+      }
+      if (maybe_next.ok()) {
+        const T& val = maybe_next.ValueUnsafe();
+        if (IsIterationEnd(val)) {
+          sink.MarkFinished(IterationTraits<V>::End());
+        } else {
+          Future<V> mapped_fut = state->map(val);
+          mapped_fut.AddCallback(MappedCallback{std::move(state), std::move(sink)});
+        }
+      } else {
+        sink.MarkFinished(maybe_next.status());
+      }
+    }
+
+    std::shared_ptr<State> state;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief Create a generator that will apply the map function to each element of
+/// source.  The map function is not called on the end token.
+///
+/// Note: This function makes a copy of `map` for each item
+/// Note: Errors returned from the `map` function will be propagated
+///
+/// If the source generator is async-reentrant then this generator will be also
+template <typename T, typename MapFn,
+          typename Mapped = detail::result_of_t<MapFn(const T&)>,
+          typename V = typename EnsureFuture<Mapped>::type::ValueType>
+AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, MapFn map) {
+  struct MapCallback {
+    MapFn map_;
+
+    Future<V> operator()(const T& val) { return ToFuture(map_(val)); }
+  };
+
+  return MappingGenerator<T, V>(std::move(source_generator), MapCallback{std::move(map)});
+}
+
+/// \brief Create a generator that will apply the map function to
+/// each element of source.  The map function is not called on the end
+/// token.  The result of the map function should be another
+/// generator; all these generators will then be flattened to produce
+/// a single stream of items.
+///
+/// Note: This function makes a copy of `map` for each item
+/// Note: Errors returned from the `map` function will be propagated
+///
+/// If the source generator is async-reentrant then this generator will be also
+template <typename T, typename MapFn,
+          typename Mapped = detail::result_of_t<MapFn(const T&)>,
+          typename V = typename EnsureFuture<Mapped>::type::ValueType>
+AsyncGenerator<T> MakeFlatMappedGenerator(AsyncGenerator<T> source_generator, MapFn map) {
+  return MakeConcatenatedGenerator(
+      MakeMappedGenerator(std::move(source_generator), std::move(map)));
+}
+
+/// \see MakeSequencingGenerator
+template <typename T, typename ComesAfter, typename IsNext>
+class SequencingGenerator {
+ public:
+  SequencingGenerator(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next,
+                      T initial_value)
+      : state_(std::make_shared<State>(std::move(source), std::move(compare),
+                                       std::move(is_next), std::move(initial_value))) {}
+
+  Future<T> operator()() {
+    {
+      auto guard = state_->mutex.Lock();
+      // We can send a result immediately if the top of the queue is either an
+      // error or the next item
+      if (!state_->queue.empty() &&
+          (!state_->queue.top().ok() ||
+           state_->is_next(state_->previous_value, *state_->queue.top()))) {
+        auto result = std::move(state_->queue.top());
+        if (result.ok()) {
+          state_->previous_value = *result;
+        }
+        state_->queue.pop();
+        return Future<T>::MakeFinished(result);
+      }
+      if (state_->finished) {
+        return AsyncGeneratorEnd<T>();
+      }
+      // The next item is not in the queue so we will need to wait
+      auto new_waiting_fut = Future<T>::Make();
+      state_->waiting_future = new_waiting_fut;
+      guard.Unlock();
+      state_->source().AddCallback(Callback{state_});
+      return new_waiting_fut;
+    }
+  }
+
+ private:
+  struct WrappedComesAfter {
+    bool operator()(const Result<T>& left, const Result<T>& right) {
+      if (!left.ok() || !right.ok()) {
+        // Should never happen
+        return false;
+      }
+      return compare(*left, *right);
+    }
+    ComesAfter compare;
+  };
+
+  struct State {
+    State(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next, T initial_value)
+        : source(std::move(source)),
+          is_next(std::move(is_next)),
+          previous_value(std::move(initial_value)),
+          waiting_future(),
+          queue(WrappedComesAfter{compare}),
+          finished(false),
+          mutex() {}
+
+    AsyncGenerator<T> source;
+    IsNext is_next;
+    T previous_value;
+    Future<T> waiting_future;
+    std::priority_queue<Result<T>, std::vector<Result<T>>, WrappedComesAfter> queue;
+    bool finished;
+    util::Mutex mutex;
+  };
+
+  class Callback {
+   public:
+    explicit Callback(std::shared_ptr<State> state) : state_(std::move(state)) {}
+
+    void operator()(const Result<T> result) {
+      Future<T> to_deliver;
+      bool finished;
+      {
+        auto guard = state_->mutex.Lock();
+        bool ready_to_deliver = false;
+        if (!result.ok()) {
+          // Clear any cached results
+          while (!state_->queue.empty()) {
+            state_->queue.pop();
+          }
+          ready_to_deliver = true;
+          state_->finished = true;
+        } else if (IsIterationEnd<T>(result.ValueUnsafe())) {
+          ready_to_deliver = state_->queue.empty();
+          state_->finished = true;
+        } else {
+          ready_to_deliver = state_->is_next(state_->previous_value, *result);
+        }
+
+        if (ready_to_deliver && state_->waiting_future.is_valid()) {
+          to_deliver = state_->waiting_future;
+          if (result.ok()) {
+            state_->previous_value = *result;
+          }
+        } else {
+          state_->queue.push(result);
+        }
+        // Capture state_->finished so we can access it outside the mutex
+        finished = state_->finished;
+      }
+      // Must deliver result outside of the mutex
+      if (to_deliver.is_valid()) {
+        to_deliver.MarkFinished(result);
+      } else {
+        // Otherwise, if we didn't get the next item (or a terminal item), we
+        // need to keep looking
+        if (!finished) {
+          state_->source().AddCallback(Callback{state_});
+        }
+      }
+    }
+
+   private:
+    const std::shared_ptr<State> state_;
+  };
+
+  const std::shared_ptr<State> state_;
+};
+
+/// \brief Buffer an AsyncGenerator to return values in sequence order  ComesAfter
+/// and IsNext determine the sequence order.
+///
+/// ComesAfter should be a BinaryPredicate that only returns true if a comes after b
+///
+/// IsNext should be a BinaryPredicate that returns true, given `a` and `b`, only if
+/// `b` follows immediately after `a`.  It should return true given `initial_value` and
+/// `b` if `b` is the first item in the sequence.
+///
+/// This operator will queue unboundedly while waiting for the next item.  It is intended
+/// for jittery sources that might scatter an ordered sequence.  It is NOT intended to
+/// sort.  Using it to try and sort could result in excessive RAM usage.  This generator
+/// will queue up to N blocks where N is the max "out of order"ness of the source.
+///
+/// For example, if the source is 1,6,2,5,4,3 it will queue 3 blocks because 3 is 3
+/// blocks beyond where it belongs.
+///
+/// This generator is not async-reentrant but it consists only of a simple log(n)
+/// insertion into a priority queue.
+template <typename T, typename ComesAfter, typename IsNext>
+AsyncGenerator<T> MakeSequencingGenerator(AsyncGenerator<T> source_generator,
+                                          ComesAfter compare, IsNext is_next,
+                                          T initial_value) {
+  return SequencingGenerator<T, ComesAfter, IsNext>(
+      std::move(source_generator), std::move(compare), std::move(is_next),
+      std::move(initial_value));
+}
+
+/// \see MakeTransformedGenerator
+template <typename T, typename V>
+class TransformingGenerator {
+  // The transforming generator state will be referenced as an async generator but will
+  // also be referenced via callback to various futures.  If the async generator owner
+  // moves it around we need the state to be consistent for future callbacks.
+  struct TransformingGeneratorState
+      : std::enable_shared_from_this<TransformingGeneratorState> {
+    TransformingGeneratorState(AsyncGenerator<T> generator, Transformer<T, V> transformer)
+        : generator_(std::move(generator)),
+          transformer_(std::move(transformer)),
+          last_value_(),
+          finished_() {}
+
+    Future<V> operator()() {
+      while (true) {
+        auto maybe_next_result = Pump();
+        if (!maybe_next_result.ok()) {
+          return Future<V>::MakeFinished(maybe_next_result.status());
+        }
+        auto maybe_next = std::move(maybe_next_result).ValueUnsafe();
+        if (maybe_next.has_value()) {
+          return Future<V>::MakeFinished(*std::move(maybe_next));
+        }
+
+        auto next_fut = generator_();
+        // If finished already, process results immediately inside the loop to avoid
+        // stack overflow
+        if (next_fut.is_finished()) {
+          auto next_result = next_fut.result();
+          if (next_result.ok()) {
+            last_value_ = *next_result;
+          } else {
+            return Future<V>::MakeFinished(next_result.status());
+          }
+          // Otherwise, if not finished immediately, add callback to process results
+        } else {
+          auto self = this->shared_from_this();
+          return next_fut.Then([self](const T& next_result) {
+            self->last_value_ = next_result;
+            return (*self)();
+          });
+        }
+      }
+    }
+
+    // See comment on TransformingIterator::Pump
+    Result<util::optional<V>> Pump() {
+      if (!finished_ && last_value_.has_value()) {
+        ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+        if (next.ReadyForNext()) {
+          if (IsIterationEnd(*last_value_)) {
+            finished_ = true;
+          }
+          last_value_.reset();
+        }
+        if (next.Finished()) {
+          finished_ = true;
+        }
+        if (next.HasValue()) {
+          return next.Value();
+        }
+      }
+      if (finished_) {
+        return IterationTraits<V>::End();
+      }
+      return util::nullopt;
+    }
+
+    AsyncGenerator<T> generator_;
+    Transformer<T, V> transformer_;
+    util::optional<T> last_value_;
+    bool finished_;
+  };
+
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : state_(std::make_shared<TransformingGeneratorState>(std::move(generator),
+                                                            std::move(transformer))) {}
+
+  Future<V> operator()() { return (*state_)(); }
+
+ protected:
+  std::shared_ptr<TransformingGeneratorState> state_;
+};
+
+/// \brief Transform an async generator using a transformer function returning a new
+/// AsyncGenerator
+///
+/// The transform function here behaves exactly the same as the transform function in
+/// MakeTransformedIterator and you can safely use the same transform function to
+/// transform both synchronous and asynchronous streams.
+///
+/// This generator is not async-reentrant
+///
+/// This generator may queue up to 1 instance of T but will not delay
+template <typename T, typename V>
+AsyncGenerator<V> MakeTransformedGenerator(AsyncGenerator<T> generator,
+                                           Transformer<T, V> transformer) {
+  return TransformingGenerator<T, V>(generator, transformer);
+}
+
+/// \see MakeSerialReadaheadGenerator
+template <typename T>
+class SerialReadaheadGenerator {
+ public:
+  SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
+      : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
+
+  Future<T> operator()() {
+    if (state_->first_) {
+      // Lazy generator, need to wait for the first ask to prime the pump
+      state_->first_ = false;
+      auto next = state_->source_();
+      return next.Then(Callback{state_}, ErrCallback{state_});
+    }
+
+    // This generator is not async-reentrant.  We won't be called until the last
+    // future finished so we know there is something in the queue
+    auto finished = state_->finished_.load();
+    if (finished && state_->readahead_queue_.IsEmpty()) {
+      return AsyncGeneratorEnd<T>();
+    }
+
+    std::shared_ptr<Future<T>> next;
+    if (!state_->readahead_queue_.Read(next)) {
+      return Status::UnknownError("Could not read from readahead_queue");
+    }
+
+    auto last_available = state_->spaces_available_.fetch_add(1);
+    if (last_available == 0 && !finished) {
+      // Reader idled out, we need to restart it
+      ARROW_RETURN_NOT_OK(state_->Pump(state_));
+    }
+    return *next;
+  }
+
+ private:
+  struct State {
+    State(AsyncGenerator<T> source, int max_readahead)
+        : first_(true),
+          source_(std::move(source)),
+          finished_(false),
+          // There is one extra "space" for the in-flight request
+          spaces_available_(max_readahead + 1),
+          // The SPSC queue has size-1 "usable" slots so we need to overallocate 1
+          readahead_queue_(max_readahead + 1) {}
+
+    Status Pump(const std::shared_ptr<State>& self) {
+      // Can't do readahead_queue.write(source().Then(...)) because then the
+      // callback might run immediately and add itself to the queue before this gets added
+      // to the queue messing up the order.
+      auto next_slot = std::make_shared<Future<T>>();
+      auto written = readahead_queue_.Write(next_slot);
+      if (!written) {
+        return Status::UnknownError("Could not write to readahead_queue");
+      }
+      // If this Pump is being called from a callback it is possible for the source to
+      // poll and read from the queue between the Write and this spot where we fill the
+      // value in. However, it is not possible for the future to read this value we are
+      // writing.  That is because this callback (the callback for future X) must be
+      // finished before future X is marked complete and this source is not pulled
+      // reentrantly so it will not poll for future X+1 until this callback has completed.
+      *next_slot = source_().Then(Callback{self}, ErrCallback{self});
+      return Status::OK();
+    }
+
+    // Only accessed by the consumer end
+    bool first_;
+    // Accessed by both threads
+    AsyncGenerator<T> source_;
+    std::atomic<bool> finished_;
+    // The queue has a size but it is not atomic.  We keep track of how many spaces are
+    // left in the queue here so we know if we've just written the last value and we need
+    // to stop reading ahead or if we've just read from a full queue and we need to
+    // restart reading ahead
+    std::atomic<uint32_t> spaces_available_;
+    // Needs to be a queue of shared_ptr and not Future because we set the value of the
+    // future after we add it to the queue
+    util::SpscQueue<std::shared_ptr<Future<T>>> readahead_queue_;
+  };
+
+  struct Callback {
+    Result<T> operator()(const T& next) {
+      if (IsIterationEnd(next)) {
+        state_->finished_.store(true);
+        return next;
+      }
+      auto last_available = state_->spaces_available_.fetch_sub(1);
+      if (last_available > 1) {
+        ARROW_RETURN_NOT_OK(state_->Pump(state_));
+      }
+      return next;
+    }
+
+    std::shared_ptr<State> state_;
+  };
+
+  struct ErrCallback {
+    Result<T> operator()(const Status& st) {
+      state_->finished_.store(true);
+      return st;
+    }
+
+    std::shared_ptr<State> state_;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \see MakeFromFuture
+template <typename T>
+class FutureFirstGenerator {
+ public:
+  explicit FutureFirstGenerator(Future<AsyncGenerator<T>> future)
+      : state_(std::make_shared<State>(std::move(future))) {}
+
+  Future<T> operator()() {
+    if (state_->source_) {
+      return state_->source_();
+    } else {
+      auto state = state_;
+      return state_->future_.Then([state](const AsyncGenerator<T>& source) {
+        state->source_ = source;
+        return state->source_();
+      });
+    }
+  }
+
+ private:
+  struct State {
+    explicit State(Future<AsyncGenerator<T>> future) : future_(future), source_() {}
+
+    Future<AsyncGenerator<T>> future_;
+    AsyncGenerator<T> source_;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief Transform a Future<AsyncGenerator<T>> into an AsyncGenerator<T>
+/// that waits for the future to complete as part of the first item.
+///
+/// This generator is not async-reentrant (even if the generator yielded by future is)
+///
+/// This generator does not queue
+template <typename T>
+AsyncGenerator<T> MakeFromFuture(Future<AsyncGenerator<T>> future) {
+  return FutureFirstGenerator<T>(std::move(future));
+}
+
+/// \brief Create a generator that will pull from the source into a queue.  Unlike
+/// MakeReadaheadGenerator this will not pull reentrantly from the source.
+///
+/// The source generator does not need to be async-reentrant
+///
+/// This generator is not async-reentrant (even if the source is)
+///
+/// This generator may queue up to max_readahead additional instances of T
+template <typename T>
+AsyncGenerator<T> MakeSerialReadaheadGenerator(AsyncGenerator<T> source_generator,
+                                               int max_readahead) {
+  return SerialReadaheadGenerator<T>(std::move(source_generator), max_readahead);
+}
+
+/// \brief Create a generator that immediately pulls from the source
+///
+/// Typical generators do not pull from their source until they themselves
+/// are pulled.  This generator does not follow that convention and will call
+/// generator() once before it returns.  The returned generator will otherwise
+/// mirror the source.
+///
+/// This generator forwards aysnc-reentrant pressure to the source
+/// This generator buffers one item (the first result) until it is delivered.
+template <typename T>
+AsyncGenerator<T> MakeAutoStartingGenerator(AsyncGenerator<T> generator) {
+  struct AutostartGenerator {
+    Future<T> operator()() {
+      if (first_future->is_valid()) {
+        Future<T> result = *first_future;
+        *first_future = Future<T>();
+        return result;
+      }
+      return source();
+    }
+
+    std::shared_ptr<Future<T>> first_future;
+    AsyncGenerator<T> source;
+  };
+
+  std::shared_ptr<Future<T>> first_future = std::make_shared<Future<T>>(generator());
+  return AutostartGenerator{std::move(first_future), std::move(generator)};
+}
+
+/// \see MakeReadaheadGenerator
+template <typename T>
+class ReadaheadGenerator {
+ public:
+  ReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
+      : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
+
+  Future<T> AddMarkFinishedContinuation(Future<T> fut) {
+    auto state = state_;
+    return fut.Then(
+        [state](const T& result) -> Result<T> {
+          state->MarkFinishedIfDone(result);
+          return result;
+        },
+        [state](const Status& err) -> Result<T> {
+          state->finished.store(true);
+          return err;
+        });
+  }
+
+  Future<T> operator()() {
+    if (state_->readahead_queue.empty()) {
+      // This is the first request, let's pump the underlying queue
+      for (int i = 0; i < state_->max_readahead; i++) {
+        auto next = state_->source_generator();
+        auto next_after_check = AddMarkFinishedContinuation(std::move(next));
+        state_->readahead_queue.push(std::move(next_after_check));
+      }
+    }
+    // Pop one and add one
+    auto result = state_->readahead_queue.front();
+    state_->readahead_queue.pop();
+    if (state_->finished.load()) {
+      state_->readahead_queue.push(AsyncGeneratorEnd<T>());
+    } else {
+      auto back_of_queue = state_->source_generator();
+      auto back_of_queue_after_check =
+          AddMarkFinishedContinuation(std::move(back_of_queue));
+      state_->readahead_queue.push(std::move(back_of_queue_after_check));
+    }
+    return result;
+  }
+
+ private:
+  struct State {
+    State(AsyncGenerator<T> source_generator, int max_readahead)
+        : source_generator(std::move(source_generator)), max_readahead(max_readahead) {
+      finished.store(false);
+    }
+
+    void MarkFinishedIfDone(const T& next_result) {
+      if (IsIterationEnd(next_result)) {
+        finished.store(true);
+      }
+    }
+
+    AsyncGenerator<T> source_generator;
+    int max_readahead;
+    std::atomic<bool> finished;
+    std::queue<Future<T>> readahead_queue;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief A generator where the producer pushes items on a queue.
+///
+/// No back-pressure is applied, so this generator is mostly useful when
+/// producing the values is neither CPU- nor memory-expensive (e.g. fetching
+/// filesystem metadata).
+///
+/// This generator is not async-reentrant.
+template <typename T>
+class PushGenerator {
+  struct State {
+    explicit State(util::BackpressureOptions backpressure)
+        : backpressure(std::move(backpressure)) {}
+
+    void OpenBackpressureIfFreeUnlocked(util::Mutex::Guard&& guard) {
+      if (backpressure.toggle && result_q.size() < backpressure.resume_if_below) {
+        // Open might trigger callbacks so release the lock first
+        guard.Unlock();
+        backpressure.toggle->Open();
+      }
+    }
+
+    void CloseBackpressureIfFullUnlocked() {
+      if (backpressure.toggle && result_q.size() > backpressure.pause_if_above) {
+        backpressure.toggle->Close();
+      }
+    }
+
+    util::BackpressureOptions backpressure;
+    util::Mutex mutex;
+    std::deque<Result<T>> result_q;
+    util::optional<Future<T>> consumer_fut;
+    bool finished = false;
+  };
+
+ public:
+  /// Producer API for PushGenerator
+  class Producer {
+   public:
+    explicit Producer(const std::shared_ptr<State>& state) : weak_state_(state) {}
+
+    /// \brief Push a value on the queue
+    ///
+    /// True is returned if the value was pushed, false if the generator is
+    /// already closed or destroyed.  If the latter, it is recommended to stop
+    /// producing any further values.
+    bool Push(Result<T> result) {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
+        // Closed early
+        return false;
+      }
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
+        lock.Unlock();  // unlock before potentially invoking a callback
+        fut.MarkFinished(std::move(result));
+      } else {
+        state->result_q.push_back(std::move(result));
+        state->CloseBackpressureIfFullUnlocked();
+      }
+      return true;
+    }
+
+    /// \brief Tell the consumer we have finished producing
+    ///
+    /// It is allowed to call this and later call Push() again ("early close").
+    /// In this case, calls to Push() after the queue is closed are silently
+    /// ignored.  This can help implementing non-trivial cancellation cases.
+    ///
+    /// True is returned on success, false if the generator is already closed
+    /// or destroyed.
+    bool Close() {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
+        // Already closed
+        return false;
+      }
+      state->finished = true;
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
+        lock.Unlock();  // unlock before potentially invoking a callback
+        fut.MarkFinished(IterationTraits<T>::End());
+      }
+      return true;
+    }
+
+    /// Return whether the generator was closed or destroyed.
+    bool is_closed() const {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return true;
+      }
+      auto lock = state->mutex.Lock();
+      return state->finished;
+    }
+
+   private:
+    const std::weak_ptr<State> weak_state_;
+  };
+
+  explicit PushGenerator(util::BackpressureOptions backpressure = {})
+      : state_(std::make_shared<State>(std::move(backpressure))) {}
+
+  /// Read an item from the queue
+  Future<T> operator()() const {
+    auto lock = state_->mutex.Lock();
+    assert(!state_->consumer_fut.has_value());  // Non-reentrant
+    if (!state_->result_q.empty()) {
+      auto fut = Future<T>::MakeFinished(std::move(state_->result_q.front()));
+      state_->result_q.pop_front();
+      state_->OpenBackpressureIfFreeUnlocked(std::move(lock));
+      return fut;
+    }
+    if (state_->finished) {
+      return AsyncGeneratorEnd<T>();
+    }
+    auto fut = Future<T>::Make();
+    state_->consumer_fut = fut;
+    return fut;
+  }
+
+  /// \brief Return producer-side interface
+  ///
+  /// The returned object must be used by the producer to push values on the queue.
+  /// Only a single Producer object should be instantiated.
+  Producer producer() { return Producer{state_}; }
+
+ private:
+  const std::shared_ptr<State> state_;
+};
+
+/// \brief Create a generator that pulls reentrantly from a source
+/// This generator will pull reentrantly from a source, ensuring that max_readahead
+/// requests are active at any given time.
+///
+/// The source generator must be async-reentrant
+///
+/// This generator itself is async-reentrant.
+///
+/// This generator may queue up to max_readahead instances of T
+template <typename T>
+AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator,
+                                         int max_readahead) {
+  return ReadaheadGenerator<T>(std::move(source_generator), max_readahead);
+}
+
+/// \brief Creates a generator that will yield finished futures from a vector
+///
+/// This generator is async-reentrant
+template <typename T>
+AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) {
+  struct State {
+    explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {}
+
+    std::vector<T> vec;
+    std::atomic<std::size_t> vec_idx;
+  };
+
+  auto state = std::make_shared<State>(std::move(vec));
+  return [state]() {
+    auto idx = state->vec_idx.fetch_add(1);
+    if (idx >= state->vec.size()) {
+      // Eagerly return memory
+      state->vec.clear();
+      return AsyncGeneratorEnd<T>();
+    }
+    return Future<T>::MakeFinished(state->vec[idx]);
+  };
+}
+
+/// \see MakeMergedGenerator
+template <typename T>
+class MergedGenerator {
+ public:
+  explicit MergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
+                           int max_subscriptions)
+      : state_(std::make_shared<State>(std::move(source), max_subscriptions)) {}
+
+  Future<T> operator()() {
+    Future<T> waiting_future;
+    std::shared_ptr<DeliveredJob> delivered_job;
+    {
+      auto guard = state_->mutex.Lock();
+      if (!state_->delivered_jobs.empty()) {
+        delivered_job = std::move(state_->delivered_jobs.front());
+        state_->delivered_jobs.pop_front();
+      } else if (state_->finished) {
+        return IterationTraits<T>::End();
+      } else {
+        waiting_future = Future<T>::Make();
+        state_->waiting_jobs.push_back(std::make_shared<Future<T>>(waiting_future));
+      }
+    }
+    if (delivered_job) {
+      // deliverer will be invalid if outer callback encounters an error and delivers a
+      // failed result
+      if (delivered_job->deliverer) {
+        delivered_job->deliverer().AddCallback(
+            InnerCallback{state_, delivered_job->index});
+      }
+      return std::move(delivered_job->value);
+    }
+    if (state_->first) {
+      state_->first = false;
+      for (std::size_t i = 0; i < state_->active_subscriptions.size(); i++) {
+        state_->PullSource().AddCallback(OuterCallback{state_, i});
+      }
+    }
+    return waiting_future;
+  }
+
+ private:
+  struct DeliveredJob {
+    explicit DeliveredJob(AsyncGenerator<T> deliverer_, Result<T> value_,
+                          std::size_t index_)
+        : deliverer(deliverer_), value(std::move(value_)), index(index_) {}
+
+    AsyncGenerator<T> deliverer;
+    Result<T> value;
+    std::size_t index;
+  };
+
+  struct State {
+    State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions)
+        : source(std::move(source)),
+          active_subscriptions(max_subscriptions),
+          delivered_jobs(),
+          waiting_jobs(),
+          mutex(),
+          first(true),
+          source_exhausted(false),
+          finished(false),
+          num_active_subscriptions(max_subscriptions) {}
+
+    Future<AsyncGenerator<T>> PullSource() {
+      // Need to guard access to source() so we don't pull sync-reentrantly which
+      // is never valid.
+      auto lock = mutex.Lock();
+      return source();
+    }
+
+    AsyncGenerator<AsyncGenerator<T>> source;
+    // active_subscriptions and delivered_jobs will be bounded by max_subscriptions
+    std::vector<AsyncGenerator<T>> active_subscriptions;
+    std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs;
+    // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will provide the
+    // backpressure
+    std::deque<std::shared_ptr<Future<T>>> waiting_jobs;
+    util::Mutex mutex;
+    bool first;
+    bool source_exhausted;
+    bool finished;
+    int num_active_subscriptions;
+  };
+
+  struct InnerCallback {
+    void operator()(const Result<T>& maybe_next_ref) {
+      Future<T> next_fut;
+      const Result<T>* maybe_next = &maybe_next_ref;
+
+      while (true) {
+        Future<T> sink;
+        bool sub_finished = maybe_next->ok() && IsIterationEnd(**maybe_next);
+        {
+          auto guard = state->mutex.Lock();
+          if (state->finished) {
+            // We've errored out so just ignore this result and don't keep pumping
+            return;
+          }
+          if (!sub_finished) {
+            if (state->waiting_jobs.empty()) {
+              state->delivered_jobs.push_back(std::make_shared<DeliveredJob>(
+                  state->active_subscriptions[index], *maybe_next, index));
+            } else {
+              sink = std::move(*state->waiting_jobs.front());
+              state->waiting_jobs.pop_front();
+            }
+          }
+        }
+        if (sub_finished) {
+          state->PullSource().AddCallback(OuterCallback{state, index});
+        } else if (sink.is_valid()) {
+          sink.MarkFinished(*maybe_next);
+          if (!maybe_next->ok()) return;
+
+          next_fut = state->active_subscriptions[index]();
+          if (next_fut.TryAddCallback([this]() { return *this; })) {
+            return;
+          }
+          // Already completed. Avoid very deep recursion by looping
+          // here instead of relying on the callback.
+          maybe_next = &next_fut.result();
+          continue;
+        }
+        return;
+      }
+    }
+    std::shared_ptr<State> state;
+    std::size_t index;
+  };
+
+  struct OuterCallback {
+    void operator()(const Result<AsyncGenerator<T>>& maybe_next) {
+      bool should_purge = false;
+      bool should_continue = false;
+      Future<T> error_sink;
+      {
+        auto guard = state->mutex.Lock();
+        if (!maybe_next.ok() || IsIterationEnd(*maybe_next)) {
+          state->source_exhausted = true;
+          if (!maybe_next.ok() || --state->num_active_subscriptions == 0) {
+            state->finished = true;
+            should_purge = true;
+          }
+          if (!maybe_next.ok()) {
+            if (state->waiting_jobs.empty()) {
+              state->delivered_jobs.push_back(std::make_shared<DeliveredJob>(
+                  AsyncGenerator<T>(), maybe_next.status(), index));
+            } else {
+              error_sink = std::move(*state->waiting_jobs.front());
+              state->waiting_jobs.pop_front();
+            }
+          }
+        } else {
+          state->active_subscriptions[index] = *maybe_next;
+          should_continue = true;
+        }
+      }
+      if (error_sink.is_valid()) {
+        error_sink.MarkFinished(maybe_next.status());
+      }
+      if (should_continue) {
+        (*maybe_next)().AddCallback(InnerCallback{state, index});
+      } else if (should_purge) {
+        // At this point state->finished has been marked true so no one else
+        // will be interacting with waiting_jobs and we can iterate outside lock
+        while (!state->waiting_jobs.empty()) {
+          state->waiting_jobs.front()->MarkFinished(IterationTraits<T>::End());
+          state->waiting_jobs.pop_front();
+        }
+      }
+    }
+    std::shared_ptr<State> state;
+    std::size_t index;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief Create a generator that takes in a stream of generators and pulls from up to
+/// max_subscriptions at a time
+///
+/// Note: This may deliver items out of sequence. For example, items from the third
+/// AsyncGenerator generated by the source may be emitted before some items from the first
+/// AsyncGenerator generated by the source.
+///
+/// This generator will pull from source async-reentrantly unless max_subscriptions is 1
+/// This generator will not pull from the individual subscriptions reentrantly.  Add
+/// readahead to the individual subscriptions if that is desired.
+/// This generator is async-reentrant
+///
+/// This generator may queue up to max_subscriptions instances of T
+template <typename T>
+AsyncGenerator<T> MakeMergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
+                                      int max_subscriptions) {
+  return MergedGenerator<T>(std::move(source), max_subscriptions);
+}
+
+template <typename T>
+Result<AsyncGenerator<T>> MakeSequencedMergedGenerator(
+    AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions) {
+  if (max_subscriptions < 0) {
+    return Status::Invalid("max_subscriptions must be a positive integer");
+  }
+  if (max_subscriptions == 1) {
+    return Status::Invalid("Use MakeConcatenatedGenerator if max_subscriptions is 1");
+  }
+  AsyncGenerator<AsyncGenerator<T>> autostarting_source = MakeMappedGenerator(
+      std::move(source),
+      [](const AsyncGenerator<T>& sub) { return MakeAutoStartingGenerator(sub); });
+  AsyncGenerator<AsyncGenerator<T>> sub_readahead =
+      MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1);
+  return MakeConcatenatedGenerator(std::move(sub_readahead));
+}
+
+/// \brief Create a generator that takes in a stream of generators and pulls from each
+/// one in sequence.
+///
+/// This generator is async-reentrant but will never pull from source reentrantly and
+/// will never pull from any subscription reentrantly.
+///
+/// This generator may queue 1 instance of T
+///
+/// TODO: Could potentially make a bespoke implementation instead of MergedGenerator that
+/// forwards async-reentrant requests instead of buffering them (which is what
+/// MergedGenerator does)
+template <typename T>
+AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> source) {
+  return MergedGenerator<T>(std::move(source), 1);
+}
+
+template <typename T>
+struct Enumerated {
+  T value;
+  int index;
+  bool last;
+};
+
+template <typename T>
+struct IterationTraits<Enumerated<T>> {
+  static Enumerated<T> End() { return Enumerated<T>{IterationEnd<T>(), -1, false}; }
+  static bool IsEnd(const Enumerated<T>& val) { return val.index < 0; }
+};
+
+/// \see MakeEnumeratedGenerator
+template <typename T>
+class EnumeratingGenerator {
+ public:
+  EnumeratingGenerator(AsyncGenerator<T> source, T initial_value)
+      : state_(std::make_shared<State>(std::move(source), std::move(initial_value))) {}
+
+  Future<Enumerated<T>> operator()() {
+    if (state_->finished) {
+      return AsyncGeneratorEnd<Enumerated<T>>();
+    } else {
+      auto state = state_;
+      return state->source().Then([state](const T& next) {
+        auto finished = IsIterationEnd<T>(next);
+        auto prev = Enumerated<T>{state->prev_value, state->prev_index, finished};
+        state->prev_value = next;
+        state->prev_index++;
+        state->finished = finished;
+        return prev;
+      });
+    }
+  }
+
+ private:
+  struct State {
+    State(AsyncGenerator<T> source, T initial_value)
+        : source(std::move(source)), prev_value(std::move(initial_value)), prev_index(0) {
+      finished = IsIterationEnd<T>(prev_value);
+    }
+
+    AsyncGenerator<T> source;
+    T prev_value;
+    int prev_index;
+    bool finished;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// Wrap items from a source generator with positional information
+///
+/// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be
+/// processed in a "first-available" fashion and later resequenced which can reduce the
+/// impact of sources with erratic performance (e.g. a filesystem where some items may
+/// take longer to read than others).
+///
+/// TODO(ARROW-12371) Would require this generator be async-reentrant
+///
+/// \see MakeSequencingGenerator for an example of putting items back in order
+///
+/// This generator is not async-reentrant
+///
+/// This generator buffers one item (so it knows which item is the last item)
+template <typename T>
+AsyncGenerator<Enumerated<T>> MakeEnumeratedGenerator(AsyncGenerator<T> source) {
+  return FutureFirstGenerator<Enumerated<T>>(
+      source().Then([source](const T& initial_value) -> AsyncGenerator<Enumerated<T>> {
+        return EnumeratingGenerator<T>(std::move(source), initial_value);
+      }));
+}
+
+/// \see MakeTransferredGenerator
+template <typename T>
+class TransferringGenerator {
+ public:
+  explicit TransferringGenerator(AsyncGenerator<T> source, internal::Executor* executor)
+      : source_(std::move(source)), executor_(executor) {}
+
+  Future<T> operator()() { return executor_->Transfer(source_()); }
+
+ private:
+  AsyncGenerator<T> source_;
+  internal::Executor* executor_;
+};
+
+/// \brief Transfer a future to an underlying executor.
+///
+/// Continuations run on the returned future will be run on the given executor
+/// if they cannot be run synchronously.
+///
+/// This is often needed to move computation off I/O threads or other external
+/// completion sources and back on to the CPU executor so the I/O thread can
+/// stay busy and focused on I/O
+///
+/// Keep in mind that continuations called on an already completed future will
+/// always be run synchronously and so no transfer will happen in that case.
+///
+/// This generator is async reentrant if the source is
+///
+/// This generator will not queue
+template <typename T>
+AsyncGenerator<T> MakeTransferredGenerator(AsyncGenerator<T> source,
+                                           internal::Executor* executor) {
+  return TransferringGenerator<T>(std::move(source), executor);
+}
+
+/// \see MakeBackgroundGenerator
+template <typename T>
+class BackgroundGenerator {
+ public:
+  explicit BackgroundGenerator(Iterator<T> it, internal::Executor* io_executor, int max_q,
+                               int q_restart)
+      : state_(std::make_shared<State>(io_executor, std::move(it), max_q, q_restart)),
+        cleanup_(std::make_shared<Cleanup>(state_.get())) {}
+
+  Future<T> operator()() {
+    auto guard = state_->mutex.Lock();
+    Future<T> waiting_future;
+    if (state_->queue.empty()) {
+      if (state_->finished) {
+        return AsyncGeneratorEnd<T>();
+      } else {
+        waiting_future = Future<T>::Make();
+        state_->waiting_future = waiting_future;
+      }
+    } else {
+      auto next = Future<T>::MakeFinished(std::move(state_->queue.front()));
+      state_->queue.pop();
+      if (state_->NeedsRestart()) {
+        return state_->RestartTask(state_, std::move(guard), std::move(next));
+      }
+      return next;
+    }
+    // This should only trigger the very first time this method is called
+    if (state_->NeedsRestart()) {
+      return state_->RestartTask(state_, std::move(guard), std::move(waiting_future));
+    }
+    return waiting_future;
+  }
+
+ protected:
+  static constexpr uint64_t kUnlikelyThreadId{std::numeric_limits<uint64_t>::max()};
+
+  struct State {
+    State(internal::Executor* io_executor, Iterator<T> it, int max_q, int q_restart)
+        : io_executor(io_executor),
+          max_q(max_q),
+          q_restart(q_restart),
+          it(std::move(it)),
+          reading(false),
+          finished(false),
+          should_shutdown(false) {}
+
+    void ClearQueue() {
+      while (!queue.empty()) {
+        queue.pop();
+      }
+    }
+
+    bool TaskIsRunning() const { return task_finished.is_valid(); }
+
+    bool NeedsRestart() const {
+      return !finished && !reading && static_cast<int>(queue.size()) <= q_restart;
+    }
+
+    void DoRestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) {
+      // If we get here we are actually going to start a new task so let's create a
+      // task_finished future for it
+      state->task_finished = Future<>::Make();
+      state->reading = true;
+      auto spawn_status = io_executor->Spawn(
+          [state]() { BackgroundGenerator::WorkerTask(std::move(state)); });
+      if (!spawn_status.ok()) {
+        // If we can't spawn a new task then send an error to the consumer (either via a
+        // waiting future or the queue) and mark ourselves finished
+        state->finished = true;
+        state->task_finished = Future<>();
+        if (waiting_future.has_value()) {
+          auto to_deliver = std::move(waiting_future.value());
+          waiting_future.reset();
+          guard.Unlock();
+          to_deliver.MarkFinished(spawn_status);
+        } else {
+          ClearQueue();
+          queue.push(spawn_status);
+        }
+      }
+    }
+
+    Future<T> RestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard,
+                          Future<T> next) {
+      if (TaskIsRunning()) {
+        // If the task is still cleaning up we need to wait for it to finish before
+        // restarting.  We also want to block the consumer until we've restarted the
+        // reader to avoid multiple restarts
+        return task_finished.Then([state, next]() {
+          // This may appear dangerous (recursive mutex) but we should be guaranteed the
+          // outer guard has been released by this point.  We know...
+          // * task_finished is not already finished (it would be invalid in that case)
+          // * task_finished will not be marked complete until we've given up the mutex
+          auto guard_ = state->mutex.Lock();
+          state->DoRestartTask(state, std::move(guard_));
+          return next;
+        });
+      }
+      // Otherwise we can restart immediately
+      DoRestartTask(std::move(state), std::move(guard));
+      return next;
+    }
+
+    internal::Executor* io_executor;
+    const int max_q;
+    const int q_restart;
+    Iterator<T> it;
+    std::atomic<uint64_t> worker_thread_id{kUnlikelyThreadId};
+
+    // If true, the task is actively pumping items from the queue and does not need a
+    // restart
+    bool reading;
+    // Set to true when a terminal item arrives
+    bool finished;
+    // Signal to the background task to end early because consumers have given up on it
+    bool should_shutdown;
+    // If the queue is empty, the consumer will create a waiting future and wait for it
+    std::queue<Result<T>> queue;
+    util::optional<Future<T>> waiting_future;
+    // Every background task is given a future to complete when it is entirely finished
+    // processing and ready for the next task to start or for State to be destroyed
+    Future<> task_finished;
+    util::Mutex mutex;
+  };
+
+  // Cleanup task that will be run when all consumer references to the generator are lost
+  struct Cleanup {
+    explicit Cleanup(State* state) : state(state) {}
+    ~Cleanup() {
+      /// TODO: Once ARROW-13109 is available then we can be force consumers to spawn and
+      /// there is no need to perform this check.
+      ///
+      /// It's a deadlock if we enter cleanup from
+      /// the worker thread but it can happen if the consumer doesn't transfer away
+      assert(state->worker_thread_id.load() != ::arrow::internal::GetThreadId());
+      Future<> finish_fut;
+      {
+        auto lock = state->mutex.Lock();
+        if (!state->TaskIsRunning()) {
+          return;
+        }
+        // Signal the current task to stop and wait for it to finish
+        state->should_shutdown = true;
+        finish_fut = state->task_finished;
+      }
+      // Using future as a condition variable here
+      Status st = finish_fut.status();
+      ARROW_UNUSED(st);
+    }
+    State* state;
+  };
+
+  static void WorkerTask(std::shared_ptr<State> state) {
+    state->worker_thread_id.store(::arrow::internal::GetThreadId());
+    // We need to capture the state to read while outside the mutex
+    bool reading = true;
+    while (reading) {
+      auto next = state->it.Next();
+      // Need to capture state->waiting_future inside the mutex to mark finished outside
+      Future<T> waiting_future;
+      {
+        auto guard = state->mutex.Lock();
+
+        if (state->should_shutdown) {
+          state->finished = true;
+          break;
+        }
+
+        if (!next.ok() || IsIterationEnd<T>(*next)) {
+          // Terminal item.  Mark finished to true, send this last item, and quit
+          state->finished = true;
+          if (!next.ok()) {
+            state->ClearQueue();
+          }
+        }
+        // At this point we are going to send an item.  Either we will add it to the
+        // queue or deliver it to a waiting future.
+        if (state->waiting_future.has_value()) {
+          waiting_future = std::move(state->waiting_future.value());
+          state->waiting_future.reset();
+        } else {
+          state->queue.push(std::move(next));
+          // We just filled up the queue so it is time to quit.  We may need to notify
+          // a cleanup task so we transition to Quitting
+          if (static_cast<int>(state->queue.size()) >= state->max_q) {
+            state->reading = false;
+          }
+        }
+        reading = state->reading && !state->finished;
+      }
+      // This should happen outside the mutex.  Presumably there is a
+      // transferring generator on the other end that will quickly transfer any
+      // callbacks off of this thread so we can continue looping.  Still, best not to
+      // rely on that
+      if (waiting_future.is_valid()) {
+        waiting_future.MarkFinished(next);
+      }
+    }
+    // Once we've sent our last item we can notify any waiters that we are done and so
+    // either state can be cleaned up or a new background task can be started
+    Future<> task_finished;
+    {
+      auto guard = state->mutex.Lock();
+      // After we give up the mutex state can be safely deleted.  We will no longer
+      // reference it.  We can safely transition to idle now.
+      task_finished = state->task_finished;
+      state->task_finished = Future<>();
+      state->worker_thread_id.store(kUnlikelyThreadId);
+    }
+    task_finished.MarkFinished();
+  }
+
+  std::shared_ptr<State> state_;
+  // state_ is held by both the generator and the background thread so it won't be cleaned
+  // up when all consumer references are relinquished.  cleanup_ is only held by the
+  // generator so it will be destructed when the last consumer reference is gone.  We use
+  // this to cleanup / stop the background generator in case the consuming end stops
+  // listening (e.g. due to a downstream error)
+  std::shared_ptr<Cleanup> cleanup_;
+};
+
+constexpr int kDefaultBackgroundMaxQ = 32;
+constexpr int kDefaultBackgroundQRestart = 16;
+
+/// \brief Create an AsyncGenerator<T> by iterating over an Iterator<T> on a background
+/// thread
+///
+/// The parameter max_q and q_restart control queue size and background thread task
+/// management. If the background task is fast you typically don't want it creating a
+/// thread task for every item.  Instead the background thread will run until it fills
+/// up a readahead queue.
+///
+/// Once the queue has filled up the background thread task will terminate (allowing other
+/// I/O tasks to use the thread).  Once the queue has been drained enough (specified by
+/// q_restart) then the background thread task will be restarted.  If q_restart is too low
+/// then you may exhaust the queue waiting for the background thread task to start running
+/// again.  If it is too high then it will be constantly stopping and restarting the
+/// background queue task
+///
+/// The "background thread" is a logical thread and will run as tasks on the io_executor.
+/// This thread may stop and start when the queue fills up but there will only be one
+/// active background thread task at any given time.  You MUST transfer away from this
+/// background generator.  Otherwise there could be a race condition if a callback on the
+/// background thread deletes the last consumer reference to the background generator. You
+/// can transfer onto the same executor as the background thread, it is only neccesary to
+/// create a new thread task, not to switch executors.
+///
+/// This generator is not async-reentrant
+///
+/// This generator will queue up to max_q blocks
+template <typename T>
+static Result<AsyncGenerator<T>> MakeBackgroundGenerator(
+    Iterator<T> iterator, internal::Executor* io_executor,
+    int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart) {
+  if (max_q < q_restart) {
+    return Status::Invalid("max_q must be >= q_restart");
+  }
+  return BackgroundGenerator<T>(std::move(iterator), io_executor, max_q, q_restart);
+}
+
+/// \see MakeGeneratorIterator
+template <typename T>
+class GeneratorIterator {
+ public:
+  explicit GeneratorIterator(AsyncGenerator<T> source) : source_(std::move(source)) {}
+
+  Result<T> Next() { return source_().result(); }
+
+ private:
+  AsyncGenerator<T> source_;
+};
+
+/// \brief Convert an AsyncGenerator<T> to an Iterator<T> which blocks until each future
+/// is finished
+template <typename T>
+Iterator<T> MakeGeneratorIterator(AsyncGenerator<T> source) {
+  return Iterator<T>(GeneratorIterator<T>(std::move(source)));
+}
+
+/// \brief Add readahead to an iterator using a background thread.
+///
+/// Under the hood this is converting the iterator to a generator using
+/// MakeBackgroundGenerator, adding readahead to the converted generator with
+/// MakeReadaheadGenerator, and then converting back to an iterator using
+/// MakeGeneratorIterator.
+template <typename T>
+Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> it, int readahead_queue_size) {
+  ARROW_ASSIGN_OR_RAISE(auto io_executor, internal::ThreadPool::Make(1));
+  auto max_q = readahead_queue_size;
+  auto q_restart = std::max(1, max_q / 2);
+  ARROW_ASSIGN_OR_RAISE(
+      auto background_generator,
+      MakeBackgroundGenerator(std::move(it), io_executor.get(), max_q, q_restart));
+  // Capture io_executor to keep it alive as long as owned_bg_generator is still
+  // referenced
+  AsyncGenerator<T> owned_bg_generator = [io_executor, background_generator]() {
+    return background_generator();
+  };
+  return MakeGeneratorIterator(std::move(owned_bg_generator));
+}
+
+/// \brief Make a generator that returns a single pre-generated future
+///
+/// This generator is async-reentrant.
+template <typename T>
+std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
+  assert(future.is_valid());
+  auto state = std::make_shared<Future<T>>(std::move(future));
+  return [state]() -> Future<T> {
+    auto fut = std::move(*state);
+    if (fut.is_valid()) {
+      return fut;
+    } else {
+      return AsyncGeneratorEnd<T>();
+    }
+  };
+}
+
+/// \brief Make a generator that immediately ends.
+///
+/// This generator is async-reentrant.
+template <typename T>
+std::function<Future<T>()> MakeEmptyGenerator() {
+  return []() -> Future<T> { return AsyncGeneratorEnd<T>(); };
+}
+
+/// \brief Make a generator that always fails with a given error
+///
+/// This generator is async-reentrant.
+template <typename T>
+AsyncGenerator<T> MakeFailingGenerator(Status st) {
+  assert(!st.ok());
+  auto state = std::make_shared<Status>(std::move(st));
+  return [state]() -> Future<T> {
+    auto st = std::move(*state);
+    if (!st.ok()) {
+      return std::move(st);
+    } else {
+      return AsyncGeneratorEnd<T>();
+    }
+  };
+}
+
+/// \brief Make a generator that always fails with a given error
+///
+/// This overload allows inferring the return type from the argument.
+template <typename T>
+AsyncGenerator<T> MakeFailingGenerator(const Result<T>& result) {
+  return MakeFailingGenerator<T>(result.status());
+}
+
+/// \brief Prepend initial_values onto a generator
+///
+/// This generator is async-reentrant but will buffer requests and will not
+/// pull from following_values async-reentrantly.
+template <typename T>
+AsyncGenerator<T> MakeGeneratorStartsWith(std::vector<T> initial_values,
+                                          AsyncGenerator<T> following_values) {
+  auto initial_values_vec_gen = MakeVectorGenerator(std::move(initial_values));
+  auto gen_gen = MakeVectorGenerator<AsyncGenerator<T>>(
+      {std::move(initial_values_vec_gen), std::move(following_values)});
+  return MakeConcatenatedGenerator(std::move(gen_gen));
+}
+
+template <typename T>
+struct CancellableGenerator {
+  Future<T> operator()() {
+    if (stop_token.IsStopRequested()) {
+      return stop_token.Poll();
+    }
+    return source();
+  }
+
+  AsyncGenerator<T> source;
+  StopToken stop_token;
+};
+
+/// \brief Allow an async generator to be cancelled
+///
+/// This generator is async-reentrant
+template <typename T>
+AsyncGenerator<T> MakeCancellable(AsyncGenerator<T> source, StopToken stop_token) {
+  return CancellableGenerator<T>{std::move(source), std::move(stop_token)};
+}
+
+template <typename T>
+struct PauseableGenerator {
+ public:
+  PauseableGenerator(AsyncGenerator<T> source, std::shared_ptr<util::AsyncToggle> toggle)
+      : state_(std::make_shared<PauseableGeneratorState>(std::move(source),
+                                                         std::move(toggle))) {}
+
+  Future<T> operator()() { return (*state_)(); }
+
+ private:
+  struct PauseableGeneratorState
+      : public std::enable_shared_from_this<PauseableGeneratorState> {
+    PauseableGeneratorState(AsyncGenerator<T> source,
+                            std::shared_ptr<util::AsyncToggle> toggle)
+        : source_(std::move(source)), toggle_(std::move(toggle)) {}
+
+    Future<T> operator()() {
+      std::shared_ptr<PauseableGeneratorState> self = this->shared_from_this();
+      return toggle_->WhenOpen().Then([self] {
+        util::Mutex::Guard guard = self->mutex_.Lock();
+        return self->source_();
+      });
+    }
+
+    AsyncGenerator<T> source_;
+    std::shared_ptr<util::AsyncToggle> toggle_;
+    util::Mutex mutex_;
+  };
+  std::shared_ptr<PauseableGeneratorState> state_;
+};
+
+/// \brief Allow an async generator to be paused
+///
+/// This generator is NOT async-reentrant and calling it in an async-reentrant fashion
+/// may lead to items getting reordered (and potentially truncated if the end token is
+/// reordered ahead of valid items)
+///
+/// This generator forwards async-reentrant pressure
+template <typename T>
+AsyncGenerator<T> MakePauseable(AsyncGenerator<T> source,
+                                std::shared_ptr<util::AsyncToggle> toggle) {
+  return PauseableGenerator<T>(std::move(source), std::move(toggle));
+}
+
+template <typename T>
+class DefaultIfEmptyGenerator {
+ public:
+  DefaultIfEmptyGenerator(AsyncGenerator<T> source, T or_value)
+      : state_(std::make_shared<State>(std::move(source), std::move(or_value))) {}
+
+  Future<T> operator()() {
+    if (state_->first) {
+      state_->first = false;
+      struct {
+        T or_value;
+
+        Result<T> operator()(const T& value) {
+          if (IterationTraits<T>::IsEnd(value)) {
+            return std::move(or_value);
+          }
+          return value;
+        }
+      } Continuation;
+      Continuation.or_value = std::move(state_->or_value);
+      return state_->source().Then(std::move(Continuation));
+    }
+    return state_->source();
+  }
+
+ private:
+  struct State {
+    AsyncGenerator<T> source;
+    T or_value;
+    bool first;
+    State(AsyncGenerator<T> source_, T or_value_)
+        : source(std::move(source_)), or_value(std::move(or_value_)), first(true) {}
+  };
+  std::shared_ptr<State> state_;
+};
+
+/// \brief If the generator is empty, return the given value, else
+/// forward the values from the generator.
+///
+/// This generator is async-reentrant.
+template <typename T>
+AsyncGenerator<T> MakeDefaultIfEmptyGenerator(AsyncGenerator<T> source, T or_value) {
+  return DefaultIfEmptyGenerator<T>(std::move(source), std::move(or_value));
+}
+}  // namespace arrow