1 /*
2  * Notified.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef FDBCLIENT_NOTIFIED_H
22 #define FDBCLIENT_NOTIFIED_H
23 #pragma once
24 
25 #include "fdbclient/FDBTypes.h"
26 #include "flow/TDMetric.actor.h"
27 
28 struct NotifiedVersion {
valNotifiedVersion29 	NotifiedVersion( StringRef& name, StringRef const &id, Version version = 0 ) : val(name, id, version) { val = version; }
valNotifiedVersion30 	NotifiedVersion( Version version = 0 ) : val(StringRef(), StringRef(), version) {}
31 
initMetricNotifiedVersion32 	void initMetric(const StringRef& name, const StringRef &id) {
33 		Version version = val;
34 		val.init(name, id);
35 		val = version;
36 	}
37 
whenAtLeastNotifiedVersion38 	Future<Void> whenAtLeast( Version limit ) {
39 		if (val >= limit)
40 			return Void();
41 		Promise<Void> p;
42 		waiting.push( std::make_pair(limit,p) );
43 		return p.getFuture();
44 	}
45 
getNotifiedVersion46 	Version get() const { return val; }
47 
setNotifiedVersion48 	void set( Version v ) {
49 		ASSERT( v >= val );
50 		if (v != val) {
51 			val = v;
52 
53 			std::vector<Promise<Void>> toSend;
54 			while ( waiting.size() && v >= waiting.top().first ) {
55 				Promise<Void> p = std::move(waiting.top().second);
56 				waiting.pop();
57 				toSend.push_back(p);
58 			}
59 			for(auto& p : toSend) {
60 				p.send(Void());
61 			}
62 		}
63 	}
64 
65 	void operator=( Version v ) {
66 		set( v );
67 	}
68 
NotifiedVersionNotifiedVersion69 	NotifiedVersion(NotifiedVersion&& r) BOOST_NOEXCEPT : waiting(std::move(r.waiting)), val(std::move(r.val)) {}
70 	void operator=(NotifiedVersion&& r) BOOST_NOEXCEPT { waiting = std::move(r.waiting); val = std::move(r.val); }
71 
72 private:
73 	typedef std::pair<Version,Promise<Void>> Item;
74 	struct ItemCompare {
operatorNotifiedVersion::ItemCompare75 		bool operator()(const Item& a, const Item& b) { return a.first > b.first; }
76 	};
77 	std::priority_queue<Item, std::vector<Item>, ItemCompare> waiting;
78 	VersionMetricHandle val;
79 };
80 
81 #endif
82