1 /*
2  * Throttling.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include <boost/lexical_cast.hpp>
22 
23 #include "fdbclient/ReadYourWrites.h"
24 #include "fdbserver/workloads/workloads.actor.h"
25 #include "flow/actorcompiler.h" // This must be the last include
26 
27 struct TokenBucket {
28 	static constexpr const double addTokensInterval = 0.1;
29 	static constexpr const double maxSleepTime = 60.0;
30 
31 	double transactionRate;
32 	double maxBurst;
33 	double bucketSize;
34 	Future<Void> tokenAdderActor;
35 
tokenAdderTokenBucket36 	ACTOR static Future<Void> tokenAdder(TokenBucket* self) {
37 		loop {
38 			self->bucketSize = std::min(self->bucketSize + self->transactionRate * addTokensInterval, self->maxBurst);
39 			if (g_random->randomInt(0, 100) == 0)
40 				TraceEvent("AddingTokensx100")
41 				    .detail("BucketSize", self->bucketSize)
42 				    .detail("TransactionRate", self->transactionRate);
43 			wait(delay(addTokensInterval));
44 		}
45 	}
46 
TokenBucketTokenBucket47 	TokenBucket(double maxBurst = 1000) : transactionRate(0), maxBurst(maxBurst), bucketSize(maxBurst) {
48 		tokenAdderActor = tokenAdder(this);
49 	}
50 
startTransactionTokenBucket51 	ACTOR static Future<Void> startTransaction(TokenBucket* self) {
52 		state double sleepTime = addTokensInterval;
53 		loop {
54 			if (self->bucketSize >= 1.0) {
55 				--self->bucketSize;
56 				return Void();
57 			}
58 			if (g_random->randomInt(0, 100) == 0)
59 				TraceEvent("ThrottlingTransactionx100").detail("SleepTime", sleepTime);
60 			wait(delay(sleepTime));
61 			sleepTime = std::min(sleepTime * 2, maxSleepTime);
62 		}
63 	}
64 };
65 
66 constexpr const double TokenBucket::addTokensInterval;
67 constexpr const double TokenBucket::maxSleepTime;
68 
69 struct ThrottlingWorkload : KVWorkload {
70 
71 	double testDuration;
72 	double healthMetricsCheckInterval;
73 	int actorsPerClient;
74 	int writesPerTransaction;
75 	int readsPerTransaction;
76 	double throttlingMultiplier;
77 	int transactionsCommitted;
78 	int64_t worstStorageQueue;
79 	int64_t worstStorageDurabilityLag;
80 	int64_t worstTLogQueue;
81 	int64_t detailedWorstStorageQueue;
82 	int64_t detailedWorstStorageDurabilityLag;
83 	int64_t detailedWorstTLogQueue;
84 	double detailedWorstCpuUsage;
85 	double detailedWorstDiskUsage;
86 	bool sendDetailedHealthMetrics;
87 	double maxAllowedStaleness;
88 	TokenBucket tokenBucket;
89 	bool healthMetricsStoppedUpdating;
90 
ThrottlingWorkloadThrottlingWorkload91 	ThrottlingWorkload(WorkloadContext const& wcx)
92 	  : KVWorkload(wcx), transactionsCommitted(0), worstStorageQueue(0), worstStorageDurabilityLag(0), worstTLogQueue(0),
93 	    detailedWorstStorageQueue(0), detailedWorstStorageDurabilityLag(0), detailedWorstTLogQueue(0), detailedWorstCpuUsage(0.0),
94 	    detailedWorstDiskUsage(0.0), healthMetricsStoppedUpdating(false) {
95 		testDuration = getOption(options, LiteralStringRef("testDuration"), 60.0);
96 		healthMetricsCheckInterval = getOption(options, LiteralStringRef("healthMetricsCheckInterval"), 1.0);
97 		actorsPerClient = getOption(options, LiteralStringRef("actorsPerClient"), 10);
98 		writesPerTransaction = getOption(options, LiteralStringRef("writesPerTransaction"), 10);
99 		readsPerTransaction = getOption(options, LiteralStringRef("readsPerTransaction"), 10);
100 		throttlingMultiplier = getOption(options, LiteralStringRef("throttlingMultiplier"), 0.5);
101 		sendDetailedHealthMetrics = getOption(options, LiteralStringRef("sendDetailedHealthMetrics"), true);
102 		maxAllowedStaleness = getOption(options, LiteralStringRef("maxAllowedStaleness"), 10.0);
103 		int maxBurst = getOption(options, LiteralStringRef("maxBurst"), 1000);
104 		tokenBucket.maxBurst = maxBurst;
105 	}
106 
healthMetricsCheckerThrottlingWorkload107 	ACTOR static Future<Void> healthMetricsChecker(Database cx, ThrottlingWorkload* self) {
108 		state int repeated = 0;
109 		state HealthMetrics healthMetrics;
110 
111 		loop {
112 			wait(delay(self->healthMetricsCheckInterval));
113 			HealthMetrics newHealthMetrics = wait(cx->getHealthMetrics(self->sendDetailedHealthMetrics));
114 			if (healthMetrics == newHealthMetrics)
115 			{
116 				if (++repeated > self->maxAllowedStaleness / self->healthMetricsCheckInterval)
117 					self->healthMetricsStoppedUpdating = true;
118 			}
119 			else
120 				repeated = 0;
121 			healthMetrics = newHealthMetrics;
122 
123 			self->tokenBucket.transactionRate = healthMetrics.tpsLimit * self->throttlingMultiplier / self->clientCount;
124 			self->worstStorageQueue = std::max(self->worstStorageQueue, healthMetrics.worstStorageQueue);
125 			self->worstStorageDurabilityLag = std::max(self->worstStorageDurabilityLag, healthMetrics.worstStorageDurabilityLag);
126 			self->worstTLogQueue = std::max(self->worstTLogQueue, healthMetrics.worstTLogQueue);
127 
128 			TraceEvent("HealthMetrics")
129 			    .detail("WorstStorageQueue", healthMetrics.worstStorageQueue)
130 			    .detail("WorstStorageDurabilityLag", healthMetrics.worstStorageDurabilityLag)
131 			    .detail("WorstTLogQueue", healthMetrics.worstTLogQueue)
132 			    .detail("TpsLimit", healthMetrics.tpsLimit);
133 
134 			TraceEvent traceStorageQueue("StorageQueue");
135 			TraceEvent traceStorageDurabilityLag("StorageDurabilityLag");
136 			TraceEvent traceCpuUsage("CpuUsage");
137 			TraceEvent traceDiskUsage("DiskUsage");
138 			for (const auto& ss : healthMetrics.storageStats) {
139 				auto storageStats = ss.second;
140 				self->detailedWorstStorageQueue = std::max(self->detailedWorstStorageQueue, storageStats.storageQueue);
141 				traceStorageQueue.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.storageQueue);
142 				self->detailedWorstStorageDurabilityLag = std::max(self->detailedWorstStorageDurabilityLag, storageStats.storageDurabilityLag);
143 				traceStorageDurabilityLag.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.storageDurabilityLag);
144 				self->detailedWorstCpuUsage = std::max(self->detailedWorstCpuUsage, storageStats.cpuUsage);
145 				traceCpuUsage.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.cpuUsage);
146 				self->detailedWorstDiskUsage = std::max(self->detailedWorstDiskUsage, storageStats.diskUsage);
147 				traceDiskUsage.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.diskUsage);
148 			}
149 
150 			TraceEvent traceTLogQueue("TLogQueue");
151 			for (const auto& ss : healthMetrics.tLogQueue) {
152 				self->detailedWorstTLogQueue = std::max(self->detailedWorstTLogQueue, ss.second);
153 				traceTLogQueue.detail(format("TLog/%s", ss.first.toString().c_str()), ss.second);
154 			}
155 		}
156 	}
157 
getRandomValueThrottlingWorkload158 	static Value getRandomValue() { return Standalone<StringRef>(format("Value/%d", g_random->randomInt(0, 10e6))); }
159 
clientActorThrottlingWorkload160 	ACTOR static Future<Void> clientActor(Database cx, ThrottlingWorkload* self) {
161 		state ReadYourWritesTransaction tr(cx);
162 
163 		loop {
164 			wait(TokenBucket::startTransaction(&self->tokenBucket));
165 			tr.reset();
166 			try {
167 				state int i;
168 				for (i = 0; i < self->readsPerTransaction; ++i) {
169 					state Optional<Value> value = wait(tr.get(self->getRandomKey()));
170 				}
171 				for (i = 0; i < self->writesPerTransaction; ++i) {
172 					tr.set(self->getRandomKey(), getRandomValue());
173 				}
174 				wait(tr.commit());
175 				if (g_random->randomInt(0, 1000) == 0) TraceEvent("TransactionCommittedx1000");
176 				++self->transactionsCommitted;
177 			} catch (Error& e) {
178 				// ignore failing transactions
179 			}
180 		}
181 	}
182 
_setupThrottlingWorkload183 	ACTOR static Future<Void> _setup(Database cx, ThrottlingWorkload* self) {
184 		if (!self->sendDetailedHealthMetrics) {
185 			// Clear detailed health metrics that are already populated
186 			wait(delay(2 * CLIENT_KNOBS->DETAILED_HEALTH_METRICS_MAX_STALENESS));
187 			cx->healthMetrics.storageStats.clear();
188 			cx->healthMetrics.tLogQueue.clear();
189 		}
190 		return Void();
191 	}
192 
_startThrottlingWorkload193 	ACTOR static Future<Void> _start(Database cx, ThrottlingWorkload* self) {
194 		state Future<Void> hmChecker = timeout(healthMetricsChecker(cx, self), self->testDuration, Void());
195 
196 		state vector<Future<Void>> clientActors;
197 		state int actorId;
198 		for (actorId = 0; actorId < self->actorsPerClient; ++actorId) {
199 			clientActors.push_back(timeout(clientActor(cx, self), self->testDuration, Void()));
200 		}
201 		wait(hmChecker);
202 		return Void();
203 	}
204 
descriptionThrottlingWorkload205 	virtual std::string description() { return "Throttling"; }
setupThrottlingWorkload206 	virtual Future<Void> setup(Database const& cx) { return _setup(cx, this); }
startThrottlingWorkload207 	virtual Future<Void> start(Database const& cx) { return _start(cx, this); }
checkThrottlingWorkload208 	virtual Future<bool> check(Database const& cx) {
209 		if (healthMetricsStoppedUpdating) {
210 			TraceEvent(SevError, "HealthMetricsStoppedUpdating");
211 			return false;
212 		}
213 		if (transactionsCommitted == 0) {
214 			TraceEvent(SevError, "NoTransactionsCommitted");
215 			return false;
216 		}
217 		bool correctHealthMetricsState = true;
218 		if (worstStorageQueue == 0 || worstStorageDurabilityLag == 0 || worstTLogQueue == 0 || transactionsCommitted == 0)
219 			correctHealthMetricsState = false;
220 		if (sendDetailedHealthMetrics) {
221 			if (detailedWorstStorageQueue == 0 || detailedWorstStorageDurabilityLag == 0 || detailedWorstTLogQueue == 0 ||
222 			    detailedWorstCpuUsage == 0.0 || detailedWorstDiskUsage == 0.0)
223 				correctHealthMetricsState = false;
224 		} else {
225 			if (detailedWorstStorageQueue != 0 || detailedWorstStorageDurabilityLag != 0 || detailedWorstTLogQueue != 0 ||
226 			    detailedWorstCpuUsage != 0.0 || detailedWorstDiskUsage != 0.0)
227 				correctHealthMetricsState = false;
228 		}
229 		if (!correctHealthMetricsState) {
230 			TraceEvent(SevError, "IncorrectHealthMetricsState")
231 				.detail("WorstStorageQueue", worstStorageQueue)
232 				.detail("WorstStorageDurabilityLag", worstStorageDurabilityLag)
233 				.detail("WorstTLogQueue", worstTLogQueue)
234 				.detail("DetailedWorstStorageQueue", detailedWorstStorageQueue)
235 				.detail("DetailedWorstStorageDurabilityLag", detailedWorstStorageDurabilityLag)
236 				.detail("DetailedWorstTLogQueue", detailedWorstTLogQueue)
237 				.detail("DetailedWorstCpuUsage", detailedWorstCpuUsage)
238 				.detail("DetailedWorstDiskUsage", detailedWorstDiskUsage)
239 				.detail("SendingDetailedHealthMetrics", sendDetailedHealthMetrics);
240 		}
241 		return correctHealthMetricsState;
242 	}
243 
getMetricsThrottlingWorkload244 	virtual void getMetrics(vector<PerfMetric>& m) {
245 		m.push_back(PerfMetric("TransactionsCommitted", transactionsCommitted, false));
246 		m.push_back(PerfMetric("WorstStorageQueue", worstStorageQueue, true));
247 		m.push_back(PerfMetric("DetailedWorstStorageQueue", detailedWorstStorageQueue, true));
248 		m.push_back(PerfMetric("WorstStorageDurabilityLag", worstStorageDurabilityLag, true));
249 		m.push_back(PerfMetric("DetailedWorstStorageDurabilityLag", detailedWorstStorageDurabilityLag, true));
250 		m.push_back(PerfMetric("WorstTLogQueue", worstTLogQueue, true));
251 		m.push_back(PerfMetric("DetailedWorstTLogQueue", detailedWorstTLogQueue, true));
252 		m.push_back(PerfMetric("DetailedWorstCpuUsage", detailedWorstCpuUsage, true));
253 		m.push_back(PerfMetric("DetailedWorstDiskUsage", detailedWorstDiskUsage, true));
254 	}
255 };
256 
257 WorkloadFactory<ThrottlingWorkload> ThrottlingWorkloadFactory("Throttling");
258