1 /*
2  * genericactors.actor.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #pragma once
22 
23 // When actually compiled (NO_INTELLISENSE), include the generated version of this file.  In intellisense use the source version.
24 #if defined(NO_INTELLISENSE) && !defined(FLOW_GENERICACTORS_ACTOR_G_H)
25 	#define FLOW_GENERICACTORS_ACTOR_G_H
26 	#include "flow/genericactors.actor.g.h"
27 #elif !defined(GENERICACTORS_ACTOR_H)
28 	#define GENERICACTORS_ACTOR_H
29 
30 #include <list>
31 
32 #include "flow/flow.h"
33 #include "flow/Knobs.h"
34 #include "flow/Util.h"
35 #include "flow/actorcompiler.h"  // This must be the last #include.
36 #pragma warning( disable: 4355 )	// 'this' : used in base member initializer list
37 
38 ACTOR template<class T, class X>
39 Future<T> traceAfter(Future<T> what, const char* type, const char* key, X value, bool traceErrors = false)
40 {
41 	try {
42 		T val = wait(what);
43 		TraceEvent(type).detail(key, value);
44 		return val;
catch(Error & e)45 	} catch( Error &e ) {
46 		if(traceErrors) TraceEvent(type).error(e,true).detail(key, value);
47 		throw;
48 	}
49 }
50 
51 ACTOR template<class T, class X>
52 Future<T> traceAfterCall(Future<T> what, const char* type, const char* key, X func, bool traceErrors = false)
53 {
54 	try {
55 		state T val = wait(what);
56 		try {
57 			TraceEvent(type).detail(key, func(val));
catch(Error & e)58 		} catch( Error &e ) {
59 			TraceEvent(SevError, "TraceAfterCallError").error(e);
60 		}
61 		return val;
catch(Error & e)62 	} catch( Error &e ) {
63 		if(traceErrors) TraceEvent(type).error(e,true);
64 		throw;
65 	}
66 }
67 
68 ACTOR template <class T>
stopAfter(Future<T> what)69 Future<Optional<T>> stopAfter( Future<T> what ) {
70 	state Optional<T> ret = T();
71 	try {
72 		T _ = wait(what);
73 		ret = Optional<T>(_);
74 	} catch (Error& e) {
75 		bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete || e.code() == error_code_actor_cancelled;
76 		TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e);
77 		if(!ok) {
78 			fprintf(stderr, "Fatal Error: %s\n", e.what());
79 			ret = Optional<T>();
80 		}
81 	}
82 	g_network->stop();
83 	return ret;
84 }
85 
86 template <class T>
sorted(T range)87 T sorted(T range) {
88 	std::sort(range.begin(), range.end());
89 	return range;
90 }
91 
92 template <class T>
93 inline std::vector<T>& operator , (std::vector<T>& v, T a) {
94 	v.push_back(a);
95 	return v;
96 }
97 
98 template <class T>
99 inline std::vector<T>& operator , (std::vector<T> && v, T a) {
100 	return (const_cast<std::vector<T>&>(v), a);
101 }
102 
103 template <class T>
errorOr(T t)104 ErrorOr<T> errorOr( T t ) {
105 	return ErrorOr<T>(t);
106 }
107 
108 ACTOR template <class T>
errorOr(Future<T> f)109 Future<ErrorOr<T>> errorOr( Future<T> f ) {
110 	try {
111 		T t = wait(f);
112 		return ErrorOr<T>(t);
113 	} catch (Error& e) {
114 		return ErrorOr<T>(e);
115 	}
116 }
117 
118 ACTOR template <class T>
throwErrorOr(Future<ErrorOr<T>> f)119 Future<T> throwErrorOr( Future<ErrorOr<T>> f ) {
120 	ErrorOr<T> t = wait(f);
121 	if(t.isError()) throw t.getError();
122 	return t.get();
123 }
124 
125 ACTOR template <class T>
transformErrors(Future<T> f,Error err)126 Future<T> transformErrors( Future<T> f, Error err ) {
127 	try {
128 		T t = wait( f );
129 		return t;
130 	} catch( Error &e ) {
131 		if( e.code() == error_code_actor_cancelled )
132 			throw e;
133 		throw err;
134 	}
135 }
136 
137 ACTOR template <class T>
transformError(Future<T> f,Error inErr,Error outErr)138 Future<T> transformError( Future<T> f, Error inErr, Error outErr ) {
139 	try {
140 		T t = wait( f );
141 		return t;
142 	} catch( Error &e ) {
143 		if( e.code() == inErr.code() )
144 			throw outErr;
145 		throw e;
146 	}
147 }
148 
149 // Note that the RequestStream<T> version of forwardPromise doesn't exist, because what to do with errors?
150 
151 ACTOR template <class T>
forwardEvent(Event * ev,Future<T> input)152 void forwardEvent( Event* ev, Future<T> input ) {
153 	try {
154 		T value = wait(input);
155 	} catch (Error&) {
156 	}
157 	ev->set();
158 }
159 
160 ACTOR template <class T>
forwardEvent(Event * ev,T * t,Error * err,FutureStream<T> input)161 void forwardEvent( Event* ev, T* t, Error* err, FutureStream<T> input ) {
162 	try {
163 		T value = waitNext(input);
164 		*t = std::move(value);
165 		ev->set();
166 	} catch (Error& e) {
167 		*err = e;
168 		ev->set();
169 	}
170 }
171 
172 ACTOR template <class T>
waitForAllReady(std::vector<Future<T>> results)173 Future<Void> waitForAllReady( std::vector<Future<T>> results ) {
174 	state int i = 0;
175 	loop {
176 		if (i == results.size()) return Void();
177 		try {
178 			T r = wait( results[i] );
179 		} catch (...) {
180 		}
181 		i++;
182 	}
183 }
184 
185 ACTOR template <class T>
186 Future<T> timeout( Future<T> what, double time, T timedoutValue, int taskID = TaskDefaultDelay ) {
187 	Future<Void> end = delay( time, taskID );
188 	choose {
189 		when( T t = wait( what ) ) { return t; }
when(wait (end))190 		when( wait( end ) ) { return timedoutValue; }
191 	}
192 }
193 
194 ACTOR template <class T>
timeout(Future<T> what,double time)195 Future<Optional<T>> timeout( Future<T> what, double time ) {
196 	Future<Void> end = delay( time );
197 	choose {
198 		when( T t = wait( what ) ) { return t; }
199 		when( wait( end ) ) { return Optional<T>(); }
200 	}
201 }
202 
203 ACTOR template <class T>
204 Future<T> timeoutError( Future<T> what, double time, int taskID = TaskDefaultDelay ) {
205 	Future<Void> end = delay( time, taskID );
206 	choose {
207 		when( T t = wait( what ) ) { return t; }
when(wait (end))208 		when( wait( end ) ) { throw timed_out(); }
209 	}
210 }
211 
212 ACTOR template <class T>
213 Future<T> delayed( Future<T> what, double time = 0.0, int taskID = TaskDefaultDelay  ) {
214 	try {
215 		state T t = wait( what );
216 		wait( delay( time, taskID ) );
217 		return t;
catch(Error & e)218 	} catch( Error &e ) {
219 		state Error err = e;
220 		wait( delay( time, taskID ) );
221 		throw err;
222 	}
223 }
224 
225 ACTOR template<class Func>
226 Future<Void> recurring( Func what, double interval, int taskID = TaskDefaultDelay ) {
227 	loop choose {
when(wait (delay (interval,taskID)))228 		when ( wait( delay( interval, taskID ) ) ) { what(); }
229 	}
230 }
231 
232 ACTOR template<class Func>
trigger(Func what,Future<Void> signal)233 Future<Void> trigger( Func what, Future<Void> signal ) {
234 	wait( signal );
235 	what();
236 	return Void();
237 }
238 
239 ACTOR template<class Func>
triggerOnError(Func what,Future<Void> signal)240 Future<Void> triggerOnError( Func what, Future<Void> signal ) {
241 	try {
242 		wait( signal );
243 	}
244 	catch(Error &e) {
245 		what();
246 	}
247 
248 	return Void();
249 }
250 
251 //Waits for a future to complete and cannot be cancelled
252 //Most situations will use the overload below, which does not require a promise
253 ACTOR template<class T>
uncancellable(Future<T> what,Promise<T> result)254 void uncancellable(Future<T> what, Promise<T> result)
255 {
256 	try {
257 		T val = wait(what);
258 		result.send(val);
259 	} catch( Error &e ) {
260 		result.sendError(e);
261 	}
262 }
263 
264 //Waits for a future to complete and cannot be cancelled
265 ACTOR template<class T>
uncancellable(Future<T> what)266 Future<T> uncancellable(Future<T> what)
267 {
268 	Promise<T> resultPromise;
269 	Future<T> result = resultPromise.getFuture();
270 
271 	uncancellable(what, resultPromise);
272 	T val = wait(result);
273 
274 	return val;
275 }
276 
277 //Holds onto an object until a future either completes or is cancelled
278 //Used to prevent the object from being reclaimed
279 //
280 // NOTE: the order of the arguments is important. The arguments will be destructed in
281 // reverse order, and we need the object to be destructed last.
282 ACTOR template<class T, class X>
holdWhile(X object,Future<T> what)283 Future<T> holdWhile(X object, Future<T> what)
284 {
285 	T val = wait(what);
286 	return val;
287 }
288 
289 ACTOR template<class T, class X>
holdWhileVoid(X object,Future<T> what)290 Future<Void> holdWhileVoid(X object, Future<T> what)
291 {
292 	T val = wait(what);
293 	return Void();
294 }
295 
296 template<class T>
store(T & out,Future<T> what)297 Future<Void> store(T &out, Future<T> what) {
298 	return map(what, [&out](T const &v) { out = v; return Void(); });
299 }
300 
301 template<class T>
302 Future<Void> storeOrThrow(T &out, Future<Optional<T>> what, Error e = key_not_found()) {
303 	return map(what, [&out,e](Optional<T> const &o) {
304 		if(!o.present())
305 			throw e;
306 		out = o.get();
307 		return Void();
308 	});
309 }
310 
311 //Waits for a future to be ready, and then applies an asynchronous function to it.
312 ACTOR template<class T, class F, class U = decltype( fake<F>()(fake<T>()).getValue() )>
mapAsync(Future<T> what,F actorFunc)313 Future<U> mapAsync(Future<T> what, F actorFunc)
314 {
315 	T val = wait(what);
316 	U ret = wait(actorFunc(val));
317 	return ret;
318 }
319 
320 //maps a vector of futures with an asynchronous function
321 template<class T, class F>
mapAsync(std::vector<Future<T>> const & what,F const & actorFunc)322 std::vector<Future<decltype(actorFunc(T()).getValue())>> mapAsync(std::vector<Future<T>> const& what, F const& actorFunc)
323 {
324 	std::vector<typename std::result_of<F(T)>::type> ret;
325 	for(auto f : what)
326 		ret.push_back(mapAsync( f, actorFunc ));
327 	return ret;
328 }
329 
330 //maps a stream with an asynchronous function
331 ACTOR template<class T, class F, class U = decltype( fake<F>()(fake<T>()).getValue() )>
mapAsync(FutureStream<T> input,F actorFunc,PromiseStream<U> output)332 Future<Void> mapAsync( FutureStream<T> input, F actorFunc, PromiseStream<U> output ) {
333 	state Deque<Future<U>> futures;
334 
335 	loop {
336 		try {
337 			choose {
338 				when( T nextInput = waitNext( input ) ) {
339 					futures.push_back( actorFunc(nextInput) );
340 				}
341 				when( U nextOutput = wait( futures.size() == 0 ? Never() : futures.front() ) ) {
342 					output.send( nextOutput );
343 					futures.pop_front();
344 				}
345 
346 			}
347 		}
348 		catch ( Error& e ) {
349 			if( e.code() == error_code_end_of_stream ) {
350 				break;
351 			}
352 			else {
353 				output.sendError( e );
354 				throw e;
355 			}
356 		}
357 	}
358 
359 	while(futures.size()) {
360 		U nextOutput = wait( futures.front() );
361 		output.send( nextOutput );
362 		futures.pop_front();
363 	}
364 
365 	output.sendError(end_of_stream());
366 
367 	return Void();
368 
369 }
370 
371 //Waits for a future to be ready, and then applies a function to it.
372 ACTOR template<class T, class F>
map(Future<T> what,F func)373 Future<typename std::result_of<F(T)>::type> map(Future<T> what, F func)
374 {
375 	T val = wait(what);
376 	return func(val);
377 }
378 
379 //maps a vector of futures
380 template<class T, class F>
map(std::vector<Future<T>> const & what,F const & func)381 std::vector<Future<typename std::result_of<F(T)>>> map(std::vector<Future<T>> const& what, F const& func)
382 {
383 	std::vector<Future<typename std::result_of<F(T)>>> ret;
384 	for(auto f : what)
385 		ret.push_back(map( f, func ));
386 	return ret;
387 }
388 
389 //maps a stream
390 ACTOR template<class T, class F>
map(FutureStream<T> input,F func,PromiseStream<typename std::result_of<F (T)>> output)391 Future<Void> map( FutureStream<T> input, F func, PromiseStream<typename std::result_of<F(T)>> output )
392 {
393 	loop {
394 		try {
395 			T nextInput = waitNext( input );
396 			output.send(func(nextInput));
397 		}
398 		catch ( Error& e ) {
399 			if( e.code() == error_code_end_of_stream ) {
400 				break;
401 			}
402 			else
403 				throw;
404 		}
405 	}
406 
407 	output.sendError(end_of_stream());
408 
409 	return Void();
410 }
411 
412 //Returns if the future returns true, otherwise waits forever.
returnIfTrue(Future<bool> f)413 ACTOR static Future<Void> returnIfTrue( Future<bool> f )
414 {
415 	bool b = wait( f );
416 	if ( b ) {
417 		return Void();
418 	}
419 	wait( Never() );
420 	throw internal_error();
421 }
422 
423 //Returns if the future, when waited on and then evaluated with the predicate, returns true, otherwise waits forever
424 template<class T, class F>
returnIfTrue(Future<T> what,F pred)425 Future<Void> returnIfTrue( Future<T> what, F pred)
426 {
427 	return returnIfTrue( map( what, pred ) );
428 }
429 
430 //filters a stream
431 ACTOR template<class T, class F>
filter(FutureStream<T> input,F pred,PromiseStream<T> output)432 Future<Void> filter( FutureStream<T> input, F pred, PromiseStream<T> output )
433 {
434 	loop {
435 		try {
436 			T nextInput = waitNext( input );
437 			if(func(nextInput))
438 				output.send(nextInput);
439 		}
440 		catch ( Error& e ) {
441 			if( e.code() == error_code_end_of_stream ) {
442 				break;
443 			}
444 			else
445 				throw;
446 		}
447 	}
448 
449 	output.sendError(end_of_stream());
450 
451 	return Void();
452 }
453 
454 //filters a stream asynchronously
455 ACTOR template<class T, class F>
asyncFilter(FutureStream<T> input,F actorPred,PromiseStream<T> output)456 Future<Void> asyncFilter( FutureStream<T> input, F actorPred, PromiseStream<T> output )
457 {
458 	state Deque<std::pair<T, Future<bool>>> futures;
459 	state std::pair<T, Future<bool>> p;
460 
461 	loop {
462 		try {
463 			choose {
464 				when ( T nextInput = waitNext(input) ) {
465 					futures.push_back( std::pair<T, Future<bool>>(nextInput, actorPred(nextInput)) );
466 				}
467 				when ( bool pass = wait( futures.size() == 0 ? Never() : futures.front().second ) ) {
468 					if(pass) output.send(futures.front().first);
469 					futures.pop_front();
470 				}
471 			}
472 		}
473 		catch ( Error& e ) {
474 			if( e.code() == error_code_end_of_stream ) {
475 				break;
476 			}
477 			else {
478 				throw e;
479 			}
480 		}
481 	}
482 
483 	while(futures.size()) {
484 		p = futures.front();
485 		bool pass = wait( p.second );
486 		if(pass) output.send(p.first);
487 		futures.pop_front();
488 	}
489 
490 	output.sendError(end_of_stream());
491 
492 	return Void();
493 }
494 
495 template <class T>
496 struct WorkerCache {
497 	// SOMEDAY: Would we do better to use "unreliable" (at most once) transport for the initialize requests and get rid of this?
498 	// It doesn't provide true at most once behavior because things are removed from the cache after they have terminated.
existsWorkerCache499 	bool exists( UID id ) {
500 		return id_interface.count( id ) != 0;
501 	}
setWorkerCache502 	void set( UID id, const Future<T>& onReady ) {
503 		ASSERT( !exists( id ) );
504 		id_interface[ id ] = onReady;
505 	}
getWorkerCache506 	Future<T> get( UID id ) {
507 		ASSERT( exists( id ) );
508 		return id_interface[ id ];
509 	}
510 
removeOnReadyWorkerCache511 	Future<Void> removeOnReady( UID id, Future<Void> const& ready ) {
512 		return removeOnReady( this, id, ready );
513 	}
514 private:
removeOnReadyWorkerCache515 	ACTOR static Future<Void> removeOnReady( WorkerCache* self, UID id, Future<Void> ready ) {
516 		try {
517 			wait(ready);
518 			self->id_interface.erase(id);
519 			return Void();
520 		} catch ( Error &e ) {
521 			self->id_interface.erase(id);
522 			throw;
523 		}
524 	}
525 
526 	std::map<UID, Future<T>> id_interface;
527 };
528 
529 template <class K, class V>
530 class AsyncMap : NonCopyable {
531 public:
532 	// Represents a complete function from keys to values (K -> V)
533 	// All values not explicitly inserted map to V()
534 	// If this isn't appropriate, use V=Optional<X>
535 
AsyncMap()536 	AsyncMap() : defaultValue(), destructing(false) {}
537 
~AsyncMap()538 	virtual ~AsyncMap() {
539 		destructing = true;
540 		items.clear();
541 	}
542 
set(K const & k,V const & v)543 	void set( K const& k, V const& v ) {
544 		auto& i = items[k];
545 		if (i.value != v)
546 			setUnconditional(k,v,i);
547 	}
setUnconditional(K const & k,V const & v)548 	void setUnconditional( K const& k, V const& v ) {
549 		setUnconditional(k,v,items[k]);
550 	}
triggerAll()551 	void triggerAll() {
552 		std::vector<Promise<Void>> ps;
553 		for(auto it = items.begin(); it != items.end(); ++it){
554 			ps.resize(ps.size()+1);
555 			ps.back().swap( it->second.change );
556 		}
557 		std::vector<Promise<Void>> noDestroy = ps;  // See explanation of noDestroy in setUnconditional()
558 		for(auto p=ps.begin(); p!=ps.end(); ++p)
559 			p->send(Void());
560 	}
triggerRange(K const & begin,K const & end)561 	void triggerRange( K const& begin, K const& end ) {
562 		std::vector<Promise<Void>> ps;
563 		for(auto it = items.lower_bound(begin); it != items.end() && it->first < end; ++it){
564 			ps.resize(ps.size()+1);
565 			ps.back().swap( it->second.change );
566 		}
567 		std::vector<Promise<Void>> noDestroy = ps;  // See explanation of noDestroy in setUnconditional()
568 		for(auto p=ps.begin(); p!=ps.end(); ++p)
569 			p->send(Void());
570 	}
trigger(K const & key)571 	void trigger( K const& key ) {
572 		if( items.count(key) != 0 ) {
573 			auto& i = items[key];
574 			Promise<Void> trigger;
575 			i.change.swap(trigger);
576 			Promise<Void> noDestroy = trigger;  // See explanation of noDestroy in setUnconditional()
577 
578 			if (i.value == defaultValue)
579 				items.erase(key);
580 
581 			trigger.send(Void());
582 		}
583 	}
clear(K const & k)584 	void clear( K const& k ) { set(k, V()); }
get(K const & k)585 	V const& get( K const& k ) {
586 		auto it = items.find(k);
587 		if (it != items.end())
588 			return it->second.value;
589 		else
590 			return defaultValue;
591 	}
count(K const & k)592 	int count( K const& k ) {
593 		auto it = items.find(k);
594 		if (it != items.end())
595 			return 1;
596 		return 0;
597 	}
onChange(K const & k)598 	virtual Future<Void> onChange( K const& k ) {	// throws broken_promise if this is destroyed
599 		auto &item = items[k];
600 		if (item.value == defaultValue)
601 			return destroyOnCancel( this, k, item.change.getFuture() );
602 		return item.change.getFuture();
603 	}
getKeys()604 	std::vector<K> getKeys() {
605 		std::vector<K> keys;
606 		for(auto i = items.begin(); i != items.end(); ++i)
607 			keys.push_back( i->first );
608 		return keys;
609 	}
resetNoWaiting()610 	void resetNoWaiting() {
611 		for(auto i = items.begin(); i != items.end(); ++i)
612 			ASSERT( i->second.change.getFuture().getFutureReferenceCount() == 1 );
613 		items.clear();
614 	}
615 
616 protected:
617 	// Invariant: Every item in the map either has value!=defaultValue xor a destroyOnCancel actor waiting on change.getFuture()
618 	struct P {
619 		V value;
620 		Promise<Void> change;
PP621 		P() : value() {}
622 	};
623 	std::map<K, P> items;
624 	const V defaultValue;
625 	bool destructing;
626 
setUnconditional(K const & k,V const & v,P & i)627 	void setUnconditional( K const& k, V const& v, P& i ) {
628 		Promise<Void> trigger;
629 		i.change.swap(trigger);
630 		Promise<Void> noDestroy = trigger;  // The send(Void()) or even V::operator= could cause destroyOnCancel,
631 			                                // which could undo the change to i.value here.  Keeping the promise reference count >= 2
632 			                                // prevents destroyOnCancel from erasing anything from the map.
633 
634 		if (v == defaultValue)
635 			items.erase(k);
636 		else
637 			i.value = v;
638 
639 		trigger.send(Void());
640 	}
641 
destroyOnCancel(AsyncMap * self,K key,Future<Void> change)642 	ACTOR Future<Void> destroyOnCancel( AsyncMap* self, K key, Future<Void> change ) {
643 		try {
644 			wait(change);
645 			return Void();
646 		} catch (Error& e) {
647 			if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount()==1 && change.getPromiseReferenceCount()==1) {
648 				if(EXPENSIVE_VALIDATION) {
649 					auto& p = self->items[key];
650 					ASSERT(p.change.getFuture() == change);
651 				}
652 				self->items.erase(key);
653 			}
654 			throw;
655 		}
656 	}
657 };
658 
659 template <class V>
660 class AsyncVar : NonCopyable, public ReferenceCounted<AsyncVar<V>> {
661 public:
AsyncVar()662 	AsyncVar() : value() {}
AsyncVar(V const & v)663 	AsyncVar( V const& v ) : value(v) {}
AsyncVar(AsyncVar && av)664 	AsyncVar(AsyncVar&& av) : value(std::move(av.value)), nextChange(std::move(av.nextChange)) {}
665 	void operator=(AsyncVar&& av) { value = std::move(av.value); nextChange = std::move(av.nextChange); }
666 
get()667 	V const& get() const {
668 		return value;
669 	}
onChange()670 	Future<Void> onChange() const {
671 		return nextChange.getFuture();
672 	}
set(V const & v)673 	void set( V const& v ) {
674 		if (v != value)
675 			setUnconditional(v);
676 	}
setUnconditional(V const & v)677 	void setUnconditional( V const& v ) {
678 		Promise<Void> t;
679 		this->nextChange.swap(t);
680 		this->value = v;
681 		t.send(Void());
682 	}
trigger()683 	void trigger() {
684 		Promise<Void> t;
685 		this->nextChange.swap(t);
686 		t.send(Void());
687 	}
688 
689 private:
690 	V value;
691 	Promise<Void> nextChange;
692 };
693 
694 class AsyncTrigger : NonCopyable {
695 public:
AsyncTrigger()696 	AsyncTrigger() {}
AsyncTrigger(AsyncTrigger && at)697 	AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {}
698 	void operator=(AsyncTrigger&& at) { v = std::move(at.v); }
onTrigger()699 	Future<Void> onTrigger() {
700 		return v.onChange();
701 	}
trigger()702 	void trigger() {
703 		v.trigger();
704 	}
705 private:
706 	AsyncVar<Void> v;
707 };
708 
709 class Debouncer : NonCopyable {
710 public:
Debouncer(double delay)711 	explicit Debouncer( double delay ) { worker = debounceWorker(this, delay); }
Debouncer(Debouncer && at)712 	Debouncer(Debouncer&& at) : input(std::move(at.input)), output(std::move(at.output)) {}
713 	void operator=(Debouncer&& at) { input = std::move(at.input); output = std::move(at.output); }
onTrigger()714 	Future<Void> onTrigger() {
715 		return output.onChange();
716 	}
trigger()717 	void trigger() {
718 		input.setUnconditional(Void());
719 	}
720 private:
721 	AsyncVar<Void> input;
722 	AsyncVar<Void> output;
723 	Future<Void> worker;
724 
debounceWorker(Debouncer * self,double bounceTime)725 	ACTOR Future<Void> debounceWorker( Debouncer* self, double bounceTime ) {
726 		loop {
727 			wait( self->input.onChange() );
728 			loop {
729 				choose {
730 					when(wait( self->input.onChange() )) {}
731 					when(wait( delay(bounceTime) )) { break; }
732 				}
733 			}
734 			self->output.setUnconditional(Void());
735 		}
736 	}
737 };
738 
asyncDeserialize(Reference<AsyncVar<Standalone<StringRef>>> input,Reference<AsyncVar<Optional<T>>> output)739 ACTOR template <class T> Future<Void> asyncDeserialize( Reference<AsyncVar<Standalone<StringRef>>> input, Reference<AsyncVar<Optional<T>>> output ) {
740 	loop {
741 		if (input->get().size())
742 			output->set( BinaryReader::fromStringRef<T>( input->get(), IncludeVersion() ) );
743 		else
744 			output->set( Optional<T>() );
745 		wait( input->onChange() );
746 	}
747 }
748 
749 ACTOR template <class V, class T>
forwardVector(Future<V> values,std::vector<Promise<T>> out)750 void forwardVector( Future<V> values, std::vector<Promise<T>> out ) {
751 	V in = wait( values );
752 	ASSERT (in.size() == out.size());
753 	for(int i=0; i<out.size(); i++)
754 		out[i].send( in[i] );
755 }
756 
757 ACTOR template <class T>
delayedAsyncVar(Reference<AsyncVar<T>> in,Reference<AsyncVar<T>> out,double time)758 Future<Void> delayedAsyncVar( Reference<AsyncVar<T>> in, Reference<AsyncVar<T>> out, double time ) {
759 	try {
760 		loop {
761 			wait( delay( time ) );
762 			out->set( in->get() );
763 			wait( in->onChange() );
764 		}
765 	} catch (Error& e) {
766 		out->set( in->get() );
767 		throw;
768 	}
769 }
770 
771 ACTOR template <class T>
setAfter(Reference<AsyncVar<T>> var,double time,T val)772 Future<Void> setAfter( Reference<AsyncVar<T>> var, double time, T val ) {
773 	wait( delay( time ) );
774 	var->set( val );
775 	return Void();
776 }
777 
778 ACTOR template <class T>
779 Future<Void> resetAfter( Reference<AsyncVar<T>> var, double time, T val, int warningLimit = -1, double warningResetDelay = 0, const char* context = NULL ) {
780 	state bool isEqual = var->get() == val;
781 	state Future<Void> resetDelay = isEqual ? Never() : delay(time);
782 	state int resetCount = 0;
783 	state double lastReset = now();
784 	loop {
785 		choose {
when(wait (resetDelay))786 			when( wait( resetDelay ) ) {
787 				var->set( val );
788 				if(now() - lastReset > warningResetDelay) {
789 					resetCount = 0;
790 				}
791 				resetCount++;
792 				if(context && warningLimit >= 0 && resetCount > warningLimit) {
793 					TraceEvent(SevWarnAlways, context).detail("ResetCount", resetCount).detail("LastReset", now() - lastReset);
794 				}
795 				lastReset = now();
796 				isEqual = true;
797 				resetDelay = Never();
798 			}
799 			when( wait( var->onChange() ) ) {}
800 		}
801 		if( isEqual && var->get() != val ) {
802 			isEqual = false;
803 			resetDelay = delay(time);
804 		}
805 		if( !isEqual && var->get() == val ) {
806 			isEqual = true;
807 			resetDelay = Never();
808 		}
809 	}
810 }
811 
812 ACTOR template <class T>
setWhenDoneOrError(Future<Void> condition,Reference<AsyncVar<T>> var,T val)813 Future<Void> setWhenDoneOrError( Future<Void> condition, Reference<AsyncVar<T>> var, T val ) {
814 	try {
815 		wait( condition );
816 	}
817 	catch ( Error& e ) {
818 		if (e.code() == error_code_actor_cancelled) throw;
819 	}
820 	var->set( val );
821 	return Void();
822 }
823 
824 Future<bool> allTrue( const std::vector<Future<bool>>& all );
825 Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> const& input, Reference<AsyncVar<bool>> const& output );
826 Future<Void> cancelOnly( std::vector<Future<Void>> const& futures );
827 Future<Void> timeoutWarningCollector( FutureStream<Void> const& input, double const& logDelay, const char* const& context, UID const& id );
828 Future<bool> quorumEqualsTrue( std::vector<Future<bool>> const& futures, int const& required );
829 
830 ACTOR template <class T>
streamHelper(PromiseStream<T> output,PromiseStream<Error> errors,Future<T> input)831 Future<Void> streamHelper( PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input ) {
832 	try {
833 		T value = wait(input);
834 		output.send(value);
835 	} catch (Error& e) {
836 		if (e.code() == error_code_actor_cancelled)
837 			throw;
838 		errors.send(e);
839 	}
840 	return Void();
841 }
842 
843 template <class T>
makeStream(const std::vector<Future<T>> & futures,PromiseStream<T> & stream,PromiseStream<Error> & errors)844 Future<Void> makeStream( const std::vector<Future<T>>& futures, PromiseStream<T>& stream, PromiseStream<Error>& errors ) {
845 	std::vector<Future<Void>> forwarders;
846 	for(int f=0; f<futures.size(); f++)
847 		forwarders.push_back( streamHelper( stream, errors, futures[f] ) );
848 	return cancelOnly(forwarders);
849 }
850 
851 template <class T>
852 class QuorumCallback;
853 
854 template <class T>
855 struct Quorum : SAV<Void> {
856 	int antiQuorum;
857 	int count;
858 
sizeForQuorum859 	static inline int sizeFor(int count) {
860 		return sizeof(Quorum<T>) + sizeof(QuorumCallback<T>)*count;
861 	}
862 
destroyQuorum863 	virtual void destroy() {
864 		int size = sizeFor(this->count);
865 		this->~Quorum();
866 		freeFast(size, this);
867 	}
cancelQuorum868 	virtual void cancel() {
869 		int cancelled_callbacks = 0;
870 		for (int i = 0; i < count; i++)
871 			if (callbacks()[i].next) {
872 				callbacks()[i].remove();
873 				callbacks()[i].next = 0;
874 				++cancelled_callbacks;
875 			}
876 		if (canBeSet())
877 			sendError(actor_cancelled());
878 		for (int i = 0; i < cancelled_callbacks; i++)
879 			delPromiseRef();
880 	}
QuorumQuorum881 	explicit Quorum(int quorum, int count) : SAV<Void>(1, count), antiQuorum(count - quorum + 1), count(count) {
882 		if (!quorum) this->send(Void());
883 	}
oneSuccessQuorum884 	void oneSuccess() {
885 		if (getPromiseReferenceCount() == antiQuorum && canBeSet())
886 			this->sendAndDelPromiseRef(Void());
887 		else
888 			delPromiseRef();
889 	}
oneErrorQuorum890 	void oneError(Error err) {
891 		if (canBeSet())
892 			this->sendErrorAndDelPromiseRef(err);
893 		else
894 			delPromiseRef();
895 	}
896 
callbacksQuorum897 	QuorumCallback<T>* callbacks() { return (QuorumCallback<T>*)(this + 1); }
898 };
899 
900 template <class T>
901 class QuorumCallback : public Callback<T> {
902 public:
QuorumCallback(Future<T> future,Quorum<T> * head)903 	QuorumCallback(Future<T> future, Quorum<T>* head)
904 		: head(head)
905 	{
906 		future.addCallbackAndClear(this);
907 	}
fire(const T & value)908 	virtual void fire(const T& value) {
909 		Callback<T>::remove();
910 		Callback<T>::next = 0;
911 		head->oneSuccess();
912 	}
error(Error error)913 	virtual void error(Error error) {
914 		Callback<T>::remove();
915 		Callback<T>::next = 0;
916 		head->oneError(error);
917 	}
918 
919 private:
920 	Quorum<T>* head;
921 };
922 
923 template <class T>
quorum(std::vector<Future<T>> const & results,int n)924 Future<Void> quorum(std::vector<Future<T>> const& results, int n) {
925 	ASSERT(n >= 0 && n <= results.size());
926 
927 	int size = Quorum<T>::sizeFor(results.size());
928 	Quorum<T>* q = new (allocateFast(size)) Quorum<T>(n, results.size());
929 
930 	QuorumCallback<T>* nextCallback = q->callbacks();
931 	for (auto & r : results) {
932 		if (r.isReady()) {
933 			nextCallback->next = 0;
934 			if (r.isError())
935 				q->oneError(r.getError());
936 			else
937 				q->oneSuccess();
938 		}
939 		else
940 			new (nextCallback) QuorumCallback<T>(r, q);
941 		++nextCallback;
942 	}
943 	return Future<Void>(q);
944 }
945 
946 ACTOR template <class T>
947 Future<Void> smartQuorum( std::vector<Future<T>> results, int required, double extraSeconds, int taskID = TaskDefaultDelay ) {
948 	if (results.empty() && required == 0) return Void();
949 	wait(quorum(results, required));
950 	choose {
951 		when (wait(quorum(results, (int)results.size()))) {return Void();}
when(wait (delay (extraSeconds,taskID)))952 		when (wait(delay(extraSeconds, taskID))) {return Void(); }
953 	}
954 }
955 
956 template <class T>
waitForAll(std::vector<Future<T>> const & results)957 Future<Void> waitForAll( std::vector<Future<T>> const& results ) {
958 	if (results.empty()) return Void();
959 	return quorum( results, (int)results.size() );
960 }
961 
962 template <class T>
waitForAny(std::vector<Future<T>> const & results)963 Future<Void> waitForAny( std::vector<Future<T>> const& results ) {
964 	if (results.empty()) return Void();
965 	return quorum( results, 1 );
966 }
967 
shortCircuitAny(std::vector<Future<bool>> f)968 ACTOR static Future<bool> shortCircuitAny( std::vector<Future<bool>> f )
969 {
970 	std::vector<Future<Void>> sc;
971 	for(Future<bool> fut : f) {
972 		sc.push_back(returnIfTrue(fut));
973 	}
974 
975 	choose {
976 		when( wait( waitForAll( f ) ) ) {
977 			// Handle a possible race condition? If the _last_ term to
978 			// be evaluated triggers the waitForAll before bubbling
979 			// out of the returnIfTrue quorum
980 			for ( auto fut : f ) {
981 				if ( fut.get() ) {
982 					return true;
983 				}
984 			}
985 			return false;
986 		}
987 		when( wait( waitForAny( sc ) ) ) {
988 			return true;
989 		}
990 	}
991 }
992 
993 ACTOR template <class T>
getAll(std::vector<Future<T>> input)994 Future<std::vector<T>> getAll( std::vector<Future<T>> input ) {
995 	if (input.empty()) return std::vector<T>();
996 	wait( quorum( input, input.size() ) );
997 
998 	std::vector<T> output;
999 	for(int i=0; i<input.size(); i++)
1000 		output.push_back( input[i].get() );
1001 	return output;
1002 }
1003 
1004 ACTOR template <class T>
appendAll(std::vector<Future<std::vector<T>>> input)1005 Future<std::vector<T>> appendAll( std::vector<Future<std::vector<T>>> input ) {
1006 	wait( quorum( input, input.size() ) );
1007 
1008 	std::vector<T> output;
1009 	for(int i=0; i<input.size(); i++) {
1010 		auto const& r = input[i].get();
1011 		output.insert( output.end(), r.begin(), r.end() );
1012 	}
1013 	return output;
1014 }
1015 
onEqual(Future<T> in,T equalTo)1016 ACTOR template <class T> Future<Void> onEqual( Future<T> in, T equalTo ) {
1017 	T t = wait(in);
1018 	if ( t == equalTo )
1019 		return Void();
1020 	wait(Never());  // never return
1021 	throw internal_error();  // does not happen
1022 }
1023 
1024 ACTOR template <class T>
success(Future<T> of)1025 Future<Void> success( Future<T> of ) {
1026 	T t = wait( of );
1027 	return Void();
1028 }
1029 
1030 ACTOR template <class T>
ready(Future<T> f)1031 Future<Void> ready( Future<T> f ) {
1032 	try {
1033 		T _ = wait( f );
1034 	} catch (...) {
1035 	}
1036 	return Void();
1037 }
1038 
1039 ACTOR template <class T>
waitAndForward(FutureStream<T> input)1040 Future<T> waitAndForward( FutureStream<T> input ) {
1041 	T output = waitNext( input );
1042 	return output;
1043 }
1044 
1045 ACTOR template <class T>
reportErrorsExcept(Future<T> in,const char * context,UID id,std::set<int> const * pExceptErrors)1046 Future<T> reportErrorsExcept( Future<T> in, const char* context, UID id, std::set<int> const* pExceptErrors ) {
1047 	try {
1048 		T t = wait( in );
1049 		return t;
1050 	} catch (Error& e) {
1051 		if (e.code() != error_code_actor_cancelled && (!pExceptErrors || !pExceptErrors->count(e.code())))
1052 			TraceEvent(SevError, context, id).error(e);
1053 		throw;
1054 	}
1055 }
1056 
1057 template <class T>
1058 Future<T> reportErrors( Future<T> const& in, const char* context, UID id = UID() ) {
1059 	return reportErrorsExcept(in, context, id, NULL);
1060 }
1061 
1062 ACTOR template <class T>
require(Future<Optional<T>> in,int errorCode)1063 Future<T> require( Future<Optional<T>> in, int errorCode ) {
1064 	Optional<T> o = wait(in);
1065 	if (o.present()) {
1066 		return o.get();
1067 	}
1068 	else {
1069 		throw Error(errorCode);
1070 	}
1071 }
1072 
1073 ACTOR template <class T>
waitForFirst(std::vector<Future<T>> items)1074 Future<T> waitForFirst( std::vector<Future<T>> items ) {
1075 	state PromiseStream<T> resultStream;
1076 	state PromiseStream<Error> errorStream;
1077 
1078 	state Future<Void> forCancellation = makeStream( items, resultStream, errorStream );
1079 
1080 	state FutureStream<T> resultFutureStream = resultStream.getFuture();
1081 	state FutureStream<Error> errorFutureStream = errorStream.getFuture();
1082 
1083 	choose {
1084 		when (T val = waitNext( resultFutureStream )) {
1085 			forCancellation = Future<Void>();
1086 			return val;
1087 		}
1088 		when (Error e = waitNext( errorFutureStream )) {
1089 			forCancellation = Future<Void>();
1090 			throw e;
1091 		}
1092 	}
1093 }
1094 
1095 ACTOR template <class T>
tag(Future<Void> future,T what)1096 Future<T> tag( Future<Void> future, T what ) {
1097 	wait(future);
1098 	return what;
1099 }
1100 
1101 ACTOR template <class T>
tag(Future<Void> future,T what,PromiseStream<T> stream)1102 Future<Void> tag( Future<Void> future, T what, PromiseStream<T> stream ) {
1103 	wait( future );
1104 	stream.send( what );
1105 	return Void();
1106 }
1107 
1108 ACTOR template <class T>
tagError(Future<Void> future,Error e)1109 Future<T> tagError( Future<Void> future, Error e) {
1110 	wait(future);
1111 	throw e;
1112 }
1113 
1114 //If the future is ready, yields and returns. Otherwise, returns when future is set.
1115 template <class T>
orYield(Future<T> f)1116 Future<T> orYield( Future<T> f ) {
1117 	if(f.isReady()) {
1118 		if(f.isError())
1119 			return tagError<T>(yield(), f.getError());
1120 		else
1121 			return tag(yield(), f.get());
1122 	}
1123 	else
1124 		return f;
1125 }
1126 
orYield(Future<Void> f)1127 static Future<Void> orYield( Future<Void> f ) {
1128 	if(f.isReady()) {
1129 		if(f.isError())
1130 			return tagError<Void>(yield(), f.getError());
1131 		else
1132 			return yield();
1133 	}
1134 	else
1135 		return f;
1136 }
1137 
chooseActor(Future<T> lhs,Future<T> rhs)1138 ACTOR template <class T> Future<T> chooseActor( Future<T> lhs, Future<T> rhs ) {
1139 	choose {
1140 		when ( T t = wait(lhs) ) { return t; }
1141 		when ( T t = wait(rhs) ) { return t; }
1142 	}
1143 }
1144 
1145 // set && set -> set
1146 // error && x -> error
1147 // all others -> unset
1148 static Future<Void> operator &&( Future<Void> const& lhs, Future<Void> const& rhs ) {
1149 	if(lhs.isReady()) {
1150 		if(lhs.isError()) return lhs;
1151 		else return rhs;
1152 	}
1153 	if(rhs.isReady()) {
1154 		if(rhs.isError()) return rhs;
1155 		else return lhs;
1156 	}
1157 
1158 	std::vector<Future<Void>> v;
1159 	v.push_back( lhs );
1160 	v.push_back( rhs );
1161 	return waitForAll(v);
1162 }
1163 
1164 // error || unset -> error
1165 // unset || unset -> unset
1166 // all others -> set
1167 inline Future<Void> operator ||( Future<Void> const& lhs, Future<Void> const& rhs ) {
1168 	if(lhs.isReady()) {
1169 		if(lhs.isError()) return lhs;
1170 		if(rhs.isReady()) return rhs;
1171 		return lhs;
1172 	}
1173 
1174 	return chooseActor( lhs, rhs );
1175 }
1176 
brokenPromiseToNever(Future<T> in)1177 ACTOR template <class T> Future<T> brokenPromiseToNever( Future<T> in ) {
1178 	try {
1179 		T t = wait(in);
1180 		return t;
1181 	} catch (Error& e) {
1182 		if (e.code() != error_code_broken_promise)
1183 			throw;
1184 		wait(Never());  // never return
1185 		throw internal_error();  // does not happen
1186 	}
1187 }
1188 
brokenPromiseToMaybeDelivered(Future<T> in)1189 ACTOR template <class T> Future<T> brokenPromiseToMaybeDelivered( Future<T> in ) {
1190 	try {
1191 		T t = wait(in);
1192 		return t;
1193 	} catch (Error& e) {
1194 		if (e.code() == error_code_broken_promise) {
1195 			throw request_maybe_delivered();
1196 		}
1197 		throw;
1198 	}
1199 }
1200 
tagAndForward(Promise<T> * pOutputPromise,T value,Future<Void> signal)1201 ACTOR template <class T> void tagAndForward( Promise<T>* pOutputPromise, T value, Future<Void> signal ) {
1202 	state Promise<T> out( std::move(*pOutputPromise) );
1203 	wait( signal );
1204 	out.send(value);
1205 }
1206 
tagAndForwardError(Promise<T> * pOutputPromise,Error value,Future<Void> signal)1207 ACTOR template <class T> void tagAndForwardError( Promise<T>* pOutputPromise, Error value, Future<Void> signal ) {
1208 	state Promise<T> out( std::move(*pOutputPromise) );
1209 	wait( signal );
1210 	out.sendError(value);
1211 }
1212 
waitOrError(Future<T> f,Future<Void> errorSignal)1213 ACTOR template <class T> Future<T> waitOrError(Future<T> f, Future<Void> errorSignal) {
1214 	choose {
1215 		when(T val = wait(f)) {
1216 			return val;
1217 		}
1218 		when(wait(errorSignal)) {
1219 			ASSERT(false);
1220 			throw internal_error();
1221 		}
1222 	}
1223 }
1224 
1225 struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
1226 	// FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code between
1227 	// wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock is fewer than the
1228 	// number of permits, and release() makes the caller no longer a holder of the lock. release() only runs waiting take()rs
1229 	// after the caller wait()s
1230 
1231 	struct Releaser : NonCopyable {
1232 		FlowLock* lock;
1233 		int remaining;
ReleaserFlowLock::Releaser1234 		Releaser() : lock(0), remaining(0) {}
1235 		Releaser( FlowLock& lock, int64_t amount = 1 ) : lock(&lock), remaining(amount) {}
ReleaserFlowLock::Releaser1236 		Releaser(Releaser&& r) BOOST_NOEXCEPT : lock(r.lock), remaining(r.remaining) { r.remaining = 0; }
1237 		void operator=(Releaser&& r) { if (remaining) lock->release(remaining); lock = r.lock; remaining = r.remaining; r.remaining = 0; }
1238 
1239 		void release( int64_t amount = -1 ) {
1240 			if( amount == -1 || amount > remaining )
1241 				amount = remaining;
1242 
1243 			if (remaining)
1244 				lock->release( amount );
1245 			remaining -= amount;
1246 		}
1247 
~ReleaserFlowLock::Releaser1248 		~Releaser() { if (remaining) lock->release(remaining); }
1249 	};
1250 
FlowLockFlowLock1251 	FlowLock() : permits(1), active(0) {}
FlowLockFlowLock1252 	explicit FlowLock(int64_t permits) : permits(permits), active(0) {}
1253 
1254 	Future<Void> take(int taskID = TaskDefaultYield, int64_t amount = 1) {
1255 		if (active + amount <= permits || active == 0) {
1256 			active += amount;
1257 			return safeYieldActor(this, taskID, amount);
1258 		}
1259 		return takeActor(this, taskID, amount);
1260 	}
1261 	void release( int64_t amount = 1 ) {
1262 		ASSERT( (active > 0 || amount == 0) && active - amount >= 0 );
1263 		active -= amount;
1264 
1265 		while( !takers.empty() ) {
1266 			if( active + takers.begin()->second <= permits || active == 0 ) {
1267 				std::pair< Promise<Void>, int64_t > next = std::move( *takers.begin() );
1268 				active += next.second;
1269 				takers.pop_front();
1270 				next.first.send(Void());
1271 			} else {
1272 				break;
1273 			}
1274 		}
1275 	}
1276 
1277 	Future<Void> releaseWhen( Future<Void> const& signal, int amount = 1 ) { return releaseWhenActor( this, signal, amount ); }
1278 
1279 	// returns when any permits are available, having taken as many as possible up to the given amount, and modifies amount to the number of permits taken
takeUpToFlowLock1280 	Future<Void> takeUpTo(int64_t& amount) {
1281 		return takeMoreActor(this, &amount);
1282 	}
1283 
availableFlowLock1284 	int64_t available() const { return permits - active; }
activePermitsFlowLock1285 	int64_t activePermits() const { return active; }
waitersFlowLock1286 	int waiters() const { return takers.size(); }
1287 private:
1288 	std::list< std::pair< Promise<Void>, int64_t > > takers;
1289 	const int64_t permits;
1290 	int64_t active;
1291 	Promise<Void> broken_on_destruct;
1292 
takeActorFlowLock1293 	ACTOR static Future<Void> takeActor(FlowLock* lock, int taskID, int64_t amount) {
1294 		state std::list<std::pair<Promise<Void>, int64_t>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise<Void>(), amount));
1295 
1296 		try {
1297 			wait( it->first.getFuture() );
1298 		} catch (Error& e) {
1299 			if (e.code() == error_code_actor_cancelled) {
1300 				lock->takers.erase(it);
1301 				lock->release(0);
1302 			}
1303 			throw;
1304 		}
1305 		try {
1306 			double duration = BUGGIFY_WITH_PROB(.001) ? g_random->random01()*FLOW_KNOBS->BUGGIFY_FLOW_LOCK_RELEASE_DELAY : 0.0;
1307 			choose{ when(wait(delay(duration, taskID))) {}  // So release()ing the lock doesn't cause arbitrary code to run on the stack
1308 					when(wait(lock->broken_on_destruct.getFuture())) {} }
1309 			return Void();
1310 		} catch (...) {
1311 			TEST(true); // If we get cancelled here, we are holding the lock but the caller doesn't know, so release it
1312 			lock->release(amount);
1313 			throw;
1314 		}
1315 	}
1316 
takeMoreActorFlowLock1317 	ACTOR static Future<Void> takeMoreActor(FlowLock* lock, int64_t* amount) {
1318 		wait(lock->take());
1319 		int64_t extra = std::min( lock->available(), *amount-1 );
1320 		lock->active += extra;
1321 		*amount = 1 + extra;
1322 		return Void();
1323 	}
1324 
safeYieldActorFlowLock1325 	ACTOR static Future<Void> safeYieldActor(FlowLock* lock, int taskID, int64_t amount) {
1326 		try {
1327 			choose{
1328 				when(wait(yield(taskID))) {}
1329 				when(wait(lock->broken_on_destruct.getFuture())) {}
1330 			}
1331 			return Void();
1332 		} catch (Error& e) {
1333 			lock->release(amount);
1334 			throw;
1335 		}
1336 	}
1337 
releaseWhenActorFlowLock1338 	ACTOR static Future<Void> releaseWhenActor( FlowLock* self, Future<Void> signal, int64_t amount ) {
1339 		wait(signal);
1340 		self->release(amount);
1341 		return Void();
1342 	}
1343 };
1344 
1345 ACTOR template <class T>
1346 Future<Void> yieldPromiseStream( FutureStream<T> input, PromiseStream<T> output, int taskID = TaskDefaultYield ) {
1347 	loop {
1348 		T f = waitNext( input );
1349 		output.send( f );
1350 		wait( yield( taskID ) );
1351 	}
1352 }
1353 
1354 
1355 struct YieldedFutureActor : SAV<Void>, ActorCallback<YieldedFutureActor, 1, Void>, FastAllocated<YieldedFutureActor> {
1356 	Error in_error_state;
1357 
1358 	typedef ActorCallback<YieldedFutureActor, 1, Void> CB1;
1359 
1360 	using FastAllocated<YieldedFutureActor>::operator new;
1361 	using FastAllocated<YieldedFutureActor>::operator delete;
1362 
YieldedFutureActorYieldedFutureActor1363 	YieldedFutureActor(Future<Void> && f) : SAV<Void>(1, 1), in_error_state(Error::fromCode(UNSET_ERROR_CODE)) {
1364 		f.addYieldedCallbackAndClear(static_cast< ActorCallback< YieldedFutureActor, 1, Void >* >(this));
1365 	}
1366 
cancelYieldedFutureActor1367 	void cancel()
1368 	{
1369 		if (!SAV<Void>::canBeSet()) return;  // Cancel could be invoked *by* a callback within finish().  Otherwise it's guaranteed that we are waiting either on the original future or on a delay().
1370 		ActorCallback<YieldedFutureActor, 1, Void>::remove();
1371 		SAV<Void>::sendErrorAndDelPromiseRef(actor_cancelled());
1372 	}
1373 
destroyYieldedFutureActor1374 	virtual void destroy() {
1375 		delete this;
1376 	}
1377 
a_callback_fireYieldedFutureActor1378 	void a_callback_fire(ActorCallback<YieldedFutureActor, 1, Void>*, Void) {
1379 		if (int16_t(in_error_state.code()) == UNSET_ERROR_CODE) {
1380 			in_error_state = Error::fromCode(SET_ERROR_CODE);
1381 			if (check_yield())
1382 				doYield();
1383 			else
1384 				finish();
1385 		} else {
1386 			// We hit this case when and only when the delay() created by a previous doYield() fires.  Then we want to get at least one task done, regardless of what check_yield() would say.
1387 			finish();
1388 		}
1389 	}
a_callback_errorYieldedFutureActor1390 	void a_callback_error(ActorCallback<YieldedFutureActor, 1, Void>*, Error const& err) {
1391 		ASSERT(int16_t(in_error_state.code()) == UNSET_ERROR_CODE);
1392 		in_error_state = err;
1393 		if (check_yield())
1394 			doYield();
1395 		else
1396 			finish();
1397 	}
finishYieldedFutureActor1398 	void finish() {
1399 		ActorCallback<YieldedFutureActor, 1, Void>::remove();
1400 		if (int16_t(in_error_state.code()) == SET_ERROR_CODE)
1401 			SAV<Void>::sendAndDelPromiseRef(Void());
1402 		else
1403 			SAV<Void>::sendErrorAndDelPromiseRef(in_error_state);
1404 	}
doYieldYieldedFutureActor1405 	void doYield() {
1406 		// Since we are being fired, we are the first callback in the ring, and `prev` is the source future
1407 		Callback<Void>* source = CB1::prev;
1408 		ASSERT(source->next == static_cast<CB1*>(this));
1409 
1410 		// Remove the source future from the ring.  All the remaining callbacks in the ring should be yielded, since yielded callbacks are installed at the end
1411 		CB1::prev = source->prev;
1412 		CB1::prev->next = static_cast<CB1*>(this);
1413 
1414 		// The source future's ring is now empty, since we have removed all the callbacks
1415 		source->next = source->prev = source;
1416 		source->unwait();
1417 
1418 		// Link all the callbacks, including this one, into the ring of a delay future so that after a short time they will be fired again
1419 		delay(0, g_network->getCurrentTask()).addCallbackChainAndClear(static_cast< CB1* >(this));
1420 	}
1421 };
1422 
yieldedFuture(Future<Void> f)1423 static Future<Void> yieldedFuture(Future<Void> f) {
1424 	if (f.isReady())
1425 		return yield();
1426 	else
1427 		return Future<Void>(new YieldedFutureActor(std::move(f)));
1428 }
1429 
1430 //An AsyncMap that uses a yieldedFuture in its onChange method.
1431 template <class K, class V>
1432 class YieldedAsyncMap : public AsyncMap<K, V> {
1433 public:
onChange(K const & k)1434 	Future<Void> onChange(K const& k) {	// throws broken_promise if this is destroyed
1435 		auto &item = AsyncMap<K, V>::items[k];
1436 		if (item.value == AsyncMap<K, V>::defaultValue)
1437 			return destroyOnCancelYield(this, k, item.change.getFuture());
1438 		return yieldedFuture(item.change.getFuture());
1439 	}
1440 
destroyOnCancelYield(YieldedAsyncMap * self,K key,Future<Void> change)1441 	ACTOR static Future<Void> destroyOnCancelYield( YieldedAsyncMap* self, K key, Future<Void> change ) {
1442 		try {
1443 			wait(yieldedFuture(change));
1444 			return Void();
1445 		} catch (Error& e) {
1446 			if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount() == 1 && change.getPromiseReferenceCount() == 1) {
1447 				if(EXPENSIVE_VALIDATION) {
1448 					auto& p = self->items[key];
1449 					ASSERT(p.change.getFuture() == change);
1450 				}
1451 				self->items.erase(key);
1452 			}
1453 			throw;
1454 		}
1455 	}
1456 };
1457 
1458 ACTOR template <class T>
delayActionJittered(Future<T> what,double time)1459 Future<T> delayActionJittered( Future<T> what, double time ) {
1460 	wait( delayJittered( time ) );
1461 	T t = wait( what );
1462 	return t;
1463 }
1464 
1465 class AndFuture {
1466 public:
AndFuture()1467 	AndFuture() { }
1468 
AndFuture(AndFuture const & f)1469 	AndFuture(AndFuture const& f) {
1470 		futures = f.futures;
1471 	}
1472 
AndFuture(AndFuture && f)1473 	AndFuture(AndFuture&& f) BOOST_NOEXCEPT {
1474 		futures = std::move(f.futures);
1475 	}
1476 
AndFuture(Future<Void> const & f)1477 	AndFuture(Future<Void> const& f) {
1478 		futures.push_back(f);
1479 	}
1480 
AndFuture(Error const & e)1481 	AndFuture(Error const& e) {
1482 		futures.push_back(e);
1483 	}
1484 
1485 	operator Future<Void>() {
1486 		return getFuture();
1487 	}
1488 
1489 	void operator=(AndFuture const& f) {
1490 		futures = f.futures;
1491 	}
1492 
1493 	void operator=(AndFuture&& f) BOOST_NOEXCEPT {
1494 		futures = std::move(f.futures);
1495 	}
1496 
1497 	void operator=(Future<Void> const& f) {
1498 		futures.push_back(f);
1499 	}
1500 
1501 	void operator=(Error const& e) {
1502 		futures.push_back(e);
1503 	}
1504 
getFuture()1505 	Future<Void> getFuture() {
1506 		if(futures.empty())
1507 			return Void();
1508 
1509 		if(futures.size() == 1)
1510 			return futures[0];
1511 
1512 		Future<Void> f = waitForAll(futures);
1513 		futures = std::vector<Future<Void>>();
1514 		futures.push_back(f);
1515 		return f;
1516 	}
1517 
isReady()1518 	bool isReady() {
1519 		for( int i = futures.size() - 1; i >= 0; --i ) {
1520 			if( !futures[i].isReady() ) {
1521 				return false;
1522 			}
1523 			else if(!futures[i].isError()) {
1524 				swapAndPop(&futures, i);
1525 			}
1526 		}
1527 		return true;
1528 	}
1529 
isError()1530 	bool isError() {
1531 		for( int i = 0; i < futures.size(); i++ )
1532 			if( futures[i].isError() )
1533 				return true;
1534 		return false;
1535 	}
1536 
cleanup()1537 	void cleanup() {
1538 		for( int i = 0; i < futures.size(); i++ ) {
1539 			if( futures[i].isReady() && !futures[i].isError() ) {
1540 				swapAndPop(&futures, i--);
1541 			}
1542 		}
1543 	}
1544 
add(Future<Void> const & f)1545 	void add(Future<Void> const& f) {
1546 		if(!f.isReady() || f.isError())
1547 			futures.push_back(f);
1548 	}
1549 
add(AndFuture f)1550 	void add(AndFuture f) {
1551 		add(f.getFuture());
1552 	}
1553 
1554 private:
1555 	std::vector<Future<Void>> futures;
1556 };
1557 
1558 // Performs an unordered merge of a and b.
1559 ACTOR template <class T>
unorderedMergeStreams(FutureStream<T> a,FutureStream<T> b,PromiseStream<T> output)1560 Future<Void> unorderedMergeStreams(FutureStream<T> a, FutureStream<T> b, PromiseStream<T> output) {
1561 	state Future<T> aFuture = waitAndForward(a);
1562 	state Future<T> bFuture = waitAndForward(b);
1563 	state bool aOpen = true;
1564 	state bool bOpen = true;
1565 
1566 	loop{
1567 		try {
1568 			choose {
1569 				when(T val = wait(aFuture)) {
1570 					output.send(val);
1571 					aFuture = waitAndForward(a);
1572 				}
1573 				when(T val = wait(bFuture)) {
1574 					output.send(val);
1575 					bFuture = waitAndForward(b);
1576 				}
1577 			}
1578 		}
1579 		catch (Error &e) {
1580 			if (e.code() != error_code_end_of_stream) {
1581 				output.sendError(e);
1582 				break;
1583 			}
1584 
1585 			ASSERT(!aFuture.isError() || !bFuture.isError() || aFuture.getError().code() == bFuture.getError().code());
1586 
1587 			if (aFuture.isError()) {
1588 				aFuture = Never();
1589 				aOpen = false;
1590 			}
1591 			if (bFuture.isError()) {
1592 				bFuture = Never();
1593 				bOpen = false;
1594 			}
1595 
1596 			if (!aOpen && !bOpen) {
1597 				output.sendError(e);
1598 				break;
1599 			}
1600 		}
1601 	}
1602 
1603 	return Void();
1604 }
1605 
1606 // Returns the ordered merge of a and b, assuming that a and b are both already ordered (prefer a over b if keys are equal). T must be a class that implements compare()
1607 ACTOR template <class T>
orderedMergeStreams(FutureStream<T> a,FutureStream<T> b,PromiseStream<T> output)1608 Future<Void> orderedMergeStreams( FutureStream<T> a, FutureStream<T> b, PromiseStream<T> output ) {
1609 	state Optional<T> savedKVa;
1610 	state bool aOpen;
1611 	state Optional<T> savedKVb;
1612 	state bool bOpen;
1613 
1614 	aOpen = bOpen = true;
1615 
1616 	loop {
1617 		if ( aOpen && !savedKVa.present() ) {
1618 			try {
1619 				T KVa = waitNext( a );
1620 				savedKVa = Optional<T>( KVa );
1621 			} catch ( Error& e ) {
1622 				if ( e.code() == error_code_end_of_stream ) {
1623 					aOpen = false;
1624 					if (!bOpen) {
1625 						output.sendError(e);
1626 					}
1627 				} else {
1628 					output.sendError(e);
1629 					break;
1630 				}
1631 			}
1632 		}
1633 		if ( bOpen && !savedKVb.present() ) {
1634 			try {
1635 				T KVb = waitNext( b );
1636 				savedKVb = Optional<T>( KVb );
1637 			} catch ( Error& e ) {
1638 				if ( e.code() == error_code_end_of_stream ) {
1639 					bOpen = false;
1640 					if (!aOpen) {
1641 						output.sendError(e);
1642 					}
1643 				} else {
1644 					output.sendError(e);
1645 					break;
1646 				}
1647 			}
1648 		}
1649 
1650 		if (!aOpen) {
1651 			output.send( savedKVb.get() );
1652 			savedKVb = Optional<T>();
1653 		} else if (!bOpen) {
1654 			output.send( savedKVa.get() );
1655 			savedKVa = Optional<T>();
1656 		} else {
1657 			int cmp = savedKVa.get().compare( savedKVb.get() );
1658 
1659 			if ( cmp == 0 ) {
1660 				// prefer a
1661 				output.send( savedKVa.get() );
1662 				savedKVa = Optional<T>();
1663 				savedKVb = Optional<T>();
1664 			} else if ( cmp < 0 ) {
1665 				output.send( savedKVa.get() );
1666 				savedKVa = Optional<T>();
1667 			} else {
1668 				output.send( savedKVb.get() );
1669 				savedKVb = Optional<T>();
1670 			}
1671 		}
1672 	}
1673 
1674 	return Void();
1675 }
1676 
1677 
1678 ACTOR template<class T>
timeReply(Future<T> replyToTime,PromiseStream<double> timeOutput)1679 Future<Void> timeReply(Future<T> replyToTime, PromiseStream<double> timeOutput){
1680 	state double startTime = now();
1681 	try {
1682 		T _ = wait(replyToTime);
1683 		wait( delay(0) );
1684 		timeOutput.send(now() - startTime);
1685 	} catch( Error &e ) {
1686 		// Ignore broken promises.  They typically occur during shutdown and our callers don't want to have to create brokenPromiseToNever actors to ignore them.  For what it's worth we are breaking timeOutput to pass the pain along.
1687 		if( e.code() != error_code_broken_promise )
1688 			throw;
1689 	}
1690 	return Void();
1691 }
1692 
1693 #include "flow/unactorcompiler.h"
1694 
1695 #endif
1696