1 /*
2  * genericactors.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "flow/flow.h"
22 #include "flow/actorcompiler.h"  // This must be the last #include.
23 
allTrue(std::vector<Future<bool>> all)24 ACTOR Future<bool> allTrue( std::vector<Future<bool>> all ) {
25 	state int i=0;
26 	while (i != all.size()) {
27 		bool r = wait( all[i] );
28 		if (!r) return false;
29 		i++;
30 	}
31 	return true;
32 }
33 
anyTrue(std::vector<Reference<AsyncVar<bool>>> input,Reference<AsyncVar<bool>> output)34 ACTOR Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> input, Reference<AsyncVar<bool>> output ) {
35 	loop {
36 		bool oneTrue = false;
37 		std::vector<Future<Void>> changes;
38 		for(auto it : input) {
39 			if( it->get() ) oneTrue = true;
40 			changes.push_back( it->onChange() );
41 		}
42 		output->set( oneTrue );
43 		wait( waitForAny(changes) );
44 	}
45 }
46 
cancelOnly(std::vector<Future<Void>> futures)47 ACTOR Future<Void> cancelOnly( std::vector<Future<Void>> futures ) {
48 	// We don't do anything with futures except hold them, we never return, but if we are cancelled we (naturally) drop the futures
49 	wait( Never() );
50 	return Void();
51 }
52 
timeoutWarningCollector(FutureStream<Void> input,double logDelay,const char * context,UID id)53 ACTOR Future<Void> timeoutWarningCollector( FutureStream<Void> input, double logDelay, const char* context, UID id ) {
54 	state uint64_t counter = 0;
55 	state Future<Void> end = delay( logDelay );
56 	loop choose {
57 		when ( waitNext( input ) ) {
58 			counter++;
59 		}
60 		when ( wait( end ) ) {
61 			if( counter )
62 				TraceEvent(SevWarn, context, id).detail("LateProcessCount", counter).detail("LoggingDelay", logDelay);
63 			end = delay( logDelay );
64 			counter = 0;
65 		}
66 	}
67 }
68 
quorumEqualsTrue(std::vector<Future<bool>> futures,int required)69 ACTOR Future<bool> quorumEqualsTrue( std::vector<Future<bool>> futures, int required ) {
70 	state std::vector< Future<Void> > true_futures;
71 	state std::vector< Future<Void> > false_futures;
72 	for(int i=0; i<futures.size(); i++) {
73 		true_futures.push_back( onEqual( futures[i], true ) );
74 		false_futures.push_back( onEqual( futures[i], false ) );
75 	}
76 
77 	choose {
78 		when( wait( quorum( true_futures, required ) ) ) {
79 			return true;
80 		}
81 		when( wait( quorum( false_futures, futures.size() - required + 1 ) ) ) {
82 			return false;
83 		}
84 	}
85 }
86