1 /* 2 * Throttling.actor.cpp 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #include <boost/lexical_cast.hpp> 22 23 #include "fdbclient/ReadYourWrites.h" 24 #include "fdbserver/workloads/workloads.actor.h" 25 #include "flow/actorcompiler.h" // This must be the last include 26 27 struct TokenBucket { 28 static constexpr const double addTokensInterval = 0.1; 29 static constexpr const double maxSleepTime = 60.0; 30 31 double transactionRate; 32 double maxBurst; 33 double bucketSize; 34 Future<Void> tokenAdderActor; 35 tokenAdderTokenBucket36 ACTOR static Future<Void> tokenAdder(TokenBucket* self) { 37 loop { 38 self->bucketSize = std::min(self->bucketSize + self->transactionRate * addTokensInterval, self->maxBurst); 39 if (g_random->randomInt(0, 100) == 0) 40 TraceEvent("AddingTokensx100") 41 .detail("BucketSize", self->bucketSize) 42 .detail("TransactionRate", self->transactionRate); 43 wait(delay(addTokensInterval)); 44 } 45 } 46 TokenBucketTokenBucket47 TokenBucket(double maxBurst = 1000) : transactionRate(0), maxBurst(maxBurst), bucketSize(maxBurst) { 48 tokenAdderActor = tokenAdder(this); 49 } 50 startTransactionTokenBucket51 ACTOR static Future<Void> startTransaction(TokenBucket* self) { 52 state double sleepTime = addTokensInterval; 53 loop { 54 if (self->bucketSize >= 1.0) { 55 --self->bucketSize; 56 return Void(); 57 } 58 if (g_random->randomInt(0, 100) == 0) 59 TraceEvent("ThrottlingTransactionx100").detail("SleepTime", sleepTime); 60 wait(delay(sleepTime)); 61 sleepTime = std::min(sleepTime * 2, maxSleepTime); 62 } 63 } 64 }; 65 66 constexpr const double TokenBucket::addTokensInterval; 67 constexpr const double TokenBucket::maxSleepTime; 68 69 struct ThrottlingWorkload : KVWorkload { 70 71 double testDuration; 72 double healthMetricsCheckInterval; 73 int actorsPerClient; 74 int writesPerTransaction; 75 int readsPerTransaction; 76 double throttlingMultiplier; 77 int transactionsCommitted; 78 int64_t worstStorageQueue; 79 int64_t worstStorageDurabilityLag; 80 int64_t worstTLogQueue; 81 int64_t detailedWorstStorageQueue; 82 int64_t detailedWorstStorageDurabilityLag; 83 int64_t detailedWorstTLogQueue; 84 double detailedWorstCpuUsage; 85 double detailedWorstDiskUsage; 86 bool sendDetailedHealthMetrics; 87 double maxAllowedStaleness; 88 TokenBucket tokenBucket; 89 bool healthMetricsStoppedUpdating; 90 ThrottlingWorkloadThrottlingWorkload91 ThrottlingWorkload(WorkloadContext const& wcx) 92 : KVWorkload(wcx), transactionsCommitted(0), worstStorageQueue(0), worstStorageDurabilityLag(0), worstTLogQueue(0), 93 detailedWorstStorageQueue(0), detailedWorstStorageDurabilityLag(0), detailedWorstTLogQueue(0), detailedWorstCpuUsage(0.0), 94 detailedWorstDiskUsage(0.0), healthMetricsStoppedUpdating(false) { 95 testDuration = getOption(options, LiteralStringRef("testDuration"), 60.0); 96 healthMetricsCheckInterval = getOption(options, LiteralStringRef("healthMetricsCheckInterval"), 1.0); 97 actorsPerClient = getOption(options, LiteralStringRef("actorsPerClient"), 10); 98 writesPerTransaction = getOption(options, LiteralStringRef("writesPerTransaction"), 10); 99 readsPerTransaction = getOption(options, LiteralStringRef("readsPerTransaction"), 10); 100 throttlingMultiplier = getOption(options, LiteralStringRef("throttlingMultiplier"), 0.5); 101 sendDetailedHealthMetrics = getOption(options, LiteralStringRef("sendDetailedHealthMetrics"), true); 102 maxAllowedStaleness = getOption(options, LiteralStringRef("maxAllowedStaleness"), 10.0); 103 int maxBurst = getOption(options, LiteralStringRef("maxBurst"), 1000); 104 tokenBucket.maxBurst = maxBurst; 105 } 106 healthMetricsCheckerThrottlingWorkload107 ACTOR static Future<Void> healthMetricsChecker(Database cx, ThrottlingWorkload* self) { 108 state int repeated = 0; 109 state HealthMetrics healthMetrics; 110 111 loop { 112 wait(delay(self->healthMetricsCheckInterval)); 113 HealthMetrics newHealthMetrics = wait(cx->getHealthMetrics(self->sendDetailedHealthMetrics)); 114 if (healthMetrics == newHealthMetrics) 115 { 116 if (++repeated > self->maxAllowedStaleness / self->healthMetricsCheckInterval) 117 self->healthMetricsStoppedUpdating = true; 118 } 119 else 120 repeated = 0; 121 healthMetrics = newHealthMetrics; 122 123 self->tokenBucket.transactionRate = healthMetrics.tpsLimit * self->throttlingMultiplier / self->clientCount; 124 self->worstStorageQueue = std::max(self->worstStorageQueue, healthMetrics.worstStorageQueue); 125 self->worstStorageDurabilityLag = std::max(self->worstStorageDurabilityLag, healthMetrics.worstStorageDurabilityLag); 126 self->worstTLogQueue = std::max(self->worstTLogQueue, healthMetrics.worstTLogQueue); 127 128 TraceEvent("HealthMetrics") 129 .detail("WorstStorageQueue", healthMetrics.worstStorageQueue) 130 .detail("WorstStorageDurabilityLag", healthMetrics.worstStorageDurabilityLag) 131 .detail("WorstTLogQueue", healthMetrics.worstTLogQueue) 132 .detail("TpsLimit", healthMetrics.tpsLimit); 133 134 TraceEvent traceStorageQueue("StorageQueue"); 135 TraceEvent traceStorageDurabilityLag("StorageDurabilityLag"); 136 TraceEvent traceCpuUsage("CpuUsage"); 137 TraceEvent traceDiskUsage("DiskUsage"); 138 for (const auto& ss : healthMetrics.storageStats) { 139 auto storageStats = ss.second; 140 self->detailedWorstStorageQueue = std::max(self->detailedWorstStorageQueue, storageStats.storageQueue); 141 traceStorageQueue.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.storageQueue); 142 self->detailedWorstStorageDurabilityLag = std::max(self->detailedWorstStorageDurabilityLag, storageStats.storageDurabilityLag); 143 traceStorageDurabilityLag.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.storageDurabilityLag); 144 self->detailedWorstCpuUsage = std::max(self->detailedWorstCpuUsage, storageStats.cpuUsage); 145 traceCpuUsage.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.cpuUsage); 146 self->detailedWorstDiskUsage = std::max(self->detailedWorstDiskUsage, storageStats.diskUsage); 147 traceDiskUsage.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.diskUsage); 148 } 149 150 TraceEvent traceTLogQueue("TLogQueue"); 151 for (const auto& ss : healthMetrics.tLogQueue) { 152 self->detailedWorstTLogQueue = std::max(self->detailedWorstTLogQueue, ss.second); 153 traceTLogQueue.detail(format("TLog/%s", ss.first.toString().c_str()), ss.second); 154 } 155 } 156 } 157 getRandomValueThrottlingWorkload158 static Value getRandomValue() { return Standalone<StringRef>(format("Value/%d", g_random->randomInt(0, 10e6))); } 159 clientActorThrottlingWorkload160 ACTOR static Future<Void> clientActor(Database cx, ThrottlingWorkload* self) { 161 state ReadYourWritesTransaction tr(cx); 162 163 loop { 164 wait(TokenBucket::startTransaction(&self->tokenBucket)); 165 tr.reset(); 166 try { 167 state int i; 168 for (i = 0; i < self->readsPerTransaction; ++i) { 169 state Optional<Value> value = wait(tr.get(self->getRandomKey())); 170 } 171 for (i = 0; i < self->writesPerTransaction; ++i) { 172 tr.set(self->getRandomKey(), getRandomValue()); 173 } 174 wait(tr.commit()); 175 if (g_random->randomInt(0, 1000) == 0) TraceEvent("TransactionCommittedx1000"); 176 ++self->transactionsCommitted; 177 } catch (Error& e) { 178 // ignore failing transactions 179 } 180 } 181 } 182 _setupThrottlingWorkload183 ACTOR static Future<Void> _setup(Database cx, ThrottlingWorkload* self) { 184 if (!self->sendDetailedHealthMetrics) { 185 // Clear detailed health metrics that are already populated 186 wait(delay(2 * CLIENT_KNOBS->DETAILED_HEALTH_METRICS_MAX_STALENESS)); 187 cx->healthMetrics.storageStats.clear(); 188 cx->healthMetrics.tLogQueue.clear(); 189 } 190 return Void(); 191 } 192 _startThrottlingWorkload193 ACTOR static Future<Void> _start(Database cx, ThrottlingWorkload* self) { 194 state Future<Void> hmChecker = timeout(healthMetricsChecker(cx, self), self->testDuration, Void()); 195 196 state vector<Future<Void>> clientActors; 197 state int actorId; 198 for (actorId = 0; actorId < self->actorsPerClient; ++actorId) { 199 clientActors.push_back(timeout(clientActor(cx, self), self->testDuration, Void())); 200 } 201 wait(hmChecker); 202 return Void(); 203 } 204 descriptionThrottlingWorkload205 virtual std::string description() { return "Throttling"; } setupThrottlingWorkload206 virtual Future<Void> setup(Database const& cx) { return _setup(cx, this); } startThrottlingWorkload207 virtual Future<Void> start(Database const& cx) { return _start(cx, this); } checkThrottlingWorkload208 virtual Future<bool> check(Database const& cx) { 209 if (healthMetricsStoppedUpdating) { 210 TraceEvent(SevError, "HealthMetricsStoppedUpdating"); 211 return false; 212 } 213 if (transactionsCommitted == 0) { 214 TraceEvent(SevError, "NoTransactionsCommitted"); 215 return false; 216 } 217 bool correctHealthMetricsState = true; 218 if (worstStorageQueue == 0 || worstStorageDurabilityLag == 0 || worstTLogQueue == 0 || transactionsCommitted == 0) 219 correctHealthMetricsState = false; 220 if (sendDetailedHealthMetrics) { 221 if (detailedWorstStorageQueue == 0 || detailedWorstStorageDurabilityLag == 0 || detailedWorstTLogQueue == 0 || 222 detailedWorstCpuUsage == 0.0 || detailedWorstDiskUsage == 0.0) 223 correctHealthMetricsState = false; 224 } else { 225 if (detailedWorstStorageQueue != 0 || detailedWorstStorageDurabilityLag != 0 || detailedWorstTLogQueue != 0 || 226 detailedWorstCpuUsage != 0.0 || detailedWorstDiskUsage != 0.0) 227 correctHealthMetricsState = false; 228 } 229 if (!correctHealthMetricsState) { 230 TraceEvent(SevError, "IncorrectHealthMetricsState") 231 .detail("WorstStorageQueue", worstStorageQueue) 232 .detail("WorstStorageDurabilityLag", worstStorageDurabilityLag) 233 .detail("WorstTLogQueue", worstTLogQueue) 234 .detail("DetailedWorstStorageQueue", detailedWorstStorageQueue) 235 .detail("DetailedWorstStorageDurabilityLag", detailedWorstStorageDurabilityLag) 236 .detail("DetailedWorstTLogQueue", detailedWorstTLogQueue) 237 .detail("DetailedWorstCpuUsage", detailedWorstCpuUsage) 238 .detail("DetailedWorstDiskUsage", detailedWorstDiskUsage) 239 .detail("SendingDetailedHealthMetrics", sendDetailedHealthMetrics); 240 } 241 return correctHealthMetricsState; 242 } 243 getMetricsThrottlingWorkload244 virtual void getMetrics(vector<PerfMetric>& m) { 245 m.push_back(PerfMetric("TransactionsCommitted", transactionsCommitted, false)); 246 m.push_back(PerfMetric("WorstStorageQueue", worstStorageQueue, true)); 247 m.push_back(PerfMetric("DetailedWorstStorageQueue", detailedWorstStorageQueue, true)); 248 m.push_back(PerfMetric("WorstStorageDurabilityLag", worstStorageDurabilityLag, true)); 249 m.push_back(PerfMetric("DetailedWorstStorageDurabilityLag", detailedWorstStorageDurabilityLag, true)); 250 m.push_back(PerfMetric("WorstTLogQueue", worstTLogQueue, true)); 251 m.push_back(PerfMetric("DetailedWorstTLogQueue", detailedWorstTLogQueue, true)); 252 m.push_back(PerfMetric("DetailedWorstCpuUsage", detailedWorstCpuUsage, true)); 253 m.push_back(PerfMetric("DetailedWorstDiskUsage", detailedWorstDiskUsage, true)); 254 } 255 }; 256 257 WorkloadFactory<ThrottlingWorkload> ThrottlingWorkloadFactory("Throttling"); 258