1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 /**
21 * Utilities for asynchronously querying multiple servers, building on
22 * TAsyncClient.
23 *
24 * Terminology note: The names of the artifacts defined in this module are
25 * derived from »client pool«, because they operate on a pool of
26 * TAsyncClients. However, from a architectural point of view, they often
27 * represent a pool of hosts a Thrift client application communicates with
28 * using RPC calls.
29 */
30 module thrift.codegen.async_client_pool;
31
32 import core.sync.mutex;
33 import core.time : Duration, dur;
34 import std.algorithm : map;
35 import std.array : array, empty;
36 import std.exception : enforce;
37 import std.traits : ParameterTypeTuple, ReturnType;
38 import thrift.base;
39 import thrift.codegen.base;
40 import thrift.codegen.async_client;
41 import thrift.internal.algorithm;
42 import thrift.internal.codegen;
43 import thrift.util.awaitable;
44 import thrift.util.cancellation;
45 import thrift.util.future;
46 import thrift.internal.resource_pool;
47
48 /**
49 * Represents a generic client pool which implements TFutureInterface!Interface
50 * using multiple TAsyncClients.
51 */
52 interface TAsyncClientPoolBase(Interface) if (isService!Interface) :
53 TFutureInterface!Interface
54 {
55 /// Shorthand for the client type this pool operates on.
56 alias TAsyncClientBase!Interface Client;
57
58 /**
59 * Adds a client to the pool.
60 */
61 void addClient(Client client);
62
63 /**
64 * Removes a client from the pool.
65 *
66 * Returns: Whether the client was found in the pool.
67 */
68 bool removeClient(Client client);
69
70 /**
71 * Called to determine whether an exception comes from a client from the
72 * pool not working properly, or if it an exception thrown at the
73 * application level.
74 *
75 * If the delegate returns true, the server/connection is considered to be
76 * at fault, if it returns false, the exception is just passed on to the
77 * caller.
78 *
79 * By default, returns true for instances of TTransportException and
80 * TApplicationException, false otherwise.
81 */
82 bool delegate(Exception) rpcFaultFilter() const @property;
83 void rpcFaultFilter(bool delegate(Exception)) @property; /// Ditto
84
85 /**
86 * Whether to open the underlying transports of a client before trying to
87 * execute a method if they are not open. This is usually desirable
88 * because it allows e.g. to automatically reconnect to a remote server
89 * if the network connection is dropped.
90 *
91 * Defaults to true.
92 */
93 bool reopenTransports() const @property;
94 void reopenTransports(bool) @property; /// Ditto
95 }
96
97 immutable bool delegate(Exception) defaultRpcFaultFilter;
this()98 shared static this() {
99 defaultRpcFaultFilter = (Exception e) {
100 import thrift.protocol.base;
101 import thrift.transport.base;
102 return (
103 (cast(TTransportException)e !is null) ||
104 (cast(TApplicationException)e !is null)
105 );
106 };
107 }
108
109 /**
110 * A TAsyncClientPoolBase implementation which queries multiple servers in a
111 * row until a request succeeds, the result of which is then returned.
112 *
113 * The definition of »success« can be customized using the rpcFaultFilter()
114 * delegate property. If it is non-null and calling it for an exception set by
115 * a failed method invocation returns true, the error is considered to be
116 * caused by the RPC layer rather than the application layer, and the next
117 * server in the pool is tried. If there are no more clients to try, the
118 * operation is marked as failed with a TCompoundOperationException.
119 *
120 * If a TAsyncClient in the pool fails with an RPC exception for a number of
121 * consecutive tries, it is temporarily disabled (not tried any longer) for
122 * a certain duration. Both the limit and the timeout can be configured. If all
123 * clients fail (and keepTrying is false), the operation fails with a
124 * TCompoundOperationException which contains the collected RPC exceptions.
125 */
126 final class TAsyncClientPool(Interface) if (isService!Interface) :
127 TAsyncClientPoolBase!Interface
128 {
129 ///
this(Client[]clients)130 this(Client[] clients) {
131 pool_ = new TResourcePool!Client(clients);
132 rpcFaultFilter_ = defaultRpcFaultFilter;
133 reopenTransports_ = true;
134 }
135
addClient(Client client)136 /+override+/ void addClient(Client client) {
137 pool_.add(client);
138 }
139
removeClient(Client client)140 /+override+/ bool removeClient(Client client) {
141 return pool_.remove(client);
142 }
143
144 /**
145 * Whether to keep trying to find a working client if all have failed in a
146 * row.
147 *
148 * Defaults to false.
149 */
keepTrying()150 bool keepTrying() const @property {
151 return pool_.cycle;
152 }
153
154 /// Ditto
keepTrying(bool value)155 void keepTrying(bool value) @property {
156 pool_.cycle = value;
157 }
158
159 /**
160 * Whether to use a random permutation of the client pool on every call to
161 * execute(). This can be used e.g. as a simple form of load balancing.
162 *
163 * Defaults to true.
164 */
permuteClients()165 bool permuteClients() const @property {
166 return pool_.permute;
167 }
168
169 /// Ditto
permuteClients(bool value)170 void permuteClients(bool value) @property {
171 pool_.permute = value;
172 }
173
174 /**
175 * The number of consecutive faults after which a client is disabled until
176 * faultDisableDuration has passed. 0 to never disable clients.
177 *
178 * Defaults to 0.
179 */
faultDisableCount()180 ushort faultDisableCount() const @property {
181 return pool_.faultDisableCount;
182 }
183
184 /// Ditto
faultDisableCount(ushort value)185 void faultDisableCount(ushort value) @property {
186 pool_.faultDisableCount = value;
187 }
188
189 /**
190 * The duration for which a client is no longer considered after it has
191 * failed too often.
192 *
193 * Defaults to one second.
194 */
faultDisableDuration()195 Duration faultDisableDuration() const @property {
196 return pool_.faultDisableDuration;
197 }
198
199 /// Ditto
faultDisableDuration(Duration value)200 void faultDisableDuration(Duration value) @property {
201 pool_.faultDisableDuration = value;
202 }
203
delegate(Exception)204 /+override+/ bool delegate(Exception) rpcFaultFilter() const @property {
205 return rpcFaultFilter_;
206 }
207
rpcFaultFilter(bool delegate (Exception)value)208 /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property {
209 rpcFaultFilter_ = value;
210 }
211
reopenTransports()212 /+override+/ bool reopenTransports() const @property {
213 return reopenTransports_;
214 }
215
reopenTransports(bool value)216 /+override+/ void reopenTransports(bool value) @property {
217 reopenTransports_ = value;
218 }
219
220 mixin(fallbackPoolForwardCode!Interface());
221
222 protected:
223 // The actual worker implementation to which RPC method calls are forwarded.
executeOnPool(string method,Args...)224 auto executeOnPool(string method, Args...)(Args args,
225 TCancellation cancellation
226 ) {
227 auto clients = pool_[];
228 if (clients.empty) {
229 throw new TException("No clients available to try.");
230 }
231
232 auto promise = new TPromise!(ReturnType!(MemberType!(Interface, method)));
233 Exception[] rpcExceptions;
234
235 void tryNext() {
236 while (clients.empty) {
237 Client next;
238 Duration waitTime;
239 if (clients.willBecomeNonempty(next, waitTime)) {
240 if (waitTime > dur!"hnsecs"(0)) {
241 if (waitTime < dur!"usecs"(10)) {
242 import core.thread;
243 Thread.sleep(waitTime);
244 } else {
245 next.transport.asyncManager.delay(waitTime, { tryNext(); });
246 return;
247 }
248 }
249 } else {
250 promise.fail(new TCompoundOperationException("All clients failed.",
251 rpcExceptions));
252 return;
253 }
254 }
255
256 auto client = clients.front;
257 clients.popFront;
258
259 if (reopenTransports) {
260 if (!client.transport.isOpen) {
261 try {
262 client.transport.open();
263 } catch (Exception e) {
264 pool_.recordFault(client);
265 tryNext();
266 return;
267 }
268 }
269 }
270
271 auto future = mixin("client." ~ method)(args, cancellation);
272 future.completion.addCallback({
273 if (future.status == TFutureStatus.CANCELLED) {
274 promise.cancel();
275 return;
276 }
277
278 auto e = future.getException();
279 if (e) {
280 if (rpcFaultFilter_ && rpcFaultFilter_(e)) {
281 pool_.recordFault(client);
282 rpcExceptions ~= e;
283 tryNext();
284 return;
285 }
286 }
287 pool_.recordSuccess(client);
288 promise.complete(future);
289 });
290 }
291
292 tryNext();
293 return promise;
294 }
295
296 private:
297 TResourcePool!Client pool_;
298 bool delegate(Exception) rpcFaultFilter_;
299 bool reopenTransports_;
300 }
301
302 /**
303 * TAsyncClientPool construction helper to avoid having to explicitly
304 * specify the interface type, i.e. to allow the constructor being called
305 * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
306 */
307 TAsyncClientPool!Interface tAsyncClientPool(Interface)(
308 TAsyncClientBase!Interface[] clients
309 ) if (isService!Interface) {
310 return new typeof(return)(clients);
311 }
312
313 private {
314 // Cannot use an anonymous delegate literal for this because they aren't
315 // allowed in class scope.
fallbackPoolForwardCode(Interface)316 string fallbackPoolForwardCode(Interface)() {
317 string code = "";
318
319 foreach (methodName; AllMemberMethodNames!Interface) {
320 enum qn = "Interface." ~ methodName;
321 code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
322 "(ParameterTypeTuple!(" ~ qn ~ ") args, TCancellation cancellation = null) {\n";
323 code ~= "return executeOnPool!(`" ~ methodName ~ "`)(args, cancellation);\n";
324 code ~= "}\n";
325 }
326
327 return code;
328 }
329 }
330
331 /**
332 * A TAsyncClientPoolBase implementation which queries multiple servers at
333 * the same time and returns the first success response.
334 *
335 * The definition of »success« can be customized using the rpcFaultFilter()
336 * delegate property. If it is non-null and calling it for an exception set by
337 * a failed method invocation returns true, the error is considered to be
338 * caused by the RPC layer rather than the application layer, and the next
339 * server in the pool is tried. If all clients fail, the operation is marked
340 * as failed with a TCompoundOperationException.
341 */
342 final class TAsyncFastestClientPool(Interface) if (isService!Interface) :
343 TAsyncClientPoolBase!Interface
344 {
345 ///
this(Client[]clients)346 this(Client[] clients) {
347 clients_ = clients;
348 rpcFaultFilter_ = defaultRpcFaultFilter;
349 reopenTransports_ = true;
350 }
351
addClient(Client client)352 /+override+/ void addClient(Client client) {
353 clients_ ~= client;
354 }
355
removeClient(Client client)356 /+override+/ bool removeClient(Client client) {
357 auto oldLength = clients_.length;
358 clients_ = removeEqual(clients_, client);
359 return clients_.length < oldLength;
360 }
361
362
delegate(Exception)363 /+override+/ bool delegate(Exception) rpcFaultFilter() const @property {
364 return rpcFaultFilter_;
365 }
366
rpcFaultFilter(bool delegate (Exception)value)367 /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property {
368 rpcFaultFilter_ = value;
369 }
370
reopenTransports()371 /+override+/bool reopenTransports() const @property {
372 return reopenTransports_;
373 }
374
reopenTransports(bool value)375 /+override+/ void reopenTransports(bool value) @property {
376 reopenTransports_ = value;
377 }
378
379 mixin(fastestPoolForwardCode!Interface());
380
381 private:
382 Client[] clients_;
383 bool delegate(Exception) rpcFaultFilter_;
384 bool reopenTransports_;
385 }
386
387 /**
388 * TAsyncFastestClientPool construction helper to avoid having to explicitly
389 * specify the interface type, i.e. to allow the constructor being called
390 * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
391 */
392 TAsyncFastestClientPool!Interface tAsyncFastestClientPool(Interface)(
393 TAsyncClientBase!Interface[] clients
394 ) if (isService!Interface) {
395 return new typeof(return)(clients);
396 }
397
398 private {
399 // Cannot use an anonymous delegate literal for this because they aren't
400 // allowed in class scope.
fastestPoolForwardCode(Interface)401 string fastestPoolForwardCode(Interface)() {
402 string code = "";
403
404 foreach (methodName; AllMemberMethodNames!Interface) {
405 enum qn = "Interface." ~ methodName;
406 code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
407 "(ParameterTypeTuple!(" ~ qn ~ ") args, " ~
408 "TCancellation cancellation = null) {\n";
409 code ~= "enum methodName = `" ~ methodName ~ "`;\n";
410 code ~= q{
411 alias ReturnType!(MemberType!(Interface, methodName)) ResultType;
412
413 auto childCancellation = new TCancellationOrigin;
414
415 TFuture!ResultType[] futures;
416 futures.reserve(clients_.length);
417
418 foreach (c; clients_) {
419 if (reopenTransports) {
420 if (!c.transport.isOpen) {
421 try {
422 c.transport.open();
423 } catch (Exception e) {
424 continue;
425 }
426 }
427 }
428 futures ~= mixin("c." ~ methodName)(args, childCancellation);
429 }
430
431 return new FastestPoolJob!(ResultType)(
432 futures, rpcFaultFilter, cancellation, childCancellation);
433 };
434 code ~= "}\n";
435 }
436
437 return code;
438 }
439
440 final class FastestPoolJob(Result) : TFuture!Result {
441 this(TFuture!Result[] poolFutures, bool delegate(Exception) rpcFaultFilter,
442 TCancellation cancellation, TCancellationOrigin childCancellation
443 ) {
444 resultPromise_ = new TPromise!Result;
445 poolFutures_ = poolFutures;
446 rpcFaultFilter_ = rpcFaultFilter;
447 childCancellation_ = childCancellation;
448
foreach(future;poolFutures)449 foreach (future; poolFutures) {
450 future.completion.addCallback({
451 auto f = future;
452 return { completionCallback(f); };
453 }());
454 if (future.status != TFutureStatus.RUNNING) {
455 // If the current future is already completed, we are done, don't
456 // bother adding callbacks for the others (they would just return
457 // immediately after acquiring the lock).
458 return;
459 }
460 }
461
462 if (cancellation) {
463 cancellation.triggering.addCallback({
464 resultPromise_.cancel();
465 childCancellation.trigger();
466 });
467 }
468 }
469
status()470 TFutureStatus status() const @property {
471 return resultPromise_.status;
472 }
473
completion()474 TAwaitable completion() @property {
475 return resultPromise_.completion;
476 }
477
get()478 Result get() {
479 return resultPromise_.get();
480 }
481
getException()482 Exception getException() {
483 return resultPromise_.getException();
484 }
485
486 private:
487 void completionCallback(TFuture!Result future) {
488 synchronized {
489 if (future.status == TFutureStatus.CANCELLED) {
490 assert(resultPromise_.status != TFutureStatus.RUNNING);
491 return;
492 }
493
494 if (resultPromise_.status != TFutureStatus.RUNNING) {
495 // The operation has already been completed. This can happen if
496 // another client completed first, but this callback was already
497 // waiting for the lock when it called cancel().
498 return;
499 }
500
501 if (future.status == TFutureStatus.FAILED) {
502 auto e = future.getException();
503 if (rpcFaultFilter_ && rpcFaultFilter_(e)) {
504 rpcExceptions_ ~= e;
505
506 if (rpcExceptions_.length == poolFutures_.length) {
507 resultPromise_.fail(new TCompoundOperationException(
508 "All child operations failed, unable to retrieve a result.",
509 rpcExceptions_
510 ));
511 }
512
513 return;
514 }
515 }
516
517 // Store the result to the target promise.
518 resultPromise_.complete(future);
519
520 // Cancel the other futures, we would just discard their results.
521 // Note: We do this after we have stored the results to our promise,
522 // see the assert at the top of the function.
523 childCancellation_.trigger();
524 }
525 }
526
527 TPromise!Result resultPromise_;
528 TFuture!Result[] poolFutures_;
529 Exception[] rpcExceptions_;
530 bool delegate(Exception) rpcFaultFilter_;
531 TCancellationOrigin childCancellation_;
532 }
533 }
534
535 /**
536 * Allows easily aggregating results from a number of TAsyncClients.
537 *
538 * Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not
539 * simply implement TFutureInterface!Interface. It manages a pool of clients,
540 * but allows the user to specify a custom accumulator function to use or to
541 * iterate over the results using a TFutureAggregatorRange.
542 *
543 * For each service method, TAsyncAggregator offers a method
544 * accepting the same arguments, and an optional TCancellation instance, just
545 * like with TFutureInterface. The return type, however, is a proxy object
546 * that offers the following methods:
547 * ---
548 * /++
549 * + Returns a thrift.util.future.TFutureAggregatorRange for the results of
550 * + the client pool method invocations.
551 * +
552 * + The [] (slicing) operator can also be used to obtain the range.
553 * +
554 * + Params:
555 * + timeout = A timeout to pass to the TFutureAggregatorRange constructor,
556 * + defaults to zero (no timeout).
557 * +/
558 * TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0));
559 * auto opSlice() { return range(); } /// Ditto
560 *
561 * /++
562 * + Returns a future that gathers the results from the clients in the pool
563 * + and invokes a user-supplied accumulator function on them, returning its
564 * + return value to the client.
565 * +
566 * + In addition to the TFuture!AccumulatedType interface (where
567 * + AccumulatedType is the return type of the accumulator function), the
568 * + returned object also offers two additional methods, finish() and
569 * + finishGet(): By default, the accumulator functions is called after all
570 * + the results from the pool clients have become available. Calling finish()
571 * + causes the accumulator future to stop waiting for other results and
572 * + immediately invoking the accumulator function on the results currently
573 * + available. If all results are already available, finish() is a no-op.
574 * + finishGet() is a convenience shortcut for combining it with
575 * + a call to get() immediately afterwards, like waitGet() is for wait().
576 * +
577 * + The acc alias can point to any callable accepting either an array of
578 * + return values or an array of return values and an array of exceptions;
579 * + see isAccumulator!() for details. The default accumulator concatenates
580 * + return values that can be concatenated with each others (e.g. arrays),
581 * + and simply returns an array of values otherwise, failing with a
582 * + TCompoundOperationException no values were returned.
583 * +
584 * + The accumulator function is not executed in any of the async manager
585 * + worker threads associated with the async clients, but instead it is
586 * + invoked when the actual result is requested for the first time after the
587 * + operation has been completed. This also includes checking the status
588 * + of the operation once it is no longer running, since the accumulator
589 * + has to be run to determine whether the operation succeeded or failed.
590 * +/
591 * auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc);
592 * ---
593 *
594 * Example:
595 * ---
596 * // Some Thrift service.
597 * interface Foo {
598 * int foo(string name);
599 * byte[] bar();
600 * }
601 *
602 * // Create the aggregator pool – client0, client1, client2 are some
603 * // TAsyncClient!Foo instances, but in theory could also be other
604 * // TFutureInterface!Foo implementations (e.g. some async client pool).
605 * auto pool = new TAsyncAggregator!Foo([client0, client1, client2]);
606 *
607 * foreach (val; pool.foo("baz").range(dur!"seconds"(1))) {
608 * // Process all the results that are available before a second has passed,
609 * // in the order they arrive.
610 * writeln(val);
611 * }
612 *
613 * auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){
614 * if (vals.empty) {
615 * throw new TCompoundOperationException("All clients failed", exs);
616 * }
617 *
618 * // Just to illustrate that the type of the values can change, convert the
619 * // numbers to double and sum up their roots.
620 * double result = 0;
621 * foreach (v; vals) result += sqrt(cast(double)v);
622 * return result;
623 * })();
624 *
625 * // Wait up to three seconds for the result, and then accumulate what has
626 * // arrived so far.
627 * sumRoots.completion.wait(dur!"seconds"(3));
628 * writeln(sumRoots.finishGet());
629 *
630 * // For scalars, the default accumulator returns an array of the values.
631 * pragma(msg, typeof(pool.foo("").accumulate().get()); // int[].
632 *
633 * // For lists, etc., it concatenates the results together.
634 * pragma(msg, typeof(pool.bar().accumulate().get())); // byte[].
635 * ---
636 *
637 * Note: For the accumulate!() interface, you might currently hit a »cannot use
638 * local '…' as parameter to non-global template accumulate«-error, see
639 * $(DMDBUG 5710, DMD issue 5710). If your accumulator function does not need
640 * to access the surrounding scope, you might want to use a function literal
641 * instead of a delegate to avoid the issue.
642 */
643 class TAsyncAggregator(Interface) if (isBaseService!Interface) {
644 /// Shorthand for the client type this instance operates on.
645 alias TAsyncClientBase!Interface Client;
646
647 ///
this(Client[]clients)648 this(Client[] clients) {
649 clients_ = clients;
650 }
651
652 /// Whether to open the underlying transports of a client before trying to
653 /// execute a method if they are not open. This is usually desirable
654 /// because it allows e.g. to automatically reconnect to a remote server
655 /// if the network connection is dropped.
656 ///
657 /// Defaults to true.
658 bool reopenTransports = true;
659
660 mixin AggregatorOpDispatch!();
661
662 private:
663 Client[] clients_;
664 }
665
666 /// Ditto
667 class TAsyncAggregator(Interface) if (isDerivedService!Interface) :
668 TAsyncAggregator!(BaseService!Interface)
669 {
670 /// Shorthand for the client type this instance operates on.
671 alias TAsyncClientBase!Interface Client;
672
673 ///
this(Client[]clients)674 this(Client[] clients) {
675 super(cast(TAsyncClientBase!(BaseService!Interface)[])clients);
676 }
677
678 mixin AggregatorOpDispatch!();
679 }
680
681 /**
682 * Whether fun is a valid accumulator function for values of type ValueType.
683 *
684 * For this to be true, fun must be a callable matching one of the following
685 * argument lists:
686 * ---
687 * fun(ValueType[] values);
688 * fun(ValueType[] values, Exception[] exceptions);
689 * ---
690 *
691 * The second version is passed the collected array exceptions from all the
692 * clients in the pool.
693 *
694 * The return value of the accumulator function is passed to the client (via
695 * the result future). If it throws an exception, the operation is marked as
696 * failed with the given exception instead.
697 */
isAccumulator(ValueType,alias fun)698 template isAccumulator(ValueType, alias fun) {
699 enum isAccumulator = is(typeof(fun(cast(ValueType[])[]))) ||
700 is(typeof(fun(cast(ValueType[])[], cast(Exception[])[])));
701 }
702
703 /**
704 * TAsyncAggregator construction helper to avoid having to explicitly
705 * specify the interface type, i.e. to allow the constructor being called
706 * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
707 */
708 TAsyncAggregator!Interface tAsyncAggregator(Interface)(
709 TAsyncClientBase!Interface[] clients
710 ) if (isService!Interface) {
711 return new typeof(return)(clients);
712 }
713
714 private {
AggregatorOpDispatch()715 mixin template AggregatorOpDispatch() {
716 auto opDispatch(string name, Args...)(Args args) if (
717 is(typeof(mixin("Interface.init." ~ name)(args)))
718 ) {
719 alias ReturnType!(MemberType!(Interface, name)) ResultType;
720
721 auto childCancellation = new TCancellationOrigin;
722
723 TFuture!ResultType[] futures;
724 futures.reserve(clients_.length);
725
726 foreach (c; cast(Client[])clients_) {
727 if (reopenTransports) {
728 if (!c.transport.isOpen) {
729 try {
730 c.transport.open();
731 } catch (Exception e) {
732 continue;
733 }
734 }
735 }
736 futures ~= mixin("c." ~ name)(args, childCancellation);
737 }
738
739 return AggregationResult!ResultType(futures, childCancellation);
740 }
741 }
742
AggregationResult(T)743 struct AggregationResult(T) {
744 auto opSlice() {
745 return range();
746 }
747
748 auto range(Duration timeout = dur!"hnsecs"(0)) {
749 return tFutureAggregatorRange(futures_, childCancellation_, timeout);
750 }
751
752 auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!(T, acc)) {
753 return new AccumulatorJob!(T, acc)(futures_, childCancellation_);
754 }
755
756 private:
757 TFuture!T[] futures_;
758 TCancellationOrigin childCancellation_;
759 }
760
defaultAccumulator(T)761 auto defaultAccumulator(T)(T[] values, Exception[] exceptions) {
762 if (values.empty) {
763 throw new TCompoundOperationException("All clients failed",
764 exceptions);
765 }
766
767 static if (is(typeof(T.init ~ T.init))) {
768 import std.algorithm;
769 return reduce!"a ~ b"(values);
770 } else {
771 return values;
772 }
773 }
774
775 final class AccumulatorJob(T, alias accumulator) if (
776 isAccumulator!(T, accumulator)
777 ) : TFuture!(AccumulatorResult!(T, accumulator)) {
778 this(TFuture!T[] futures, TCancellationOrigin childCancellation) {
779 futures_ = futures;
780 childCancellation_ = childCancellation;
781 resultMutex_ = new Mutex;
782 completionEvent_ = new TOneshotEvent;
783
foreach(future;futures)784 foreach (future; futures) {
785 future.completion.addCallback({
786 auto f = future;
787 return {
788 synchronized (resultMutex_) {
789 if (f.status == TFutureStatus.CANCELLED) {
790 if (!finished_) {
791 status_ = TFutureStatus.CANCELLED;
792 finished_ = true;
793 }
794 return;
795 }
796
797 if (f.status == TFutureStatus.FAILED) {
798 exceptions_ ~= f.getException();
799 } else {
800 results_ ~= f.get();
801 }
802
803 if (results_.length + exceptions_.length == futures_.length) {
804 finished_ = true;
805 completionEvent_.trigger();
806 }
807 }
808 };
809 }());
810 }
811 }
812
status()813 TFutureStatus status() @property {
814 synchronized (resultMutex_) {
815 if (!finished_) return TFutureStatus.RUNNING;
816 if (status_ != TFutureStatus.RUNNING) return status_;
817
818 try {
819 result_ = invokeAccumulator!accumulator(results_, exceptions_);
820 status_ = TFutureStatus.SUCCEEDED;
821 } catch (Exception e) {
822 exception_ = e;
823 status_ = TFutureStatus.FAILED;
824 }
825
826 return status_;
827 }
828 }
829
completion()830 TAwaitable completion() @property {
831 return completionEvent_;
832 }
833
834 AccumulatorResult!(T, accumulator) get() {
835 auto s = status;
836
837 enforce(s != TFutureStatus.RUNNING,
838 new TFutureException("Operation not yet completed."));
839
840 if (s == TFutureStatus.CANCELLED) throw new TCancelledException;
841 if (s == TFutureStatus.FAILED) throw exception_;
842 return result_;
843 }
844
getException()845 Exception getException() {
846 auto s = status;
847 enforce(s != TFutureStatus.RUNNING,
848 new TFutureException("Operation not yet completed."));
849
850 if (s == TFutureStatus.CANCELLED) throw new TCancelledException;
851
852 if (s == TFutureStatus.SUCCEEDED) {
853 return null;
854 }
855 return exception_;
856 }
857
finish()858 void finish() {
859 synchronized (resultMutex_) {
860 if (!finished_) {
861 finished_ = true;
862 childCancellation_.trigger();
863 completionEvent_.trigger();
864 }
865 }
866 }
867
finishGet()868 auto finishGet() {
869 finish();
870 return get();
871 }
872
873 private:
874 TFuture!T[] futures_;
875 TCancellationOrigin childCancellation_;
876
877 bool finished_;
878 T[] results_;
879 Exception[] exceptions_;
880
881 TFutureStatus status_;
882 Mutex resultMutex_;
883 union {
884 AccumulatorResult!(T, accumulator) result_;
885 Exception exception_;
886 }
887 TOneshotEvent completionEvent_;
888 }
889
890 auto invokeAccumulator(alias accumulator, T)(
891 T[] values, Exception[] exceptions
892 ) if (
893 isAccumulator!(T, accumulator)
894 ) {
895 static if (is(typeof(accumulator(values, exceptions)))) {
896 return accumulator(values, exceptions);
897 } else {
898 return accumulator(values);
899 }
900 }
901
AccumulatorResult(T,alias acc)902 template AccumulatorResult(T, alias acc) {
903 alias typeof(invokeAccumulator!acc(cast(T[])[], cast(Exception[])[]))
904 AccumulatorResult;
905 }
906 }
907