1 /*
2  * ActorCollection.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef FDBSERVER_ACTORCOLLECTION_H
22 #define FDBSERVER_ACTORCOLLECTION_H
23 #pragma once
24 
25 #include "flow/flow.h"
26 
27 // actorCollection
28 //   - Can add a future at any time
29 //   - Cancels all futures in deterministic order if cancelled
30 //   - Throws an error immediately if any future throws an error
31 //   - Never returns otherwise, unless returnWhenEmptied=true in which case returns the first time it goes from count 1 to count 0 futures
32 //   - Uses memory proportional to the number of unready futures added (i.e. memory
33 //     is freed promptly when an actor in the collection returns)
34 Future<Void> actorCollection( FutureStream<Future<Void>> const& addActor, int* const& optionalCountPtr = NULL, double* const& lastChangeTime = NULL, double* const& idleTime = NULL, double* const& allTime = NULL, bool const& returnWhenEmptied=false );
35 
36 // ActorCollectionNoErrors is an easy-to-use wrapper for actorCollection() when you know that no errors will
37 // be thrown by the actors (e.g. because they are wrapped with individual error reporters).
38 struct ActorCollectionNoErrors : NonCopyable {
39 private:
40 	Future<Void> m_ac;
41 	PromiseStream<Future<Void>> m_add;
42 	int m_size;
initActorCollectionNoErrors43 	void init() { m_size = 0; m_ac = actorCollection(m_add.getFuture(), &m_size); }
44 public:
ActorCollectionNoErrorsActorCollectionNoErrors45 	ActorCollectionNoErrors() { init(); }
clearActorCollectionNoErrors46 	void clear() { m_ac=Future<Void>(); init(); }
addActorCollectionNoErrors47 	void add( Future<Void> actor ) { m_add.send(actor); }
sizeActorCollectionNoErrors48 	int size() const { return m_size; }
49 };
50 
51 // Easy-to-use wrapper that permits getting the result (error or returnWhenEmptied) from actorCollection
52 class ActorCollection : NonCopyable {
53 	PromiseStream<Future<Void>> m_add;
54 	Future<Void> m_out;
55 
56 public:
ActorCollection(bool returnWhenEmptied)57 	explicit ActorCollection( bool returnWhenEmptied ) {
58 		m_out = actorCollection(m_add.getFuture(), NULL, NULL, NULL, NULL, returnWhenEmptied );
59 	}
60 
add(Future<Void> a)61 	void add( Future<Void> a ) { m_add.send(a); }
getResult()62 	Future<Void> getResult() { return m_out; }
clear(bool returnWhenEmptied)63 	void clear( bool returnWhenEmptied ) { m_out.cancel(); m_out = actorCollection(m_add.getFuture(), NULL, NULL, NULL, NULL, returnWhenEmptied ); }
64 };
65 
66 class SignalableActorCollection : NonCopyable {
67 	PromiseStream<Future<Void>> m_add;
68 	Promise<Void> stopSignal;
69 	Future<Void> m_out;
70 
init()71 	void init() {
72 		PromiseStream<Future<Void>> addStream;
73 		m_out = actorCollection(addStream.getFuture(), NULL, NULL, NULL, NULL, true);
74 		m_add = addStream;
75 		stopSignal = Promise<Void>();
76 		m_add.send(stopSignal.getFuture());
77 	}
78 
79 public:
SignalableActorCollection()80 	explicit SignalableActorCollection() {
81 		init();
82 	}
83 
signal()84 	Future<Void> signal() {
85 		stopSignal.send(Void());
86 		Future<Void> result = holdWhile(m_add, m_out);
87 		return result;
88 	}
89 
signalAndReset()90 	Future<Void> signalAndReset() {
91 		Future<Void> result = signal();
92 		clear();
93 		return result;
94 	}
95 
signalAndCollapse()96 	Future<Void> signalAndCollapse() {
97 		Future<Void> result = signalAndReset();
98 		add(result);
99 		return result;
100 	}
101 
add(Future<Void> a)102 	void add(Future<Void> a) { m_add.send(a); }
clear()103 	void clear() { init(); }
104 };
105 
106 #endif