1 /* 2 * StorageMetrics.actor.h 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 // Included via StorageMetrics.h 22 #include "fdbclient/FDBTypes.h" 23 #include "fdbrpc/simulator.h" 24 #include "flow/UnitTest.h" 25 #include "fdbclient/StorageServerInterface.h" 26 #include "fdbclient/KeyRangeMap.h" 27 #include "fdbserver/Knobs.h" 28 #include "flow/actorcompiler.h" // This must be the last #include. 29 30 struct StorageMetricSample { 31 IndexedSet<Key, int64_t> sample; 32 int64_t metricUnitsPerSample; 33 StorageMetricSampleStorageMetricSample34 StorageMetricSample( int64_t metricUnitsPerSample ) : metricUnitsPerSample(metricUnitsPerSample) {} 35 getEstimateStorageMetricSample36 int64_t getEstimate( KeyRangeRef keys ) const { 37 return sample.sumRange( keys.begin, keys.end ); 38 } 39 KeyRef splitEstimate( KeyRangeRef range, int64_t offset, bool front = true ) const { 40 auto fwd_split = sample.index( front ? sample.sumTo(sample.lower_bound(range.begin)) + offset : sample.sumTo(sample.lower_bound(range.end)) - offset ); 41 42 if( fwd_split == sample.end() || *fwd_split >= range.end ) 43 return range.end; 44 45 if( !front && *fwd_split <= range.begin ) 46 return range.begin; 47 48 auto bck_split = fwd_split; 49 50 // Butterfly search - start at midpoint then go in both directions. 51 while ((fwd_split != sample.end() && *fwd_split < range.end) || 52 (bck_split != sample.begin() && *bck_split > range.begin)) { 53 if (bck_split != sample.begin() && *bck_split > range.begin) { 54 auto it = bck_split; 55 bck_split.decrementNonEnd(); 56 57 KeyRef split = keyBetween(KeyRangeRef(bck_split != sample.begin() ? std::max<KeyRef>(*bck_split,range.begin) : range.begin, *it)); 58 if(!front || (getEstimate(KeyRangeRef(range.begin, split)) > 0 && split.size() <= CLIENT_KNOBS->SPLIT_KEY_SIZE_LIMIT)) 59 return split; 60 } 61 62 if (fwd_split != sample.end() && *fwd_split < range.end) { 63 auto it = fwd_split; 64 ++it; 65 66 KeyRef split = keyBetween(KeyRangeRef(*fwd_split, it != sample.end() ? std::min<KeyRef>(*it, range.end) : range.end)); 67 if(front || (getEstimate(KeyRangeRef(split, range.end)) > 0 && split.size() <= CLIENT_KNOBS->SPLIT_KEY_SIZE_LIMIT)) 68 return split; 69 70 fwd_split = it; 71 } 72 73 } 74 75 // If we didn't return above, we didn't find anything. 76 TraceEvent(SevWarn, "CannotSplitLastSampleKey").detail("Range", range).detail("Offset", offset); 77 return front ? range.end : range.begin; 78 } 79 }; 80 81 TEST_CASE("/fdbserver/StorageMetricSample/simple") { 82 StorageMetricSample s( 1000 ); 83 s.sample.insert(LiteralStringRef("Apple"), 1000); 84 s.sample.insert(LiteralStringRef("Banana"), 2000); 85 s.sample.insert(LiteralStringRef("Cat"), 1000); 86 s.sample.insert(LiteralStringRef("Cathode"), 1000); 87 s.sample.insert(LiteralStringRef("Dog"), 1000); 88 89 ASSERT(s.getEstimate(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D"))) == 5000); 90 ASSERT(s.getEstimate(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("E"))) == 6000); 91 ASSERT(s.getEstimate(KeyRangeRef(LiteralStringRef("B"), LiteralStringRef("C"))) == 2000); 92 93 //ASSERT(s.splitEstimate(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D")), 3500) == LiteralStringRef("Cat")); 94 95 return Void(); 96 } 97 98 struct TransientStorageMetricSample : StorageMetricSample { 99 Deque< std::pair<double, std::pair<Key, int64_t>> > queue; 100 TransientStorageMetricSampleTransientStorageMetricSample101 TransientStorageMetricSample( int64_t metricUnitsPerSample ) : StorageMetricSample(metricUnitsPerSample) {} 102 rollTransientStorageMetricSample103 bool roll( KeyRef key, int64_t metric ) { 104 return g_random->random01() < (double)metric / metricUnitsPerSample; //< SOMEDAY: Better randomInt64? 105 } 106 107 // Returns the sampled metric value (possibly 0, possibly increased by the sampling factor) addAndExpireTransientStorageMetricSample108 int64_t addAndExpire( KeyRef key, int64_t metric, double expiration ) { 109 int64_t x = add( key, metric ); 110 if (x) 111 queue.push_back( std::make_pair( expiration, std::make_pair( *sample.find(key), -x ) ) ); 112 return x; 113 } 114 115 //FIXME: both versions of erase are broken, because they do not remove items in the queue with will subtract a metric from the value sometime in the future eraseTransientStorageMetricSample116 int64_t erase( KeyRef key ) { 117 auto it = sample.find(key); 118 if (it == sample.end()) return 0; 119 int64_t x = sample.getMetric(it); 120 sample.erase(it); 121 return x; 122 } eraseTransientStorageMetricSample123 void erase( KeyRangeRef keys ) { 124 sample.erase( keys.begin, keys.end ); 125 } 126 pollTransientStorageMetricSample127 void poll(KeyRangeMap< vector< PromiseStream< StorageMetrics > > > & waitMap, StorageMetrics m) { 128 double now = ::now(); 129 while (queue.size() && 130 queue.front().first <= now ) 131 { 132 KeyRef key = queue.front().second.first; 133 int64_t delta = queue.front().second.second; 134 ASSERT( delta != 0 ); 135 136 if( sample.addMetric( key, delta ) == 0 ) 137 sample.erase( key ); 138 139 StorageMetrics deltaM = m * delta; 140 auto v = waitMap[key]; 141 for(int i=0; i<v.size(); i++) { 142 TEST( true ); // TransientStorageMetricSample poll update 143 v[i].send( deltaM ); 144 } 145 146 queue.pop_front(); 147 } 148 } 149 pollTransientStorageMetricSample150 void poll() { 151 double now = ::now(); 152 while (queue.size() && 153 queue.front().first <= now ) 154 { 155 KeyRef key = queue.front().second.first; 156 int64_t delta = queue.front().second.second; 157 ASSERT( delta != 0 ); 158 159 if( sample.addMetric( key, delta ) == 0 ) 160 sample.erase( key ); 161 162 queue.pop_front(); 163 } 164 } 165 166 private: addTransientStorageMetricSample167 int64_t add( KeyRef key, int64_t metric ) { 168 if (!metric) return 0; 169 int64_t mag = metric<0 ? -metric : metric; 170 171 if (mag < metricUnitsPerSample) { 172 if ( !roll(key, mag) ) 173 return 0; 174 metric = metric<0 ? -metricUnitsPerSample : metricUnitsPerSample; 175 } 176 177 if( sample.addMetric( key, metric ) == 0 ) 178 sample.erase( key ); 179 180 return metric; 181 } 182 }; 183 184 struct StorageServerMetrics { 185 KeyRangeMap< vector< PromiseStream< StorageMetrics > > > waitMetricsMap; 186 StorageMetricSample byteSample; 187 TransientStorageMetricSample iopsSample, bandwidthSample; // FIXME: iops and bandwidth calculations are not effectively tested, since they aren't currently used by data distribution 188 StorageServerMetricsStorageServerMetrics189 StorageServerMetrics() 190 : byteSample( 0 ), iopsSample( SERVER_KNOBS->IOPS_UNITS_PER_SAMPLE ), bandwidthSample( SERVER_KNOBS->BANDWIDTH_UNITS_PER_SAMPLE ) 191 { 192 } 193 194 // Get the current estimated metrics for the given keys getMetricsStorageServerMetrics195 StorageMetrics getMetrics( KeyRangeRef const& keys ) { 196 StorageMetrics result; 197 result.bytes = byteSample.getEstimate( keys ); 198 result.bytesPerKSecond = bandwidthSample.getEstimate( keys ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; 199 result.iosPerKSecond = iopsSample.getEstimate( keys ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; 200 return result; 201 } 202 203 // Called when metrics should change (IO for a given key) 204 // Notifies waiting WaitMetricsRequests through waitMetricsMap, and updates metricsAverageQueue and metricsSampleMap notifyStorageServerMetrics205 void notify( KeyRef key, StorageMetrics& metrics ) { 206 ASSERT (metrics.bytes == 0); // ShardNotifyMetrics 207 TEST (metrics.bytesPerKSecond != 0); // ShardNotifyMetrics 208 TEST (metrics.iosPerKSecond != 0); // ShardNotifyMetrics 209 210 double expire = now() + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL; 211 212 StorageMetrics notifyMetrics; 213 214 if (metrics.bytesPerKSecond) 215 notifyMetrics.bytesPerKSecond = bandwidthSample.addAndExpire( key, metrics.bytesPerKSecond, expire ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; 216 if (metrics.iosPerKSecond) 217 notifyMetrics.iosPerKSecond = iopsSample.addAndExpire( key, metrics.iosPerKSecond, expire ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; 218 if (!notifyMetrics.allZero()) { 219 auto& v = waitMetricsMap[key]; 220 for(int i=0; i<v.size(); i++) { 221 TEST( true ); // ShardNotifyMetrics 222 v[i].send( notifyMetrics ); 223 } 224 } 225 } 226 227 // Called by StorageServerDisk when the size of a key in byteSample changes, to notify WaitMetricsRequest 228 // Should not be called for keys past allKeys.end notifyBytesStorageServerMetrics229 void notifyBytes( RangeMap<Key, std::vector<PromiseStream<StorageMetrics>>, KeyRangeRef>::Iterator shard, int64_t bytes ) { 230 ASSERT(shard.end() <= allKeys.end); 231 232 StorageMetrics notifyMetrics; 233 notifyMetrics.bytes = bytes; 234 for(int i=0; i < shard.value().size(); i++) { 235 TEST( true ); // notifyBytes 236 shard.value()[i].send( notifyMetrics ); 237 } 238 } 239 240 // Called by StorageServerDisk when the size of a key in byteSample changes, to notify WaitMetricsRequest notifyBytesStorageServerMetrics241 void notifyBytes( KeyRef key, int64_t bytes ) { 242 if( key >= allKeys.end ) //Do not notify on changes to internal storage server state 243 return; 244 245 notifyBytes(waitMetricsMap.rangeContaining(key), bytes); 246 } 247 248 // Called when a range of keys becomes unassigned (and therefore not readable), to notify waiting WaitMetricsRequests (also other types of wait 249 // requests in the future?) notifyNotReadableStorageServerMetrics250 void notifyNotReadable( KeyRangeRef keys ) { 251 auto rs = waitMetricsMap.intersectingRanges(keys); 252 for (auto r = rs.begin(); r != rs.end(); ++r){ 253 auto &v = r->value(); 254 TEST( v.size() ); // notifyNotReadable() sending errors to intersecting ranges 255 for (int n=0; n<v.size(); n++) 256 v[n].sendError( wrong_shard_server() ); 257 } 258 } 259 260 // Called periodically (~1 sec intervals) to remove older IOs from the averages 261 // Removes old entries from metricsAverageQueue, updates metricsSampleMap accordingly, and notifies 262 // WaitMetricsRequests through waitMetricsMap. pollStorageServerMetrics263 void poll() { 264 { StorageMetrics m; m.bytesPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; bandwidthSample.poll(waitMetricsMap, m); } 265 { StorageMetrics m; m.iosPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; iopsSample.poll(waitMetricsMap, m); } 266 // bytesSample doesn't need polling because we never call addExpire() on it 267 } 268 269 //static void waitMetrics( StorageServerMetrics* const& self, WaitMetricsRequest const& req ); 270 271 // This function can run on untrusted user data. We must validate all divisions carefully. getSplitKeyStorageServerMetrics272 KeyRef getSplitKey( int64_t remaining, int64_t estimated, int64_t limits, int64_t used, int64_t infinity, 273 bool isLastShard, StorageMetricSample& sample, double divisor, KeyRef const& lastKey, KeyRef const& key, bool hasUsed ) 274 { 275 ASSERT(remaining >= 0); 276 ASSERT(limits > 0); 277 ASSERT(divisor > 0); 278 279 if( limits < infinity / 2 ) { 280 int64_t expectedSize; 281 if( isLastShard || remaining > estimated ) { 282 double remaining_divisor = ( double( remaining ) / limits ) + 0.5; 283 expectedSize = remaining / remaining_divisor; 284 } else { 285 // If we are here, then estimated >= remaining >= 0 286 double estimated_divisor = ( double( estimated ) / limits ) + 0.5; 287 expectedSize = remaining / estimated_divisor; 288 } 289 290 if( remaining > expectedSize ) { 291 // This does the conversion from native units to bytes using the divisor. 292 double offset = (expectedSize - used) / divisor; 293 if( offset <= 0 ) 294 return hasUsed ? lastKey : key; 295 return sample.splitEstimate( KeyRangeRef(lastKey, key), offset * ( ( 1.0 - SERVER_KNOBS->SPLIT_JITTER_AMOUNT ) + 2 * g_random->random01() * SERVER_KNOBS->SPLIT_JITTER_AMOUNT ) ); 296 } 297 } 298 299 return key; 300 } 301 splitMetricsStorageServerMetrics302 void splitMetrics( SplitMetricsRequest req ) { 303 try { 304 SplitMetricsReply reply; 305 KeyRef lastKey = req.keys.begin; 306 StorageMetrics used = req.used; 307 StorageMetrics estimated = req.estimated; 308 StorageMetrics remaining = getMetrics( req.keys ) + used; 309 310 //TraceEvent("SplitMetrics").detail("Begin", req.keys.begin).detail("End", req.keys.end).detail("Remaining", remaining.bytes).detail("Used", used.bytes); 311 312 while( true ) { 313 if( remaining.bytes < 2*SERVER_KNOBS->MIN_SHARD_BYTES ) 314 break; 315 KeyRef key = req.keys.end; 316 bool hasUsed = used.bytes != 0 || used.bytesPerKSecond != 0 || used.iosPerKSecond != 0; 317 key = getSplitKey( remaining.bytes, estimated.bytes, req.limits.bytes, used.bytes, 318 req.limits.infinity, req.isLastShard, byteSample, 1, lastKey, key, hasUsed ); 319 if( used.bytes < SERVER_KNOBS->MIN_SHARD_BYTES ) 320 key = std::max( key, byteSample.splitEstimate( KeyRangeRef(lastKey, req.keys.end), SERVER_KNOBS->MIN_SHARD_BYTES - used.bytes ) ); 321 key = getSplitKey( remaining.iosPerKSecond, estimated.iosPerKSecond, req.limits.iosPerKSecond, used.iosPerKSecond, 322 req.limits.infinity, req.isLastShard, iopsSample, SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, lastKey, key, hasUsed ); 323 key = getSplitKey( remaining.bytesPerKSecond, estimated.bytesPerKSecond, req.limits.bytesPerKSecond, used.bytesPerKSecond, 324 req.limits.infinity, req.isLastShard, bandwidthSample, SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, lastKey, key, hasUsed ); 325 ASSERT( key != lastKey || hasUsed); 326 if( key == req.keys.end ) 327 break; 328 reply.splits.push_back_deep( reply.splits.arena(), key ); 329 330 StorageMetrics diff = (getMetrics( KeyRangeRef(lastKey, key) ) + used); 331 remaining -= diff; 332 estimated -= diff; 333 334 used = StorageMetrics(); 335 lastKey = key; 336 } 337 338 reply.used = getMetrics( KeyRangeRef(lastKey, req.keys.end) ) + used; 339 req.reply.send(reply); 340 } catch (Error& e) { 341 req.reply.sendError(e); 342 } 343 } 344 getPhysicalMetricsStorageServerMetrics345 void getPhysicalMetrics( GetPhysicalMetricsRequest req, StorageBytes sb ){ 346 GetPhysicalMetricsReply rep; 347 348 // SOMEDAY: make bytes dynamic with hard disk space 349 rep.load = getMetrics(allKeys); 350 351 if (sb.free < 1e9 && g_random->random01() < 0.1) 352 TraceEvent(SevWarn, "PhysicalDiskMetrics") 353 .detail("Free", sb.free) 354 .detail("Total", sb.total) 355 .detail("Available", sb.available) 356 .detail("Load", rep.load.bytes); 357 358 rep.free.bytes = sb.free; 359 rep.free.iosPerKSecond = 10e6; 360 rep.free.bytesPerKSecond = 100e9; 361 362 rep.capacity.bytes = sb.total; 363 rep.capacity.iosPerKSecond = 10e6; 364 rep.capacity.bytesPerKSecond = 100e9; 365 366 req.reply.send(rep); 367 } 368 369 Future<Void> waitMetrics(WaitMetricsRequest req, Future<Void> delay); 370 371 private: collapseStorageServerMetrics372 static void collapse( KeyRangeMap<int>& map, KeyRef const& key ) { 373 auto range = map.rangeContaining(key); 374 if (range == map.ranges().begin() || range == map.ranges().end()) return; 375 int value = range->value(); 376 auto prev = range; --prev; 377 if (prev->value() != value) return; 378 KeyRange keys = KeyRangeRef( prev->begin(), range->end() ); 379 map.insert( keys, value ); 380 } 381 addStorageServerMetrics382 static void add( KeyRangeMap<int>& map, KeyRangeRef const& keys, int delta ) { 383 auto rs = map.modify(keys); 384 for(auto r = rs.begin(); r != rs.end(); ++r) 385 r->value() += delta; 386 collapse( map, keys.begin ); 387 collapse( map, keys.end ); 388 } 389 }; 390 391 //Contains information about whether or not a key-value pair should be included in a byte sample 392 //Also contains size information about the byte sample 393 struct ByteSampleInfo { 394 bool inSample; 395 396 //Actual size of the key value pair 397 int64_t size; 398 399 //The recorded size of the sample (max of bytesPerSample, size) 400 int64_t sampledSize; 401 }; 402 403 //Determines whether a key-value pair should be included in a byte sample 404 //Also returns size information about the sample 405 ByteSampleInfo isKeyValueInSample(KeyValueRef keyValue); 406 407 #include "flow/unactorcompiler.h" 408