]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/jaegertracing/thrift/lib/d/src/thrift/codegen/async_client_pool.d
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / jaegertracing / thrift / lib / d / src / thrift / codegen / async_client_pool.d
diff --git a/ceph/src/jaegertracing/thrift/lib/d/src/thrift/codegen/async_client_pool.d b/ceph/src/jaegertracing/thrift/lib/d/src/thrift/codegen/async_client_pool.d
new file mode 100644 (file)
index 0000000..26cb975
--- /dev/null
@@ -0,0 +1,906 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Utilities for asynchronously querying multiple servers, building on
+ * TAsyncClient.
+ *
+ * Terminology note: The names of the artifacts defined in this module are
+ *   derived from »client pool«, because they operate on a pool of
+ *   TAsyncClients. However, from a architectural point of view, they often
+ *   represent a pool of hosts a Thrift client application communicates with
+ *   using RPC calls.
+ */
+module thrift.codegen.async_client_pool;
+
+import core.sync.mutex;
+import core.time : Duration, dur;
+import std.algorithm : map;
+import std.array : array, empty;
+import std.exception : enforce;
+import std.traits : ParameterTypeTuple, ReturnType;
+import thrift.base;
+import thrift.codegen.base;
+import thrift.codegen.async_client;
+import thrift.internal.algorithm;
+import thrift.internal.codegen;
+import thrift.util.awaitable;
+import thrift.util.cancellation;
+import thrift.util.future;
+import thrift.internal.resource_pool;
+
+/**
+ * Represents a generic client pool which implements TFutureInterface!Interface
+ * using multiple TAsyncClients.
+ */
+interface TAsyncClientPoolBase(Interface) if (isService!Interface) :
+  TFutureInterface!Interface
+{
+  /// Shorthand for the client type this pool operates on.
+  alias TAsyncClientBase!Interface Client;
+
+  /**
+   * Adds a client to the pool.
+   */
+  void addClient(Client client);
+
+  /**
+   * Removes a client from the pool.
+   *
+   * Returns: Whether the client was found in the pool.
+   */
+  bool removeClient(Client client);
+
+  /**
+   * Called to determine whether an exception comes from a client from the
+   * pool not working properly, or if it an exception thrown at the
+   * application level.
+   *
+   * If the delegate returns true, the server/connection is considered to be
+   * at fault, if it returns false, the exception is just passed on to the
+   * caller.
+   *
+   * By default, returns true for instances of TTransportException and
+   * TApplicationException, false otherwise.
+   */
+  bool delegate(Exception) rpcFaultFilter() const @property;
+  void rpcFaultFilter(bool delegate(Exception)) @property; /// Ditto
+
+  /**
+   * Whether to open the underlying transports of a client before trying to
+   * execute a method if they are not open. This is usually desirable
+   * because it allows e.g. to automatically reconnect to a remote server
+   * if the network connection is dropped.
+   *
+   * Defaults to true.
+   */
+  bool reopenTransports() const @property;
+  void reopenTransports(bool) @property; /// Ditto
+}
+
+immutable bool delegate(Exception) defaultRpcFaultFilter;
+static this() {
+  defaultRpcFaultFilter = (Exception e) {
+    import thrift.protocol.base;
+    import thrift.transport.base;
+    return (
+      (cast(TTransportException)e !is null) ||
+      (cast(TApplicationException)e !is null)
+    );
+  };
+}
+
+/**
+ * A TAsyncClientPoolBase implementation which queries multiple servers in a
+ * row until a request succeeds, the result of which is then returned.
+ *
+ * The definition of »success« can be customized using the rpcFaultFilter()
+ * delegate property. If it is non-null and calling it for an exception set by
+ * a failed method invocation returns true, the error is considered to be
+ * caused by the RPC layer rather than the application layer, and the next
+ * server in the pool is tried. If there are no more clients to try, the
+ * operation is marked as failed with a TCompoundOperationException.
+ *
+ * If a TAsyncClient in the pool fails with an RPC exception for a number of
+ * consecutive tries, it is temporarily disabled (not tried any longer) for
+ * a certain duration. Both the limit and the timeout can be configured. If all
+ * clients fail (and keepTrying is false), the operation fails with a
+ * TCompoundOperationException which contains the collected RPC exceptions.
+ */
+final class TAsyncClientPool(Interface) if (isService!Interface) :
+  TAsyncClientPoolBase!Interface
+{
+  ///
+  this(Client[] clients) {
+    pool_ = new TResourcePool!Client(clients);
+    rpcFaultFilter_ = defaultRpcFaultFilter;
+    reopenTransports_ = true;
+  }
+
+  /+override+/ void addClient(Client client) {
+    pool_.add(client);
+  }
+
+  /+override+/ bool removeClient(Client client) {
+    return pool_.remove(client);
+  }
+
+  /**
+   * Whether to keep trying to find a working client if all have failed in a
+   * row.
+   *
+   * Defaults to false.
+   */
+  bool keepTrying() const @property {
+    return pool_.cycle;
+  }
+
+  /// Ditto
+  void keepTrying(bool value) @property {
+    pool_.cycle = value;
+  }
+
+  /**
+   * Whether to use a random permutation of the client pool on every call to
+   * execute(). This can be used e.g. as a simple form of load balancing.
+   *
+   * Defaults to true.
+   */
+  bool permuteClients() const @property {
+    return pool_.permute;
+  }
+
+  /// Ditto
+  void permuteClients(bool value) @property {
+    pool_.permute = value;
+  }
+
+  /**
+   * The number of consecutive faults after which a client is disabled until
+   * faultDisableDuration has passed. 0 to never disable clients.
+   *
+   * Defaults to 0.
+   */
+  ushort faultDisableCount() const @property {
+    return pool_.faultDisableCount;
+  }
+
+  /// Ditto
+  void faultDisableCount(ushort value) @property {
+    pool_.faultDisableCount = value;
+  }
+
+  /**
+   * The duration for which a client is no longer considered after it has
+   * failed too often.
+   *
+   * Defaults to one second.
+   */
+  Duration faultDisableDuration() const @property {
+    return pool_.faultDisableDuration;
+  }
+
+  /// Ditto
+  void faultDisableDuration(Duration value) @property {
+    pool_.faultDisableDuration = value;
+  }
+
+  /+override+/ bool delegate(Exception) rpcFaultFilter() const @property {
+    return rpcFaultFilter_;
+  }
+
+  /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property {
+    rpcFaultFilter_ = value;
+  }
+
+  /+override+/ bool reopenTransports() const @property {
+    return reopenTransports_;
+  }
+
+  /+override+/ void reopenTransports(bool value) @property {
+    reopenTransports_ = value;
+  }
+
+  mixin(fallbackPoolForwardCode!Interface());
+
+protected:
+  // The actual worker implementation to which RPC method calls are forwarded.
+  auto executeOnPool(string method, Args...)(Args args,
+    TCancellation cancellation
+  ) {
+    auto clients = pool_[];
+    if (clients.empty) {
+      throw new TException("No clients available to try.");
+    }
+
+    auto promise = new TPromise!(ReturnType!(MemberType!(Interface, method)));
+    Exception[] rpcExceptions;
+
+    void tryNext() {
+      while (clients.empty) {
+        Client next;
+        Duration waitTime;
+        if (clients.willBecomeNonempty(next, waitTime)) {
+          if (waitTime > dur!"hnsecs"(0)) {
+            if (waitTime < dur!"usecs"(10)) {
+              import core.thread;
+              Thread.sleep(waitTime);
+            } else {
+              next.transport.asyncManager.delay(waitTime, { tryNext(); });
+              return;
+            }
+          }
+        } else {
+          promise.fail(new TCompoundOperationException("All clients failed.",
+            rpcExceptions));
+          return;
+        }
+      }
+
+      auto client = clients.front;
+      clients.popFront;
+
+      if (reopenTransports) {
+        if (!client.transport.isOpen) {
+          try {
+            client.transport.open();
+          } catch (Exception e) {
+            pool_.recordFault(client);
+            tryNext();
+            return;
+          }
+        }
+      }
+
+      auto future = mixin("client." ~ method)(args, cancellation);
+      future.completion.addCallback({
+        if (future.status == TFutureStatus.CANCELLED) {
+          promise.cancel();
+          return;
+        }
+
+        auto e = future.getException();
+        if (e) {
+          if (rpcFaultFilter_ && rpcFaultFilter_(e)) {
+            pool_.recordFault(client);
+            rpcExceptions ~= e;
+            tryNext();
+            return;
+          }
+        }
+        pool_.recordSuccess(client);
+        promise.complete(future);
+      });
+    }
+
+    tryNext();
+    return promise;
+  }
+
+private:
+  TResourcePool!Client pool_;
+  bool delegate(Exception) rpcFaultFilter_;
+  bool reopenTransports_;
+}
+
+/**
+ * TAsyncClientPool construction helper to avoid having to explicitly
+ * specify the interface type, i.e. to allow the constructor being called
+ * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
+ */
+TAsyncClientPool!Interface tAsyncClientPool(Interface)(
+  TAsyncClientBase!Interface[] clients
+) if (isService!Interface) {
+  return new typeof(return)(clients);
+}
+
+private {
+  // Cannot use an anonymous delegate literal for this because they aren't
+  // allowed in class scope.
+  string fallbackPoolForwardCode(Interface)() {
+    string code = "";
+
+    foreach (methodName; AllMemberMethodNames!Interface) {
+      enum qn = "Interface." ~ methodName;
+      code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
+        "(ParameterTypeTuple!(" ~ qn ~ ") args, TCancellation cancellation = null) {\n";
+      code ~= "return executeOnPool!(`" ~ methodName ~ "`)(args, cancellation);\n";
+      code ~= "}\n";
+    }
+
+    return code;
+  }
+}
+
+/**
+ * A TAsyncClientPoolBase implementation which queries multiple servers at
+ * the same time and returns the first success response.
+ *
+ * The definition of »success« can be customized using the rpcFaultFilter()
+ * delegate property. If it is non-null and calling it for an exception set by
+ * a failed method invocation returns true, the error is considered to be
+ * caused by the RPC layer rather than the application layer, and the next
+ * server in the pool is tried. If all clients fail, the operation is marked
+ * as failed with a TCompoundOperationException.
+ */
+final class TAsyncFastestClientPool(Interface) if (isService!Interface) :
+  TAsyncClientPoolBase!Interface
+{
+  ///
+  this(Client[] clients) {
+    clients_ = clients;
+    rpcFaultFilter_ = defaultRpcFaultFilter;
+    reopenTransports_ = true;
+  }
+
+  /+override+/ void addClient(Client client) {
+    clients_ ~= client;
+  }
+
+  /+override+/ bool removeClient(Client client) {
+    auto oldLength = clients_.length;
+    clients_ = removeEqual(clients_, client);
+    return clients_.length < oldLength;
+  }
+
+
+  /+override+/ bool delegate(Exception) rpcFaultFilter() const @property {
+    return rpcFaultFilter_;
+  }
+
+  /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property {
+    rpcFaultFilter_ = value;
+  }
+
+  /+override+/bool reopenTransports() const @property {
+    return reopenTransports_;
+  }
+
+  /+override+/ void reopenTransports(bool value) @property {
+    reopenTransports_ = value;
+  }
+
+  mixin(fastestPoolForwardCode!Interface());
+
+private:
+  Client[] clients_;
+  bool delegate(Exception) rpcFaultFilter_;
+  bool reopenTransports_;
+}
+
+/**
+ * TAsyncFastestClientPool construction helper to avoid having to explicitly
+ * specify the interface type, i.e. to allow the constructor being called
+ * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
+ */
+TAsyncFastestClientPool!Interface tAsyncFastestClientPool(Interface)(
+  TAsyncClientBase!Interface[] clients
+) if (isService!Interface) {
+  return new typeof(return)(clients);
+}
+
+private {
+  // Cannot use an anonymous delegate literal for this because they aren't
+  // allowed in class scope.
+  string fastestPoolForwardCode(Interface)() {
+    string code = "";
+
+    foreach (methodName; AllMemberMethodNames!Interface) {
+      enum qn = "Interface." ~ methodName;
+      code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
+        "(ParameterTypeTuple!(" ~ qn ~ ") args, " ~
+        "TCancellation cancellation = null) {\n";
+      code ~= "enum methodName = `" ~ methodName ~ "`;\n";
+      code ~= q{
+        alias ReturnType!(MemberType!(Interface, methodName)) ResultType;
+
+        auto childCancellation = new TCancellationOrigin;
+
+        TFuture!ResultType[] futures;
+        futures.reserve(clients_.length);
+
+        foreach (c; clients_) {
+          if (reopenTransports) {
+            if (!c.transport.isOpen) {
+              try {
+                c.transport.open();
+              } catch (Exception e) {
+                continue;
+              }
+            }
+          }
+          futures ~= mixin("c." ~ methodName)(args, childCancellation);
+        }
+
+        return new FastestPoolJob!(ResultType)(
+          futures, rpcFaultFilter, cancellation, childCancellation);
+      };
+      code ~= "}\n";
+    }
+
+    return code;
+  }
+
+  final class FastestPoolJob(Result) : TFuture!Result {
+    this(TFuture!Result[] poolFutures, bool delegate(Exception) rpcFaultFilter,
+      TCancellation cancellation, TCancellationOrigin childCancellation
+    ) {
+      resultPromise_ = new TPromise!Result;
+      poolFutures_ = poolFutures;
+      rpcFaultFilter_ = rpcFaultFilter;
+      childCancellation_ = childCancellation;
+
+      foreach (future; poolFutures) {
+        future.completion.addCallback({
+          auto f = future;
+          return { completionCallback(f); };
+        }());
+        if (future.status != TFutureStatus.RUNNING) {
+          // If the current future is already completed, we are done, don't
+          // bother adding callbacks for the others (they would just return
+          // immediately after acquiring the lock).
+          return;
+        }
+      }
+
+      if (cancellation) {
+        cancellation.triggering.addCallback({
+          resultPromise_.cancel();
+          childCancellation.trigger();
+        });
+      }
+    }
+
+    TFutureStatus status() const @property {
+      return resultPromise_.status;
+    }
+
+    TAwaitable completion() @property {
+      return resultPromise_.completion;
+    }
+
+    Result get() {
+      return resultPromise_.get();
+    }
+
+    Exception getException() {
+      return resultPromise_.getException();
+    }
+
+  private:
+    void completionCallback(TFuture!Result future) {
+      synchronized {
+        if (future.status == TFutureStatus.CANCELLED) {
+          assert(resultPromise_.status != TFutureStatus.RUNNING);
+          return;
+        }
+
+        if (resultPromise_.status != TFutureStatus.RUNNING) {
+          // The operation has already been completed. This can happen if
+          // another client completed first, but this callback was already
+          // waiting for the lock when it called cancel().
+          return;
+        }
+
+        if (future.status == TFutureStatus.FAILED) {
+          auto e = future.getException();
+          if (rpcFaultFilter_ && rpcFaultFilter_(e)) {
+            rpcExceptions_ ~= e;
+
+            if (rpcExceptions_.length == poolFutures_.length) {
+              resultPromise_.fail(new TCompoundOperationException(
+                "All child operations failed, unable to retrieve a result.",
+                rpcExceptions_
+              ));
+            }
+
+            return;
+          }
+        }
+
+        // Store the result to the target promise.
+        resultPromise_.complete(future);
+
+        // Cancel the other futures, we would just discard their results.
+        // Note: We do this after we have stored the results to our promise,
+        // see the assert at the top of the function.
+        childCancellation_.trigger();
+      }
+    }
+
+    TPromise!Result resultPromise_;
+    TFuture!Result[] poolFutures_;
+    Exception[] rpcExceptions_;
+    bool delegate(Exception) rpcFaultFilter_;
+    TCancellationOrigin childCancellation_;
+  }
+}
+
+/**
+ * Allows easily aggregating results from a number of TAsyncClients.
+ *
+ * Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not
+ * simply implement TFutureInterface!Interface. It manages a pool of clients,
+ * but allows the user to specify a custom accumulator function to use or to
+ * iterate over the results using a TFutureAggregatorRange.
+ *
+ * For each service method, TAsyncAggregator offers a method
+ * accepting the same arguments, and an optional TCancellation instance, just
+ * like with TFutureInterface. The return type, however, is a proxy object
+ * that offers the following methods:
+ * ---
+ * /++
+ *  + Returns a thrift.util.future.TFutureAggregatorRange for the results of
+ *  + the client pool method invocations.
+ *  +
+ *  + The [] (slicing) operator can also be used to obtain the range.
+ *  +
+ *  + Params:
+ *  +   timeout = A timeout to pass to the TFutureAggregatorRange constructor,
+ *  +     defaults to zero (no timeout).
+ *  +/
+ * TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0));
+ * auto opSlice() { return range(); } /// Ditto
+ *
+ * /++
+ *  + Returns a future that gathers the results from the clients in the pool
+ *  + and invokes a user-supplied accumulator function on them, returning its
+ *  + return value to the client.
+ *  +
+ *  + In addition to the TFuture!AccumulatedType interface (where
+ *  + AccumulatedType is the return type of the accumulator function), the
+ *  + returned object also offers two additional methods, finish() and
+ *  + finishGet(): By default, the accumulator functions is called after all
+ *  + the results from the pool clients have become available. Calling finish()
+ *  + causes the accumulator future to stop waiting for other results and
+ *  + immediately invoking the accumulator function on the results currently
+ *  + available. If all results are already available, finish() is a no-op.
+ *  + finishGet() is a convenience shortcut for combining it with
+ *  + a call to get() immediately afterwards, like waitGet() is for wait().
+ *  +
+ *  + The acc alias can point to any callable accepting either an array of
+ *  + return values or an array of return values and an array of exceptions;
+ *  + see isAccumulator!() for details. The default accumulator concatenates
+ *  + return values that can be concatenated with each others (e.g. arrays),
+ *  + and simply returns an array of values otherwise, failing with a
+ *  + TCompoundOperationException no values were returned.
+ *  +
+ *  + The accumulator function is not executed in any of the async manager
+ *  + worker threads associated with the async clients, but instead it is
+ *  + invoked when the actual result is requested for the first time after the
+ *  + operation has been completed. This also includes checking the status
+ *  + of the operation once it is no longer running, since the accumulator
+ *  + has to be run to determine whether the operation succeeded or failed.
+ *  +/
+ * auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc);
+ * ---
+ *
+ * Example:
+ * ---
+ * // Some Thrift service.
+ * interface Foo {
+ *   int foo(string name);
+ *   byte[] bar();
+ * }
+ *
+ * // Create the aggregator pool – client0, client1, client2 are some
+ * // TAsyncClient!Foo instances, but in theory could also be other
+ * // TFutureInterface!Foo implementations (e.g. some async client pool).
+ * auto pool = new TAsyncAggregator!Foo([client0, client1, client2]);
+ *
+ * foreach (val; pool.foo("baz").range(dur!"seconds"(1))) {
+ *   // Process all the results that are available before a second has passed,
+ *   // in the order they arrive.
+ *   writeln(val);
+ * }
+ *
+ * auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){
+ *   if (vals.empty) {
+ *     throw new TCompoundOperationException("All clients failed", exs);
+ *   }
+ *
+ *   // Just to illustrate that the type of the values can change, convert the
+ *   // numbers to double and sum up their roots.
+ *   double result = 0;
+ *   foreach (v; vals) result += sqrt(cast(double)v);
+ *   return result;
+ * })();
+ *
+ * // Wait up to three seconds for the result, and then accumulate what has
+ * // arrived so far.
+ * sumRoots.completion.wait(dur!"seconds"(3));
+ * writeln(sumRoots.finishGet());
+ *
+ * // For scalars, the default accumulator returns an array of the values.
+ * pragma(msg, typeof(pool.foo("").accumulate().get()); // int[].
+ *
+ * // For lists, etc., it concatenates the results together.
+ * pragma(msg, typeof(pool.bar().accumulate().get())); // byte[].
+ * ---
+ *
+ * Note: For the accumulate!() interface, you might currently hit a »cannot use
+ * local '…' as parameter to non-global template accumulate«-error, see
+ * $(DMDBUG 5710, DMD issue 5710). If your accumulator function does not need
+ * to access the surrounding scope, you might want to use a function literal
+ * instead of a delegate to avoid the issue.
+ */
+class TAsyncAggregator(Interface) if (isBaseService!Interface) {
+  /// Shorthand for the client type this instance operates on.
+  alias TAsyncClientBase!Interface Client;
+
+  ///
+  this(Client[] clients) {
+    clients_ = clients;
+  }
+
+  /// Whether to open the underlying transports of a client before trying to
+  /// execute a method if they are not open. This is usually desirable
+  /// because it allows e.g. to automatically reconnect to a remote server
+  /// if the network connection is dropped.
+  ///
+  /// Defaults to true.
+  bool reopenTransports = true;
+
+  mixin AggregatorOpDispatch!();
+
+private:
+  Client[] clients_;
+}
+
+/// Ditto
+class TAsyncAggregator(Interface) if (isDerivedService!Interface) :
+  TAsyncAggregator!(BaseService!Interface)
+{
+  /// Shorthand for the client type this instance operates on.
+  alias TAsyncClientBase!Interface Client;
+
+  ///
+  this(Client[] clients) {
+    super(cast(TAsyncClientBase!(BaseService!Interface)[])clients);
+  }
+
+  mixin AggregatorOpDispatch!();
+}
+
+/**
+ * Whether fun is a valid accumulator function for values of type ValueType.
+ *
+ * For this to be true, fun must be a callable matching one of the following
+ * argument lists:
+ * ---
+ * fun(ValueType[] values);
+ * fun(ValueType[] values, Exception[] exceptions);
+ * ---
+ *
+ * The second version is passed the collected array exceptions from all the
+ * clients in the pool.
+ *
+ * The return value of the accumulator function is passed to the client (via
+ * the result future). If it throws an exception, the operation is marked as
+ * failed with the given exception instead.
+ */
+template isAccumulator(ValueType, alias fun) {
+  enum isAccumulator = is(typeof(fun(cast(ValueType[])[]))) ||
+    is(typeof(fun(cast(ValueType[])[], cast(Exception[])[])));
+}
+
+/**
+ * TAsyncAggregator construction helper to avoid having to explicitly
+ * specify the interface type, i.e. to allow the constructor being called
+ * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
+ */
+TAsyncAggregator!Interface tAsyncAggregator(Interface)(
+  TAsyncClientBase!Interface[] clients
+) if (isService!Interface) {
+  return new typeof(return)(clients);
+}
+
+private {
+  mixin template AggregatorOpDispatch() {
+    auto opDispatch(string name, Args...)(Args args) if (
+      is(typeof(mixin("Interface.init." ~ name)(args)))
+    ) {
+      alias ReturnType!(MemberType!(Interface, name)) ResultType;
+
+      auto childCancellation = new TCancellationOrigin;
+
+      TFuture!ResultType[] futures;
+      futures.reserve(clients_.length);
+
+      foreach (c; cast(Client[])clients_) {
+        if (reopenTransports) {
+          if (!c.transport.isOpen) {
+            try {
+              c.transport.open();
+            } catch (Exception e) {
+              continue;
+            }
+          }
+        }
+        futures ~= mixin("c." ~ name)(args, childCancellation);
+      }
+
+      return AggregationResult!ResultType(futures, childCancellation);
+    }
+  }
+
+  struct AggregationResult(T) {
+    auto opSlice() {
+      return range();
+    }
+
+    auto range(Duration timeout = dur!"hnsecs"(0)) {
+      return tFutureAggregatorRange(futures_, childCancellation_, timeout);
+    }
+
+    auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!(T, acc)) {
+      return new AccumulatorJob!(T, acc)(futures_, childCancellation_);
+    }
+
+  private:
+    TFuture!T[] futures_;
+    TCancellationOrigin childCancellation_;
+  }
+
+  auto defaultAccumulator(T)(T[] values, Exception[] exceptions) {
+    if (values.empty) {
+      throw new TCompoundOperationException("All clients failed",
+        exceptions);
+    }
+
+    static if (is(typeof(T.init ~ T.init))) {
+      import std.algorithm;
+      return reduce!"a ~ b"(values);
+    } else {
+      return values;
+    }
+  }
+
+  final class AccumulatorJob(T, alias accumulator) if (
+    isAccumulator!(T, accumulator)
+  ) : TFuture!(AccumulatorResult!(T, accumulator)) {
+    this(TFuture!T[] futures, TCancellationOrigin childCancellation) {
+      futures_ = futures;
+      childCancellation_ = childCancellation;
+      resultMutex_ = new Mutex;
+      completionEvent_ = new TOneshotEvent;
+
+      foreach (future; futures) {
+        future.completion.addCallback({
+          auto f = future;
+          return {
+            synchronized (resultMutex_) {
+              if (f.status == TFutureStatus.CANCELLED) {
+                if (!finished_) {
+                  status_ = TFutureStatus.CANCELLED;
+                  finished_ = true;
+                }
+                return;
+              }
+
+              if (f.status == TFutureStatus.FAILED) {
+                exceptions_ ~= f.getException();
+              } else {
+                results_ ~= f.get();
+              }
+
+              if (results_.length + exceptions_.length == futures_.length) {
+                finished_ = true;
+                completionEvent_.trigger();
+              }
+            }
+          };
+        }());
+      }
+    }
+
+    TFutureStatus status() @property {
+      synchronized (resultMutex_) {
+        if (!finished_) return TFutureStatus.RUNNING;
+        if (status_ != TFutureStatus.RUNNING) return status_;
+
+        try {
+          result_ = invokeAccumulator!accumulator(results_, exceptions_);
+          status_ = TFutureStatus.SUCCEEDED;
+        } catch (Exception e) {
+          exception_ = e;
+          status_ = TFutureStatus.FAILED;
+        }
+
+        return status_;
+      }
+    }
+
+    TAwaitable completion() @property {
+      return completionEvent_;
+    }
+
+    AccumulatorResult!(T, accumulator) get() {
+      auto s = status;
+
+      enforce(s != TFutureStatus.RUNNING,
+        new TFutureException("Operation not yet completed."));
+
+      if (s == TFutureStatus.CANCELLED) throw new TCancelledException;
+      if (s == TFutureStatus.FAILED) throw exception_;
+      return result_;
+    }
+
+    Exception getException() {
+      auto s = status;
+      enforce(s != TFutureStatus.RUNNING,
+        new TFutureException("Operation not yet completed."));
+
+      if (s == TFutureStatus.CANCELLED) throw new TCancelledException;
+
+      if (s == TFutureStatus.SUCCEEDED) {
+        return null;
+      }
+      return exception_;
+    }
+
+    void finish() {
+      synchronized (resultMutex_) {
+        if (!finished_) {
+          finished_ = true;
+          childCancellation_.trigger();
+          completionEvent_.trigger();
+        }
+      }
+    }
+
+    auto finishGet() {
+      finish();
+      return get();
+    }
+
+  private:
+    TFuture!T[] futures_;
+    TCancellationOrigin childCancellation_;
+
+    bool finished_;
+    T[] results_;
+    Exception[] exceptions_;
+
+    TFutureStatus status_;
+    Mutex resultMutex_;
+    union {
+      AccumulatorResult!(T, accumulator) result_;
+      Exception exception_;
+    }
+    TOneshotEvent completionEvent_;
+  }
+
+  auto invokeAccumulator(alias accumulator, T)(
+    T[] values, Exception[] exceptions
+  ) if (
+    isAccumulator!(T, accumulator)
+  ) {
+    static if (is(typeof(accumulator(values, exceptions)))) {
+      return accumulator(values, exceptions);
+    } else {
+      return accumulator(values);
+    }
+  }
+
+  template AccumulatorResult(T, alias acc) {
+    alias typeof(invokeAccumulator!acc(cast(T[])[], cast(Exception[])[]))
+      AccumulatorResult;
+  }
+}