1 /*
2  * genericactors.actor.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
21 #pragma once
23 // When actually compiled (NO_INTELLISENSE), include the generated version of this file.  In intellisense use the source version.
26 	#include "flow/genericactors.actor.g.h"
27 #elif !defined(GENERICACTORS_ACTOR_H)
30 #include <list>
32 #include "flow/flow.h"
33 #include "flow/Knobs.h"
34 #include "flow/Util.h"
35 #include "flow/actorcompiler.h"  // This must be the last #include.
36 #pragma warning( disable: 4355 )	// 'this' : used in base member initializer list
38 ACTOR template<class T, class X>
39 Future<T> traceAfter(Future<T> what, const char* type, const char* key, X value, bool traceErrors = false)
40 {
41 	try {
42 		T val = wait(what);
43 		TraceEvent(type).detail(key, value);
44 		return val;
catch(Error & e)45 	} catch( Error &e ) {
46 		if(traceErrors) TraceEvent(type).error(e,true).detail(key, value);
47 		throw;
48 	}
49 }
51 ACTOR template<class T, class X>
52 Future<T> traceAfterCall(Future<T> what, const char* type, const char* key, X func, bool traceErrors = false)
53 {
54 	try {
55 		state T val = wait(what);
56 		try {
57 			TraceEvent(type).detail(key, func(val));
catch(Error & e)58 		} catch( Error &e ) {
59 			TraceEvent(SevError, "TraceAfterCallError").error(e);
60 		}
61 		return val;
catch(Error & e)62 	} catch( Error &e ) {
63 		if(traceErrors) TraceEvent(type).error(e,true);
64 		throw;
65 	}
66 }
68 ACTOR template <class T>
stopAfter(Future<T> what)69 Future<Optional<T>> stopAfter( Future<T> what ) {
70 	state Optional<T> ret = T();
71 	try {
72 		T _ = wait(what);
73 		ret = Optional<T>(_);
74 	} catch (Error& e) {
75 		bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete || e.code() == error_code_actor_cancelled;
76 		TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e);
77 		if(!ok) {
78 			fprintf(stderr, "Fatal Error: %s\n", e.what());
79 			ret = Optional<T>();
80 		}
81 	}
82 	g_network->stop();
83 	return ret;
84 }
86 template <class T>
sorted(T range)87 T sorted(T range) {
88 	std::sort(range.begin(), range.end());
89 	return range;
90 }
92 template <class T>
93 inline std::vector<T>& operator , (std::vector<T>& v, T a) {
94 	v.push_back(a);
95 	return v;
96 }
98 template <class T>
99 inline std::vector<T>& operator , (std::vector<T> && v, T a) {
100 	return (const_cast<std::vector<T>&>(v), a);
101 }
103 template <class T>
errorOr(T t)104 ErrorOr<T> errorOr( T t ) {
105 	return ErrorOr<T>(t);
106 }
108 ACTOR template <class T>
errorOr(Future<T> f)109 Future<ErrorOr<T>> errorOr( Future<T> f ) {
110 	try {
111 		T t = wait(f);
112 		return ErrorOr<T>(t);
113 	} catch (Error& e) {
114 		return ErrorOr<T>(e);
115 	}
116 }
118 ACTOR template <class T>
throwErrorOr(Future<ErrorOr<T>> f)119 Future<T> throwErrorOr( Future<ErrorOr<T>> f ) {
120 	ErrorOr<T> t = wait(f);
121 	if(t.isError()) throw t.getError();
122 	return t.get();
123 }
125 ACTOR template <class T>
transformErrors(Future<T> f,Error err)126 Future<T> transformErrors( Future<T> f, Error err ) {
127 	try {
128 		T t = wait( f );
129 		return t;
130 	} catch( Error &e ) {
131 		if( e.code() == error_code_actor_cancelled )
132 			throw e;
133 		throw err;
134 	}
135 }
137 ACTOR template <class T>
transformError(Future<T> f,Error inErr,Error outErr)138 Future<T> transformError( Future<T> f, Error inErr, Error outErr ) {
139 	try {
140 		T t = wait( f );
141 		return t;
142 	} catch( Error &e ) {
143 		if( e.code() == inErr.code() )
144 			throw outErr;
145 		throw e;
146 	}
147 }
149 // Note that the RequestStream<T> version of forwardPromise doesn't exist, because what to do with errors?
151 ACTOR template <class T>
forwardEvent(Event * ev,Future<T> input)152 void forwardEvent( Event* ev, Future<T> input ) {
153 	try {
154 		T value = wait(input);
155 	} catch (Error&) {
156 	}
157 	ev->set();
158 }
160 ACTOR template <class T>
forwardEvent(Event * ev,T * t,Error * err,FutureStream<T> input)161 void forwardEvent( Event* ev, T* t, Error* err, FutureStream<T> input ) {
162 	try {
163 		T value = waitNext(input);
164 		*t = std::move(value);
165 		ev->set();
166 	} catch (Error& e) {
167 		*err = e;
168 		ev->set();
169 	}
170 }
172 ACTOR template <class T>
waitForAllReady(std::vector<Future<T>> results)173 Future<Void> waitForAllReady( std::vector<Future<T>> results ) {
174 	state int i = 0;
175 	loop {
176 		if (i == results.size()) return Void();
177 		try {
178 			T r = wait( results[i] );
179 		} catch (...) {
180 		}
181 		i++;
182 	}
183 }
185 ACTOR template <class T>
186 Future<T> timeout( Future<T> what, double time, T timedoutValue, int taskID = TaskDefaultDelay ) {
187 	Future<Void> end = delay( time, taskID );
188 	choose {
189 		when( T t = wait( what ) ) { return t; }
when(wait (end))190 		when( wait( end ) ) { return timedoutValue; }
191 	}
192 }
194 ACTOR template <class T>
timeout(Future<T> what,double time)195 Future<Optional<T>> timeout( Future<T> what, double time ) {
196 	Future<Void> end = delay( time );
197 	choose {
198 		when( T t = wait( what ) ) { return t; }
199 		when( wait( end ) ) { return Optional<T>(); }
200 	}
201 }
203 ACTOR template <class T>
204 Future<T> timeoutError( Future<T> what, double time, int taskID = TaskDefaultDelay ) {
205 	Future<Void> end = delay( time, taskID );
206 	choose {
207 		when( T t = wait( what ) ) { return t; }
when(wait (end))208 		when( wait( end ) ) { throw timed_out(); }
209 	}
210 }
212 ACTOR template <class T>
213 Future<T> delayed( Future<T> what, double time = 0.0, int taskID = TaskDefaultDelay  ) {
214 	try {
215 		state T t = wait( what );
216 		wait( delay( time, taskID ) );
217 		return t;
catch(Error & e)218 	} catch( Error &e ) {
219 		state Error err = e;
220 		wait( delay( time, taskID ) );
221 		throw err;
222 	}
223 }
225 ACTOR template<class Func>
226 Future<Void> recurring( Func what, double interval, int taskID = TaskDefaultDelay ) {
227 	loop choose {
when(wait (delay (interval,taskID)))228 		when ( wait( delay( interval, taskID ) ) ) { what(); }
229 	}
230 }
232 ACTOR template<class Func>
trigger(Func what,Future<Void> signal)233 Future<Void> trigger( Func what, Future<Void> signal ) {
234 	wait( signal );
235 	what();
236 	return Void();
237 }
239 ACTOR template<class Func>
triggerOnError(Func what,Future<Void> signal)240 Future<Void> triggerOnError( Func what, Future<Void> signal ) {
241 	try {
242 		wait( signal );
243 	}
244 	catch(Error &e) {
245 		what();
246 	}
248 	return Void();
249 }
251 //Waits for a future to complete and cannot be cancelled
252 //Most situations will use the overload below, which does not require a promise
253 ACTOR template<class T>
uncancellable(Future<T> what,Promise<T> result)254 void uncancellable(Future<T> what, Promise<T> result)
255 {
256 	try {
257 		T val = wait(what);
258 		result.send(val);
259 	} catch( Error &e ) {
260 		result.sendError(e);
261 	}
262 }
264 //Waits for a future to complete and cannot be cancelled
265 ACTOR template<class T>
uncancellable(Future<T> what)266 Future<T> uncancellable(Future<T> what)
267 {
268 	Promise<T> resultPromise;
269 	Future<T> result = resultPromise.getFuture();
271 	uncancellable(what, resultPromise);
272 	T val = wait(result);
274 	return val;
275 }
277 //Holds onto an object until a future either completes or is cancelled
278 //Used to prevent the object from being reclaimed
279 //
280 // NOTE: the order of the arguments is important. The arguments will be destructed in
281 // reverse order, and we need the object to be destructed last.
282 ACTOR template<class T, class X>
holdWhile(X object,Future<T> what)283 Future<T> holdWhile(X object, Future<T> what)
284 {
285 	T val = wait(what);
286 	return val;
287 }
289 ACTOR template<class T, class X>
holdWhileVoid(X object,Future<T> what)290 Future<Void> holdWhileVoid(X object, Future<T> what)
291 {
292 	T val = wait(what);
293 	return Void();
294 }
296 template<class T>
store(T & out,Future<T> what)297 Future<Void> store(T &out, Future<T> what) {
298 	return map(what, [&out](T const &v) { out = v; return Void(); });
299 }
301 template<class T>
302 Future<Void> storeOrThrow(T &out, Future<Optional<T>> what, Error e = key_not_found()) {
303 	return map(what, [&out,e](Optional<T> const &o) {
304 		if(!o.present())
305 			throw e;
306 		out = o.get();
307 		return Void();
308 	});
309 }
311 //Waits for a future to be ready, and then applies an asynchronous function to it.
312 ACTOR template<class T, class F, class U = decltype( fake<F>()(fake<T>()).getValue() )>
mapAsync(Future<T> what,F actorFunc)313 Future<U> mapAsync(Future<T> what, F actorFunc)
314 {
315 	T val = wait(what);
316 	U ret = wait(actorFunc(val));
317 	return ret;
318 }
320 //maps a vector of futures with an asynchronous function
321 template<class T, class F>
mapAsync(std::vector<Future<T>> const & what,F const & actorFunc)322 std::vector<Future<decltype(actorFunc(T()).getValue())>> mapAsync(std::vector<Future<T>> const& what, F const& actorFunc)
323 {
324 	std::vector<typename std::result_of<F(T)>::type> ret;
325 	for(auto f : what)
326 		ret.push_back(mapAsync( f, actorFunc ));
327 	return ret;
328 }
330 //maps a stream with an asynchronous function
331 ACTOR template<class T, class F, class U = decltype( fake<F>()(fake<T>()).getValue() )>
mapAsync(FutureStream<T> input,F actorFunc,PromiseStream<U> output)332 Future<Void> mapAsync( FutureStream<T> input, F actorFunc, PromiseStream<U> output ) {
333 	state Deque<Future<U>> futures;
335 	loop {
336 		try {
337 			choose {
338 				when( T nextInput = waitNext( input ) ) {
339 					futures.push_back( actorFunc(nextInput) );
340 				}
341 				when( U nextOutput = wait( futures.size() == 0 ? Never() : futures.front() ) ) {
342 					output.send( nextOutput );
343 					futures.pop_front();
344 				}
346 			}
347 		}
348 		catch ( Error& e ) {
349 			if( e.code() == error_code_end_of_stream ) {
350 				break;
351 			}
352 			else {
353 				output.sendError( e );
354 				throw e;
355 			}
356 		}
357 	}
359 	while(futures.size()) {
360 		U nextOutput = wait( futures.front() );
361 		output.send( nextOutput );
362 		futures.pop_front();
363 	}
365 	output.sendError(end_of_stream());
367 	return Void();
369 }
371 //Waits for a future to be ready, and then applies a function to it.
372 ACTOR template<class T, class F>
map(Future<T> what,F func)373 Future<typename std::result_of<F(T)>::type> map(Future<T> what, F func)
374 {
375 	T val = wait(what);
376 	return func(val);
377 }
379 //maps a vector of futures
380 template<class T, class F>
map(std::vector<Future<T>> const & what,F const & func)381 std::vector<Future<typename std::result_of<F(T)>>> map(std::vector<Future<T>> const& what, F const& func)
382 {
383 	std::vector<Future<typename std::result_of<F(T)>>> ret;
384 	for(auto f : what)
385 		ret.push_back(map( f, func ));
386 	return ret;
387 }
389 //maps a stream
390 ACTOR template<class T, class F>
map(FutureStream<T> input,F func,PromiseStream<typename std::result_of<F (T)>> output)391 Future<Void> map( FutureStream<T> input, F func, PromiseStream<typename std::result_of<F(T)>> output )
392 {
393 	loop {
394 		try {
395 			T nextInput = waitNext( input );
396 			output.send(func(nextInput));
397 		}
398 		catch ( Error& e ) {
399 			if( e.code() == error_code_end_of_stream ) {
400 				break;
401 			}
402 			else
403 				throw;
404 		}
405 	}
407 	output.sendError(end_of_stream());
409 	return Void();
410 }
412 //Returns if the future returns true, otherwise waits forever.
returnIfTrue(Future<bool> f)413 ACTOR static Future<Void> returnIfTrue( Future<bool> f )
414 {
415 	bool b = wait( f );
416 	if ( b ) {
417 		return Void();
418 	}
419 	wait( Never() );
420 	throw internal_error();
421 }
423 //Returns if the future, when waited on and then evaluated with the predicate, returns true, otherwise waits forever
424 template<class T, class F>
returnIfTrue(Future<T> what,F pred)425 Future<Void> returnIfTrue( Future<T> what, F pred)
426 {
427 	return returnIfTrue( map( what, pred ) );
428 }
430 //filters a stream
431 ACTOR template<class T, class F>
filter(FutureStream<T> input,F pred,PromiseStream<T> output)432 Future<Void> filter( FutureStream<T> input, F pred, PromiseStream<T> output )
433 {
434 	loop {
435 		try {
436 			T nextInput = waitNext( input );
437 			if(func(nextInput))
438 				output.send(nextInput);
439 		}
440 		catch ( Error& e ) {
441 			if( e.code() == error_code_end_of_stream ) {
442 				break;
443 			}
444 			else
445 				throw;
446 		}
447 	}
449 	output.sendError(end_of_stream());
451 	return Void();
452 }
454 //filters a stream asynchronously
455 ACTOR template<class T, class F>
asyncFilter(FutureStream<T> input,F actorPred,PromiseStream<T> output)456 Future<Void> asyncFilter( FutureStream<T> input, F actorPred, PromiseStream<T> output )
457 {
458 	state Deque<std::pair<T, Future<bool>>> futures;
459 	state std::pair<T, Future<bool>> p;
461 	loop {
462 		try {
463 			choose {
464 				when ( T nextInput = waitNext(input) ) {
465 					futures.push_back( std::pair<T, Future<bool>>(nextInput, actorPred(nextInput)) );
466 				}
467 				when ( bool pass = wait( futures.size() == 0 ? Never() : futures.front().second ) ) {
468 					if(pass) output.send(futures.front().first);
469 					futures.pop_front();
470 				}
471 			}
472 		}
473 		catch ( Error& e ) {
474 			if( e.code() == error_code_end_of_stream ) {
475 				break;
476 			}
477 			else {
478 				throw e;
479 			}
480 		}
481 	}
483 	while(futures.size()) {
484 		p = futures.front();
485 		bool pass = wait( p.second );
486 		if(pass) output.send(p.first);
487 		futures.pop_front();
488 	}
490 	output.sendError(end_of_stream());
492 	return Void();
493 }
495 template <class T>
496 struct WorkerCache {
497 	// SOMEDAY: Would we do better to use "unreliable" (at most once) transport for the initialize requests and get rid of this?
498 	// It doesn't provide true at most once behavior because things are removed from the cache after they have terminated.
existsWorkerCache499 	bool exists( UID id ) {
500 		return id_interface.count( id ) != 0;
501 	}
setWorkerCache502 	void set( UID id, const Future<T>& onReady ) {
503 		ASSERT( !exists( id ) );
504 		id_interface[ id ] = onReady;
505 	}
getWorkerCache506 	Future<T> get( UID id ) {
507 		ASSERT( exists( id ) );
508 		return id_interface[ id ];
509 	}
removeOnReadyWorkerCache511 	Future<Void> removeOnReady( UID id, Future<Void> const& ready ) {
512 		return removeOnReady( this, id, ready );
513 	}
514 private:
removeOnReadyWorkerCache515 	ACTOR static Future<Void> removeOnReady( WorkerCache* self, UID id, Future<Void> ready ) {
516 		try {
517 			wait(ready);
518 			self->id_interface.erase(id);
519 			return Void();
520 		} catch ( Error &e ) {
521 			self->id_interface.erase(id);
522 			throw;
523 		}
524 	}
526 	std::map<UID, Future<T>> id_interface;
527 };
529 template <class K, class V>
530 class AsyncMap : NonCopyable {
531 public:
532 	// Represents a complete function from keys to values (K -> V)
533 	// All values not explicitly inserted map to V()
534 	// If this isn't appropriate, use V=Optional<X>
AsyncMap()536 	AsyncMap() : defaultValue(), destructing(false) {}
~AsyncMap()538 	virtual ~AsyncMap() {
539 		destructing = true;
540 		items.clear();
541 	}
set(K const & k,V const & v)543 	void set( K const& k, V const& v ) {
544 		auto& i = items[k];
545 		if (i.value != v)
546 			setUnconditional(k,v,i);
547 	}
setUnconditional(K const & k,V const & v)548 	void setUnconditional( K const& k, V const& v ) {
549 		setUnconditional(k,v,items[k]);
550 	}
triggerAll()551 	void triggerAll() {
552 		std::vector<Promise<Void>> ps;
553 		for(auto it = items.begin(); it != items.end(); ++it){
554 			ps.resize(ps.size()+1);
555 			ps.back().swap( it->second.change );
556 		}
557 		std::vector<Promise<Void>> noDestroy = ps;  // See explanation of noDestroy in setUnconditional()
558 		for(auto p=ps.begin(); p!=ps.end(); ++p)
559 			p->send(Void());
560 	}
triggerRange(K const & begin,K const & end)561 	void triggerRange( K const& begin, K const& end ) {
562 		std::vector<Promise<Void>> ps;
563 		for(auto it = items.lower_bound(begin); it != items.end() && it->first < end; ++it){
564 			ps.resize(ps.size()+1);
565 			ps.back().swap( it->second.change );
566 		}
567 		std::vector<Promise<Void>> noDestroy = ps;  // See explanation of noDestroy in setUnconditional()
568 		for(auto p=ps.begin(); p!=ps.end(); ++p)
569 			p->send(Void());
570 	}
trigger(K const & key)571 	void trigger( K const& key ) {
572 		if( items.count(key) != 0 ) {
573 			auto& i = items[key];
574 			Promise<Void> trigger;
575 			i.change.swap(trigger);
576 			Promise<Void> noDestroy = trigger;  // See explanation of noDestroy in setUnconditional()
578 			if (i.value == defaultValue)
579 				items.erase(key);
581 			trigger.send(Void());
582 		}
583 	}
clear(K const & k)584 	void clear( K const& k ) { set(k, V()); }
get(K const & k)585 	V const& get( K const& k ) {
586 		auto it = items.find(k);
587 		if (it != items.end())
588 			return it->second.value;
589 		else
590 			return defaultValue;
591 	}
count(K const & k)592 	int count( K const& k ) {
593 		auto it = items.find(k);
594 		if (it != items.end())
595 			return 1;
596 		return 0;
597 	}
onChange(K const & k)598 	virtual Future<Void> onChange( K const& k ) {	// throws broken_promise if this is destroyed
599 		auto &item = items[k];
600 		if (item.value == defaultValue)
601 			return destroyOnCancel( this, k, item.change.getFuture() );
602 		return item.change.getFuture();
603 	}
getKeys()604 	std::vector<K> getKeys() {
605 		std::vector<K> keys;
606 		for(auto i = items.begin(); i != items.end(); ++i)
607 			keys.push_back( i->first );
608 		return keys;
609 	}
resetNoWaiting()610 	void resetNoWaiting() {
611 		for(auto i = items.begin(); i != items.end(); ++i)
612 			ASSERT( i->second.change.getFuture().getFutureReferenceCount() == 1 );
613 		items.clear();
614 	}
616 protected:
617 	// Invariant: Every item in the map either has value!=defaultValue xor a destroyOnCancel actor waiting on change.getFuture()
618 	struct P {
619 		V value;
620 		Promise<Void> change;
PP621 		P() : value() {}
622 	};
623 	std::map<K, P> items;
624 	const V defaultValue;
625 	bool destructing;
setUnconditional(K const & k,V const & v,P & i)627 	void setUnconditional( K const& k, V const& v, P& i ) {
628 		Promise<Void> trigger;
629 		i.change.swap(trigger);
630 		Promise<Void> noDestroy = trigger;  // The send(Void()) or even V::operator= could cause destroyOnCancel,
631 			                                // which could undo the change to i.value here.  Keeping the promise reference count >= 2
632 			                                // prevents destroyOnCancel from erasing anything from the map.
634 		if (v == defaultValue)
635 			items.erase(k);
636 		else
637 			i.value = v;
639 		trigger.send(Void());
640 	}
destroyOnCancel(AsyncMap * self,K key,Future<Void> change)642 	ACTOR Future<Void> destroyOnCancel( AsyncMap* self, K key, Future<Void> change ) {
643 		try {
644 			wait(change);
645 			return Void();
646 		} catch (Error& e) {
647 			if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount()==1 && change.getPromiseReferenceCount()==1) {
649 					auto& p = self->items[key];
650 					ASSERT(p.change.getFuture() == change);
651 				}
652 				self->items.erase(key);
653 			}
654 			throw;
655 		}
656 	}
657 };
659 template <class V>
660 class AsyncVar : NonCopyable, public ReferenceCounted<AsyncVar<V>> {
661 public:
AsyncVar()662 	AsyncVar() : value() {}
AsyncVar(V const & v)663 	AsyncVar( V const& v ) : value(v) {}
AsyncVar(AsyncVar && av)664 	AsyncVar(AsyncVar&& av) : value(std::move(av.value)), nextChange(std::move(av.nextChange)) {}
665 	void operator=(AsyncVar&& av) { value = std::move(av.value); nextChange = std::move(av.nextChange); }
get()667 	V const& get() const {
668 		return value;
669 	}
onChange()670 	Future<Void> onChange() const {
671 		return nextChange.getFuture();
672 	}
set(V const & v)673 	void set( V const& v ) {
674 		if (v != value)
675 			setUnconditional(v);
676 	}
setUnconditional(V const & v)677 	void setUnconditional( V const& v ) {
678 		Promise<Void> t;
679 		this->nextChange.swap(t);
680 		this->value = v;
681 		t.send(Void());
682 	}
trigger()683 	void trigger() {
684 		Promise<Void> t;
685 		this->nextChange.swap(t);
686 		t.send(Void());
687 	}
689 private:
690 	V value;
691 	Promise<Void> nextChange;
692 };
694 class AsyncTrigger : NonCopyable {
695 public:
AsyncTrigger()696 	AsyncTrigger() {}
AsyncTrigger(AsyncTrigger && at)697 	AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {}
698 	void operator=(AsyncTrigger&& at) { v = std::move(at.v); }
onTrigger()699 	Future<Void> onTrigger() {
700 		return v.onChange();
701 	}
trigger()702 	void trigger() {
703 		v.trigger();
704 	}
705 private:
706 	AsyncVar<Void> v;
707 };
709 class Debouncer : NonCopyable {
710 public:
Debouncer(double delay)711 	explicit Debouncer( double delay ) { worker = debounceWorker(this, delay); }
Debouncer(Debouncer && at)712 	Debouncer(Debouncer&& at) : input(std::move(at.input)), output(std::move(at.output)) {}
713 	void operator=(Debouncer&& at) { input = std::move(at.input); output = std::move(at.output); }
onTrigger()714 	Future<Void> onTrigger() {
715 		return output.onChange();
716 	}
trigger()717 	void trigger() {
718 		input.setUnconditional(Void());
719 	}
720 private:
721 	AsyncVar<Void> input;
722 	AsyncVar<Void> output;
723 	Future<Void> worker;
debounceWorker(Debouncer * self,double bounceTime)725 	ACTOR Future<Void> debounceWorker( Debouncer* self, double bounceTime ) {
726 		loop {
727 			wait( self->input.onChange() );
728 			loop {
729 				choose {
730 					when(wait( self->input.onChange() )) {}
731 					when(wait( delay(bounceTime) )) { break; }
732 				}
733 			}
734 			self->output.setUnconditional(Void());
735 		}
736 	}
737 };
asyncDeserialize(Reference<AsyncVar<Standalone<StringRef>>> input,Reference<AsyncVar<Optional<T>>> output)739 ACTOR template <class T> Future<Void> asyncDeserialize( Reference<AsyncVar<Standalone<StringRef>>> input, Reference<AsyncVar<Optional<T>>> output ) {
740 	loop {
741 		if (input->get().size())
742 			output->set( BinaryReader::fromStringRef<T>( input->get(), IncludeVersion() ) );
743 		else
744 			output->set( Optional<T>() );
745 		wait( input->onChange() );
746 	}
747 }
749 ACTOR template <class V, class T>
forwardVector(Future<V> values,std::vector<Promise<T>> out)750 void forwardVector( Future<V> values, std::vector<Promise<T>> out ) {
751 	V in = wait( values );
752 	ASSERT (in.size() == out.size());
753 	for(int i=0; i<out.size(); i++)
754 		out[i].send( in[i] );
755 }
757 ACTOR template <class T>
delayedAsyncVar(Reference<AsyncVar<T>> in,Reference<AsyncVar<T>> out,double time)758 Future<Void> delayedAsyncVar( Reference<AsyncVar<T>> in, Reference<AsyncVar<T>> out, double time ) {
759 	try {
760 		loop {
761 			wait( delay( time ) );
762 			out->set( in->get() );
763 			wait( in->onChange() );
764 		}
765 	} catch (Error& e) {
766 		out->set( in->get() );
767 		throw;
768 	}
769 }
771 ACTOR template <class T>
setAfter(Reference<AsyncVar<T>> var,double time,T val)772 Future<Void> setAfter( Reference<AsyncVar<T>> var, double time, T val ) {
773 	wait( delay( time ) );
774 	var->set( val );
775 	return Void();
776 }
778 ACTOR template <class T>
779 Future<Void> resetAfter( Reference<AsyncVar<T>> var, double time, T val, int warningLimit = -1, double warningResetDelay = 0, const char* context = NULL ) {
780 	state bool isEqual = var->get() == val;
781 	state Future<Void> resetDelay = isEqual ? Never() : delay(time);
782 	state int resetCount = 0;
783 	state double lastReset = now();
784 	loop {
785 		choose {
when(wait (resetDelay))786 			when( wait( resetDelay ) ) {
787 				var->set( val );
788 				if(now() - lastReset > warningResetDelay) {
789 					resetCount = 0;
790 				}
791 				resetCount++;
792 				if(context && warningLimit >= 0 && resetCount > warningLimit) {
793 					TraceEvent(SevWarnAlways, context).detail("ResetCount", resetCount).detail("LastReset", now() - lastReset);
794 				}
795 				lastReset = now();
796 				isEqual = true;
797 				resetDelay = Never();
798 			}
799 			when( wait( var->onChange() ) ) {}
800 		}
801 		if( isEqual && var->get() != val ) {
802 			isEqual = false;
803 			resetDelay = delay(time);
804 		}
805 		if( !isEqual && var->get() == val ) {
806 			isEqual = true;
807 			resetDelay = Never();
808 		}
809 	}
810 }
812 ACTOR template <class T>
setWhenDoneOrError(Future<Void> condition,Reference<AsyncVar<T>> var,T val)813 Future<Void> setWhenDoneOrError( Future<Void> condition, Reference<AsyncVar<T>> var, T val ) {
814 	try {
815 		wait( condition );
816 	}
817 	catch ( Error& e ) {
818 		if (e.code() == error_code_actor_cancelled) throw;
819 	}
820 	var->set( val );
821 	return Void();
822 }
824 Future<bool> allTrue( const std::vector<Future<bool>>& all );
825 Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> const& input, Reference<AsyncVar<bool>> const& output );
826 Future<Void> cancelOnly( std::vector<Future<Void>> const& futures );
827 Future<Void> timeoutWarningCollector( FutureStream<Void> const& input, double const& logDelay, const char* const& context, UID const& id );
828 Future<bool> quorumEqualsTrue( std::vector<Future<bool>> const& futures, int const& required );
830 ACTOR template <class T>
streamHelper(PromiseStream<T> output,PromiseStream<Error> errors,Future<T> input)831 Future<Void> streamHelper( PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input ) {
832 	try {
833 		T value = wait(input);
834 		output.send(value);
835 	} catch (Error& e) {
836 		if (e.code() == error_code_actor_cancelled)
837 			throw;
838 		errors.send(e);
839 	}
840 	return Void();
841 }
843 template <class T>
makeStream(const std::vector<Future<T>> & futures,PromiseStream<T> & stream,PromiseStream<Error> & errors)844 Future<Void> makeStream( const std::vector<Future<T>>& futures, PromiseStream<T>& stream, PromiseStream<Error>& errors ) {
845 	std::vector<Future<Void>> forwarders;
846 	for(int f=0; f<futures.size(); f++)
847 		forwarders.push_back( streamHelper( stream, errors, futures[f] ) );
848 	return cancelOnly(forwarders);
849 }
851 template <class T>
852 class QuorumCallback;
854 template <class T>
855 struct Quorum : SAV<Void> {
856 	int antiQuorum;
857 	int count;
sizeForQuorum859 	static inline int sizeFor(int count) {
860 		return sizeof(Quorum<T>) + sizeof(QuorumCallback<T>)*count;
861 	}
destroyQuorum863 	virtual void destroy() {
864 		int size = sizeFor(this->count);
865 		this->~Quorum();
866 		freeFast(size, this);
867 	}
cancelQuorum868 	virtual void cancel() {
869 		int cancelled_callbacks = 0;
870 		for (int i = 0; i < count; i++)
871 			if (callbacks()[i].next) {
872 				callbacks()[i].remove();
873 				callbacks()[i].next = 0;
874 				++cancelled_callbacks;
875 			}
876 		if (canBeSet())
877 			sendError(actor_cancelled());
878 		for (int i = 0; i < cancelled_callbacks; i++)
879 			delPromiseRef();
880 	}
QuorumQuorum881 	explicit Quorum(int quorum, int count) : SAV<Void>(1, count), antiQuorum(count - quorum + 1), count(count) {
882 		if (!quorum) this->send(Void());
883 	}
oneSuccessQuorum884 	void oneSuccess() {
885 		if (getPromiseReferenceCount() == antiQuorum && canBeSet())
886 			this->sendAndDelPromiseRef(Void());
887 		else
888 			delPromiseRef();
889 	}
oneErrorQuorum890 	void oneError(Error err) {
891 		if (canBeSet())
892 			this->sendErrorAndDelPromiseRef(err);
893 		else
894 			delPromiseRef();
895 	}
callbacksQuorum897 	QuorumCallback<T>* callbacks() { return (QuorumCallback<T>*)(this + 1); }
898 };
900 template <class T>
901 class QuorumCallback : public Callback<T> {
902 public:
QuorumCallback(Future<T> future,Quorum<T> * head)903 	QuorumCallback(Future<T> future, Quorum<T>* head)
904 		: head(head)
905 	{
906 		future.addCallbackAndClear(this);
907 	}
fire(const T & value)908 	virtual void fire(const T& value) {
909 		Callback<T>::remove();
910 		Callback<T>::next = 0;
911 		head->oneSuccess();
912 	}
error(Error error)913 	virtual void error(Error error) {
914 		Callback<T>::remove();
915 		Callback<T>::next = 0;
916 		head->oneError(error);
917 	}
919 private:
920 	Quorum<T>* head;
921 };
923 template <class T>
quorum(std::vector<Future<T>> const & results,int n)924 Future<Void> quorum(std::vector<Future<T>> const& results, int n) {
925 	ASSERT(n >= 0 && n <= results.size());
927 	int size = Quorum<T>::sizeFor(results.size());
928 	Quorum<T>* q = new (allocateFast(size)) Quorum<T>(n, results.size());
930 	QuorumCallback<T>* nextCallback = q->callbacks();
931 	for (auto & r : results) {
932 		if (r.isReady()) {
933 			nextCallback->next = 0;
934 			if (r.isError())
935 				q->oneError(r.getError());
936 			else
937 				q->oneSuccess();
938 		}
939 		else
940 			new (nextCallback) QuorumCallback<T>(r, q);
941 		++nextCallback;
942 	}
943 	return Future<Void>(q);
944 }
946 ACTOR template <class T>
947 Future<Void> smartQuorum( std::vector<Future<T>> results, int required, double extraSeconds, int taskID = TaskDefaultDelay ) {
948 	if (results.empty() && required == 0) return Void();
949 	wait(quorum(results, required));
950 	choose {
951 		when (wait(quorum(results, (int)results.size()))) {return Void();}
when(wait (delay (extraSeconds,taskID)))952 		when (wait(delay(extraSeconds, taskID))) {return Void(); }
953 	}
954 }
956 template <class T>
waitForAll(std::vector<Future<T>> const & results)957 Future<Void> waitForAll( std::vector<Future<T>> const& results ) {
958 	if (results.empty()) return Void();
959 	return quorum( results, (int)results.size() );
960 }
962 template <class T>
waitForAny(std::vector<Future<T>> const & results)963 Future<Void> waitForAny( std::vector<Future<T>> const& results ) {
964 	if (results.empty()) return Void();
965 	return quorum( results, 1 );
966 }
shortCircuitAny(std::vector<Future<bool>> f)968 ACTOR static Future<bool> shortCircuitAny( std::vector<Future<bool>> f )
969 {
970 	std::vector<Future<Void>> sc;
971 	for(Future<bool> fut : f) {
972 		sc.push_back(returnIfTrue(fut));
973 	}
975 	choose {
976 		when( wait( waitForAll( f ) ) ) {
977 			// Handle a possible race condition? If the _last_ term to
978 			// be evaluated triggers the waitForAll before bubbling
979 			// out of the returnIfTrue quorum
980 			for ( auto fut : f ) {
981 				if ( fut.get() ) {
982 					return true;
983 				}
984 			}
985 			return false;
986 		}
987 		when( wait( waitForAny( sc ) ) ) {
988 			return true;
989 		}
990 	}
991 }
993 ACTOR template <class T>
getAll(std::vector<Future<T>> input)994 Future<std::vector<T>> getAll( std::vector<Future<T>> input ) {
995 	if (input.empty()) return std::vector<T>();
996 	wait( quorum( input, input.size() ) );
998 	std::vector<T> output;
999 	for(int i=0; i<input.size(); i++)
1000 		output.push_back( input[i].get() );
1001 	return output;
1002 }
1004 ACTOR template <class T>
appendAll(std::vector<Future<std::vector<T>>> input)1005 Future<std::vector<T>> appendAll( std::vector<Future<std::vector<T>>> input ) {
1006 	wait( quorum( input, input.size() ) );
1008 	std::vector<T> output;
1009 	for(int i=0; i<input.size(); i++) {
1010 		auto const& r = input[i].get();
1011 		output.insert( output.end(), r.begin(), r.end() );
1012 	}
1013 	return output;
1014 }
onEqual(Future<T> in,T equalTo)1016 ACTOR template <class T> Future<Void> onEqual( Future<T> in, T equalTo ) {
1017 	T t = wait(in);
1018 	if ( t == equalTo )
1019 		return Void();
1020 	wait(Never());  // never return
1021 	throw internal_error();  // does not happen
1022 }
1024 ACTOR template <class T>
success(Future<T> of)1025 Future<Void> success( Future<T> of ) {
1026 	T t = wait( of );
1027 	return Void();
1028 }
1030 ACTOR template <class T>
ready(Future<T> f)1031 Future<Void> ready( Future<T> f ) {
1032 	try {
1033 		T _ = wait( f );
1034 	} catch (...) {
1035 	}
1036 	return Void();
1037 }
1039 ACTOR template <class T>
waitAndForward(FutureStream<T> input)1040 Future<T> waitAndForward( FutureStream<T> input ) {
1041 	T output = waitNext( input );
1042 	return output;
1043 }
1045 ACTOR template <class T>
reportErrorsExcept(Future<T> in,const char * context,UID id,std::set<int> const * pExceptErrors)1046 Future<T> reportErrorsExcept( Future<T> in, const char* context, UID id, std::set<int> const* pExceptErrors ) {
1047 	try {
1048 		T t = wait( in );
1049 		return t;
1050 	} catch (Error& e) {
1051 		if (e.code() != error_code_actor_cancelled && (!pExceptErrors || !pExceptErrors->count(e.code())))
1052 			TraceEvent(SevError, context, id).error(e);
1053 		throw;
1054 	}
1055 }
1057 template <class T>
1058 Future<T> reportErrors( Future<T> const& in, const char* context, UID id = UID() ) {
1059 	return reportErrorsExcept(in, context, id, NULL);
1060 }
1062 ACTOR template <class T>
require(Future<Optional<T>> in,int errorCode)1063 Future<T> require( Future<Optional<T>> in, int errorCode ) {
1064 	Optional<T> o = wait(in);
1065 	if (o.present()) {
1066 		return o.get();
1067 	}
1068 	else {
1069 		throw Error(errorCode);
1070 	}
1071 }
1073 ACTOR template <class T>
waitForFirst(std::vector<Future<T>> items)1074 Future<T> waitForFirst( std::vector<Future<T>> items ) {
1075 	state PromiseStream<T> resultStream;
1076 	state PromiseStream<Error> errorStream;
1078 	state Future<Void> forCancellation = makeStream( items, resultStream, errorStream );
1080 	state FutureStream<T> resultFutureStream = resultStream.getFuture();
1081 	state FutureStream<Error> errorFutureStream = errorStream.getFuture();
1083 	choose {
1084 		when (T val = waitNext( resultFutureStream )) {
1085 			forCancellation = Future<Void>();
1086 			return val;
1087 		}
1088 		when (Error e = waitNext( errorFutureStream )) {
1089 			forCancellation = Future<Void>();
1090 			throw e;
1091 		}
1092 	}
1093 }
1095 ACTOR template <class T>
tag(Future<Void> future,T what)1096 Future<T> tag( Future<Void> future, T what ) {
1097 	wait(future);
1098 	return what;
1099 }
1101 ACTOR template <class T>
tag(Future<Void> future,T what,PromiseStream<T> stream)1102 Future<Void> tag( Future<Void> future, T what, PromiseStream<T> stream ) {
1103 	wait( future );
1104 	stream.send( what );
1105 	return Void();
1106 }
1108 ACTOR template <class T>
tagError(Future<Void> future,Error e)1109 Future<T> tagError( Future<Void> future, Error e) {
1110 	wait(future);
1111 	throw e;
1112 }
1114 //If the future is ready, yields and returns. Otherwise, returns when future is set.
1115 template <class T>
orYield(Future<T> f)1116 Future<T> orYield( Future<T> f ) {
1117 	if(f.isReady()) {
1118 		if(f.isError())
1119 			return tagError<T>(yield(), f.getError());
1120 		else
1121 			return tag(yield(), f.get());
1122 	}
1123 	else
1124 		return f;
1125 }
orYield(Future<Void> f)1127 static Future<Void> orYield( Future<Void> f ) {
1128 	if(f.isReady()) {
1129 		if(f.isError())
1130 			return tagError<Void>(yield(), f.getError());
1131 		else
1132 			return yield();
1133 	}
1134 	else
1135 		return f;
1136 }
chooseActor(Future<T> lhs,Future<T> rhs)1138 ACTOR template <class T> Future<T> chooseActor( Future<T> lhs, Future<T> rhs ) {
1139 	choose {
1140 		when ( T t = wait(lhs) ) { return t; }
1141 		when ( T t = wait(rhs) ) { return t; }
1142 	}
1143 }
1145 // set && set -> set
1146 // error && x -> error
1147 // all others -> unset
1148 static Future<Void> operator &&( Future<Void> const& lhs, Future<Void> const& rhs ) {
1149 	if(lhs.isReady()) {
1150 		if(lhs.isError()) return lhs;
1151 		else return rhs;
1152 	}
1153 	if(rhs.isReady()) {
1154 		if(rhs.isError()) return rhs;
1155 		else return lhs;
1156 	}
1158 	std::vector<Future<Void>> v;
1159 	v.push_back( lhs );
1160 	v.push_back( rhs );
1161 	return waitForAll(v);
1162 }
1164 // error || unset -> error
1165 // unset || unset -> unset
1166 // all others -> set
1167 inline Future<Void> operator ||( Future<Void> const& lhs, Future<Void> const& rhs ) {
1168 	if(lhs.isReady()) {
1169 		if(lhs.isError()) return lhs;
1170 		if(rhs.isReady()) return rhs;
1171 		return lhs;
1172 	}
1174 	return chooseActor( lhs, rhs );
1175 }
brokenPromiseToNever(Future<T> in)1177 ACTOR template <class T> Future<T> brokenPromiseToNever( Future<T> in ) {
1178 	try {
1179 		T t = wait(in);
1180 		return t;
1181 	} catch (Error& e) {
1182 		if (e.code() != error_code_broken_promise)
1183 			throw;
1184 		wait(Never());  // never return
1185 		throw internal_error();  // does not happen
1186 	}
1187 }
brokenPromiseToMaybeDelivered(Future<T> in)1189 ACTOR template <class T> Future<T> brokenPromiseToMaybeDelivered( Future<T> in ) {
1190 	try {
1191 		T t = wait(in);
1192 		return t;
1193 	} catch (Error& e) {
1194 		if (e.code() == error_code_broken_promise) {
1195 			throw request_maybe_delivered();
1196 		}
1197 		throw;
1198 	}
1199 }
tagAndForward(Promise<T> * pOutputPromise,T value,Future<Void> signal)1201 ACTOR template <class T> void tagAndForward( Promise<T>* pOutputPromise, T value, Future<Void> signal ) {
1202 	state Promise<T> out( std::move(*pOutputPromise) );
1203 	wait( signal );
1204 	out.send(value);
1205 }
tagAndForwardError(Promise<T> * pOutputPromise,Error value,Future<Void> signal)1207 ACTOR template <class T> void tagAndForwardError( Promise<T>* pOutputPromise, Error value, Future<Void> signal ) {
1208 	state Promise<T> out( std::move(*pOutputPromise) );
1209 	wait( signal );
1210 	out.sendError(value);
1211 }
waitOrError(Future<T> f,Future<Void> errorSignal)1213 ACTOR template <class T> Future<T> waitOrError(Future<T> f, Future<Void> errorSignal) {
1214 	choose {
1215 		when(T val = wait(f)) {
1216 			return val;
1217 		}
1218 		when(wait(errorSignal)) {
1219 			ASSERT(false);
1220 			throw internal_error();
1221 		}
1222 	}
1223 }
1225 struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
1226 	// FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code between
1227 	// wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock is fewer than the
1228 	// number of permits, and release() makes the caller no longer a holder of the lock. release() only runs waiting take()rs
1229 	// after the caller wait()s
1231 	struct Releaser : NonCopyable {
1232 		FlowLock* lock;
1233 		int remaining;
ReleaserFlowLock::Releaser1234 		Releaser() : lock(0), remaining(0) {}
1235 		Releaser( FlowLock& lock, int64_t amount = 1 ) : lock(&lock), remaining(amount) {}
ReleaserFlowLock::Releaser1236 		Releaser(Releaser&& r) BOOST_NOEXCEPT : lock(r.lock), remaining(r.remaining) { r.remaining = 0; }
1237 		void operator=(Releaser&& r) { if (remaining) lock->release(remaining); lock = r.lock; remaining = r.remaining; r.remaining = 0; }
1239 		void release( int64_t amount = -1 ) {
1240 			if( amount == -1 || amount > remaining )
1241 				amount = remaining;
1243 			if (remaining)
1244 				lock->release( amount );
1245 			remaining -= amount;
1246 		}
~ReleaserFlowLock::Releaser1248 		~Releaser() { if (remaining) lock->release(remaining); }
1249 	};
FlowLockFlowLock1251 	FlowLock() : permits(1), active(0) {}
FlowLockFlowLock1252 	explicit FlowLock(int64_t permits) : permits(permits), active(0) {}
1254 	Future<Void> take(int taskID = TaskDefaultYield, int64_t amount = 1) {
1255 		if (active + amount <= permits || active == 0) {
1256 			active += amount;
1257 			return safeYieldActor(this, taskID, amount);
1258 		}
1259 		return takeActor(this, taskID, amount);
1260 	}
1261 	void release( int64_t amount = 1 ) {
1262 		ASSERT( (active > 0 || amount == 0) && active - amount >= 0 );
1263 		active -= amount;
1265 		while( !takers.empty() ) {
1266 			if( active + takers.begin()->second <= permits || active == 0 ) {
1267 				std::pair< Promise<Void>, int64_t > next = std::move( *takers.begin() );
1268 				active += next.second;
1269 				takers.pop_front();
1270 				next.first.send(Void());
1271 			} else {
1272 				break;
1273 			}
1274 		}
1275 	}
1277 	Future<Void> releaseWhen( Future<Void> const& signal, int amount = 1 ) { return releaseWhenActor( this, signal, amount ); }
1279 	// returns when any permits are available, having taken as many as possible up to the given amount, and modifies amount to the number of permits taken
takeUpToFlowLock1280 	Future<Void> takeUpTo(int64_t& amount) {
1281 		return takeMoreActor(this, &amount);
1282 	}
availableFlowLock1284 	int64_t available() const { return permits - active; }
activePermitsFlowLock1285 	int64_t activePermits() const { return active; }
waitersFlowLock1286 	int waiters() const { return takers.size(); }
1287 private:
1288 	std::list< std::pair< Promise<Void>, int64_t > > takers;
1289 	const int64_t permits;
1290 	int64_t active;
1291 	Promise<Void> broken_on_destruct;
takeActorFlowLock1293 	ACTOR static Future<Void> takeActor(FlowLock* lock, int taskID, int64_t amount) {
1294 		state std::list<std::pair<Promise<Void>, int64_t>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise<Void>(), amount));
1296 		try {
1297 			wait( it->first.getFuture() );
1298 		} catch (Error& e) {
1299 			if (e.code() == error_code_actor_cancelled) {
1300 				lock->takers.erase(it);
1301 				lock->release(0);
1302 			}
1303 			throw;
1304 		}
1305 		try {
1306 			double duration = BUGGIFY_WITH_PROB(.001) ? g_random->random01()*FLOW_KNOBS->BUGGIFY_FLOW_LOCK_RELEASE_DELAY : 0.0;
1307 			choose{ when(wait(delay(duration, taskID))) {}  // So release()ing the lock doesn't cause arbitrary code to run on the stack
1308 					when(wait(lock->broken_on_destruct.getFuture())) {} }
1309 			return Void();
1310 		} catch (...) {
1311 			TEST(true); // If we get cancelled here, we are holding the lock but the caller doesn't know, so release it
1312 			lock->release(amount);
1313 			throw;
1314 		}
1315 	}
takeMoreActorFlowLock1317 	ACTOR static Future<Void> takeMoreActor(FlowLock* lock, int64_t* amount) {
1318 		wait(lock->take());
1319 		int64_t extra = std::min( lock->available(), *amount-1 );
1320 		lock->active += extra;
1321 		*amount = 1 + extra;
1322 		return Void();
1323 	}
safeYieldActorFlowLock1325 	ACTOR static Future<Void> safeYieldActor(FlowLock* lock, int taskID, int64_t amount) {
1326 		try {
1327 			choose{
1328 				when(wait(yield(taskID))) {}
1329 				when(wait(lock->broken_on_destruct.getFuture())) {}
1330 			}
1331 			return Void();
1332 		} catch (Error& e) {
1333 			lock->release(amount);
1334 			throw;
1335 		}
1336 	}
releaseWhenActorFlowLock1338 	ACTOR static Future<Void> releaseWhenActor( FlowLock* self, Future<Void> signal, int64_t amount ) {
1339 		wait(signal);
1340 		self->release(amount);
1341 		return Void();
1342 	}
1343 };
1345 ACTOR template <class T>
1346 Future<Void> yieldPromiseStream( FutureStream<T> input, PromiseStream<T> output, int taskID = TaskDefaultYield ) {
1347 	loop {
1348 		T f = waitNext( input );
1349 		output.send( f );
1350 		wait( yield( taskID ) );
1351 	}
1352 }
1355 struct YieldedFutureActor : SAV<Void>, ActorCallback<YieldedFutureActor, 1, Void>, FastAllocated<YieldedFutureActor> {
1356 	Error in_error_state;
1358 	typedef ActorCallback<YieldedFutureActor, 1, Void> CB1;
1360 	using FastAllocated<YieldedFutureActor>::operator new;
1361 	using FastAllocated<YieldedFutureActor>::operator delete;
YieldedFutureActorYieldedFutureActor1363 	YieldedFutureActor(Future<Void> && f) : SAV<Void>(1, 1), in_error_state(Error::fromCode(UNSET_ERROR_CODE)) {
1364 		f.addYieldedCallbackAndClear(static_cast< ActorCallback< YieldedFutureActor, 1, Void >* >(this));
1365 	}
cancelYieldedFutureActor1367 	void cancel()
1368 	{
1369 		if (!SAV<Void>::canBeSet()) return;  // Cancel could be invoked *by* a callback within finish().  Otherwise it's guaranteed that we are waiting either on the original future or on a delay().
1370 		ActorCallback<YieldedFutureActor, 1, Void>::remove();
1371 		SAV<Void>::sendErrorAndDelPromiseRef(actor_cancelled());
1372 	}
destroyYieldedFutureActor1374 	virtual void destroy() {
1375 		delete this;
1376 	}
a_callback_fireYieldedFutureActor1378 	void a_callback_fire(ActorCallback<YieldedFutureActor, 1, Void>*, Void) {
1379 		if (int16_t(in_error_state.code()) == UNSET_ERROR_CODE) {
1380 			in_error_state = Error::fromCode(SET_ERROR_CODE);
1381 			if (check_yield())
1382 				doYield();
1383 			else
1384 				finish();
1385 		} else {
1386 			// We hit this case when and only when the delay() created by a previous doYield() fires.  Then we want to get at least one task done, regardless of what check_yield() would say.
1387 			finish();
1388 		}
1389 	}
a_callback_errorYieldedFutureActor1390 	void a_callback_error(ActorCallback<YieldedFutureActor, 1, Void>*, Error const& err) {
1391 		ASSERT(int16_t(in_error_state.code()) == UNSET_ERROR_CODE);
1392 		in_error_state = err;
1393 		if (check_yield())
1394 			doYield();
1395 		else
1396 			finish();
1397 	}
finishYieldedFutureActor1398 	void finish() {
1399 		ActorCallback<YieldedFutureActor, 1, Void>::remove();
1400 		if (int16_t(in_error_state.code()) == SET_ERROR_CODE)
1401 			SAV<Void>::sendAndDelPromiseRef(Void());
1402 		else
1403 			SAV<Void>::sendErrorAndDelPromiseRef(in_error_state);
1404 	}
doYieldYieldedFutureActor1405 	void doYield() {
1406 		// Since we are being fired, we are the first callback in the ring, and `prev` is the source future
1407 		Callback<Void>* source = CB1::prev;
1408 		ASSERT(source->next == static_cast<CB1*>(this));
1410 		// Remove the source future from the ring.  All the remaining callbacks in the ring should be yielded, since yielded callbacks are installed at the end
1411 		CB1::prev = source->prev;
1412 		CB1::prev->next = static_cast<CB1*>(this);
1414 		// The source future's ring is now empty, since we have removed all the callbacks
1415 		source->next = source->prev = source;
1416 		source->unwait();
1418 		// Link all the callbacks, including this one, into the ring of a delay future so that after a short time they will be fired again
1419 		delay(0, g_network->getCurrentTask()).addCallbackChainAndClear(static_cast< CB1* >(this));
1420 	}
1421 };
yieldedFuture(Future<Void> f)1423 static Future<Void> yieldedFuture(Future<Void> f) {
1424 	if (f.isReady())
1425 		return yield();
1426 	else
1427 		return Future<Void>(new YieldedFutureActor(std::move(f)));
1428 }
1430 //An AsyncMap that uses a yieldedFuture in its onChange method.
1431 template <class K, class V>
1432 class YieldedAsyncMap : public AsyncMap<K, V> {
1433 public:
onChange(K const & k)1434 	Future<Void> onChange(K const& k) {	// throws broken_promise if this is destroyed
1435 		auto &item = AsyncMap<K, V>::items[k];
1436 		if (item.value == AsyncMap<K, V>::defaultValue)
1437 			return destroyOnCancelYield(this, k, item.change.getFuture());
1438 		return yieldedFuture(item.change.getFuture());
1439 	}
destroyOnCancelYield(YieldedAsyncMap * self,K key,Future<Void> change)1441 	ACTOR static Future<Void> destroyOnCancelYield( YieldedAsyncMap* self, K key, Future<Void> change ) {
1442 		try {
1443 			wait(yieldedFuture(change));
1444 			return Void();
1445 		} catch (Error& e) {
1446 			if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount() == 1 && change.getPromiseReferenceCount() == 1) {
1448 					auto& p = self->items[key];
1449 					ASSERT(p.change.getFuture() == change);
1450 				}
1451 				self->items.erase(key);
1452 			}
1453 			throw;
1454 		}
1455 	}
1456 };
1458 ACTOR template <class T>
delayActionJittered(Future<T> what,double time)1459 Future<T> delayActionJittered( Future<T> what, double time ) {
1460 	wait( delayJittered( time ) );
1461 	T t = wait( what );
1462 	return t;
1463 }
1465 class AndFuture {
1466 public:
AndFuture()1467 	AndFuture() { }
AndFuture(AndFuture const & f)1469 	AndFuture(AndFuture const& f) {
1470 		futures = f.futures;
1471 	}
AndFuture(AndFuture && f)1473 	AndFuture(AndFuture&& f) BOOST_NOEXCEPT {
1474 		futures = std::move(f.futures);
1475 	}
AndFuture(Future<Void> const & f)1477 	AndFuture(Future<Void> const& f) {
1478 		futures.push_back(f);
1479 	}
AndFuture(Error const & e)1481 	AndFuture(Error const& e) {
1482 		futures.push_back(e);
1483 	}
1485 	operator Future<Void>() {
1486 		return getFuture();
1487 	}
1489 	void operator=(AndFuture const& f) {
1490 		futures = f.futures;
1491 	}
1493 	void operator=(AndFuture&& f) BOOST_NOEXCEPT {
1494 		futures = std::move(f.futures);
1495 	}
1497 	void operator=(Future<Void> const& f) {
1498 		futures.push_back(f);
1499 	}
1501 	void operator=(Error const& e) {
1502 		futures.push_back(e);
1503 	}
getFuture()1505 	Future<Void> getFuture() {
1506 		if(futures.empty())
1507 			return Void();
1509 		if(futures.size() == 1)
1510 			return futures[0];
1512 		Future<Void> f = waitForAll(futures);
1513 		futures = std::vector<Future<Void>>();
1514 		futures.push_back(f);
1515 		return f;
1516 	}
isReady()1518 	bool isReady() {
1519 		for( int i = futures.size() - 1; i >= 0; --i ) {
1520 			if( !futures[i].isReady() ) {
1521 				return false;
1522 			}
1523 			else if(!futures[i].isError()) {
1524 				swapAndPop(&futures, i);
1525 			}
1526 		}
1527 		return true;
1528 	}
isError()1530 	bool isError() {
1531 		for( int i = 0; i < futures.size(); i++ )
1532 			if( futures[i].isError() )
1533 				return true;
1534 		return false;
1535 	}
cleanup()1537 	void cleanup() {
1538 		for( int i = 0; i < futures.size(); i++ ) {
1539 			if( futures[i].isReady() && !futures[i].isError() ) {
1540 				swapAndPop(&futures, i--);
1541 			}
1542 		}
1543 	}
add(Future<Void> const & f)1545 	void add(Future<Void> const& f) {
1546 		if(!f.isReady() || f.isError())
1547 			futures.push_back(f);
1548 	}
add(AndFuture f)1550 	void add(AndFuture f) {
1551 		add(f.getFuture());
1552 	}
1554 private:
1555 	std::vector<Future<Void>> futures;
1556 };
1558 // Performs an unordered merge of a and b.
1559 ACTOR template <class T>
unorderedMergeStreams(FutureStream<T> a,FutureStream<T> b,PromiseStream<T> output)1560 Future<Void> unorderedMergeStreams(FutureStream<T> a, FutureStream<T> b, PromiseStream<T> output) {
1561 	state Future<T> aFuture = waitAndForward(a);
1562 	state Future<T> bFuture = waitAndForward(b);
1563 	state bool aOpen = true;
1564 	state bool bOpen = true;
1566 	loop{
1567 		try {
1568 			choose {
1569 				when(T val = wait(aFuture)) {
1570 					output.send(val);
1571 					aFuture = waitAndForward(a);
1572 				}
1573 				when(T val = wait(bFuture)) {
1574 					output.send(val);
1575 					bFuture = waitAndForward(b);
1576 				}
1577 			}
1578 		}
1579 		catch (Error &e) {
1580 			if (e.code() != error_code_end_of_stream) {
1581 				output.sendError(e);
1582 				break;
1583 			}
1585 			ASSERT(!aFuture.isError() || !bFuture.isError() || aFuture.getError().code() == bFuture.getError().code());
1587 			if (aFuture.isError()) {
1588 				aFuture = Never();
1589 				aOpen = false;
1590 			}
1591 			if (bFuture.isError()) {
1592 				bFuture = Never();
1593 				bOpen = false;
1594 			}
1596 			if (!aOpen && !bOpen) {
1597 				output.sendError(e);
1598 				break;
1599 			}
1600 		}
1601 	}
1603 	return Void();
1604 }
1606 // Returns the ordered merge of a and b, assuming that a and b are both already ordered (prefer a over b if keys are equal). T must be a class that implements compare()
1607 ACTOR template <class T>
orderedMergeStreams(FutureStream<T> a,FutureStream<T> b,PromiseStream<T> output)1608 Future<Void> orderedMergeStreams( FutureStream<T> a, FutureStream<T> b, PromiseStream<T> output ) {
1609 	state Optional<T> savedKVa;
1610 	state bool aOpen;
1611 	state Optional<T> savedKVb;
1612 	state bool bOpen;
1614 	aOpen = bOpen = true;
1616 	loop {
1617 		if ( aOpen && !savedKVa.present() ) {
1618 			try {
1619 				T KVa = waitNext( a );
1620 				savedKVa = Optional<T>( KVa );
1621 			} catch ( Error& e ) {
1622 				if ( e.code() == error_code_end_of_stream ) {
1623 					aOpen = false;
1624 					if (!bOpen) {
1625 						output.sendError(e);
1626 					}
1627 				} else {
1628 					output.sendError(e);
1629 					break;
1630 				}
1631 			}
1632 		}
1633 		if ( bOpen && !savedKVb.present() ) {
1634 			try {
1635 				T KVb = waitNext( b );
1636 				savedKVb = Optional<T>( KVb );
1637 			} catch ( Error& e ) {
1638 				if ( e.code() == error_code_end_of_stream ) {
1639 					bOpen = false;
1640 					if (!aOpen) {
1641 						output.sendError(e);
1642 					}
1643 				} else {
1644 					output.sendError(e);
1645 					break;
1646 				}
1647 			}
1648 		}
1650 		if (!aOpen) {
1651 			output.send( savedKVb.get() );
1652 			savedKVb = Optional<T>();
1653 		} else if (!bOpen) {
1654 			output.send( savedKVa.get() );
1655 			savedKVa = Optional<T>();
1656 		} else {
1657 			int cmp = savedKVa.get().compare( savedKVb.get() );
1659 			if ( cmp == 0 ) {
1660 				// prefer a
1661 				output.send( savedKVa.get() );
1662 				savedKVa = Optional<T>();
1663 				savedKVb = Optional<T>();
1664 			} else if ( cmp < 0 ) {
1665 				output.send( savedKVa.get() );
1666 				savedKVa = Optional<T>();
1667 			} else {
1668 				output.send( savedKVb.get() );
1669 				savedKVb = Optional<T>();
1670 			}
1671 		}
1672 	}
1674 	return Void();
1675 }
1678 ACTOR template<class T>
timeReply(Future<T> replyToTime,PromiseStream<double> timeOutput)1679 Future<Void> timeReply(Future<T> replyToTime, PromiseStream<double> timeOutput){
1680 	state double startTime = now();
1681 	try {
1682 		T _ = wait(replyToTime);
1683 		wait( delay(0) );
1684 		timeOutput.send(now() - startTime);
1685 	} catch( Error &e ) {
1686 		// Ignore broken promises.  They typically occur during shutdown and our callers don't want to have to create brokenPromiseToNever actors to ignore them.  For what it's worth we are breaking timeOutput to pass the pain along.
1687 		if( e.code() != error_code_broken_promise )
1688 			throw;
1689 	}
1690 	return Void();
1691 }
1693 #include "flow/unactorcompiler.h"
1695 #endif