]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | module thrift.util.future; | |
20 | ||
21 | import core.atomic; | |
22 | import core.sync.condition; | |
23 | import core.sync.mutex; | |
24 | import core.time : Duration; | |
25 | import std.array : empty, front, popFront; | |
26 | import std.conv : to; | |
27 | import std.exception : enforce; | |
28 | import std.traits : BaseTypeTuple, isSomeFunction, ParameterTypeTuple, ReturnType; | |
29 | import thrift.base; | |
30 | import thrift.util.awaitable; | |
31 | import thrift.util.cancellation; | |
32 | ||
33 | /** | |
34 | * Represents an operation which is executed asynchronously and the result of | |
35 | * which will become available at some point in the future. | |
36 | * | |
37 | * Once a operation is completed, the result of the operation can be fetched | |
38 | * via the get() family of methods. There are three possible cases: Either the | |
39 | * operation succeeded, then its return value is returned, or it failed by | |
40 | * throwing, in which case the exception is rethrown, or it was cancelled | |
41 | * before, then a TCancelledException is thrown. There might be TFuture | |
42 | * implementations which never possibly enter the cancelled state. | |
43 | * | |
44 | * All methods are thread-safe, but keep in mind that any exception object or | |
45 | * result (if it is a reference type, of course) is shared between all | |
46 | * get()-family invocations. | |
47 | */ | |
48 | interface TFuture(ResultType) { | |
49 | /** | |
50 | * The status the operation is currently in. | |
51 | * | |
52 | * An operation starts out in RUNNING status, and changes state to one of the | |
53 | * others at most once afterwards. | |
54 | */ | |
55 | TFutureStatus status() @property; | |
56 | ||
57 | /** | |
58 | * A TAwaitable triggered when the operation leaves the RUNNING status. | |
59 | */ | |
60 | TAwaitable completion() @property; | |
61 | ||
62 | /** | |
63 | * Convenience shorthand for waiting until the result is available and then | |
64 | * get()ing it. | |
65 | * | |
66 | * If the operation has already completed, the result is immediately | |
67 | * returned. | |
68 | * | |
69 | * The result of this method is »alias this«'d to the interface, so that | |
70 | * TFuture can be used as a drop-in replacement for a simple value in | |
71 | * synchronous code. | |
72 | */ | |
73 | final ResultType waitGet() { | |
74 | completion.wait(); | |
75 | return get(); | |
76 | } | |
77 | final @property auto waitGetProperty() { return waitGet(); } | |
78 | alias waitGetProperty this; | |
79 | ||
80 | /** | |
81 | * Convenience shorthand for waiting until the result is available and then | |
82 | * get()ing it. | |
83 | * | |
84 | * If the operation completes in time, returns its result (resp. throws an | |
85 | * exception for the failed/cancelled cases). If not, throws a | |
86 | * TFutureException. | |
87 | */ | |
88 | final ResultType waitGet(Duration timeout) { | |
89 | enforce(completion.wait(timeout), new TFutureException( | |
90 | "Operation did not complete in time.")); | |
91 | return get(); | |
92 | } | |
93 | ||
94 | /** | |
95 | * Returns the result of the operation. | |
96 | * | |
97 | * Throws: TFutureException if the operation has been cancelled, | |
98 | * TCancelledException if it is not yet done; the set exception if it | |
99 | * failed. | |
100 | */ | |
101 | ResultType get(); | |
102 | ||
103 | /** | |
104 | * Returns the captured exception if the operation failed, or null otherwise. | |
105 | * | |
106 | * Throws: TFutureException if not yet done, TCancelledException if the | |
107 | * operation has been cancelled. | |
108 | */ | |
109 | Exception getException(); | |
110 | } | |
111 | ||
112 | /** | |
113 | * The states the operation offering a future interface can be in. | |
114 | */ | |
115 | enum TFutureStatus : byte { | |
116 | RUNNING, /// The operation is still running. | |
117 | SUCCEEDED, /// The operation completed without throwing an exception. | |
118 | FAILED, /// The operation completed by throwing an exception. | |
119 | CANCELLED /// The operation was cancelled. | |
120 | } | |
121 | ||
122 | /** | |
123 | * A TFuture covering the simple but common case where the result is simply | |
124 | * set by a call to succeed()/fail(). | |
125 | * | |
126 | * All methods are thread-safe, but usually, succeed()/fail() are only called | |
127 | * from a single thread (different from the thread(s) waiting for the result | |
128 | * using the TFuture interface, though). | |
129 | */ | |
130 | class TPromise(ResultType) : TFuture!ResultType { | |
131 | this() { | |
132 | statusMutex_ = new Mutex; | |
133 | completionEvent_ = new TOneshotEvent; | |
134 | } | |
135 | ||
136 | override S status() const @property { | |
137 | return atomicLoad(status_); | |
138 | } | |
139 | ||
140 | override TAwaitable completion() @property { | |
141 | return completionEvent_; | |
142 | } | |
143 | ||
144 | override ResultType get() { | |
145 | auto s = atomicLoad(status_); | |
146 | enforce(s != S.RUNNING, | |
147 | new TFutureException("Operation not yet completed.")); | |
148 | ||
149 | if (s == S.CANCELLED) throw new TCancelledException; | |
150 | if (s == S.FAILED) throw exception_; | |
151 | ||
152 | static if (!is(ResultType == void)) { | |
153 | return result_; | |
154 | } | |
155 | } | |
156 | ||
157 | override Exception getException() { | |
158 | auto s = atomicLoad(status_); | |
159 | enforce(s != S.RUNNING, | |
160 | new TFutureException("Operation not yet completed.")); | |
161 | ||
162 | if (s == S.CANCELLED) throw new TCancelledException; | |
163 | if (s == S.SUCCEEDED) return null; | |
164 | ||
165 | return exception_; | |
166 | } | |
167 | ||
168 | static if (!is(ResultType == void)) { | |
169 | /** | |
170 | * Sets the result of the operation, marks it as done, and notifies any | |
171 | * waiters. | |
172 | * | |
173 | * If the operation has been cancelled before, nothing happens. | |
174 | * | |
175 | * Throws: TFutureException if the operation is already completed. | |
176 | */ | |
177 | void succeed(ResultType result) { | |
178 | synchronized (statusMutex_) { | |
179 | auto s = atomicLoad(status_); | |
180 | if (s == S.CANCELLED) return; | |
181 | ||
182 | enforce(s == S.RUNNING, | |
183 | new TFutureException("Operation already completed.")); | |
184 | result_ = result; | |
185 | ||
186 | atomicStore(status_, S.SUCCEEDED); | |
187 | } | |
188 | ||
189 | completionEvent_.trigger(); | |
190 | } | |
191 | } else { | |
192 | void succeed() { | |
193 | synchronized (statusMutex_) { | |
194 | auto s = atomicLoad(status_); | |
195 | if (s == S.CANCELLED) return; | |
196 | ||
197 | enforce(s == S.RUNNING, | |
198 | new TFutureException("Operation already completed.")); | |
199 | ||
200 | atomicStore(status_, S.SUCCEEDED); | |
201 | } | |
202 | ||
203 | completionEvent_.trigger(); | |
204 | } | |
205 | } | |
206 | ||
207 | /** | |
208 | * Marks the operation as failed with the specified exception and notifies | |
209 | * any waiters. | |
210 | * | |
211 | * If the operation was already cancelled, nothing happens. | |
212 | * | |
213 | * Throws: TFutureException if the operation is already completed. | |
214 | */ | |
215 | void fail(Exception exception) { | |
216 | synchronized (statusMutex_) { | |
217 | auto status = atomicLoad(status_); | |
218 | if (status == S.CANCELLED) return; | |
219 | ||
220 | enforce(status == S.RUNNING, | |
221 | new TFutureException("Operation already completed.")); | |
222 | exception_ = exception; | |
223 | ||
224 | atomicStore(status_, S.FAILED); | |
225 | } | |
226 | ||
227 | completionEvent_.trigger(); | |
228 | } | |
229 | ||
230 | ||
231 | /** | |
232 | * Marks this operation as completed and takes over the outcome of another | |
233 | * TFuture of the same type. | |
234 | * | |
235 | * If this operation was already cancelled, nothing happens. If the other | |
236 | * operation was cancelled, this operation is marked as failed with a | |
237 | * TCancelledException. | |
238 | * | |
239 | * Throws: TFutureException if the passed in future was not completed or | |
240 | * this operation is already completed. | |
241 | */ | |
242 | void complete(TFuture!ResultType future) { | |
243 | synchronized (statusMutex_) { | |
244 | auto status = atomicLoad(status_); | |
245 | if (status == S.CANCELLED) return; | |
246 | enforce(status == S.RUNNING, | |
247 | new TFutureException("Operation already completed.")); | |
248 | ||
249 | enforce(future.status != S.RUNNING, new TFutureException( | |
250 | "The passed TFuture is not yet completed.")); | |
251 | ||
252 | status = future.status; | |
253 | if (status == S.CANCELLED) { | |
254 | status = S.FAILED; | |
255 | exception_ = new TCancelledException; | |
256 | } else if (status == S.FAILED) { | |
257 | exception_ = future.getException(); | |
258 | } else static if (!is(ResultType == void)) { | |
259 | result_ = future.get(); | |
260 | } | |
261 | ||
262 | atomicStore(status_, status); | |
263 | } | |
264 | ||
265 | completionEvent_.trigger(); | |
266 | } | |
267 | ||
268 | /** | |
269 | * Marks this operation as cancelled and notifies any waiters. | |
270 | * | |
271 | * If the operation is already completed, nothing happens. | |
272 | */ | |
273 | void cancel() { | |
274 | synchronized (statusMutex_) { | |
275 | auto status = atomicLoad(status_); | |
276 | if (status == S.RUNNING) atomicStore(status_, S.CANCELLED); | |
277 | } | |
278 | ||
279 | completionEvent_.trigger(); | |
280 | } | |
281 | ||
282 | private: | |
283 | // Convenience alias because TFutureStatus is ubiquitous in this class. | |
284 | alias TFutureStatus S; | |
285 | ||
286 | // The status the promise is currently in. | |
287 | shared S status_; | |
288 | ||
289 | union { | |
290 | static if (!is(ResultType == void)) { | |
291 | // Set if status_ is SUCCEEDED. | |
292 | ResultType result_; | |
293 | } | |
294 | // Set if status_ is FAILED. | |
295 | Exception exception_; | |
296 | } | |
297 | ||
298 | // Protects status_. | |
299 | // As for result_ and exception_: They are only set once, while status_ is | |
300 | // still RUNNING, so given that the operation has already completed, reading | |
301 | // them is safe without holding some kind of lock. | |
302 | Mutex statusMutex_; | |
303 | ||
304 | // Triggered when the event completes. | |
305 | TOneshotEvent completionEvent_; | |
306 | } | |
307 | ||
308 | /// | |
309 | class TFutureException : TException { | |
310 | /// | |
311 | this(string msg = "", string file = __FILE__, size_t line = __LINE__, | |
312 | Throwable next = null) | |
313 | { | |
314 | super(msg, file, line, next); | |
315 | } | |
316 | } | |
317 | ||
318 | /** | |
319 | * Creates an interface that is similar to a given one, but accepts an | |
320 | * additional, optional TCancellation parameter each method, and returns | |
321 | * TFutures instead of plain return values. | |
322 | * | |
323 | * For example, given the following declarations: | |
324 | * --- | |
325 | * interface Foo { | |
326 | * void bar(); | |
327 | * string baz(int a); | |
328 | * } | |
329 | * alias TFutureInterface!Foo FutureFoo; | |
330 | * --- | |
331 | * | |
332 | * FutureFoo would be equivalent to: | |
333 | * --- | |
334 | * interface FutureFoo { | |
335 | * TFuture!void bar(TCancellation cancellation = null); | |
336 | * TFuture!string baz(int a, TCancellation cancellation = null); | |
337 | * } | |
338 | * --- | |
339 | */ | |
340 | template TFutureInterface(Interface) if (is(Interface _ == interface)) { | |
341 | mixin({ | |
342 | string code = "interface TFutureInterface \n"; | |
343 | ||
344 | static if (is(Interface Bases == super) && Bases.length > 0) { | |
345 | code ~= ": "; | |
346 | foreach (i; 0 .. Bases.length) { | |
347 | if (i > 0) code ~= ", "; | |
348 | code ~= "TFutureInterface!(BaseTypeTuple!Interface[" ~ to!string(i) ~ "]) "; | |
349 | } | |
350 | } | |
351 | ||
352 | code ~= "{\n"; | |
353 | ||
354 | foreach (methodName; __traits(derivedMembers, Interface)) { | |
355 | enum qn = "Interface." ~ methodName; | |
356 | static if (isSomeFunction!(mixin(qn))) { | |
357 | code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ | |
358 | "(ParameterTypeTuple!(" ~ qn ~ "), TCancellation cancellation = null);\n"; | |
359 | } | |
360 | } | |
361 | ||
362 | code ~= "}\n"; | |
363 | return code; | |
364 | }()); | |
365 | } | |
366 | ||
367 | /** | |
368 | * An input range that aggregates results from multiple asynchronous operations, | |
369 | * returning them in the order they arrive. | |
370 | * | |
371 | * Additionally, a timeout can be set after which results from not yet finished | |
372 | * futures will no longer be waited for, e.g. to ensure the time it takes to | |
373 | * iterate over a set of results is limited. | |
374 | */ | |
375 | final class TFutureAggregatorRange(T) { | |
376 | /** | |
377 | * Constructs a new instance. | |
378 | * | |
379 | * Params: | |
380 | * futures = The set of futures to collect results from. | |
381 | * timeout = If positive, not yet finished futures will be cancelled and | |
382 | * their results will not be taken into account. | |
383 | */ | |
384 | this(TFuture!T[] futures, TCancellationOrigin childCancellation, | |
385 | Duration timeout = dur!"hnsecs"(0) | |
386 | ) { | |
387 | if (timeout > dur!"hnsecs"(0)) { | |
388 | timeoutSysTick_ = TickDuration.currSystemTick + | |
389 | TickDuration.from!"hnsecs"(timeout.total!"hnsecs"); | |
390 | } else { | |
391 | timeoutSysTick_ = TickDuration(0); | |
392 | } | |
393 | ||
394 | queueMutex_ = new Mutex; | |
395 | queueNonEmptyCondition_ = new Condition(queueMutex_); | |
396 | futures_ = futures; | |
397 | childCancellation_ = childCancellation; | |
398 | ||
399 | foreach (future; futures_) { | |
400 | future.completion.addCallback({ | |
401 | auto f = future; | |
402 | return { | |
403 | if (f.status == TFutureStatus.CANCELLED) return; | |
404 | assert(f.status != TFutureStatus.RUNNING); | |
405 | ||
406 | synchronized (queueMutex_) { | |
407 | completedQueue_ ~= f; | |
408 | ||
409 | if (completedQueue_.length == 1) { | |
410 | queueNonEmptyCondition_.notifyAll(); | |
411 | } | |
412 | } | |
413 | }; | |
414 | }()); | |
415 | } | |
416 | } | |
417 | ||
418 | /** | |
419 | * Whether the range is empty. | |
420 | * | |
421 | * This is the case if the results from the completed futures not having | |
422 | * failed have already been popped and either all future have been finished | |
423 | * or the timeout has expired. | |
424 | * | |
425 | * Potentially blocks until a new result is available or the timeout has | |
426 | * expired. | |
427 | */ | |
428 | bool empty() @property { | |
429 | if (finished_) return true; | |
430 | if (bufferFilled_) return false; | |
431 | ||
432 | while (true) { | |
433 | TFuture!T future; | |
434 | synchronized (queueMutex_) { | |
435 | // The while loop is just being cautious about spurious wakeups, in | |
436 | // case they should be possible. | |
437 | while (completedQueue_.empty) { | |
438 | auto remaining = to!Duration(timeoutSysTick_ - | |
439 | TickDuration.currSystemTick); | |
440 | ||
441 | if (remaining <= dur!"hnsecs"(0)) { | |
442 | // No time left, but still no element received – we are empty now. | |
443 | finished_ = true; | |
444 | childCancellation_.trigger(); | |
445 | return true; | |
446 | } | |
447 | ||
448 | queueNonEmptyCondition_.wait(remaining); | |
449 | } | |
450 | ||
451 | future = completedQueue_.front; | |
452 | completedQueue_.popFront(); | |
453 | } | |
454 | ||
455 | ++completedCount_; | |
456 | if (completedCount_ == futures_.length) { | |
457 | // This was the last future in the list, there is no possibility | |
458 | // another result could ever become available. | |
459 | finished_ = true; | |
460 | } | |
461 | ||
462 | if (future.status == TFutureStatus.FAILED) { | |
463 | // This one failed, loop again and try getting another item from | |
464 | // the queue. | |
465 | exceptions_ ~= future.getException(); | |
466 | } else { | |
467 | resultBuffer_ = future.get(); | |
468 | bufferFilled_ = true; | |
469 | return false; | |
470 | } | |
471 | } | |
472 | } | |
473 | ||
474 | /** | |
475 | * Returns the first element from the range. | |
476 | * | |
477 | * Potentially blocks until a new result is available or the timeout has | |
478 | * expired. | |
479 | * | |
480 | * Throws: TException if the range is empty. | |
481 | */ | |
482 | T front() { | |
483 | enforce(!empty, new TException( | |
484 | "Cannot get front of an empty future aggregator range.")); | |
485 | return resultBuffer_; | |
486 | } | |
487 | ||
488 | /** | |
489 | * Removes the first element from the range. | |
490 | * | |
491 | * Potentially blocks until a new result is available or the timeout has | |
492 | * expired. | |
493 | * | |
494 | * Throws: TException if the range is empty. | |
495 | */ | |
496 | void popFront() { | |
497 | enforce(!empty, new TException( | |
498 | "Cannot pop front of an empty future aggregator range.")); | |
499 | bufferFilled_ = false; | |
500 | } | |
501 | ||
502 | /** | |
503 | * The number of futures the result of which has been returned or which have | |
504 | * failed so far. | |
505 | */ | |
506 | size_t completedCount() @property const { | |
507 | return completedCount_; | |
508 | } | |
509 | ||
510 | /** | |
511 | * The exceptions collected from failed TFutures so far. | |
512 | */ | |
513 | Exception[] exceptions() @property { | |
514 | return exceptions_; | |
515 | } | |
516 | ||
517 | private: | |
518 | TFuture!T[] futures_; | |
519 | TCancellationOrigin childCancellation_; | |
520 | ||
521 | // The system tick this operation will time out, or zero if no timeout has | |
522 | // been set. | |
523 | TickDuration timeoutSysTick_; | |
524 | ||
525 | bool finished_; | |
526 | ||
527 | bool bufferFilled_; | |
528 | T resultBuffer_; | |
529 | ||
530 | Exception[] exceptions_; | |
531 | size_t completedCount_; | |
532 | ||
533 | // The queue of completed futures. This (and the associated condition) are | |
534 | // the only parts of this class that are accessed by multiple threads. | |
535 | TFuture!T[] completedQueue_; | |
536 | Mutex queueMutex_; | |
537 | Condition queueNonEmptyCondition_; | |
538 | } | |
539 | ||
540 | /** | |
541 | * TFutureAggregatorRange construction helper to avoid having to explicitly | |
542 | * specify the value type, i.e. to allow the constructor being called using IFTI | |
543 | * (see $(DMDBUG 6082, D Bugzilla enhancement requet 6082)). | |
544 | */ | |
545 | TFutureAggregatorRange!T tFutureAggregatorRange(T)(TFuture!T[] futures, | |
546 | TCancellationOrigin childCancellation, Duration timeout = dur!"hnsecs"(0) | |
547 | ) { | |
548 | return new TFutureAggregatorRange!T(futures, childCancellation, timeout); | |
549 | } |