1 /*
2  * AsyncFileCorrectness.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbserver/workloads/workloads.actor.h"
22 #include "flow/ActorCollection.h"
23 #include "flow/SystemMonitor.h"
24 #include "fdbserver/workloads/AsyncFile.actor.h"
25 #include "flow/actorcompiler.h"  // This must be the last #include.
26 
27 //An enumeration representing the type of operation to be performed in a correctness test operation
28 enum OperationType
29 {
30 	READ,
31 	WRITE,
32 	SYNC,
33 	REOPEN,
34 	TRUNCATE
35 };
36 
37 //Stores information about an operation that is executed on the file
38 struct OperationInfo
39 {
40 	Reference<AsyncFileBuffer> data;
41 
42 	uint64_t offset;
43 	uint64_t length;
44 
45 	bool flushOperations;
46 	OperationType operation;
47 	int index;
48 };
49 
50 struct AsyncFileCorrectnessWorkload : public AsyncFileWorkload
51 {
52 	//Maximum number of bytes operated on by a file operation
53 	int maxOperationSize;
54 
55 	//The number of simultaneous outstanding operations on a file
56 	int numSimultaneousOperations;
57 
58 	//The futures for asynchronous IO operations
59 	vector<Future<OperationInfo> > operations;
60 
61 	//Our in memory representation of what the file should be
62 	Reference<AsyncFileBuffer> memoryFile;
63 
64 	//A vector holding a lock for each byte in the file. 0xFFFFFFFF means that the byte is being written, any other number means that it is being read that many times
65 	vector<uint32_t> fileLock;
66 
67 	//A mask designating whether each byte in the file has been explicitly written (bytes which weren't explicitly written have no guarantees about content)
68 	vector<unsigned char> fileValidityMask;
69 
70 	//Whether or not the correctness test succeeds
71 	bool success;
72 
73 	//The targetted size of the file (the actual file can be anywhere in size from 1 byte to 2 * targetFileSize)
74 	int64_t targetFileSize;
75 
76 	double averageCpuUtilization;
77 	PerfIntCounter numOperations;
78 
AsyncFileCorrectnessWorkloadAsyncFileCorrectnessWorkload79 	AsyncFileCorrectnessWorkload(WorkloadContext const& wcx)
80 		: AsyncFileWorkload(wcx), success(true), numOperations("Num Operations"), memoryFile(NULL)
81 	{
82 		maxOperationSize = getOption(options, LiteralStringRef("maxOperationSize"), 4096);
83 		numSimultaneousOperations = getOption(options, LiteralStringRef("numSimultaneousOperations"), 10);
84 		targetFileSize = getOption(options, LiteralStringRef("targetFileSize"), (uint64_t)163840);
85 
86 		if(unbufferedIO)
87 			maxOperationSize = std::max(_PAGE_SIZE, maxOperationSize);
88 
89 		if(maxOperationSize * numSimultaneousOperations > targetFileSize * 0.25)
90 		{
91 			targetFileSize *= (int)ceil((maxOperationSize * numSimultaneousOperations * 4.0) / targetFileSize);
92 			printf("Target file size is insufficient to support %d simultaneous operations of size %d; changing to %lld\n", numSimultaneousOperations, maxOperationSize, targetFileSize);
93 		}
94 	}
95 
~AsyncFileCorrectnessWorkloadAsyncFileCorrectnessWorkload96 	virtual ~AsyncFileCorrectnessWorkload(){ }
97 
descriptionAsyncFileCorrectnessWorkload98 	virtual std::string description()
99 	{
100 		return "AsyncFileCorrectness";
101 	}
102 
setupAsyncFileCorrectnessWorkload103 	Future<Void> setup(Database const& cx)
104 	{
105 		if(enabled)
106 			return _setup(this);
107 
108 		return Void();
109 	}
110 
_setupAsyncFileCorrectnessWorkload111 	ACTOR Future<Void> _setup(AsyncFileCorrectnessWorkload *self)
112 	{
113 		//Create the memory version of the file, the file locks, and the valid mask
114 		self->memoryFile = self->allocateBuffer(self->targetFileSize);
115 		self->fileLock.resize(self->targetFileSize, 0);
116 		self->fileValidityMask.resize(self->targetFileSize, 0);
117 		self->fileSize = 0;
118 
119 		//Create or open the file being used for testing
120 		wait(self->openFile(self, IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_CREATE, 0666, self->fileSize, true));
121 
122 		return Void();
123 	}
124 
125 	//Updates the memory buffer, locks, and validity mask to a new file size
updateMemoryBufferAsyncFileCorrectnessWorkload126 	void updateMemoryBuffer(int64_t newFileSize)
127 	{
128 		int64_t oldBufferSize = std::max(fileSize, targetFileSize);
129 		int64_t newBufferSize = std::max(newFileSize, targetFileSize);
130 
131 		if(oldBufferSize != newBufferSize)
132 		{
133 			Reference<AsyncFileBuffer> newFile = allocateBuffer(newBufferSize);
134 			memcpy(newFile->buffer, memoryFile->buffer, std::min(newBufferSize, oldBufferSize));
135 
136 			if(newBufferSize > oldBufferSize)
137 				memset(&newFile->buffer[oldBufferSize], 0, newBufferSize - oldBufferSize);
138 
139 			memoryFile = newFile;
140 
141 			fileLock.resize(newBufferSize, 0);
142 			fileValidityMask.resize(newBufferSize, 0xFF);
143 		}
144 
145 		fileSize = newFileSize;
146 	}
147 
startAsyncFileCorrectnessWorkload148 	Future<Void> start(Database const& cx)
149 	{
150 		if(enabled)
151 			return _start(this);
152 
153 		return Void();
154 	}
155 
_startAsyncFileCorrectnessWorkload156 	ACTOR Future<Void> _start(AsyncFileCorrectnessWorkload *self)
157 	{
158 		state StatisticsState statState;
159 		customSystemMonitor("AsyncFile Metrics", &statState);
160 
161 		wait(timeout(self->runCorrectnessTest(self), self->testDuration, Void()));
162 
163 		SystemStatistics stats = customSystemMonitor("AsyncFile Metrics", &statState);
164 		self->averageCpuUtilization = stats.processCPUSeconds / stats.elapsed;
165 
166 		//Try to let the IO operations finish so we can clean up after them
167 		wait(timeout(waitForAll(self->operations), 10, Void()));
168 
169 		return Void();
170 	}
171 
runCorrectnessTestAsyncFileCorrectnessWorkload172 	ACTOR Future<Void> runCorrectnessTest(AsyncFileCorrectnessWorkload *self)
173 	{
174 		state vector<OperationInfo> postponedOperations;
175 		state int validOperations = 0;
176 
177 		loop
178 		{
179 			wait(delay(0));
180 
181 			//Fill the operations buffer with random operations
182 			while(self->operations.size() < self->numSimultaneousOperations && postponedOperations.size() == 0)
183 			{
184 				self->operations.push_back(self->processOperation(self, self->generateOperation(self->operations.size(), false)));
185 				validOperations++;
186 			}
187 
188 			//Get the first operation that finishes
189 			OperationInfo info = wait(waitForFirst(self->operations));
190 
191 			//If it is a read, check that it matches what our memory representation has
192 			if(info.operation == READ)
193 			{
194 				int start = 0;
195 				bool isValid = true;
196 				int length = std::min(info.length, self->fileLock.size() - info.offset);
197 
198 				//Scan the entire read range for sections that we know (fileValidityMask > 0) and those that we don't
199 				for(int i = 0; i < length; i++)
200 				{
201 					bool currentValid = self->fileValidityMask[i] > 0;
202 					if(start == 0)
203 						isValid = currentValid;
204 					else if(isValid != currentValid || i == length - 1)
205 					{
206 						//If we know what data should be in a particular range, then compare the result with what we know
207 						if(isValid && memcmp(&self->fileValidityMask[info.offset + start], &info.data->buffer[start], i - start))
208 						{
209 							printf("Read returned incorrect results at %llu of length %llu\n", info.offset, info.length);
210 
211 							self->success = false;
212 							return Void();
213 						}
214 						//Otherwise, skip the comparison and just update what we know
215 						else if(!isValid)
216 						{
217 							memcpy(&self->memoryFile->buffer[info.offset + start], &info.data->buffer[start], i - start);
218 							memset(&self->fileValidityMask[info.offset + start], 0xFF, i - start);
219 						}
220 
221 						start = i;
222 					}
223 
224 					isValid = currentValid;
225 				}
226 
227 				//Decrement the read count for each byte that was read
228 				int lockEnd = std::min(info.offset + info.length, (uint64_t)self->fileLock.size());
229 				if(lockEnd > self->fileSize)
230 					lockEnd = self->fileLock.size();
231 
232 				for(int i = info.offset; i < lockEnd; i++)
233 					self->fileLock[i]--;
234 			}
235 
236 			//If it is a write, clear the write locks
237 			else if(info.operation == WRITE)
238 				memset(&self->fileLock[info.offset], 0, info.length * sizeof(uint32_t));
239 
240 			//Only generate new operations if we don't have a postponed operation in queue
241 			if(postponedOperations.size() == 0)
242 			{
243 				//Insert a new operation into the operations buffer
244 				OperationInfo newOperation = self->generateOperation(info.index);
245 
246 				//If we need to flush existing operations, postpone this operation
247 				if(newOperation.flushOperations)
248 					postponedOperations.push_back(newOperation);
249 				//Otherwise, add it to our operations queue
250 				else
251 					self->operations[info.index] = self->processOperation(self, newOperation);
252 			}
253 
254 			//If there is a postponed operation, clear the queue so that we can run it
255 			if(postponedOperations.size() > 0)
256 			{
257 				self->operations[info.index] = Never();
258 				validOperations--;
259 			}
260 
261 			//If there are no operations being processed and postponed operations are waiting, run them now
262 			while(validOperations == 0 && postponedOperations.size() > 0)
263 			{
264 				self->operations.clear();
265 				self->operations.push_back(self->processOperation(self, postponedOperations.front()));
266 				OperationInfo info = wait(self->operations.front());
267 				postponedOperations.erase(postponedOperations.begin());
268 				self->operations.clear();
269 			}
270 		}
271 	}
272 
273 	//Generates a random operation
generateOperationAsyncFileCorrectnessWorkload274 	OperationInfo generateOperation(int index, bool allowFlushingOperations = true)
275 	{
276 		OperationInfo info;
277 
278 		do
279 		{
280 			info.flushOperations = false;
281 
282 			//Cumulative density function for the different operations
283 			int cdfArray[] = { 0, 1000, 2000, 2100, 2101, 2102 };
284 			vector<int> cdf = vector<int>(cdfArray, cdfArray + 6);
285 
286 			//Choose a random operation type (READ, WRITE, SYNC, REOPEN, TRUNCATE).
287 			int random = g_random->randomInt(0, cdf.back());
288 			for(int i = 0; i < cdf.size() - 1; i++)
289 			{
290 				if(cdf[i] <= random && random < cdf[i + 1])
291 				{
292 					info.operation = (OperationType)i;
293 					break;
294 				}
295 			}
296 
297 			if(info.operation == READ || info.operation == WRITE)
298 			{
299 				int64_t maxOffset;
300 
301 				//Reads should not exceed the extent of written data
302 				if(info.operation == READ)
303 				{
304 					maxOffset = fileSize - 1;
305 					if(maxOffset < 0)
306 						info.operation = WRITE;
307 				}
308 
309 				//Only allow reads once the file has gotten large enough (to prevent blocking on locks)
310 				if(maxOffset < targetFileSize / 2)
311 					info.operation = WRITE;
312 
313 				//Writes can be up to the target file size or the current file size (the current file size could be larger than the target as a result of a truncate)
314 				if(info.operation == WRITE)
315 					maxOffset = std::max(fileSize, targetFileSize) - 1;
316 
317 				//Choose a random offset and length, retrying if that section is already locked
318 				do
319 				{
320 					//Generate random length and offset
321 					if(unbufferedIO)
322 					{
323 						info.length = g_random->randomInt(1, maxOperationSize / _PAGE_SIZE + 1) * _PAGE_SIZE;
324 						info.offset = (int64_t)(g_random->random01() * maxOffset / _PAGE_SIZE) * _PAGE_SIZE;
325 					}
326 					else
327 					{
328 						info.length = g_random->randomInt(1, maxOperationSize);
329 						info.offset = (int64_t)(g_random->random01() * maxOffset);
330 					}
331 
332 				} while(checkFileLocked(info.operation, info.offset, info.length));
333 
334 				//If the operation is a read, increment the read count for each byte
335 				if(info.operation == READ)
336 				{
337 					//If the read extends past the end of the file, then we have to lock all bytes beyond the end of the file
338 					//This is so that we can accurately determine if the read count is correct
339 					int lockEnd = std::min(info.offset + info.length, (uint64_t)fileLock.size());
340 					if(lockEnd > fileSize)
341 						lockEnd = fileLock.size();
342 
343 					for(int i = info.offset; i < lockEnd; i++)
344 						fileLock[i]++;
345 				}
346 
347 				//If the operation is a write, set the write lock for each byte
348 				else if(info.operation == WRITE)
349 				{
350 					//Don't write past the end of the file
351 					info.length = std::min(info.length, std::max(targetFileSize, fileSize) - info.offset);
352 					memset(&fileLock[info.offset], 0xFF, info.length * sizeof(uint32_t));
353 				}
354 			}
355 			else if(info.operation == REOPEN)
356 				info.flushOperations = true;
357 			else if(info.operation == TRUNCATE)
358 			{
359 				info.flushOperations = true;
360 
361 				//Choose a random length to truncate to
362 				if(unbufferedIO)
363 					info.offset = (int64_t)(g_random->random01() * (2 * targetFileSize) / _PAGE_SIZE) * _PAGE_SIZE;
364 				else
365 					info.offset = (int64_t)(g_random->random01() * (2 * targetFileSize));
366 			}
367 
368 		} while(!allowFlushingOperations && info.flushOperations);
369 
370 		info.index = index;
371 		return info;
372 	}
373 
374 	//Checks if a file is already locked for a given set of bytes.  The file is locked if it is being written (fileLock[i] = 0xFFFFFFFF)
375 	//or if we are trying to perform a write and the read count is nonzero (fileLock[i] != 0)
checkFileLockedAsyncFileCorrectnessWorkload376 	bool checkFileLocked(int operation, int offset, int length)
377 	{
378 		for(int i = offset; i < offset + length && i < fileLock.size(); i++)
379 			if(fileLock[i] == 0xFFFFFFFF || (fileLock[i] != 0 && operation == WRITE))
380 				return true;
381 
382 		return false;
383 	}
384 
385 	//Populates a buffer with a random sequence of bytes
generateRandomDataAsyncFileCorrectnessWorkload386 	void generateRandomData(unsigned char *buffer, int length)
387 	{
388 		for(int i = 0; i < length; i+= sizeof(uint32_t))
389 		{
390 			uint32_t val = g_random->randomUInt32();
391 			memcpy(&buffer[i], &val, std::min(length - i, (int)sizeof(uint32_t)));
392 		}
393 	}
394 
395 	//Performs an operation on a file and the memory representation of that file
processOperationAsyncFileCorrectnessWorkload396 	ACTOR Future<OperationInfo> processOperation(AsyncFileCorrectnessWorkload *self, OperationInfo info)
397 	{
398 		if(info.operation == READ)
399 		{
400 			info.data = self->allocateBuffer(info.length);
401 
402 			//Perform the read.  Don't allow it to be cancelled (because the underlying IO may not be cancellable) and don't allow
403 			//objects that the read uses to be deleted
404 			int numRead = wait
405 			(
406 				uncancellable
407 				(
408 					holdWhile
409 					(
410 						self->fileHandle,
411 						holdWhile(info, self->fileHandle->file->read(info.data->buffer, info.length, info.offset))
412 					)
413 				)
414 			);
415 
416 			if(numRead != std::min(info.length, self->fileSize - info.offset))
417 			{
418 				printf("Read reported incorrect number of bytes at %llu of length %llu\n", info.offset, info.length);
419 				self->success = false;
420 			}
421 		}
422 		else if(info.operation == WRITE)
423 		{
424 			info.data = self->allocateBuffer(info.length);
425 			self->generateRandomData(info.data->buffer, info.length);
426 			memcpy(&self->memoryFile->buffer[info.offset], info.data->buffer, info.length);
427 			memset(&self->fileValidityMask[info.offset], 0xFF, info.length);
428 
429 			//Perform the write.  Don't allow it to be cancelled (because the underlying IO may not be cancellable) and don't allow
430 			//objects that the write uses to be deleted
431 			wait
432 			(
433 				uncancellable
434 				(
435 					holdWhile
436 					(
437 						self->fileHandle,
438 						holdWhile(info, self->fileHandle->file->write(info.data->buffer, info.length, info.offset))
439 					)
440 				)
441 			);
442 
443 			//If we wrote past the end of the file, update the size of the file
444 			self->fileSize = std::max((int64_t)(info.offset + info.length), self->fileSize);
445 		}
446 		else if(info.operation == SYNC)
447 		{
448 			info.data = Reference<AsyncFileBuffer>(NULL);
449 			wait(self->fileHandle->file->sync());
450 		}
451 		else if(info.operation == REOPEN)
452 		{
453 			// Will fail if the file does not exist
454 			wait(self->openFile(self, IAsyncFile::OPEN_READWRITE, 0666, 0, false));
455 			int64_t fileSize = wait(self->fileHandle->file->size());
456 			int64_t fileSizeChange = fileSize - self->fileSize;
457 			if(fileSizeChange >= _PAGE_SIZE)
458 			{
459 				printf("Reopened file increased in size by %lld bytes (at most %d allowed)\n", fileSizeChange, _PAGE_SIZE - 1);
460 				self->success = false;
461 			}
462 			else if(fileSizeChange < 0)
463 			{
464 				printf("Reopened file decreased in size by %lld bytes\n", -fileSizeChange);
465 				self->success = false;
466 			}
467 
468 			self->updateMemoryBuffer(fileSize);
469 		}
470 		else if(info.operation == TRUNCATE)
471 		{
472 			//Perform the truncate.  Don't allow it to be cancelled (because the underlying IO may not be cancellable) and don't allow
473 			//file handle to be deleted
474 			wait
475 			(
476 				uncancellable
477 				(
478 					holdWhile(self->fileHandle, self->fileHandle->file->truncate(info.offset))
479 				)
480 			);
481 
482 			int64_t fileSize = wait(self->fileHandle->file->size());
483 			if(fileSize != info.offset)
484 			{
485 				printf("Incorrect file size reported after truncate\n");
486 				self->success = false;
487 			}
488 
489 			self->updateMemoryBuffer(fileSize);
490 		}
491 
492 		++self->numOperations;
493 		return info;
494 	}
495 
checkAsyncFileCorrectnessWorkload496 	virtual Future<bool> check(Database const& cx)
497 	{
498 		return success;
499 	}
500 
getMetricsAsyncFileCorrectnessWorkload501 	virtual void getMetrics(vector<PerfMetric>& m)
502 	{
503 		if(enabled)
504 		{
505 			m.push_back(PerfMetric("Number of Operations Performed", numOperations.getValue(), false));
506 			m.push_back(PerfMetric("Average CPU Utilization (Percentage)", averageCpuUtilization * 100, false));
507 		}
508 	}
509  };
510 
511 WorkloadFactory<AsyncFileCorrectnessWorkload> AsyncFileCorrectnessWorkloadFactory("AsyncFileCorrectness");
512