1 /*
2 * genericactors.actor.h
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #pragma once
22
23 // When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version.
24 #if defined(NO_INTELLISENSE) && !defined(FLOW_GENERICACTORS_ACTOR_G_H)
25 #define FLOW_GENERICACTORS_ACTOR_G_H
26 #include "flow/genericactors.actor.g.h"
27 #elif !defined(GENERICACTORS_ACTOR_H)
28 #define GENERICACTORS_ACTOR_H
29
30 #include <list>
31
32 #include "flow/flow.h"
33 #include "flow/Knobs.h"
34 #include "flow/Util.h"
35 #include "flow/actorcompiler.h" // This must be the last #include.
36 #pragma warning( disable: 4355 ) // 'this' : used in base member initializer list
37
38 ACTOR template<class T, class X>
39 Future<T> traceAfter(Future<T> what, const char* type, const char* key, X value, bool traceErrors = false)
40 {
41 try {
42 T val = wait(what);
43 TraceEvent(type).detail(key, value);
44 return val;
catch(Error & e)45 } catch( Error &e ) {
46 if(traceErrors) TraceEvent(type).error(e,true).detail(key, value);
47 throw;
48 }
49 }
50
51 ACTOR template<class T, class X>
52 Future<T> traceAfterCall(Future<T> what, const char* type, const char* key, X func, bool traceErrors = false)
53 {
54 try {
55 state T val = wait(what);
56 try {
57 TraceEvent(type).detail(key, func(val));
catch(Error & e)58 } catch( Error &e ) {
59 TraceEvent(SevError, "TraceAfterCallError").error(e);
60 }
61 return val;
catch(Error & e)62 } catch( Error &e ) {
63 if(traceErrors) TraceEvent(type).error(e,true);
64 throw;
65 }
66 }
67
68 ACTOR template <class T>
stopAfter(Future<T> what)69 Future<Optional<T>> stopAfter( Future<T> what ) {
70 state Optional<T> ret = T();
71 try {
72 T _ = wait(what);
73 ret = Optional<T>(_);
74 } catch (Error& e) {
75 bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete || e.code() == error_code_actor_cancelled;
76 TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e);
77 if(!ok) {
78 fprintf(stderr, "Fatal Error: %s\n", e.what());
79 ret = Optional<T>();
80 }
81 }
82 g_network->stop();
83 return ret;
84 }
85
86 template <class T>
sorted(T range)87 T sorted(T range) {
88 std::sort(range.begin(), range.end());
89 return range;
90 }
91
92 template <class T>
93 inline std::vector<T>& operator , (std::vector<T>& v, T a) {
94 v.push_back(a);
95 return v;
96 }
97
98 template <class T>
99 inline std::vector<T>& operator , (std::vector<T> && v, T a) {
100 return (const_cast<std::vector<T>&>(v), a);
101 }
102
103 template <class T>
errorOr(T t)104 ErrorOr<T> errorOr( T t ) {
105 return ErrorOr<T>(t);
106 }
107
108 ACTOR template <class T>
errorOr(Future<T> f)109 Future<ErrorOr<T>> errorOr( Future<T> f ) {
110 try {
111 T t = wait(f);
112 return ErrorOr<T>(t);
113 } catch (Error& e) {
114 return ErrorOr<T>(e);
115 }
116 }
117
118 ACTOR template <class T>
throwErrorOr(Future<ErrorOr<T>> f)119 Future<T> throwErrorOr( Future<ErrorOr<T>> f ) {
120 ErrorOr<T> t = wait(f);
121 if(t.isError()) throw t.getError();
122 return t.get();
123 }
124
125 ACTOR template <class T>
transformErrors(Future<T> f,Error err)126 Future<T> transformErrors( Future<T> f, Error err ) {
127 try {
128 T t = wait( f );
129 return t;
130 } catch( Error &e ) {
131 if( e.code() == error_code_actor_cancelled )
132 throw e;
133 throw err;
134 }
135 }
136
137 ACTOR template <class T>
transformError(Future<T> f,Error inErr,Error outErr)138 Future<T> transformError( Future<T> f, Error inErr, Error outErr ) {
139 try {
140 T t = wait( f );
141 return t;
142 } catch( Error &e ) {
143 if( e.code() == inErr.code() )
144 throw outErr;
145 throw e;
146 }
147 }
148
149 // Note that the RequestStream<T> version of forwardPromise doesn't exist, because what to do with errors?
150
151 ACTOR template <class T>
forwardEvent(Event * ev,Future<T> input)152 void forwardEvent( Event* ev, Future<T> input ) {
153 try {
154 T value = wait(input);
155 } catch (Error&) {
156 }
157 ev->set();
158 }
159
160 ACTOR template <class T>
forwardEvent(Event * ev,T * t,Error * err,FutureStream<T> input)161 void forwardEvent( Event* ev, T* t, Error* err, FutureStream<T> input ) {
162 try {
163 T value = waitNext(input);
164 *t = std::move(value);
165 ev->set();
166 } catch (Error& e) {
167 *err = e;
168 ev->set();
169 }
170 }
171
172 ACTOR template <class T>
waitForAllReady(std::vector<Future<T>> results)173 Future<Void> waitForAllReady( std::vector<Future<T>> results ) {
174 state int i = 0;
175 loop {
176 if (i == results.size()) return Void();
177 try {
178 T r = wait( results[i] );
179 } catch (...) {
180 }
181 i++;
182 }
183 }
184
185 ACTOR template <class T>
186 Future<T> timeout( Future<T> what, double time, T timedoutValue, int taskID = TaskDefaultDelay ) {
187 Future<Void> end = delay( time, taskID );
188 choose {
189 when( T t = wait( what ) ) { return t; }
when(wait (end))190 when( wait( end ) ) { return timedoutValue; }
191 }
192 }
193
194 ACTOR template <class T>
timeout(Future<T> what,double time)195 Future<Optional<T>> timeout( Future<T> what, double time ) {
196 Future<Void> end = delay( time );
197 choose {
198 when( T t = wait( what ) ) { return t; }
199 when( wait( end ) ) { return Optional<T>(); }
200 }
201 }
202
203 ACTOR template <class T>
204 Future<T> timeoutError( Future<T> what, double time, int taskID = TaskDefaultDelay ) {
205 Future<Void> end = delay( time, taskID );
206 choose {
207 when( T t = wait( what ) ) { return t; }
when(wait (end))208 when( wait( end ) ) { throw timed_out(); }
209 }
210 }
211
212 ACTOR template <class T>
213 Future<T> delayed( Future<T> what, double time = 0.0, int taskID = TaskDefaultDelay ) {
214 try {
215 state T t = wait( what );
216 wait( delay( time, taskID ) );
217 return t;
catch(Error & e)218 } catch( Error &e ) {
219 state Error err = e;
220 wait( delay( time, taskID ) );
221 throw err;
222 }
223 }
224
225 ACTOR template<class Func>
226 Future<Void> recurring( Func what, double interval, int taskID = TaskDefaultDelay ) {
227 loop choose {
when(wait (delay (interval,taskID)))228 when ( wait( delay( interval, taskID ) ) ) { what(); }
229 }
230 }
231
232 ACTOR template<class Func>
trigger(Func what,Future<Void> signal)233 Future<Void> trigger( Func what, Future<Void> signal ) {
234 wait( signal );
235 what();
236 return Void();
237 }
238
239 ACTOR template<class Func>
triggerOnError(Func what,Future<Void> signal)240 Future<Void> triggerOnError( Func what, Future<Void> signal ) {
241 try {
242 wait( signal );
243 }
244 catch(Error &e) {
245 what();
246 }
247
248 return Void();
249 }
250
251 //Waits for a future to complete and cannot be cancelled
252 //Most situations will use the overload below, which does not require a promise
253 ACTOR template<class T>
uncancellable(Future<T> what,Promise<T> result)254 void uncancellable(Future<T> what, Promise<T> result)
255 {
256 try {
257 T val = wait(what);
258 result.send(val);
259 } catch( Error &e ) {
260 result.sendError(e);
261 }
262 }
263
264 //Waits for a future to complete and cannot be cancelled
265 ACTOR template<class T>
uncancellable(Future<T> what)266 Future<T> uncancellable(Future<T> what)
267 {
268 Promise<T> resultPromise;
269 Future<T> result = resultPromise.getFuture();
270
271 uncancellable(what, resultPromise);
272 T val = wait(result);
273
274 return val;
275 }
276
277 //Holds onto an object until a future either completes or is cancelled
278 //Used to prevent the object from being reclaimed
279 //
280 // NOTE: the order of the arguments is important. The arguments will be destructed in
281 // reverse order, and we need the object to be destructed last.
282 ACTOR template<class T, class X>
holdWhile(X object,Future<T> what)283 Future<T> holdWhile(X object, Future<T> what)
284 {
285 T val = wait(what);
286 return val;
287 }
288
289 ACTOR template<class T, class X>
holdWhileVoid(X object,Future<T> what)290 Future<Void> holdWhileVoid(X object, Future<T> what)
291 {
292 T val = wait(what);
293 return Void();
294 }
295
296 template<class T>
store(T & out,Future<T> what)297 Future<Void> store(T &out, Future<T> what) {
298 return map(what, [&out](T const &v) { out = v; return Void(); });
299 }
300
301 template<class T>
302 Future<Void> storeOrThrow(T &out, Future<Optional<T>> what, Error e = key_not_found()) {
303 return map(what, [&out,e](Optional<T> const &o) {
304 if(!o.present())
305 throw e;
306 out = o.get();
307 return Void();
308 });
309 }
310
311 //Waits for a future to be ready, and then applies an asynchronous function to it.
312 ACTOR template<class T, class F, class U = decltype( fake<F>()(fake<T>()).getValue() )>
mapAsync(Future<T> what,F actorFunc)313 Future<U> mapAsync(Future<T> what, F actorFunc)
314 {
315 T val = wait(what);
316 U ret = wait(actorFunc(val));
317 return ret;
318 }
319
320 //maps a vector of futures with an asynchronous function
321 template<class T, class F>
mapAsync(std::vector<Future<T>> const & what,F const & actorFunc)322 std::vector<Future<decltype(actorFunc(T()).getValue())>> mapAsync(std::vector<Future<T>> const& what, F const& actorFunc)
323 {
324 std::vector<typename std::result_of<F(T)>::type> ret;
325 for(auto f : what)
326 ret.push_back(mapAsync( f, actorFunc ));
327 return ret;
328 }
329
330 //maps a stream with an asynchronous function
331 ACTOR template<class T, class F, class U = decltype( fake<F>()(fake<T>()).getValue() )>
mapAsync(FutureStream<T> input,F actorFunc,PromiseStream<U> output)332 Future<Void> mapAsync( FutureStream<T> input, F actorFunc, PromiseStream<U> output ) {
333 state Deque<Future<U>> futures;
334
335 loop {
336 try {
337 choose {
338 when( T nextInput = waitNext( input ) ) {
339 futures.push_back( actorFunc(nextInput) );
340 }
341 when( U nextOutput = wait( futures.size() == 0 ? Never() : futures.front() ) ) {
342 output.send( nextOutput );
343 futures.pop_front();
344 }
345
346 }
347 }
348 catch ( Error& e ) {
349 if( e.code() == error_code_end_of_stream ) {
350 break;
351 }
352 else {
353 output.sendError( e );
354 throw e;
355 }
356 }
357 }
358
359 while(futures.size()) {
360 U nextOutput = wait( futures.front() );
361 output.send( nextOutput );
362 futures.pop_front();
363 }
364
365 output.sendError(end_of_stream());
366
367 return Void();
368
369 }
370
371 //Waits for a future to be ready, and then applies a function to it.
372 ACTOR template<class T, class F>
map(Future<T> what,F func)373 Future<typename std::result_of<F(T)>::type> map(Future<T> what, F func)
374 {
375 T val = wait(what);
376 return func(val);
377 }
378
379 //maps a vector of futures
380 template<class T, class F>
map(std::vector<Future<T>> const & what,F const & func)381 std::vector<Future<typename std::result_of<F(T)>>> map(std::vector<Future<T>> const& what, F const& func)
382 {
383 std::vector<Future<typename std::result_of<F(T)>>> ret;
384 for(auto f : what)
385 ret.push_back(map( f, func ));
386 return ret;
387 }
388
389 //maps a stream
390 ACTOR template<class T, class F>
map(FutureStream<T> input,F func,PromiseStream<typename std::result_of<F (T)>> output)391 Future<Void> map( FutureStream<T> input, F func, PromiseStream<typename std::result_of<F(T)>> output )
392 {
393 loop {
394 try {
395 T nextInput = waitNext( input );
396 output.send(func(nextInput));
397 }
398 catch ( Error& e ) {
399 if( e.code() == error_code_end_of_stream ) {
400 break;
401 }
402 else
403 throw;
404 }
405 }
406
407 output.sendError(end_of_stream());
408
409 return Void();
410 }
411
412 //Returns if the future returns true, otherwise waits forever.
returnIfTrue(Future<bool> f)413 ACTOR static Future<Void> returnIfTrue( Future<bool> f )
414 {
415 bool b = wait( f );
416 if ( b ) {
417 return Void();
418 }
419 wait( Never() );
420 throw internal_error();
421 }
422
423 //Returns if the future, when waited on and then evaluated with the predicate, returns true, otherwise waits forever
424 template<class T, class F>
returnIfTrue(Future<T> what,F pred)425 Future<Void> returnIfTrue( Future<T> what, F pred)
426 {
427 return returnIfTrue( map( what, pred ) );
428 }
429
430 //filters a stream
431 ACTOR template<class T, class F>
filter(FutureStream<T> input,F pred,PromiseStream<T> output)432 Future<Void> filter( FutureStream<T> input, F pred, PromiseStream<T> output )
433 {
434 loop {
435 try {
436 T nextInput = waitNext( input );
437 if(func(nextInput))
438 output.send(nextInput);
439 }
440 catch ( Error& e ) {
441 if( e.code() == error_code_end_of_stream ) {
442 break;
443 }
444 else
445 throw;
446 }
447 }
448
449 output.sendError(end_of_stream());
450
451 return Void();
452 }
453
454 //filters a stream asynchronously
455 ACTOR template<class T, class F>
asyncFilter(FutureStream<T> input,F actorPred,PromiseStream<T> output)456 Future<Void> asyncFilter( FutureStream<T> input, F actorPred, PromiseStream<T> output )
457 {
458 state Deque<std::pair<T, Future<bool>>> futures;
459 state std::pair<T, Future<bool>> p;
460
461 loop {
462 try {
463 choose {
464 when ( T nextInput = waitNext(input) ) {
465 futures.push_back( std::pair<T, Future<bool>>(nextInput, actorPred(nextInput)) );
466 }
467 when ( bool pass = wait( futures.size() == 0 ? Never() : futures.front().second ) ) {
468 if(pass) output.send(futures.front().first);
469 futures.pop_front();
470 }
471 }
472 }
473 catch ( Error& e ) {
474 if( e.code() == error_code_end_of_stream ) {
475 break;
476 }
477 else {
478 throw e;
479 }
480 }
481 }
482
483 while(futures.size()) {
484 p = futures.front();
485 bool pass = wait( p.second );
486 if(pass) output.send(p.first);
487 futures.pop_front();
488 }
489
490 output.sendError(end_of_stream());
491
492 return Void();
493 }
494
495 template <class T>
496 struct WorkerCache {
497 // SOMEDAY: Would we do better to use "unreliable" (at most once) transport for the initialize requests and get rid of this?
498 // It doesn't provide true at most once behavior because things are removed from the cache after they have terminated.
existsWorkerCache499 bool exists( UID id ) {
500 return id_interface.count( id ) != 0;
501 }
setWorkerCache502 void set( UID id, const Future<T>& onReady ) {
503 ASSERT( !exists( id ) );
504 id_interface[ id ] = onReady;
505 }
getWorkerCache506 Future<T> get( UID id ) {
507 ASSERT( exists( id ) );
508 return id_interface[ id ];
509 }
510
removeOnReadyWorkerCache511 Future<Void> removeOnReady( UID id, Future<Void> const& ready ) {
512 return removeOnReady( this, id, ready );
513 }
514 private:
removeOnReadyWorkerCache515 ACTOR static Future<Void> removeOnReady( WorkerCache* self, UID id, Future<Void> ready ) {
516 try {
517 wait(ready);
518 self->id_interface.erase(id);
519 return Void();
520 } catch ( Error &e ) {
521 self->id_interface.erase(id);
522 throw;
523 }
524 }
525
526 std::map<UID, Future<T>> id_interface;
527 };
528
529 template <class K, class V>
530 class AsyncMap : NonCopyable {
531 public:
532 // Represents a complete function from keys to values (K -> V)
533 // All values not explicitly inserted map to V()
534 // If this isn't appropriate, use V=Optional<X>
535
AsyncMap()536 AsyncMap() : defaultValue(), destructing(false) {}
537
~AsyncMap()538 virtual ~AsyncMap() {
539 destructing = true;
540 items.clear();
541 }
542
set(K const & k,V const & v)543 void set( K const& k, V const& v ) {
544 auto& i = items[k];
545 if (i.value != v)
546 setUnconditional(k,v,i);
547 }
setUnconditional(K const & k,V const & v)548 void setUnconditional( K const& k, V const& v ) {
549 setUnconditional(k,v,items[k]);
550 }
triggerAll()551 void triggerAll() {
552 std::vector<Promise<Void>> ps;
553 for(auto it = items.begin(); it != items.end(); ++it){
554 ps.resize(ps.size()+1);
555 ps.back().swap( it->second.change );
556 }
557 std::vector<Promise<Void>> noDestroy = ps; // See explanation of noDestroy in setUnconditional()
558 for(auto p=ps.begin(); p!=ps.end(); ++p)
559 p->send(Void());
560 }
triggerRange(K const & begin,K const & end)561 void triggerRange( K const& begin, K const& end ) {
562 std::vector<Promise<Void>> ps;
563 for(auto it = items.lower_bound(begin); it != items.end() && it->first < end; ++it){
564 ps.resize(ps.size()+1);
565 ps.back().swap( it->second.change );
566 }
567 std::vector<Promise<Void>> noDestroy = ps; // See explanation of noDestroy in setUnconditional()
568 for(auto p=ps.begin(); p!=ps.end(); ++p)
569 p->send(Void());
570 }
trigger(K const & key)571 void trigger( K const& key ) {
572 if( items.count(key) != 0 ) {
573 auto& i = items[key];
574 Promise<Void> trigger;
575 i.change.swap(trigger);
576 Promise<Void> noDestroy = trigger; // See explanation of noDestroy in setUnconditional()
577
578 if (i.value == defaultValue)
579 items.erase(key);
580
581 trigger.send(Void());
582 }
583 }
clear(K const & k)584 void clear( K const& k ) { set(k, V()); }
get(K const & k)585 V const& get( K const& k ) {
586 auto it = items.find(k);
587 if (it != items.end())
588 return it->second.value;
589 else
590 return defaultValue;
591 }
count(K const & k)592 int count( K const& k ) {
593 auto it = items.find(k);
594 if (it != items.end())
595 return 1;
596 return 0;
597 }
onChange(K const & k)598 virtual Future<Void> onChange( K const& k ) { // throws broken_promise if this is destroyed
599 auto &item = items[k];
600 if (item.value == defaultValue)
601 return destroyOnCancel( this, k, item.change.getFuture() );
602 return item.change.getFuture();
603 }
getKeys()604 std::vector<K> getKeys() {
605 std::vector<K> keys;
606 for(auto i = items.begin(); i != items.end(); ++i)
607 keys.push_back( i->first );
608 return keys;
609 }
resetNoWaiting()610 void resetNoWaiting() {
611 for(auto i = items.begin(); i != items.end(); ++i)
612 ASSERT( i->second.change.getFuture().getFutureReferenceCount() == 1 );
613 items.clear();
614 }
615
616 protected:
617 // Invariant: Every item in the map either has value!=defaultValue xor a destroyOnCancel actor waiting on change.getFuture()
618 struct P {
619 V value;
620 Promise<Void> change;
PP621 P() : value() {}
622 };
623 std::map<K, P> items;
624 const V defaultValue;
625 bool destructing;
626
setUnconditional(K const & k,V const & v,P & i)627 void setUnconditional( K const& k, V const& v, P& i ) {
628 Promise<Void> trigger;
629 i.change.swap(trigger);
630 Promise<Void> noDestroy = trigger; // The send(Void()) or even V::operator= could cause destroyOnCancel,
631 // which could undo the change to i.value here. Keeping the promise reference count >= 2
632 // prevents destroyOnCancel from erasing anything from the map.
633
634 if (v == defaultValue)
635 items.erase(k);
636 else
637 i.value = v;
638
639 trigger.send(Void());
640 }
641
destroyOnCancel(AsyncMap * self,K key,Future<Void> change)642 ACTOR Future<Void> destroyOnCancel( AsyncMap* self, K key, Future<Void> change ) {
643 try {
644 wait(change);
645 return Void();
646 } catch (Error& e) {
647 if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount()==1 && change.getPromiseReferenceCount()==1) {
648 if(EXPENSIVE_VALIDATION) {
649 auto& p = self->items[key];
650 ASSERT(p.change.getFuture() == change);
651 }
652 self->items.erase(key);
653 }
654 throw;
655 }
656 }
657 };
658
659 template <class V>
660 class AsyncVar : NonCopyable, public ReferenceCounted<AsyncVar<V>> {
661 public:
AsyncVar()662 AsyncVar() : value() {}
AsyncVar(V const & v)663 AsyncVar( V const& v ) : value(v) {}
AsyncVar(AsyncVar && av)664 AsyncVar(AsyncVar&& av) : value(std::move(av.value)), nextChange(std::move(av.nextChange)) {}
665 void operator=(AsyncVar&& av) { value = std::move(av.value); nextChange = std::move(av.nextChange); }
666
get()667 V const& get() const {
668 return value;
669 }
onChange()670 Future<Void> onChange() const {
671 return nextChange.getFuture();
672 }
set(V const & v)673 void set( V const& v ) {
674 if (v != value)
675 setUnconditional(v);
676 }
setUnconditional(V const & v)677 void setUnconditional( V const& v ) {
678 Promise<Void> t;
679 this->nextChange.swap(t);
680 this->value = v;
681 t.send(Void());
682 }
trigger()683 void trigger() {
684 Promise<Void> t;
685 this->nextChange.swap(t);
686 t.send(Void());
687 }
688
689 private:
690 V value;
691 Promise<Void> nextChange;
692 };
693
694 class AsyncTrigger : NonCopyable {
695 public:
AsyncTrigger()696 AsyncTrigger() {}
AsyncTrigger(AsyncTrigger && at)697 AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {}
698 void operator=(AsyncTrigger&& at) { v = std::move(at.v); }
onTrigger()699 Future<Void> onTrigger() {
700 return v.onChange();
701 }
trigger()702 void trigger() {
703 v.trigger();
704 }
705 private:
706 AsyncVar<Void> v;
707 };
708
709 class Debouncer : NonCopyable {
710 public:
Debouncer(double delay)711 explicit Debouncer( double delay ) { worker = debounceWorker(this, delay); }
Debouncer(Debouncer && at)712 Debouncer(Debouncer&& at) : input(std::move(at.input)), output(std::move(at.output)) {}
713 void operator=(Debouncer&& at) { input = std::move(at.input); output = std::move(at.output); }
onTrigger()714 Future<Void> onTrigger() {
715 return output.onChange();
716 }
trigger()717 void trigger() {
718 input.setUnconditional(Void());
719 }
720 private:
721 AsyncVar<Void> input;
722 AsyncVar<Void> output;
723 Future<Void> worker;
724
debounceWorker(Debouncer * self,double bounceTime)725 ACTOR Future<Void> debounceWorker( Debouncer* self, double bounceTime ) {
726 loop {
727 wait( self->input.onChange() );
728 loop {
729 choose {
730 when(wait( self->input.onChange() )) {}
731 when(wait( delay(bounceTime) )) { break; }
732 }
733 }
734 self->output.setUnconditional(Void());
735 }
736 }
737 };
738
asyncDeserialize(Reference<AsyncVar<Standalone<StringRef>>> input,Reference<AsyncVar<Optional<T>>> output)739 ACTOR template <class T> Future<Void> asyncDeserialize( Reference<AsyncVar<Standalone<StringRef>>> input, Reference<AsyncVar<Optional<T>>> output ) {
740 loop {
741 if (input->get().size())
742 output->set( BinaryReader::fromStringRef<T>( input->get(), IncludeVersion() ) );
743 else
744 output->set( Optional<T>() );
745 wait( input->onChange() );
746 }
747 }
748
749 ACTOR template <class V, class T>
forwardVector(Future<V> values,std::vector<Promise<T>> out)750 void forwardVector( Future<V> values, std::vector<Promise<T>> out ) {
751 V in = wait( values );
752 ASSERT (in.size() == out.size());
753 for(int i=0; i<out.size(); i++)
754 out[i].send( in[i] );
755 }
756
757 ACTOR template <class T>
delayedAsyncVar(Reference<AsyncVar<T>> in,Reference<AsyncVar<T>> out,double time)758 Future<Void> delayedAsyncVar( Reference<AsyncVar<T>> in, Reference<AsyncVar<T>> out, double time ) {
759 try {
760 loop {
761 wait( delay( time ) );
762 out->set( in->get() );
763 wait( in->onChange() );
764 }
765 } catch (Error& e) {
766 out->set( in->get() );
767 throw;
768 }
769 }
770
771 ACTOR template <class T>
setAfter(Reference<AsyncVar<T>> var,double time,T val)772 Future<Void> setAfter( Reference<AsyncVar<T>> var, double time, T val ) {
773 wait( delay( time ) );
774 var->set( val );
775 return Void();
776 }
777
778 ACTOR template <class T>
779 Future<Void> resetAfter( Reference<AsyncVar<T>> var, double time, T val, int warningLimit = -1, double warningResetDelay = 0, const char* context = NULL ) {
780 state bool isEqual = var->get() == val;
781 state Future<Void> resetDelay = isEqual ? Never() : delay(time);
782 state int resetCount = 0;
783 state double lastReset = now();
784 loop {
785 choose {
when(wait (resetDelay))786 when( wait( resetDelay ) ) {
787 var->set( val );
788 if(now() - lastReset > warningResetDelay) {
789 resetCount = 0;
790 }
791 resetCount++;
792 if(context && warningLimit >= 0 && resetCount > warningLimit) {
793 TraceEvent(SevWarnAlways, context).detail("ResetCount", resetCount).detail("LastReset", now() - lastReset);
794 }
795 lastReset = now();
796 isEqual = true;
797 resetDelay = Never();
798 }
799 when( wait( var->onChange() ) ) {}
800 }
801 if( isEqual && var->get() != val ) {
802 isEqual = false;
803 resetDelay = delay(time);
804 }
805 if( !isEqual && var->get() == val ) {
806 isEqual = true;
807 resetDelay = Never();
808 }
809 }
810 }
811
812 ACTOR template <class T>
setWhenDoneOrError(Future<Void> condition,Reference<AsyncVar<T>> var,T val)813 Future<Void> setWhenDoneOrError( Future<Void> condition, Reference<AsyncVar<T>> var, T val ) {
814 try {
815 wait( condition );
816 }
817 catch ( Error& e ) {
818 if (e.code() == error_code_actor_cancelled) throw;
819 }
820 var->set( val );
821 return Void();
822 }
823
824 Future<bool> allTrue( const std::vector<Future<bool>>& all );
825 Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> const& input, Reference<AsyncVar<bool>> const& output );
826 Future<Void> cancelOnly( std::vector<Future<Void>> const& futures );
827 Future<Void> timeoutWarningCollector( FutureStream<Void> const& input, double const& logDelay, const char* const& context, UID const& id );
828 Future<bool> quorumEqualsTrue( std::vector<Future<bool>> const& futures, int const& required );
829
830 ACTOR template <class T>
streamHelper(PromiseStream<T> output,PromiseStream<Error> errors,Future<T> input)831 Future<Void> streamHelper( PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input ) {
832 try {
833 T value = wait(input);
834 output.send(value);
835 } catch (Error& e) {
836 if (e.code() == error_code_actor_cancelled)
837 throw;
838 errors.send(e);
839 }
840 return Void();
841 }
842
843 template <class T>
makeStream(const std::vector<Future<T>> & futures,PromiseStream<T> & stream,PromiseStream<Error> & errors)844 Future<Void> makeStream( const std::vector<Future<T>>& futures, PromiseStream<T>& stream, PromiseStream<Error>& errors ) {
845 std::vector<Future<Void>> forwarders;
846 for(int f=0; f<futures.size(); f++)
847 forwarders.push_back( streamHelper( stream, errors, futures[f] ) );
848 return cancelOnly(forwarders);
849 }
850
851 template <class T>
852 class QuorumCallback;
853
854 template <class T>
855 struct Quorum : SAV<Void> {
856 int antiQuorum;
857 int count;
858
sizeForQuorum859 static inline int sizeFor(int count) {
860 return sizeof(Quorum<T>) + sizeof(QuorumCallback<T>)*count;
861 }
862
destroyQuorum863 virtual void destroy() {
864 int size = sizeFor(this->count);
865 this->~Quorum();
866 freeFast(size, this);
867 }
cancelQuorum868 virtual void cancel() {
869 int cancelled_callbacks = 0;
870 for (int i = 0; i < count; i++)
871 if (callbacks()[i].next) {
872 callbacks()[i].remove();
873 callbacks()[i].next = 0;
874 ++cancelled_callbacks;
875 }
876 if (canBeSet())
877 sendError(actor_cancelled());
878 for (int i = 0; i < cancelled_callbacks; i++)
879 delPromiseRef();
880 }
QuorumQuorum881 explicit Quorum(int quorum, int count) : SAV<Void>(1, count), antiQuorum(count - quorum + 1), count(count) {
882 if (!quorum) this->send(Void());
883 }
oneSuccessQuorum884 void oneSuccess() {
885 if (getPromiseReferenceCount() == antiQuorum && canBeSet())
886 this->sendAndDelPromiseRef(Void());
887 else
888 delPromiseRef();
889 }
oneErrorQuorum890 void oneError(Error err) {
891 if (canBeSet())
892 this->sendErrorAndDelPromiseRef(err);
893 else
894 delPromiseRef();
895 }
896
callbacksQuorum897 QuorumCallback<T>* callbacks() { return (QuorumCallback<T>*)(this + 1); }
898 };
899
900 template <class T>
901 class QuorumCallback : public Callback<T> {
902 public:
QuorumCallback(Future<T> future,Quorum<T> * head)903 QuorumCallback(Future<T> future, Quorum<T>* head)
904 : head(head)
905 {
906 future.addCallbackAndClear(this);
907 }
fire(const T & value)908 virtual void fire(const T& value) {
909 Callback<T>::remove();
910 Callback<T>::next = 0;
911 head->oneSuccess();
912 }
error(Error error)913 virtual void error(Error error) {
914 Callback<T>::remove();
915 Callback<T>::next = 0;
916 head->oneError(error);
917 }
918
919 private:
920 Quorum<T>* head;
921 };
922
923 template <class T>
quorum(std::vector<Future<T>> const & results,int n)924 Future<Void> quorum(std::vector<Future<T>> const& results, int n) {
925 ASSERT(n >= 0 && n <= results.size());
926
927 int size = Quorum<T>::sizeFor(results.size());
928 Quorum<T>* q = new (allocateFast(size)) Quorum<T>(n, results.size());
929
930 QuorumCallback<T>* nextCallback = q->callbacks();
931 for (auto & r : results) {
932 if (r.isReady()) {
933 nextCallback->next = 0;
934 if (r.isError())
935 q->oneError(r.getError());
936 else
937 q->oneSuccess();
938 }
939 else
940 new (nextCallback) QuorumCallback<T>(r, q);
941 ++nextCallback;
942 }
943 return Future<Void>(q);
944 }
945
946 ACTOR template <class T>
947 Future<Void> smartQuorum( std::vector<Future<T>> results, int required, double extraSeconds, int taskID = TaskDefaultDelay ) {
948 if (results.empty() && required == 0) return Void();
949 wait(quorum(results, required));
950 choose {
951 when (wait(quorum(results, (int)results.size()))) {return Void();}
when(wait (delay (extraSeconds,taskID)))952 when (wait(delay(extraSeconds, taskID))) {return Void(); }
953 }
954 }
955
956 template <class T>
waitForAll(std::vector<Future<T>> const & results)957 Future<Void> waitForAll( std::vector<Future<T>> const& results ) {
958 if (results.empty()) return Void();
959 return quorum( results, (int)results.size() );
960 }
961
962 template <class T>
waitForAny(std::vector<Future<T>> const & results)963 Future<Void> waitForAny( std::vector<Future<T>> const& results ) {
964 if (results.empty()) return Void();
965 return quorum( results, 1 );
966 }
967
shortCircuitAny(std::vector<Future<bool>> f)968 ACTOR static Future<bool> shortCircuitAny( std::vector<Future<bool>> f )
969 {
970 std::vector<Future<Void>> sc;
971 for(Future<bool> fut : f) {
972 sc.push_back(returnIfTrue(fut));
973 }
974
975 choose {
976 when( wait( waitForAll( f ) ) ) {
977 // Handle a possible race condition? If the _last_ term to
978 // be evaluated triggers the waitForAll before bubbling
979 // out of the returnIfTrue quorum
980 for ( auto fut : f ) {
981 if ( fut.get() ) {
982 return true;
983 }
984 }
985 return false;
986 }
987 when( wait( waitForAny( sc ) ) ) {
988 return true;
989 }
990 }
991 }
992
993 ACTOR template <class T>
getAll(std::vector<Future<T>> input)994 Future<std::vector<T>> getAll( std::vector<Future<T>> input ) {
995 if (input.empty()) return std::vector<T>();
996 wait( quorum( input, input.size() ) );
997
998 std::vector<T> output;
999 for(int i=0; i<input.size(); i++)
1000 output.push_back( input[i].get() );
1001 return output;
1002 }
1003
1004 ACTOR template <class T>
appendAll(std::vector<Future<std::vector<T>>> input)1005 Future<std::vector<T>> appendAll( std::vector<Future<std::vector<T>>> input ) {
1006 wait( quorum( input, input.size() ) );
1007
1008 std::vector<T> output;
1009 for(int i=0; i<input.size(); i++) {
1010 auto const& r = input[i].get();
1011 output.insert( output.end(), r.begin(), r.end() );
1012 }
1013 return output;
1014 }
1015
onEqual(Future<T> in,T equalTo)1016 ACTOR template <class T> Future<Void> onEqual( Future<T> in, T equalTo ) {
1017 T t = wait(in);
1018 if ( t == equalTo )
1019 return Void();
1020 wait(Never()); // never return
1021 throw internal_error(); // does not happen
1022 }
1023
1024 ACTOR template <class T>
success(Future<T> of)1025 Future<Void> success( Future<T> of ) {
1026 T t = wait( of );
1027 return Void();
1028 }
1029
1030 ACTOR template <class T>
ready(Future<T> f)1031 Future<Void> ready( Future<T> f ) {
1032 try {
1033 T _ = wait( f );
1034 } catch (...) {
1035 }
1036 return Void();
1037 }
1038
1039 ACTOR template <class T>
waitAndForward(FutureStream<T> input)1040 Future<T> waitAndForward( FutureStream<T> input ) {
1041 T output = waitNext( input );
1042 return output;
1043 }
1044
1045 ACTOR template <class T>
reportErrorsExcept(Future<T> in,const char * context,UID id,std::set<int> const * pExceptErrors)1046 Future<T> reportErrorsExcept( Future<T> in, const char* context, UID id, std::set<int> const* pExceptErrors ) {
1047 try {
1048 T t = wait( in );
1049 return t;
1050 } catch (Error& e) {
1051 if (e.code() != error_code_actor_cancelled && (!pExceptErrors || !pExceptErrors->count(e.code())))
1052 TraceEvent(SevError, context, id).error(e);
1053 throw;
1054 }
1055 }
1056
1057 template <class T>
1058 Future<T> reportErrors( Future<T> const& in, const char* context, UID id = UID() ) {
1059 return reportErrorsExcept(in, context, id, NULL);
1060 }
1061
1062 ACTOR template <class T>
require(Future<Optional<T>> in,int errorCode)1063 Future<T> require( Future<Optional<T>> in, int errorCode ) {
1064 Optional<T> o = wait(in);
1065 if (o.present()) {
1066 return o.get();
1067 }
1068 else {
1069 throw Error(errorCode);
1070 }
1071 }
1072
1073 ACTOR template <class T>
waitForFirst(std::vector<Future<T>> items)1074 Future<T> waitForFirst( std::vector<Future<T>> items ) {
1075 state PromiseStream<T> resultStream;
1076 state PromiseStream<Error> errorStream;
1077
1078 state Future<Void> forCancellation = makeStream( items, resultStream, errorStream );
1079
1080 state FutureStream<T> resultFutureStream = resultStream.getFuture();
1081 state FutureStream<Error> errorFutureStream = errorStream.getFuture();
1082
1083 choose {
1084 when (T val = waitNext( resultFutureStream )) {
1085 forCancellation = Future<Void>();
1086 return val;
1087 }
1088 when (Error e = waitNext( errorFutureStream )) {
1089 forCancellation = Future<Void>();
1090 throw e;
1091 }
1092 }
1093 }
1094
1095 ACTOR template <class T>
tag(Future<Void> future,T what)1096 Future<T> tag( Future<Void> future, T what ) {
1097 wait(future);
1098 return what;
1099 }
1100
1101 ACTOR template <class T>
tag(Future<Void> future,T what,PromiseStream<T> stream)1102 Future<Void> tag( Future<Void> future, T what, PromiseStream<T> stream ) {
1103 wait( future );
1104 stream.send( what );
1105 return Void();
1106 }
1107
1108 ACTOR template <class T>
tagError(Future<Void> future,Error e)1109 Future<T> tagError( Future<Void> future, Error e) {
1110 wait(future);
1111 throw e;
1112 }
1113
1114 //If the future is ready, yields and returns. Otherwise, returns when future is set.
1115 template <class T>
orYield(Future<T> f)1116 Future<T> orYield( Future<T> f ) {
1117 if(f.isReady()) {
1118 if(f.isError())
1119 return tagError<T>(yield(), f.getError());
1120 else
1121 return tag(yield(), f.get());
1122 }
1123 else
1124 return f;
1125 }
1126
orYield(Future<Void> f)1127 static Future<Void> orYield( Future<Void> f ) {
1128 if(f.isReady()) {
1129 if(f.isError())
1130 return tagError<Void>(yield(), f.getError());
1131 else
1132 return yield();
1133 }
1134 else
1135 return f;
1136 }
1137
chooseActor(Future<T> lhs,Future<T> rhs)1138 ACTOR template <class T> Future<T> chooseActor( Future<T> lhs, Future<T> rhs ) {
1139 choose {
1140 when ( T t = wait(lhs) ) { return t; }
1141 when ( T t = wait(rhs) ) { return t; }
1142 }
1143 }
1144
1145 // set && set -> set
1146 // error && x -> error
1147 // all others -> unset
1148 static Future<Void> operator &&( Future<Void> const& lhs, Future<Void> const& rhs ) {
1149 if(lhs.isReady()) {
1150 if(lhs.isError()) return lhs;
1151 else return rhs;
1152 }
1153 if(rhs.isReady()) {
1154 if(rhs.isError()) return rhs;
1155 else return lhs;
1156 }
1157
1158 std::vector<Future<Void>> v;
1159 v.push_back( lhs );
1160 v.push_back( rhs );
1161 return waitForAll(v);
1162 }
1163
1164 // error || unset -> error
1165 // unset || unset -> unset
1166 // all others -> set
1167 inline Future<Void> operator ||( Future<Void> const& lhs, Future<Void> const& rhs ) {
1168 if(lhs.isReady()) {
1169 if(lhs.isError()) return lhs;
1170 if(rhs.isReady()) return rhs;
1171 return lhs;
1172 }
1173
1174 return chooseActor( lhs, rhs );
1175 }
1176
brokenPromiseToNever(Future<T> in)1177 ACTOR template <class T> Future<T> brokenPromiseToNever( Future<T> in ) {
1178 try {
1179 T t = wait(in);
1180 return t;
1181 } catch (Error& e) {
1182 if (e.code() != error_code_broken_promise)
1183 throw;
1184 wait(Never()); // never return
1185 throw internal_error(); // does not happen
1186 }
1187 }
1188
brokenPromiseToMaybeDelivered(Future<T> in)1189 ACTOR template <class T> Future<T> brokenPromiseToMaybeDelivered( Future<T> in ) {
1190 try {
1191 T t = wait(in);
1192 return t;
1193 } catch (Error& e) {
1194 if (e.code() == error_code_broken_promise) {
1195 throw request_maybe_delivered();
1196 }
1197 throw;
1198 }
1199 }
1200
tagAndForward(Promise<T> * pOutputPromise,T value,Future<Void> signal)1201 ACTOR template <class T> void tagAndForward( Promise<T>* pOutputPromise, T value, Future<Void> signal ) {
1202 state Promise<T> out( std::move(*pOutputPromise) );
1203 wait( signal );
1204 out.send(value);
1205 }
1206
tagAndForwardError(Promise<T> * pOutputPromise,Error value,Future<Void> signal)1207 ACTOR template <class T> void tagAndForwardError( Promise<T>* pOutputPromise, Error value, Future<Void> signal ) {
1208 state Promise<T> out( std::move(*pOutputPromise) );
1209 wait( signal );
1210 out.sendError(value);
1211 }
1212
waitOrError(Future<T> f,Future<Void> errorSignal)1213 ACTOR template <class T> Future<T> waitOrError(Future<T> f, Future<Void> errorSignal) {
1214 choose {
1215 when(T val = wait(f)) {
1216 return val;
1217 }
1218 when(wait(errorSignal)) {
1219 ASSERT(false);
1220 throw internal_error();
1221 }
1222 }
1223 }
1224
1225 struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
1226 // FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code between
1227 // wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock is fewer than the
1228 // number of permits, and release() makes the caller no longer a holder of the lock. release() only runs waiting take()rs
1229 // after the caller wait()s
1230
1231 struct Releaser : NonCopyable {
1232 FlowLock* lock;
1233 int remaining;
ReleaserFlowLock::Releaser1234 Releaser() : lock(0), remaining(0) {}
1235 Releaser( FlowLock& lock, int64_t amount = 1 ) : lock(&lock), remaining(amount) {}
ReleaserFlowLock::Releaser1236 Releaser(Releaser&& r) BOOST_NOEXCEPT : lock(r.lock), remaining(r.remaining) { r.remaining = 0; }
1237 void operator=(Releaser&& r) { if (remaining) lock->release(remaining); lock = r.lock; remaining = r.remaining; r.remaining = 0; }
1238
1239 void release( int64_t amount = -1 ) {
1240 if( amount == -1 || amount > remaining )
1241 amount = remaining;
1242
1243 if (remaining)
1244 lock->release( amount );
1245 remaining -= amount;
1246 }
1247
~ReleaserFlowLock::Releaser1248 ~Releaser() { if (remaining) lock->release(remaining); }
1249 };
1250
FlowLockFlowLock1251 FlowLock() : permits(1), active(0) {}
FlowLockFlowLock1252 explicit FlowLock(int64_t permits) : permits(permits), active(0) {}
1253
1254 Future<Void> take(int taskID = TaskDefaultYield, int64_t amount = 1) {
1255 if (active + amount <= permits || active == 0) {
1256 active += amount;
1257 return safeYieldActor(this, taskID, amount);
1258 }
1259 return takeActor(this, taskID, amount);
1260 }
1261 void release( int64_t amount = 1 ) {
1262 ASSERT( (active > 0 || amount == 0) && active - amount >= 0 );
1263 active -= amount;
1264
1265 while( !takers.empty() ) {
1266 if( active + takers.begin()->second <= permits || active == 0 ) {
1267 std::pair< Promise<Void>, int64_t > next = std::move( *takers.begin() );
1268 active += next.second;
1269 takers.pop_front();
1270 next.first.send(Void());
1271 } else {
1272 break;
1273 }
1274 }
1275 }
1276
1277 Future<Void> releaseWhen( Future<Void> const& signal, int amount = 1 ) { return releaseWhenActor( this, signal, amount ); }
1278
1279 // returns when any permits are available, having taken as many as possible up to the given amount, and modifies amount to the number of permits taken
takeUpToFlowLock1280 Future<Void> takeUpTo(int64_t& amount) {
1281 return takeMoreActor(this, &amount);
1282 }
1283
availableFlowLock1284 int64_t available() const { return permits - active; }
activePermitsFlowLock1285 int64_t activePermits() const { return active; }
waitersFlowLock1286 int waiters() const { return takers.size(); }
1287 private:
1288 std::list< std::pair< Promise<Void>, int64_t > > takers;
1289 const int64_t permits;
1290 int64_t active;
1291 Promise<Void> broken_on_destruct;
1292
takeActorFlowLock1293 ACTOR static Future<Void> takeActor(FlowLock* lock, int taskID, int64_t amount) {
1294 state std::list<std::pair<Promise<Void>, int64_t>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise<Void>(), amount));
1295
1296 try {
1297 wait( it->first.getFuture() );
1298 } catch (Error& e) {
1299 if (e.code() == error_code_actor_cancelled) {
1300 lock->takers.erase(it);
1301 lock->release(0);
1302 }
1303 throw;
1304 }
1305 try {
1306 double duration = BUGGIFY_WITH_PROB(.001) ? g_random->random01()*FLOW_KNOBS->BUGGIFY_FLOW_LOCK_RELEASE_DELAY : 0.0;
1307 choose{ when(wait(delay(duration, taskID))) {} // So release()ing the lock doesn't cause arbitrary code to run on the stack
1308 when(wait(lock->broken_on_destruct.getFuture())) {} }
1309 return Void();
1310 } catch (...) {
1311 TEST(true); // If we get cancelled here, we are holding the lock but the caller doesn't know, so release it
1312 lock->release(amount);
1313 throw;
1314 }
1315 }
1316
takeMoreActorFlowLock1317 ACTOR static Future<Void> takeMoreActor(FlowLock* lock, int64_t* amount) {
1318 wait(lock->take());
1319 int64_t extra = std::min( lock->available(), *amount-1 );
1320 lock->active += extra;
1321 *amount = 1 + extra;
1322 return Void();
1323 }
1324
safeYieldActorFlowLock1325 ACTOR static Future<Void> safeYieldActor(FlowLock* lock, int taskID, int64_t amount) {
1326 try {
1327 choose{
1328 when(wait(yield(taskID))) {}
1329 when(wait(lock->broken_on_destruct.getFuture())) {}
1330 }
1331 return Void();
1332 } catch (Error& e) {
1333 lock->release(amount);
1334 throw;
1335 }
1336 }
1337
releaseWhenActorFlowLock1338 ACTOR static Future<Void> releaseWhenActor( FlowLock* self, Future<Void> signal, int64_t amount ) {
1339 wait(signal);
1340 self->release(amount);
1341 return Void();
1342 }
1343 };
1344
1345 ACTOR template <class T>
1346 Future<Void> yieldPromiseStream( FutureStream<T> input, PromiseStream<T> output, int taskID = TaskDefaultYield ) {
1347 loop {
1348 T f = waitNext( input );
1349 output.send( f );
1350 wait( yield( taskID ) );
1351 }
1352 }
1353
1354
1355 struct YieldedFutureActor : SAV<Void>, ActorCallback<YieldedFutureActor, 1, Void>, FastAllocated<YieldedFutureActor> {
1356 Error in_error_state;
1357
1358 typedef ActorCallback<YieldedFutureActor, 1, Void> CB1;
1359
1360 using FastAllocated<YieldedFutureActor>::operator new;
1361 using FastAllocated<YieldedFutureActor>::operator delete;
1362
YieldedFutureActorYieldedFutureActor1363 YieldedFutureActor(Future<Void> && f) : SAV<Void>(1, 1), in_error_state(Error::fromCode(UNSET_ERROR_CODE)) {
1364 f.addYieldedCallbackAndClear(static_cast< ActorCallback< YieldedFutureActor, 1, Void >* >(this));
1365 }
1366
cancelYieldedFutureActor1367 void cancel()
1368 {
1369 if (!SAV<Void>::canBeSet()) return; // Cancel could be invoked *by* a callback within finish(). Otherwise it's guaranteed that we are waiting either on the original future or on a delay().
1370 ActorCallback<YieldedFutureActor, 1, Void>::remove();
1371 SAV<Void>::sendErrorAndDelPromiseRef(actor_cancelled());
1372 }
1373
destroyYieldedFutureActor1374 virtual void destroy() {
1375 delete this;
1376 }
1377
a_callback_fireYieldedFutureActor1378 void a_callback_fire(ActorCallback<YieldedFutureActor, 1, Void>*, Void) {
1379 if (int16_t(in_error_state.code()) == UNSET_ERROR_CODE) {
1380 in_error_state = Error::fromCode(SET_ERROR_CODE);
1381 if (check_yield())
1382 doYield();
1383 else
1384 finish();
1385 } else {
1386 // We hit this case when and only when the delay() created by a previous doYield() fires. Then we want to get at least one task done, regardless of what check_yield() would say.
1387 finish();
1388 }
1389 }
a_callback_errorYieldedFutureActor1390 void a_callback_error(ActorCallback<YieldedFutureActor, 1, Void>*, Error const& err) {
1391 ASSERT(int16_t(in_error_state.code()) == UNSET_ERROR_CODE);
1392 in_error_state = err;
1393 if (check_yield())
1394 doYield();
1395 else
1396 finish();
1397 }
finishYieldedFutureActor1398 void finish() {
1399 ActorCallback<YieldedFutureActor, 1, Void>::remove();
1400 if (int16_t(in_error_state.code()) == SET_ERROR_CODE)
1401 SAV<Void>::sendAndDelPromiseRef(Void());
1402 else
1403 SAV<Void>::sendErrorAndDelPromiseRef(in_error_state);
1404 }
doYieldYieldedFutureActor1405 void doYield() {
1406 // Since we are being fired, we are the first callback in the ring, and `prev` is the source future
1407 Callback<Void>* source = CB1::prev;
1408 ASSERT(source->next == static_cast<CB1*>(this));
1409
1410 // Remove the source future from the ring. All the remaining callbacks in the ring should be yielded, since yielded callbacks are installed at the end
1411 CB1::prev = source->prev;
1412 CB1::prev->next = static_cast<CB1*>(this);
1413
1414 // The source future's ring is now empty, since we have removed all the callbacks
1415 source->next = source->prev = source;
1416 source->unwait();
1417
1418 // Link all the callbacks, including this one, into the ring of a delay future so that after a short time they will be fired again
1419 delay(0, g_network->getCurrentTask()).addCallbackChainAndClear(static_cast< CB1* >(this));
1420 }
1421 };
1422
yieldedFuture(Future<Void> f)1423 static Future<Void> yieldedFuture(Future<Void> f) {
1424 if (f.isReady())
1425 return yield();
1426 else
1427 return Future<Void>(new YieldedFutureActor(std::move(f)));
1428 }
1429
1430 //An AsyncMap that uses a yieldedFuture in its onChange method.
1431 template <class K, class V>
1432 class YieldedAsyncMap : public AsyncMap<K, V> {
1433 public:
onChange(K const & k)1434 Future<Void> onChange(K const& k) { // throws broken_promise if this is destroyed
1435 auto &item = AsyncMap<K, V>::items[k];
1436 if (item.value == AsyncMap<K, V>::defaultValue)
1437 return destroyOnCancelYield(this, k, item.change.getFuture());
1438 return yieldedFuture(item.change.getFuture());
1439 }
1440
destroyOnCancelYield(YieldedAsyncMap * self,K key,Future<Void> change)1441 ACTOR static Future<Void> destroyOnCancelYield( YieldedAsyncMap* self, K key, Future<Void> change ) {
1442 try {
1443 wait(yieldedFuture(change));
1444 return Void();
1445 } catch (Error& e) {
1446 if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount() == 1 && change.getPromiseReferenceCount() == 1) {
1447 if(EXPENSIVE_VALIDATION) {
1448 auto& p = self->items[key];
1449 ASSERT(p.change.getFuture() == change);
1450 }
1451 self->items.erase(key);
1452 }
1453 throw;
1454 }
1455 }
1456 };
1457
1458 ACTOR template <class T>
delayActionJittered(Future<T> what,double time)1459 Future<T> delayActionJittered( Future<T> what, double time ) {
1460 wait( delayJittered( time ) );
1461 T t = wait( what );
1462 return t;
1463 }
1464
1465 class AndFuture {
1466 public:
AndFuture()1467 AndFuture() { }
1468
AndFuture(AndFuture const & f)1469 AndFuture(AndFuture const& f) {
1470 futures = f.futures;
1471 }
1472
AndFuture(AndFuture && f)1473 AndFuture(AndFuture&& f) BOOST_NOEXCEPT {
1474 futures = std::move(f.futures);
1475 }
1476
AndFuture(Future<Void> const & f)1477 AndFuture(Future<Void> const& f) {
1478 futures.push_back(f);
1479 }
1480
AndFuture(Error const & e)1481 AndFuture(Error const& e) {
1482 futures.push_back(e);
1483 }
1484
1485 operator Future<Void>() {
1486 return getFuture();
1487 }
1488
1489 void operator=(AndFuture const& f) {
1490 futures = f.futures;
1491 }
1492
1493 void operator=(AndFuture&& f) BOOST_NOEXCEPT {
1494 futures = std::move(f.futures);
1495 }
1496
1497 void operator=(Future<Void> const& f) {
1498 futures.push_back(f);
1499 }
1500
1501 void operator=(Error const& e) {
1502 futures.push_back(e);
1503 }
1504
getFuture()1505 Future<Void> getFuture() {
1506 if(futures.empty())
1507 return Void();
1508
1509 if(futures.size() == 1)
1510 return futures[0];
1511
1512 Future<Void> f = waitForAll(futures);
1513 futures = std::vector<Future<Void>>();
1514 futures.push_back(f);
1515 return f;
1516 }
1517
isReady()1518 bool isReady() {
1519 for( int i = futures.size() - 1; i >= 0; --i ) {
1520 if( !futures[i].isReady() ) {
1521 return false;
1522 }
1523 else if(!futures[i].isError()) {
1524 swapAndPop(&futures, i);
1525 }
1526 }
1527 return true;
1528 }
1529
isError()1530 bool isError() {
1531 for( int i = 0; i < futures.size(); i++ )
1532 if( futures[i].isError() )
1533 return true;
1534 return false;
1535 }
1536
cleanup()1537 void cleanup() {
1538 for( int i = 0; i < futures.size(); i++ ) {
1539 if( futures[i].isReady() && !futures[i].isError() ) {
1540 swapAndPop(&futures, i--);
1541 }
1542 }
1543 }
1544
add(Future<Void> const & f)1545 void add(Future<Void> const& f) {
1546 if(!f.isReady() || f.isError())
1547 futures.push_back(f);
1548 }
1549
add(AndFuture f)1550 void add(AndFuture f) {
1551 add(f.getFuture());
1552 }
1553
1554 private:
1555 std::vector<Future<Void>> futures;
1556 };
1557
1558 // Performs an unordered merge of a and b.
1559 ACTOR template <class T>
unorderedMergeStreams(FutureStream<T> a,FutureStream<T> b,PromiseStream<T> output)1560 Future<Void> unorderedMergeStreams(FutureStream<T> a, FutureStream<T> b, PromiseStream<T> output) {
1561 state Future<T> aFuture = waitAndForward(a);
1562 state Future<T> bFuture = waitAndForward(b);
1563 state bool aOpen = true;
1564 state bool bOpen = true;
1565
1566 loop{
1567 try {
1568 choose {
1569 when(T val = wait(aFuture)) {
1570 output.send(val);
1571 aFuture = waitAndForward(a);
1572 }
1573 when(T val = wait(bFuture)) {
1574 output.send(val);
1575 bFuture = waitAndForward(b);
1576 }
1577 }
1578 }
1579 catch (Error &e) {
1580 if (e.code() != error_code_end_of_stream) {
1581 output.sendError(e);
1582 break;
1583 }
1584
1585 ASSERT(!aFuture.isError() || !bFuture.isError() || aFuture.getError().code() == bFuture.getError().code());
1586
1587 if (aFuture.isError()) {
1588 aFuture = Never();
1589 aOpen = false;
1590 }
1591 if (bFuture.isError()) {
1592 bFuture = Never();
1593 bOpen = false;
1594 }
1595
1596 if (!aOpen && !bOpen) {
1597 output.sendError(e);
1598 break;
1599 }
1600 }
1601 }
1602
1603 return Void();
1604 }
1605
1606 // Returns the ordered merge of a and b, assuming that a and b are both already ordered (prefer a over b if keys are equal). T must be a class that implements compare()
1607 ACTOR template <class T>
orderedMergeStreams(FutureStream<T> a,FutureStream<T> b,PromiseStream<T> output)1608 Future<Void> orderedMergeStreams( FutureStream<T> a, FutureStream<T> b, PromiseStream<T> output ) {
1609 state Optional<T> savedKVa;
1610 state bool aOpen;
1611 state Optional<T> savedKVb;
1612 state bool bOpen;
1613
1614 aOpen = bOpen = true;
1615
1616 loop {
1617 if ( aOpen && !savedKVa.present() ) {
1618 try {
1619 T KVa = waitNext( a );
1620 savedKVa = Optional<T>( KVa );
1621 } catch ( Error& e ) {
1622 if ( e.code() == error_code_end_of_stream ) {
1623 aOpen = false;
1624 if (!bOpen) {
1625 output.sendError(e);
1626 }
1627 } else {
1628 output.sendError(e);
1629 break;
1630 }
1631 }
1632 }
1633 if ( bOpen && !savedKVb.present() ) {
1634 try {
1635 T KVb = waitNext( b );
1636 savedKVb = Optional<T>( KVb );
1637 } catch ( Error& e ) {
1638 if ( e.code() == error_code_end_of_stream ) {
1639 bOpen = false;
1640 if (!aOpen) {
1641 output.sendError(e);
1642 }
1643 } else {
1644 output.sendError(e);
1645 break;
1646 }
1647 }
1648 }
1649
1650 if (!aOpen) {
1651 output.send( savedKVb.get() );
1652 savedKVb = Optional<T>();
1653 } else if (!bOpen) {
1654 output.send( savedKVa.get() );
1655 savedKVa = Optional<T>();
1656 } else {
1657 int cmp = savedKVa.get().compare( savedKVb.get() );
1658
1659 if ( cmp == 0 ) {
1660 // prefer a
1661 output.send( savedKVa.get() );
1662 savedKVa = Optional<T>();
1663 savedKVb = Optional<T>();
1664 } else if ( cmp < 0 ) {
1665 output.send( savedKVa.get() );
1666 savedKVa = Optional<T>();
1667 } else {
1668 output.send( savedKVb.get() );
1669 savedKVb = Optional<T>();
1670 }
1671 }
1672 }
1673
1674 return Void();
1675 }
1676
1677
1678 ACTOR template<class T>
timeReply(Future<T> replyToTime,PromiseStream<double> timeOutput)1679 Future<Void> timeReply(Future<T> replyToTime, PromiseStream<double> timeOutput){
1680 state double startTime = now();
1681 try {
1682 T _ = wait(replyToTime);
1683 wait( delay(0) );
1684 timeOutput.send(now() - startTime);
1685 } catch( Error &e ) {
1686 // Ignore broken promises. They typically occur during shutdown and our callers don't want to have to create brokenPromiseToNever actors to ignore them. For what it's worth we are breaking timeOutput to pass the pain along.
1687 if( e.code() != error_code_broken_promise )
1688 throw;
1689 }
1690 return Void();
1691 }
1692
1693 #include "flow/unactorcompiler.h"
1694
1695 #endif
1696