1 /*
2  * StorageMetrics.actor.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 // Included via StorageMetrics.h
22 #include "fdbclient/FDBTypes.h"
23 #include "fdbrpc/simulator.h"
24 #include "flow/UnitTest.h"
25 #include "fdbclient/StorageServerInterface.h"
26 #include "fdbclient/KeyRangeMap.h"
27 #include "fdbserver/Knobs.h"
28 #include "flow/actorcompiler.h"  // This must be the last #include.
29 
30 struct StorageMetricSample {
31 	IndexedSet<Key, int64_t> sample;
32 	int64_t metricUnitsPerSample;
33 
StorageMetricSampleStorageMetricSample34 	StorageMetricSample( int64_t metricUnitsPerSample ) : metricUnitsPerSample(metricUnitsPerSample) {}
35 
getEstimateStorageMetricSample36 	int64_t getEstimate( KeyRangeRef keys ) const {
37 		return sample.sumRange( keys.begin, keys.end );
38 	}
39 	KeyRef splitEstimate( KeyRangeRef range, int64_t offset, bool front = true ) const {
40 		auto fwd_split = sample.index( front ? sample.sumTo(sample.lower_bound(range.begin)) + offset : sample.sumTo(sample.lower_bound(range.end)) - offset );
41 
42 		if( fwd_split == sample.end() || *fwd_split >= range.end )
43 			return range.end;
44 
45 		if( !front && *fwd_split <= range.begin )
46 			return range.begin;
47 
48 		auto bck_split = fwd_split;
49 
50 		// Butterfly search - start at midpoint then go in both directions.
51 		while ((fwd_split != sample.end() && *fwd_split < range.end) ||
52 			   (bck_split != sample.begin() && *bck_split > range.begin)) {
53 			if (bck_split != sample.begin() && *bck_split > range.begin) {
54 				auto it = bck_split;
55 				bck_split.decrementNonEnd();
56 
57 				KeyRef split = keyBetween(KeyRangeRef(bck_split != sample.begin() ? std::max<KeyRef>(*bck_split,range.begin) : range.begin, *it));
58 				if(!front || (getEstimate(KeyRangeRef(range.begin, split)) > 0 && split.size() <= CLIENT_KNOBS->SPLIT_KEY_SIZE_LIMIT))
59 					return split;
60 			}
61 
62 			if (fwd_split != sample.end() && *fwd_split < range.end) {
63 				auto it = fwd_split;
64 				++it;
65 
66 				KeyRef split = keyBetween(KeyRangeRef(*fwd_split, it != sample.end() ? std::min<KeyRef>(*it, range.end) : range.end));
67 				if(front || (getEstimate(KeyRangeRef(split, range.end)) > 0 && split.size() <= CLIENT_KNOBS->SPLIT_KEY_SIZE_LIMIT))
68 					return split;
69 
70 				fwd_split = it;
71 			}
72 
73 		}
74 
75 		// If we didn't return above, we didn't find anything.
76 		TraceEvent(SevWarn, "CannotSplitLastSampleKey").detail("Range", range).detail("Offset", offset);
77 		return front ? range.end : range.begin;
78 	}
79 };
80 
81 TEST_CASE("/fdbserver/StorageMetricSample/simple") {
82 	StorageMetricSample s( 1000 );
83 	s.sample.insert(LiteralStringRef("Apple"), 1000);
84 	s.sample.insert(LiteralStringRef("Banana"), 2000);
85 	s.sample.insert(LiteralStringRef("Cat"), 1000);
86 	s.sample.insert(LiteralStringRef("Cathode"), 1000);
87 	s.sample.insert(LiteralStringRef("Dog"), 1000);
88 
89 	ASSERT(s.getEstimate(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D"))) == 5000);
90 	ASSERT(s.getEstimate(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("E"))) == 6000);
91 	ASSERT(s.getEstimate(KeyRangeRef(LiteralStringRef("B"), LiteralStringRef("C"))) == 2000);
92 
93 	//ASSERT(s.splitEstimate(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D")), 3500) == LiteralStringRef("Cat"));
94 
95 	return Void();
96 }
97 
98 struct TransientStorageMetricSample : StorageMetricSample {
99 	Deque< std::pair<double, std::pair<Key, int64_t>> > queue;
100 
TransientStorageMetricSampleTransientStorageMetricSample101 	TransientStorageMetricSample( int64_t metricUnitsPerSample ) : StorageMetricSample(metricUnitsPerSample) {}
102 
rollTransientStorageMetricSample103 	bool roll( KeyRef key, int64_t metric ) {
104 		return g_random->random01() < (double)metric / metricUnitsPerSample;	//< SOMEDAY: Better randomInt64?
105 	}
106 
107 	// Returns the sampled metric value (possibly 0, possibly increased by the sampling factor)
addAndExpireTransientStorageMetricSample108 	int64_t addAndExpire( KeyRef key, int64_t metric, double expiration ) {
109 		int64_t x = add( key, metric );
110 		if (x)
111 			queue.push_back( std::make_pair( expiration, std::make_pair( *sample.find(key), -x ) ) );
112 		return x;
113 	}
114 
115 	//FIXME: both versions of erase are broken, because they do not remove items in the queue with will subtract a metric from the value sometime in the future
eraseTransientStorageMetricSample116 	int64_t erase( KeyRef key ) {
117 		auto it = sample.find(key);
118 		if (it == sample.end()) return 0;
119 		int64_t x = sample.getMetric(it);
120 		sample.erase(it);
121 		return x;
122 	}
eraseTransientStorageMetricSample123 	void erase( KeyRangeRef keys ) {
124 		sample.erase( keys.begin, keys.end );
125 	}
126 
pollTransientStorageMetricSample127 	void poll(KeyRangeMap< vector< PromiseStream< StorageMetrics > > > & waitMap, StorageMetrics m) {
128 		double now = ::now();
129 		while (queue.size() &&
130 				queue.front().first <= now )
131 		{
132 			KeyRef key = queue.front().second.first;
133 			int64_t delta = queue.front().second.second;
134 			ASSERT( delta != 0 );
135 
136 			if( sample.addMetric( key, delta ) == 0 )
137 				sample.erase( key );
138 
139 			StorageMetrics deltaM = m * delta;
140 			auto v = waitMap[key];
141 			for(int i=0; i<v.size(); i++) {
142 				TEST( true ); // TransientStorageMetricSample poll update
143 				v[i].send( deltaM );
144 			}
145 
146 			queue.pop_front();
147 		}
148 	}
149 
pollTransientStorageMetricSample150 	void poll() {
151 		double now = ::now();
152 		while (queue.size() &&
153 				queue.front().first <= now )
154 		{
155 			KeyRef key = queue.front().second.first;
156 			int64_t delta = queue.front().second.second;
157 			ASSERT( delta != 0 );
158 
159 			if( sample.addMetric( key, delta ) == 0 )
160 				sample.erase( key );
161 
162 			queue.pop_front();
163 		}
164 	}
165 
166 private:
addTransientStorageMetricSample167 	int64_t add( KeyRef key, int64_t metric ) {
168 		if (!metric) return 0;
169 		int64_t mag = metric<0 ? -metric : metric;
170 
171 		if (mag < metricUnitsPerSample) {
172 			if ( !roll(key, mag) )
173 				return 0;
174 			metric = metric<0 ? -metricUnitsPerSample : metricUnitsPerSample;
175 		}
176 
177 		if( sample.addMetric( key, metric ) == 0 )
178 			sample.erase( key );
179 
180 		return metric;
181 	}
182 };
183 
184 struct StorageServerMetrics {
185 	KeyRangeMap< vector< PromiseStream< StorageMetrics > > > waitMetricsMap;
186 	StorageMetricSample byteSample;
187 	TransientStorageMetricSample iopsSample, bandwidthSample;	// FIXME: iops and bandwidth calculations are not effectively tested, since they aren't currently used by data distribution
188 
StorageServerMetricsStorageServerMetrics189 	StorageServerMetrics()
190 		: byteSample( 0 ), iopsSample( SERVER_KNOBS->IOPS_UNITS_PER_SAMPLE ), bandwidthSample( SERVER_KNOBS->BANDWIDTH_UNITS_PER_SAMPLE )
191 	{
192 	}
193 
194 	// Get the current estimated metrics for the given keys
getMetricsStorageServerMetrics195 	StorageMetrics getMetrics( KeyRangeRef const& keys ) {
196 		StorageMetrics result;
197 		result.bytes = byteSample.getEstimate( keys );
198 		result.bytesPerKSecond = bandwidthSample.getEstimate( keys ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
199 		result.iosPerKSecond = iopsSample.getEstimate( keys ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
200 		return result;
201 	}
202 
203 	// Called when metrics should change (IO for a given key)
204 	// Notifies waiting WaitMetricsRequests through waitMetricsMap, and updates metricsAverageQueue and metricsSampleMap
notifyStorageServerMetrics205 	void notify( KeyRef key, StorageMetrics& metrics ) {
206 		ASSERT (metrics.bytes == 0); // ShardNotifyMetrics
207 		TEST (metrics.bytesPerKSecond != 0); // ShardNotifyMetrics
208 		TEST (metrics.iosPerKSecond != 0); // ShardNotifyMetrics
209 
210 		double expire = now() + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL;
211 
212 		StorageMetrics notifyMetrics;
213 
214 		if (metrics.bytesPerKSecond)
215 			notifyMetrics.bytesPerKSecond = bandwidthSample.addAndExpire( key, metrics.bytesPerKSecond, expire ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
216 		if (metrics.iosPerKSecond)
217 			notifyMetrics.iosPerKSecond = iopsSample.addAndExpire( key, metrics.iosPerKSecond, expire ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
218 		if (!notifyMetrics.allZero()) {
219 			auto& v = waitMetricsMap[key];
220 			for(int i=0; i<v.size(); i++) {
221 				TEST( true ); // ShardNotifyMetrics
222 				v[i].send( notifyMetrics );
223 			}
224 		}
225 	}
226 
227 	// Called by StorageServerDisk when the size of a key in byteSample changes, to notify WaitMetricsRequest
228 	// Should not be called for keys past allKeys.end
notifyBytesStorageServerMetrics229 	void notifyBytes( RangeMap<Key, std::vector<PromiseStream<StorageMetrics>>, KeyRangeRef>::Iterator shard, int64_t bytes ) {
230 		ASSERT(shard.end() <= allKeys.end);
231 
232 		StorageMetrics notifyMetrics;
233 		notifyMetrics.bytes = bytes;
234 		for(int i=0; i < shard.value().size(); i++) {
235 			TEST( true ); // notifyBytes
236 			shard.value()[i].send( notifyMetrics );
237 		}
238 	}
239 
240 	// Called by StorageServerDisk when the size of a key in byteSample changes, to notify WaitMetricsRequest
notifyBytesStorageServerMetrics241 	void notifyBytes( KeyRef key, int64_t bytes ) {
242 		if( key >= allKeys.end ) //Do not notify on changes to internal storage server state
243 			return;
244 
245 		notifyBytes(waitMetricsMap.rangeContaining(key), bytes);
246 	}
247 
248 	// Called when a range of keys becomes unassigned (and therefore not readable), to notify waiting WaitMetricsRequests (also other types of wait
249 	//   requests in the future?)
notifyNotReadableStorageServerMetrics250 	void notifyNotReadable( KeyRangeRef keys ) {
251 		auto rs = waitMetricsMap.intersectingRanges(keys);
252 		for (auto r = rs.begin(); r != rs.end(); ++r){
253 			auto &v = r->value();
254 			TEST( v.size() );  // notifyNotReadable() sending errors to intersecting ranges
255 			for (int n=0; n<v.size(); n++)
256 				v[n].sendError( wrong_shard_server() );
257 		}
258 	}
259 
260 	// Called periodically (~1 sec intervals) to remove older IOs from the averages
261 	// Removes old entries from metricsAverageQueue, updates metricsSampleMap accordingly, and notifies
262 	//   WaitMetricsRequests through waitMetricsMap.
pollStorageServerMetrics263 	void poll() {
264 		{ StorageMetrics m; m.bytesPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; bandwidthSample.poll(waitMetricsMap, m); }
265 		{ StorageMetrics m; m.iosPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; iopsSample.poll(waitMetricsMap, m); }
266 		// bytesSample doesn't need polling because we never call addExpire() on it
267 	}
268 
269 	//static void waitMetrics( StorageServerMetrics* const& self, WaitMetricsRequest const& req );
270 
271 	// This function can run on untrusted user data.  We must validate all divisions carefully.
getSplitKeyStorageServerMetrics272 	KeyRef getSplitKey( int64_t remaining, int64_t estimated, int64_t limits, int64_t used, int64_t infinity,
273 		bool isLastShard, StorageMetricSample& sample, double divisor, KeyRef const& lastKey, KeyRef const& key, bool hasUsed )
274 	{
275 		ASSERT(remaining >= 0);
276 		ASSERT(limits > 0);
277 		ASSERT(divisor > 0);
278 
279 		if( limits < infinity / 2 ) {
280 			int64_t expectedSize;
281 			if( isLastShard || remaining > estimated ) {
282 				double remaining_divisor = ( double( remaining ) / limits ) + 0.5;
283 				expectedSize = remaining / remaining_divisor;
284 			} else {
285 				// If we are here, then estimated >= remaining >= 0
286 				double estimated_divisor = ( double( estimated ) / limits ) + 0.5;
287 				expectedSize = remaining / estimated_divisor;
288 			}
289 
290 			if( remaining > expectedSize ) {
291 				// This does the conversion from native units to bytes using the divisor.
292 				double offset = (expectedSize - used) / divisor;
293 				if( offset <= 0 )
294 					return hasUsed ? lastKey : key;
295 				return sample.splitEstimate( KeyRangeRef(lastKey, key), offset * ( ( 1.0 - SERVER_KNOBS->SPLIT_JITTER_AMOUNT ) + 2 * g_random->random01() * SERVER_KNOBS->SPLIT_JITTER_AMOUNT ) );
296 			}
297 		}
298 
299 		return key;
300 	}
301 
splitMetricsStorageServerMetrics302 	void splitMetrics( SplitMetricsRequest req ) {
303 		try {
304 			SplitMetricsReply reply;
305 			KeyRef lastKey = req.keys.begin;
306 			StorageMetrics used = req.used;
307 			StorageMetrics estimated = req.estimated;
308 			StorageMetrics remaining = getMetrics( req.keys ) + used;
309 
310 			//TraceEvent("SplitMetrics").detail("Begin", req.keys.begin).detail("End", req.keys.end).detail("Remaining", remaining.bytes).detail("Used", used.bytes);
311 
312 			while( true ) {
313 				if( remaining.bytes < 2*SERVER_KNOBS->MIN_SHARD_BYTES )
314 					break;
315 				KeyRef key = req.keys.end;
316 				bool hasUsed = used.bytes != 0 || used.bytesPerKSecond != 0 || used.iosPerKSecond != 0;
317 				key = getSplitKey( remaining.bytes, estimated.bytes, req.limits.bytes, used.bytes,
318 					req.limits.infinity, req.isLastShard, byteSample, 1, lastKey, key, hasUsed );
319 				if( used.bytes < SERVER_KNOBS->MIN_SHARD_BYTES )
320 					key = std::max( key, byteSample.splitEstimate( KeyRangeRef(lastKey, req.keys.end), SERVER_KNOBS->MIN_SHARD_BYTES - used.bytes ) );
321 				key = getSplitKey( remaining.iosPerKSecond, estimated.iosPerKSecond, req.limits.iosPerKSecond, used.iosPerKSecond,
322 					req.limits.infinity, req.isLastShard, iopsSample, SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, lastKey, key, hasUsed );
323 				key = getSplitKey( remaining.bytesPerKSecond, estimated.bytesPerKSecond, req.limits.bytesPerKSecond, used.bytesPerKSecond,
324 					req.limits.infinity, req.isLastShard, bandwidthSample, SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, lastKey, key, hasUsed );
325 				ASSERT( key != lastKey || hasUsed);
326 				if( key == req.keys.end )
327 					break;
328 				reply.splits.push_back_deep( reply.splits.arena(), key );
329 
330 				StorageMetrics diff = (getMetrics( KeyRangeRef(lastKey, key) ) + used);
331 				remaining -= diff;
332 				estimated -= diff;
333 
334 				used = StorageMetrics();
335 				lastKey = key;
336 			}
337 
338 			reply.used = getMetrics( KeyRangeRef(lastKey, req.keys.end) ) + used;
339 			req.reply.send(reply);
340 		} catch (Error& e) {
341 			req.reply.sendError(e);
342 		}
343 	}
344 
getPhysicalMetricsStorageServerMetrics345 	void getPhysicalMetrics( GetPhysicalMetricsRequest req, StorageBytes sb ){
346 		GetPhysicalMetricsReply rep;
347 
348 		// SOMEDAY: make bytes dynamic with hard disk space
349 		rep.load = getMetrics(allKeys);
350 
351 		if (sb.free < 1e9 && g_random->random01() < 0.1)
352 			TraceEvent(SevWarn, "PhysicalDiskMetrics")
353 				.detail("Free", sb.free)
354 				.detail("Total", sb.total)
355 				.detail("Available", sb.available)
356 				.detail("Load", rep.load.bytes);
357 
358 		rep.free.bytes = sb.free;
359 		rep.free.iosPerKSecond = 10e6;
360 		rep.free.bytesPerKSecond = 100e9;
361 
362 		rep.capacity.bytes = sb.total;
363 		rep.capacity.iosPerKSecond = 10e6;
364 		rep.capacity.bytesPerKSecond = 100e9;
365 
366 		req.reply.send(rep);
367 	}
368 
369 	Future<Void> waitMetrics(WaitMetricsRequest req, Future<Void> delay);
370 
371 private:
collapseStorageServerMetrics372 	static void collapse( KeyRangeMap<int>& map, KeyRef const& key ) {
373 		auto range = map.rangeContaining(key);
374 		if (range == map.ranges().begin() || range == map.ranges().end()) return;
375 		int value = range->value();
376 		auto prev = range; --prev;
377 		if (prev->value() != value) return;
378 		KeyRange keys = KeyRangeRef( prev->begin(), range->end() );
379 		map.insert( keys, value );
380 	}
381 
addStorageServerMetrics382 	static void add( KeyRangeMap<int>& map, KeyRangeRef const& keys, int delta ) {
383 		auto rs = map.modify(keys);
384 		for(auto r = rs.begin(); r != rs.end(); ++r)
385 			r->value() += delta;
386 		collapse( map, keys.begin );
387 		collapse( map, keys.end );
388 	}
389 };
390 
391 //Contains information about whether or not a key-value pair should be included in a byte sample
392 //Also contains size information about the byte sample
393 struct ByteSampleInfo {
394 	bool inSample;
395 
396 	//Actual size of the key value pair
397 	int64_t size;
398 
399 	//The recorded size of the sample (max of bytesPerSample, size)
400 	int64_t sampledSize;
401 };
402 
403 //Determines whether a key-value pair should be included in a byte sample
404 //Also returns size information about the sample
405 ByteSampleInfo isKeyValueInSample(KeyValueRef keyValue);
406 
407 #include "flow/unactorcompiler.h"
408