1 /* 2 * Notified.h 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #ifndef FDBCLIENT_NOTIFIED_H 22 #define FDBCLIENT_NOTIFIED_H 23 #pragma once 24 25 #include "fdbclient/FDBTypes.h" 26 #include "flow/TDMetric.actor.h" 27 28 struct NotifiedVersion { valNotifiedVersion29 NotifiedVersion( StringRef& name, StringRef const &id, Version version = 0 ) : val(name, id, version) { val = version; } valNotifiedVersion30 NotifiedVersion( Version version = 0 ) : val(StringRef(), StringRef(), version) {} 31 initMetricNotifiedVersion32 void initMetric(const StringRef& name, const StringRef &id) { 33 Version version = val; 34 val.init(name, id); 35 val = version; 36 } 37 whenAtLeastNotifiedVersion38 Future<Void> whenAtLeast( Version limit ) { 39 if (val >= limit) 40 return Void(); 41 Promise<Void> p; 42 waiting.push( std::make_pair(limit,p) ); 43 return p.getFuture(); 44 } 45 getNotifiedVersion46 Version get() const { return val; } 47 setNotifiedVersion48 void set( Version v ) { 49 ASSERT( v >= val ); 50 if (v != val) { 51 val = v; 52 53 std::vector<Promise<Void>> toSend; 54 while ( waiting.size() && v >= waiting.top().first ) { 55 Promise<Void> p = std::move(waiting.top().second); 56 waiting.pop(); 57 toSend.push_back(p); 58 } 59 for(auto& p : toSend) { 60 p.send(Void()); 61 } 62 } 63 } 64 65 void operator=( Version v ) { 66 set( v ); 67 } 68 NotifiedVersionNotifiedVersion69 NotifiedVersion(NotifiedVersion&& r) BOOST_NOEXCEPT : waiting(std::move(r.waiting)), val(std::move(r.val)) {} 70 void operator=(NotifiedVersion&& r) BOOST_NOEXCEPT { waiting = std::move(r.waiting); val = std::move(r.val); } 71 72 private: 73 typedef std::pair<Version,Promise<Void>> Item; 74 struct ItemCompare { operatorNotifiedVersion::ItemCompare75 bool operator()(const Item& a, const Item& b) { return a.first > b.first; } 76 }; 77 std::priority_queue<Item, std::vector<Item>, ItemCompare> waiting; 78 VersionMetricHandle val; 79 }; 80 81 #endif 82