]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | /** | |
21 | * Utilities for asynchronously querying multiple servers, building on | |
22 | * TAsyncClient. | |
23 | * | |
24 | * Terminology note: The names of the artifacts defined in this module are | |
25 | * derived from »client pool«, because they operate on a pool of | |
26 | * TAsyncClients. However, from a architectural point of view, they often | |
27 | * represent a pool of hosts a Thrift client application communicates with | |
28 | * using RPC calls. | |
29 | */ | |
30 | module thrift.codegen.async_client_pool; | |
31 | ||
32 | import core.sync.mutex; | |
33 | import core.time : Duration, dur; | |
34 | import std.algorithm : map; | |
35 | import std.array : array, empty; | |
36 | import std.exception : enforce; | |
37 | import std.traits : ParameterTypeTuple, ReturnType; | |
38 | import thrift.base; | |
39 | import thrift.codegen.base; | |
40 | import thrift.codegen.async_client; | |
41 | import thrift.internal.algorithm; | |
42 | import thrift.internal.codegen; | |
43 | import thrift.util.awaitable; | |
44 | import thrift.util.cancellation; | |
45 | import thrift.util.future; | |
46 | import thrift.internal.resource_pool; | |
47 | ||
48 | /** | |
49 | * Represents a generic client pool which implements TFutureInterface!Interface | |
50 | * using multiple TAsyncClients. | |
51 | */ | |
52 | interface TAsyncClientPoolBase(Interface) if (isService!Interface) : | |
53 | TFutureInterface!Interface | |
54 | { | |
55 | /// Shorthand for the client type this pool operates on. | |
56 | alias TAsyncClientBase!Interface Client; | |
57 | ||
58 | /** | |
59 | * Adds a client to the pool. | |
60 | */ | |
61 | void addClient(Client client); | |
62 | ||
63 | /** | |
64 | * Removes a client from the pool. | |
65 | * | |
66 | * Returns: Whether the client was found in the pool. | |
67 | */ | |
68 | bool removeClient(Client client); | |
69 | ||
70 | /** | |
71 | * Called to determine whether an exception comes from a client from the | |
72 | * pool not working properly, or if it an exception thrown at the | |
73 | * application level. | |
74 | * | |
75 | * If the delegate returns true, the server/connection is considered to be | |
76 | * at fault, if it returns false, the exception is just passed on to the | |
77 | * caller. | |
78 | * | |
79 | * By default, returns true for instances of TTransportException and | |
80 | * TApplicationException, false otherwise. | |
81 | */ | |
82 | bool delegate(Exception) rpcFaultFilter() const @property; | |
83 | void rpcFaultFilter(bool delegate(Exception)) @property; /// Ditto | |
84 | ||
85 | /** | |
86 | * Whether to open the underlying transports of a client before trying to | |
87 | * execute a method if they are not open. This is usually desirable | |
88 | * because it allows e.g. to automatically reconnect to a remote server | |
89 | * if the network connection is dropped. | |
90 | * | |
91 | * Defaults to true. | |
92 | */ | |
93 | bool reopenTransports() const @property; | |
94 | void reopenTransports(bool) @property; /// Ditto | |
95 | } | |
96 | ||
97 | immutable bool delegate(Exception) defaultRpcFaultFilter; | |
98 | static this() { | |
99 | defaultRpcFaultFilter = (Exception e) { | |
100 | import thrift.protocol.base; | |
101 | import thrift.transport.base; | |
102 | return ( | |
103 | (cast(TTransportException)e !is null) || | |
104 | (cast(TApplicationException)e !is null) | |
105 | ); | |
106 | }; | |
107 | } | |
108 | ||
109 | /** | |
110 | * A TAsyncClientPoolBase implementation which queries multiple servers in a | |
111 | * row until a request succeeds, the result of which is then returned. | |
112 | * | |
113 | * The definition of »success« can be customized using the rpcFaultFilter() | |
114 | * delegate property. If it is non-null and calling it for an exception set by | |
115 | * a failed method invocation returns true, the error is considered to be | |
116 | * caused by the RPC layer rather than the application layer, and the next | |
117 | * server in the pool is tried. If there are no more clients to try, the | |
118 | * operation is marked as failed with a TCompoundOperationException. | |
119 | * | |
120 | * If a TAsyncClient in the pool fails with an RPC exception for a number of | |
121 | * consecutive tries, it is temporarily disabled (not tried any longer) for | |
122 | * a certain duration. Both the limit and the timeout can be configured. If all | |
123 | * clients fail (and keepTrying is false), the operation fails with a | |
124 | * TCompoundOperationException which contains the collected RPC exceptions. | |
125 | */ | |
126 | final class TAsyncClientPool(Interface) if (isService!Interface) : | |
127 | TAsyncClientPoolBase!Interface | |
128 | { | |
129 | /// | |
130 | this(Client[] clients) { | |
131 | pool_ = new TResourcePool!Client(clients); | |
132 | rpcFaultFilter_ = defaultRpcFaultFilter; | |
133 | reopenTransports_ = true; | |
134 | } | |
135 | ||
136 | /+override+/ void addClient(Client client) { | |
137 | pool_.add(client); | |
138 | } | |
139 | ||
140 | /+override+/ bool removeClient(Client client) { | |
141 | return pool_.remove(client); | |
142 | } | |
143 | ||
144 | /** | |
145 | * Whether to keep trying to find a working client if all have failed in a | |
146 | * row. | |
147 | * | |
148 | * Defaults to false. | |
149 | */ | |
150 | bool keepTrying() const @property { | |
151 | return pool_.cycle; | |
152 | } | |
153 | ||
154 | /// Ditto | |
155 | void keepTrying(bool value) @property { | |
156 | pool_.cycle = value; | |
157 | } | |
158 | ||
159 | /** | |
160 | * Whether to use a random permutation of the client pool on every call to | |
161 | * execute(). This can be used e.g. as a simple form of load balancing. | |
162 | * | |
163 | * Defaults to true. | |
164 | */ | |
165 | bool permuteClients() const @property { | |
166 | return pool_.permute; | |
167 | } | |
168 | ||
169 | /// Ditto | |
170 | void permuteClients(bool value) @property { | |
171 | pool_.permute = value; | |
172 | } | |
173 | ||
174 | /** | |
175 | * The number of consecutive faults after which a client is disabled until | |
176 | * faultDisableDuration has passed. 0 to never disable clients. | |
177 | * | |
178 | * Defaults to 0. | |
179 | */ | |
180 | ushort faultDisableCount() const @property { | |
181 | return pool_.faultDisableCount; | |
182 | } | |
183 | ||
184 | /// Ditto | |
185 | void faultDisableCount(ushort value) @property { | |
186 | pool_.faultDisableCount = value; | |
187 | } | |
188 | ||
189 | /** | |
190 | * The duration for which a client is no longer considered after it has | |
191 | * failed too often. | |
192 | * | |
193 | * Defaults to one second. | |
194 | */ | |
195 | Duration faultDisableDuration() const @property { | |
196 | return pool_.faultDisableDuration; | |
197 | } | |
198 | ||
199 | /// Ditto | |
200 | void faultDisableDuration(Duration value) @property { | |
201 | pool_.faultDisableDuration = value; | |
202 | } | |
203 | ||
204 | /+override+/ bool delegate(Exception) rpcFaultFilter() const @property { | |
205 | return rpcFaultFilter_; | |
206 | } | |
207 | ||
208 | /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property { | |
209 | rpcFaultFilter_ = value; | |
210 | } | |
211 | ||
212 | /+override+/ bool reopenTransports() const @property { | |
213 | return reopenTransports_; | |
214 | } | |
215 | ||
216 | /+override+/ void reopenTransports(bool value) @property { | |
217 | reopenTransports_ = value; | |
218 | } | |
219 | ||
220 | mixin(fallbackPoolForwardCode!Interface()); | |
221 | ||
222 | protected: | |
223 | // The actual worker implementation to which RPC method calls are forwarded. | |
224 | auto executeOnPool(string method, Args...)(Args args, | |
225 | TCancellation cancellation | |
226 | ) { | |
227 | auto clients = pool_[]; | |
228 | if (clients.empty) { | |
229 | throw new TException("No clients available to try."); | |
230 | } | |
231 | ||
232 | auto promise = new TPromise!(ReturnType!(MemberType!(Interface, method))); | |
233 | Exception[] rpcExceptions; | |
234 | ||
235 | void tryNext() { | |
236 | while (clients.empty) { | |
237 | Client next; | |
238 | Duration waitTime; | |
239 | if (clients.willBecomeNonempty(next, waitTime)) { | |
240 | if (waitTime > dur!"hnsecs"(0)) { | |
241 | if (waitTime < dur!"usecs"(10)) { | |
242 | import core.thread; | |
243 | Thread.sleep(waitTime); | |
244 | } else { | |
245 | next.transport.asyncManager.delay(waitTime, { tryNext(); }); | |
246 | return; | |
247 | } | |
248 | } | |
249 | } else { | |
250 | promise.fail(new TCompoundOperationException("All clients failed.", | |
251 | rpcExceptions)); | |
252 | return; | |
253 | } | |
254 | } | |
255 | ||
256 | auto client = clients.front; | |
257 | clients.popFront; | |
258 | ||
259 | if (reopenTransports) { | |
260 | if (!client.transport.isOpen) { | |
261 | try { | |
262 | client.transport.open(); | |
263 | } catch (Exception e) { | |
264 | pool_.recordFault(client); | |
265 | tryNext(); | |
266 | return; | |
267 | } | |
268 | } | |
269 | } | |
270 | ||
271 | auto future = mixin("client." ~ method)(args, cancellation); | |
272 | future.completion.addCallback({ | |
273 | if (future.status == TFutureStatus.CANCELLED) { | |
274 | promise.cancel(); | |
275 | return; | |
276 | } | |
277 | ||
278 | auto e = future.getException(); | |
279 | if (e) { | |
280 | if (rpcFaultFilter_ && rpcFaultFilter_(e)) { | |
281 | pool_.recordFault(client); | |
282 | rpcExceptions ~= e; | |
283 | tryNext(); | |
284 | return; | |
285 | } | |
286 | } | |
287 | pool_.recordSuccess(client); | |
288 | promise.complete(future); | |
289 | }); | |
290 | } | |
291 | ||
292 | tryNext(); | |
293 | return promise; | |
294 | } | |
295 | ||
296 | private: | |
297 | TResourcePool!Client pool_; | |
298 | bool delegate(Exception) rpcFaultFilter_; | |
299 | bool reopenTransports_; | |
300 | } | |
301 | ||
302 | /** | |
303 | * TAsyncClientPool construction helper to avoid having to explicitly | |
304 | * specify the interface type, i.e. to allow the constructor being called | |
305 | * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). | |
306 | */ | |
307 | TAsyncClientPool!Interface tAsyncClientPool(Interface)( | |
308 | TAsyncClientBase!Interface[] clients | |
309 | ) if (isService!Interface) { | |
310 | return new typeof(return)(clients); | |
311 | } | |
312 | ||
313 | private { | |
314 | // Cannot use an anonymous delegate literal for this because they aren't | |
315 | // allowed in class scope. | |
316 | string fallbackPoolForwardCode(Interface)() { | |
317 | string code = ""; | |
318 | ||
319 | foreach (methodName; AllMemberMethodNames!Interface) { | |
320 | enum qn = "Interface." ~ methodName; | |
321 | code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ | |
322 | "(ParameterTypeTuple!(" ~ qn ~ ") args, TCancellation cancellation = null) {\n"; | |
323 | code ~= "return executeOnPool!(`" ~ methodName ~ "`)(args, cancellation);\n"; | |
324 | code ~= "}\n"; | |
325 | } | |
326 | ||
327 | return code; | |
328 | } | |
329 | } | |
330 | ||
331 | /** | |
332 | * A TAsyncClientPoolBase implementation which queries multiple servers at | |
333 | * the same time and returns the first success response. | |
334 | * | |
335 | * The definition of »success« can be customized using the rpcFaultFilter() | |
336 | * delegate property. If it is non-null and calling it for an exception set by | |
337 | * a failed method invocation returns true, the error is considered to be | |
338 | * caused by the RPC layer rather than the application layer, and the next | |
339 | * server in the pool is tried. If all clients fail, the operation is marked | |
340 | * as failed with a TCompoundOperationException. | |
341 | */ | |
342 | final class TAsyncFastestClientPool(Interface) if (isService!Interface) : | |
343 | TAsyncClientPoolBase!Interface | |
344 | { | |
345 | /// | |
346 | this(Client[] clients) { | |
347 | clients_ = clients; | |
348 | rpcFaultFilter_ = defaultRpcFaultFilter; | |
349 | reopenTransports_ = true; | |
350 | } | |
351 | ||
352 | /+override+/ void addClient(Client client) { | |
353 | clients_ ~= client; | |
354 | } | |
355 | ||
356 | /+override+/ bool removeClient(Client client) { | |
357 | auto oldLength = clients_.length; | |
358 | clients_ = removeEqual(clients_, client); | |
359 | return clients_.length < oldLength; | |
360 | } | |
361 | ||
362 | ||
363 | /+override+/ bool delegate(Exception) rpcFaultFilter() const @property { | |
364 | return rpcFaultFilter_; | |
365 | } | |
366 | ||
367 | /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property { | |
368 | rpcFaultFilter_ = value; | |
369 | } | |
370 | ||
371 | /+override+/bool reopenTransports() const @property { | |
372 | return reopenTransports_; | |
373 | } | |
374 | ||
375 | /+override+/ void reopenTransports(bool value) @property { | |
376 | reopenTransports_ = value; | |
377 | } | |
378 | ||
379 | mixin(fastestPoolForwardCode!Interface()); | |
380 | ||
381 | private: | |
382 | Client[] clients_; | |
383 | bool delegate(Exception) rpcFaultFilter_; | |
384 | bool reopenTransports_; | |
385 | } | |
386 | ||
387 | /** | |
388 | * TAsyncFastestClientPool construction helper to avoid having to explicitly | |
389 | * specify the interface type, i.e. to allow the constructor being called | |
390 | * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). | |
391 | */ | |
392 | TAsyncFastestClientPool!Interface tAsyncFastestClientPool(Interface)( | |
393 | TAsyncClientBase!Interface[] clients | |
394 | ) if (isService!Interface) { | |
395 | return new typeof(return)(clients); | |
396 | } | |
397 | ||
398 | private { | |
399 | // Cannot use an anonymous delegate literal for this because they aren't | |
400 | // allowed in class scope. | |
401 | string fastestPoolForwardCode(Interface)() { | |
402 | string code = ""; | |
403 | ||
404 | foreach (methodName; AllMemberMethodNames!Interface) { | |
405 | enum qn = "Interface." ~ methodName; | |
406 | code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ | |
407 | "(ParameterTypeTuple!(" ~ qn ~ ") args, " ~ | |
408 | "TCancellation cancellation = null) {\n"; | |
409 | code ~= "enum methodName = `" ~ methodName ~ "`;\n"; | |
410 | code ~= q{ | |
411 | alias ReturnType!(MemberType!(Interface, methodName)) ResultType; | |
412 | ||
413 | auto childCancellation = new TCancellationOrigin; | |
414 | ||
415 | TFuture!ResultType[] futures; | |
416 | futures.reserve(clients_.length); | |
417 | ||
418 | foreach (c; clients_) { | |
419 | if (reopenTransports) { | |
420 | if (!c.transport.isOpen) { | |
421 | try { | |
422 | c.transport.open(); | |
423 | } catch (Exception e) { | |
424 | continue; | |
425 | } | |
426 | } | |
427 | } | |
428 | futures ~= mixin("c." ~ methodName)(args, childCancellation); | |
429 | } | |
430 | ||
431 | return new FastestPoolJob!(ResultType)( | |
432 | futures, rpcFaultFilter, cancellation, childCancellation); | |
433 | }; | |
434 | code ~= "}\n"; | |
435 | } | |
436 | ||
437 | return code; | |
438 | } | |
439 | ||
440 | final class FastestPoolJob(Result) : TFuture!Result { | |
441 | this(TFuture!Result[] poolFutures, bool delegate(Exception) rpcFaultFilter, | |
442 | TCancellation cancellation, TCancellationOrigin childCancellation | |
443 | ) { | |
444 | resultPromise_ = new TPromise!Result; | |
445 | poolFutures_ = poolFutures; | |
446 | rpcFaultFilter_ = rpcFaultFilter; | |
447 | childCancellation_ = childCancellation; | |
448 | ||
449 | foreach (future; poolFutures) { | |
450 | future.completion.addCallback({ | |
451 | auto f = future; | |
452 | return { completionCallback(f); }; | |
453 | }()); | |
454 | if (future.status != TFutureStatus.RUNNING) { | |
455 | // If the current future is already completed, we are done, don't | |
456 | // bother adding callbacks for the others (they would just return | |
457 | // immediately after acquiring the lock). | |
458 | return; | |
459 | } | |
460 | } | |
461 | ||
462 | if (cancellation) { | |
463 | cancellation.triggering.addCallback({ | |
464 | resultPromise_.cancel(); | |
465 | childCancellation.trigger(); | |
466 | }); | |
467 | } | |
468 | } | |
469 | ||
470 | TFutureStatus status() const @property { | |
471 | return resultPromise_.status; | |
472 | } | |
473 | ||
474 | TAwaitable completion() @property { | |
475 | return resultPromise_.completion; | |
476 | } | |
477 | ||
478 | Result get() { | |
479 | return resultPromise_.get(); | |
480 | } | |
481 | ||
482 | Exception getException() { | |
483 | return resultPromise_.getException(); | |
484 | } | |
485 | ||
486 | private: | |
487 | void completionCallback(TFuture!Result future) { | |
488 | synchronized { | |
489 | if (future.status == TFutureStatus.CANCELLED) { | |
490 | assert(resultPromise_.status != TFutureStatus.RUNNING); | |
491 | return; | |
492 | } | |
493 | ||
494 | if (resultPromise_.status != TFutureStatus.RUNNING) { | |
495 | // The operation has already been completed. This can happen if | |
496 | // another client completed first, but this callback was already | |
497 | // waiting for the lock when it called cancel(). | |
498 | return; | |
499 | } | |
500 | ||
501 | if (future.status == TFutureStatus.FAILED) { | |
502 | auto e = future.getException(); | |
503 | if (rpcFaultFilter_ && rpcFaultFilter_(e)) { | |
504 | rpcExceptions_ ~= e; | |
505 | ||
506 | if (rpcExceptions_.length == poolFutures_.length) { | |
507 | resultPromise_.fail(new TCompoundOperationException( | |
508 | "All child operations failed, unable to retrieve a result.", | |
509 | rpcExceptions_ | |
510 | )); | |
511 | } | |
512 | ||
513 | return; | |
514 | } | |
515 | } | |
516 | ||
517 | // Store the result to the target promise. | |
518 | resultPromise_.complete(future); | |
519 | ||
520 | // Cancel the other futures, we would just discard their results. | |
521 | // Note: We do this after we have stored the results to our promise, | |
522 | // see the assert at the top of the function. | |
523 | childCancellation_.trigger(); | |
524 | } | |
525 | } | |
526 | ||
527 | TPromise!Result resultPromise_; | |
528 | TFuture!Result[] poolFutures_; | |
529 | Exception[] rpcExceptions_; | |
530 | bool delegate(Exception) rpcFaultFilter_; | |
531 | TCancellationOrigin childCancellation_; | |
532 | } | |
533 | } | |
534 | ||
535 | /** | |
536 | * Allows easily aggregating results from a number of TAsyncClients. | |
537 | * | |
538 | * Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not | |
539 | * simply implement TFutureInterface!Interface. It manages a pool of clients, | |
540 | * but allows the user to specify a custom accumulator function to use or to | |
541 | * iterate over the results using a TFutureAggregatorRange. | |
542 | * | |
543 | * For each service method, TAsyncAggregator offers a method | |
544 | * accepting the same arguments, and an optional TCancellation instance, just | |
545 | * like with TFutureInterface. The return type, however, is a proxy object | |
546 | * that offers the following methods: | |
547 | * --- | |
548 | * /++ | |
549 | * + Returns a thrift.util.future.TFutureAggregatorRange for the results of | |
550 | * + the client pool method invocations. | |
551 | * + | |
552 | * + The [] (slicing) operator can also be used to obtain the range. | |
553 | * + | |
554 | * + Params: | |
555 | * + timeout = A timeout to pass to the TFutureAggregatorRange constructor, | |
556 | * + defaults to zero (no timeout). | |
557 | * +/ | |
558 | * TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0)); | |
559 | * auto opSlice() { return range(); } /// Ditto | |
560 | * | |
561 | * /++ | |
562 | * + Returns a future that gathers the results from the clients in the pool | |
563 | * + and invokes a user-supplied accumulator function on them, returning its | |
564 | * + return value to the client. | |
565 | * + | |
566 | * + In addition to the TFuture!AccumulatedType interface (where | |
567 | * + AccumulatedType is the return type of the accumulator function), the | |
568 | * + returned object also offers two additional methods, finish() and | |
569 | * + finishGet(): By default, the accumulator functions is called after all | |
570 | * + the results from the pool clients have become available. Calling finish() | |
571 | * + causes the accumulator future to stop waiting for other results and | |
572 | * + immediately invoking the accumulator function on the results currently | |
573 | * + available. If all results are already available, finish() is a no-op. | |
574 | * + finishGet() is a convenience shortcut for combining it with | |
575 | * + a call to get() immediately afterwards, like waitGet() is for wait(). | |
576 | * + | |
577 | * + The acc alias can point to any callable accepting either an array of | |
578 | * + return values or an array of return values and an array of exceptions; | |
579 | * + see isAccumulator!() for details. The default accumulator concatenates | |
580 | * + return values that can be concatenated with each others (e.g. arrays), | |
581 | * + and simply returns an array of values otherwise, failing with a | |
582 | * + TCompoundOperationException no values were returned. | |
583 | * + | |
584 | * + The accumulator function is not executed in any of the async manager | |
585 | * + worker threads associated with the async clients, but instead it is | |
586 | * + invoked when the actual result is requested for the first time after the | |
587 | * + operation has been completed. This also includes checking the status | |
588 | * + of the operation once it is no longer running, since the accumulator | |
589 | * + has to be run to determine whether the operation succeeded or failed. | |
590 | * +/ | |
591 | * auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc); | |
592 | * --- | |
593 | * | |
594 | * Example: | |
595 | * --- | |
596 | * // Some Thrift service. | |
597 | * interface Foo { | |
598 | * int foo(string name); | |
599 | * byte[] bar(); | |
600 | * } | |
601 | * | |
602 | * // Create the aggregator pool – client0, client1, client2 are some | |
603 | * // TAsyncClient!Foo instances, but in theory could also be other | |
604 | * // TFutureInterface!Foo implementations (e.g. some async client pool). | |
605 | * auto pool = new TAsyncAggregator!Foo([client0, client1, client2]); | |
606 | * | |
607 | * foreach (val; pool.foo("baz").range(dur!"seconds"(1))) { | |
608 | * // Process all the results that are available before a second has passed, | |
609 | * // in the order they arrive. | |
610 | * writeln(val); | |
611 | * } | |
612 | * | |
613 | * auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){ | |
614 | * if (vals.empty) { | |
615 | * throw new TCompoundOperationException("All clients failed", exs); | |
616 | * } | |
617 | * | |
618 | * // Just to illustrate that the type of the values can change, convert the | |
619 | * // numbers to double and sum up their roots. | |
620 | * double result = 0; | |
621 | * foreach (v; vals) result += sqrt(cast(double)v); | |
622 | * return result; | |
623 | * })(); | |
624 | * | |
625 | * // Wait up to three seconds for the result, and then accumulate what has | |
626 | * // arrived so far. | |
627 | * sumRoots.completion.wait(dur!"seconds"(3)); | |
628 | * writeln(sumRoots.finishGet()); | |
629 | * | |
630 | * // For scalars, the default accumulator returns an array of the values. | |
631 | * pragma(msg, typeof(pool.foo("").accumulate().get()); // int[]. | |
632 | * | |
633 | * // For lists, etc., it concatenates the results together. | |
634 | * pragma(msg, typeof(pool.bar().accumulate().get())); // byte[]. | |
635 | * --- | |
636 | * | |
637 | * Note: For the accumulate!() interface, you might currently hit a »cannot use | |
638 | * local '…' as parameter to non-global template accumulate«-error, see | |
639 | * $(DMDBUG 5710, DMD issue 5710). If your accumulator function does not need | |
640 | * to access the surrounding scope, you might want to use a function literal | |
641 | * instead of a delegate to avoid the issue. | |
642 | */ | |
643 | class TAsyncAggregator(Interface) if (isBaseService!Interface) { | |
644 | /// Shorthand for the client type this instance operates on. | |
645 | alias TAsyncClientBase!Interface Client; | |
646 | ||
647 | /// | |
648 | this(Client[] clients) { | |
649 | clients_ = clients; | |
650 | } | |
651 | ||
652 | /// Whether to open the underlying transports of a client before trying to | |
653 | /// execute a method if they are not open. This is usually desirable | |
654 | /// because it allows e.g. to automatically reconnect to a remote server | |
655 | /// if the network connection is dropped. | |
656 | /// | |
657 | /// Defaults to true. | |
658 | bool reopenTransports = true; | |
659 | ||
660 | mixin AggregatorOpDispatch!(); | |
661 | ||
662 | private: | |
663 | Client[] clients_; | |
664 | } | |
665 | ||
666 | /// Ditto | |
667 | class TAsyncAggregator(Interface) if (isDerivedService!Interface) : | |
668 | TAsyncAggregator!(BaseService!Interface) | |
669 | { | |
670 | /// Shorthand for the client type this instance operates on. | |
671 | alias TAsyncClientBase!Interface Client; | |
672 | ||
673 | /// | |
674 | this(Client[] clients) { | |
675 | super(cast(TAsyncClientBase!(BaseService!Interface)[])clients); | |
676 | } | |
677 | ||
678 | mixin AggregatorOpDispatch!(); | |
679 | } | |
680 | ||
681 | /** | |
682 | * Whether fun is a valid accumulator function for values of type ValueType. | |
683 | * | |
684 | * For this to be true, fun must be a callable matching one of the following | |
685 | * argument lists: | |
686 | * --- | |
687 | * fun(ValueType[] values); | |
688 | * fun(ValueType[] values, Exception[] exceptions); | |
689 | * --- | |
690 | * | |
691 | * The second version is passed the collected array exceptions from all the | |
692 | * clients in the pool. | |
693 | * | |
694 | * The return value of the accumulator function is passed to the client (via | |
695 | * the result future). If it throws an exception, the operation is marked as | |
696 | * failed with the given exception instead. | |
697 | */ | |
698 | template isAccumulator(ValueType, alias fun) { | |
699 | enum isAccumulator = is(typeof(fun(cast(ValueType[])[]))) || | |
700 | is(typeof(fun(cast(ValueType[])[], cast(Exception[])[]))); | |
701 | } | |
702 | ||
703 | /** | |
704 | * TAsyncAggregator construction helper to avoid having to explicitly | |
705 | * specify the interface type, i.e. to allow the constructor being called | |
706 | * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). | |
707 | */ | |
708 | TAsyncAggregator!Interface tAsyncAggregator(Interface)( | |
709 | TAsyncClientBase!Interface[] clients | |
710 | ) if (isService!Interface) { | |
711 | return new typeof(return)(clients); | |
712 | } | |
713 | ||
714 | private { | |
715 | mixin template AggregatorOpDispatch() { | |
716 | auto opDispatch(string name, Args...)(Args args) if ( | |
717 | is(typeof(mixin("Interface.init." ~ name)(args))) | |
718 | ) { | |
719 | alias ReturnType!(MemberType!(Interface, name)) ResultType; | |
720 | ||
721 | auto childCancellation = new TCancellationOrigin; | |
722 | ||
723 | TFuture!ResultType[] futures; | |
724 | futures.reserve(clients_.length); | |
725 | ||
726 | foreach (c; cast(Client[])clients_) { | |
727 | if (reopenTransports) { | |
728 | if (!c.transport.isOpen) { | |
729 | try { | |
730 | c.transport.open(); | |
731 | } catch (Exception e) { | |
732 | continue; | |
733 | } | |
734 | } | |
735 | } | |
736 | futures ~= mixin("c." ~ name)(args, childCancellation); | |
737 | } | |
738 | ||
739 | return AggregationResult!ResultType(futures, childCancellation); | |
740 | } | |
741 | } | |
742 | ||
743 | struct AggregationResult(T) { | |
744 | auto opSlice() { | |
745 | return range(); | |
746 | } | |
747 | ||
748 | auto range(Duration timeout = dur!"hnsecs"(0)) { | |
749 | return tFutureAggregatorRange(futures_, childCancellation_, timeout); | |
750 | } | |
751 | ||
752 | auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!(T, acc)) { | |
753 | return new AccumulatorJob!(T, acc)(futures_, childCancellation_); | |
754 | } | |
755 | ||
756 | private: | |
757 | TFuture!T[] futures_; | |
758 | TCancellationOrigin childCancellation_; | |
759 | } | |
760 | ||
761 | auto defaultAccumulator(T)(T[] values, Exception[] exceptions) { | |
762 | if (values.empty) { | |
763 | throw new TCompoundOperationException("All clients failed", | |
764 | exceptions); | |
765 | } | |
766 | ||
767 | static if (is(typeof(T.init ~ T.init))) { | |
768 | import std.algorithm; | |
769 | return reduce!"a ~ b"(values); | |
770 | } else { | |
771 | return values; | |
772 | } | |
773 | } | |
774 | ||
775 | final class AccumulatorJob(T, alias accumulator) if ( | |
776 | isAccumulator!(T, accumulator) | |
777 | ) : TFuture!(AccumulatorResult!(T, accumulator)) { | |
778 | this(TFuture!T[] futures, TCancellationOrigin childCancellation) { | |
779 | futures_ = futures; | |
780 | childCancellation_ = childCancellation; | |
781 | resultMutex_ = new Mutex; | |
782 | completionEvent_ = new TOneshotEvent; | |
783 | ||
784 | foreach (future; futures) { | |
785 | future.completion.addCallback({ | |
786 | auto f = future; | |
787 | return { | |
788 | synchronized (resultMutex_) { | |
789 | if (f.status == TFutureStatus.CANCELLED) { | |
790 | if (!finished_) { | |
791 | status_ = TFutureStatus.CANCELLED; | |
792 | finished_ = true; | |
793 | } | |
794 | return; | |
795 | } | |
796 | ||
797 | if (f.status == TFutureStatus.FAILED) { | |
798 | exceptions_ ~= f.getException(); | |
799 | } else { | |
800 | results_ ~= f.get(); | |
801 | } | |
802 | ||
803 | if (results_.length + exceptions_.length == futures_.length) { | |
804 | finished_ = true; | |
805 | completionEvent_.trigger(); | |
806 | } | |
807 | } | |
808 | }; | |
809 | }()); | |
810 | } | |
811 | } | |
812 | ||
813 | TFutureStatus status() @property { | |
814 | synchronized (resultMutex_) { | |
815 | if (!finished_) return TFutureStatus.RUNNING; | |
816 | if (status_ != TFutureStatus.RUNNING) return status_; | |
817 | ||
818 | try { | |
819 | result_ = invokeAccumulator!accumulator(results_, exceptions_); | |
820 | status_ = TFutureStatus.SUCCEEDED; | |
821 | } catch (Exception e) { | |
822 | exception_ = e; | |
823 | status_ = TFutureStatus.FAILED; | |
824 | } | |
825 | ||
826 | return status_; | |
827 | } | |
828 | } | |
829 | ||
830 | TAwaitable completion() @property { | |
831 | return completionEvent_; | |
832 | } | |
833 | ||
834 | AccumulatorResult!(T, accumulator) get() { | |
835 | auto s = status; | |
836 | ||
837 | enforce(s != TFutureStatus.RUNNING, | |
838 | new TFutureException("Operation not yet completed.")); | |
839 | ||
840 | if (s == TFutureStatus.CANCELLED) throw new TCancelledException; | |
841 | if (s == TFutureStatus.FAILED) throw exception_; | |
842 | return result_; | |
843 | } | |
844 | ||
845 | Exception getException() { | |
846 | auto s = status; | |
847 | enforce(s != TFutureStatus.RUNNING, | |
848 | new TFutureException("Operation not yet completed.")); | |
849 | ||
850 | if (s == TFutureStatus.CANCELLED) throw new TCancelledException; | |
851 | ||
852 | if (s == TFutureStatus.SUCCEEDED) { | |
853 | return null; | |
854 | } | |
855 | return exception_; | |
856 | } | |
857 | ||
858 | void finish() { | |
859 | synchronized (resultMutex_) { | |
860 | if (!finished_) { | |
861 | finished_ = true; | |
862 | childCancellation_.trigger(); | |
863 | completionEvent_.trigger(); | |
864 | } | |
865 | } | |
866 | } | |
867 | ||
868 | auto finishGet() { | |
869 | finish(); | |
870 | return get(); | |
871 | } | |
872 | ||
873 | private: | |
874 | TFuture!T[] futures_; | |
875 | TCancellationOrigin childCancellation_; | |
876 | ||
877 | bool finished_; | |
878 | T[] results_; | |
879 | Exception[] exceptions_; | |
880 | ||
881 | TFutureStatus status_; | |
882 | Mutex resultMutex_; | |
883 | union { | |
884 | AccumulatorResult!(T, accumulator) result_; | |
885 | Exception exception_; | |
886 | } | |
887 | TOneshotEvent completionEvent_; | |
888 | } | |
889 | ||
890 | auto invokeAccumulator(alias accumulator, T)( | |
891 | T[] values, Exception[] exceptions | |
892 | ) if ( | |
893 | isAccumulator!(T, accumulator) | |
894 | ) { | |
895 | static if (is(typeof(accumulator(values, exceptions)))) { | |
896 | return accumulator(values, exceptions); | |
897 | } else { | |
898 | return accumulator(values); | |
899 | } | |
900 | } | |
901 | ||
902 | template AccumulatorResult(T, alias acc) { | |
903 | alias typeof(invokeAccumulator!acc(cast(T[])[], cast(Exception[])[])) | |
904 | AccumulatorResult; | |
905 | } | |
906 | } |