1 /* 2 * AsyncFileCorrectness.actor.cpp 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #include "fdbserver/workloads/workloads.actor.h" 22 #include "flow/ActorCollection.h" 23 #include "flow/SystemMonitor.h" 24 #include "fdbserver/workloads/AsyncFile.actor.h" 25 #include "flow/actorcompiler.h" // This must be the last #include. 26 27 //An enumeration representing the type of operation to be performed in a correctness test operation 28 enum OperationType 29 { 30 READ, 31 WRITE, 32 SYNC, 33 REOPEN, 34 TRUNCATE 35 }; 36 37 //Stores information about an operation that is executed on the file 38 struct OperationInfo 39 { 40 Reference<AsyncFileBuffer> data; 41 42 uint64_t offset; 43 uint64_t length; 44 45 bool flushOperations; 46 OperationType operation; 47 int index; 48 }; 49 50 struct AsyncFileCorrectnessWorkload : public AsyncFileWorkload 51 { 52 //Maximum number of bytes operated on by a file operation 53 int maxOperationSize; 54 55 //The number of simultaneous outstanding operations on a file 56 int numSimultaneousOperations; 57 58 //The futures for asynchronous IO operations 59 vector<Future<OperationInfo> > operations; 60 61 //Our in memory representation of what the file should be 62 Reference<AsyncFileBuffer> memoryFile; 63 64 //A vector holding a lock for each byte in the file. 0xFFFFFFFF means that the byte is being written, any other number means that it is being read that many times 65 vector<uint32_t> fileLock; 66 67 //A mask designating whether each byte in the file has been explicitly written (bytes which weren't explicitly written have no guarantees about content) 68 vector<unsigned char> fileValidityMask; 69 70 //Whether or not the correctness test succeeds 71 bool success; 72 73 //The targetted size of the file (the actual file can be anywhere in size from 1 byte to 2 * targetFileSize) 74 int64_t targetFileSize; 75 76 double averageCpuUtilization; 77 PerfIntCounter numOperations; 78 AsyncFileCorrectnessWorkloadAsyncFileCorrectnessWorkload79 AsyncFileCorrectnessWorkload(WorkloadContext const& wcx) 80 : AsyncFileWorkload(wcx), success(true), numOperations("Num Operations"), memoryFile(NULL) 81 { 82 maxOperationSize = getOption(options, LiteralStringRef("maxOperationSize"), 4096); 83 numSimultaneousOperations = getOption(options, LiteralStringRef("numSimultaneousOperations"), 10); 84 targetFileSize = getOption(options, LiteralStringRef("targetFileSize"), (uint64_t)163840); 85 86 if(unbufferedIO) 87 maxOperationSize = std::max(_PAGE_SIZE, maxOperationSize); 88 89 if(maxOperationSize * numSimultaneousOperations > targetFileSize * 0.25) 90 { 91 targetFileSize *= (int)ceil((maxOperationSize * numSimultaneousOperations * 4.0) / targetFileSize); 92 printf("Target file size is insufficient to support %d simultaneous operations of size %d; changing to %lld\n", numSimultaneousOperations, maxOperationSize, targetFileSize); 93 } 94 } 95 ~AsyncFileCorrectnessWorkloadAsyncFileCorrectnessWorkload96 virtual ~AsyncFileCorrectnessWorkload(){ } 97 descriptionAsyncFileCorrectnessWorkload98 virtual std::string description() 99 { 100 return "AsyncFileCorrectness"; 101 } 102 setupAsyncFileCorrectnessWorkload103 Future<Void> setup(Database const& cx) 104 { 105 if(enabled) 106 return _setup(this); 107 108 return Void(); 109 } 110 _setupAsyncFileCorrectnessWorkload111 ACTOR Future<Void> _setup(AsyncFileCorrectnessWorkload *self) 112 { 113 //Create the memory version of the file, the file locks, and the valid mask 114 self->memoryFile = self->allocateBuffer(self->targetFileSize); 115 self->fileLock.resize(self->targetFileSize, 0); 116 self->fileValidityMask.resize(self->targetFileSize, 0); 117 self->fileSize = 0; 118 119 //Create or open the file being used for testing 120 wait(self->openFile(self, IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_CREATE, 0666, self->fileSize, true)); 121 122 return Void(); 123 } 124 125 //Updates the memory buffer, locks, and validity mask to a new file size updateMemoryBufferAsyncFileCorrectnessWorkload126 void updateMemoryBuffer(int64_t newFileSize) 127 { 128 int64_t oldBufferSize = std::max(fileSize, targetFileSize); 129 int64_t newBufferSize = std::max(newFileSize, targetFileSize); 130 131 if(oldBufferSize != newBufferSize) 132 { 133 Reference<AsyncFileBuffer> newFile = allocateBuffer(newBufferSize); 134 memcpy(newFile->buffer, memoryFile->buffer, std::min(newBufferSize, oldBufferSize)); 135 136 if(newBufferSize > oldBufferSize) 137 memset(&newFile->buffer[oldBufferSize], 0, newBufferSize - oldBufferSize); 138 139 memoryFile = newFile; 140 141 fileLock.resize(newBufferSize, 0); 142 fileValidityMask.resize(newBufferSize, 0xFF); 143 } 144 145 fileSize = newFileSize; 146 } 147 startAsyncFileCorrectnessWorkload148 Future<Void> start(Database const& cx) 149 { 150 if(enabled) 151 return _start(this); 152 153 return Void(); 154 } 155 _startAsyncFileCorrectnessWorkload156 ACTOR Future<Void> _start(AsyncFileCorrectnessWorkload *self) 157 { 158 state StatisticsState statState; 159 customSystemMonitor("AsyncFile Metrics", &statState); 160 161 wait(timeout(self->runCorrectnessTest(self), self->testDuration, Void())); 162 163 SystemStatistics stats = customSystemMonitor("AsyncFile Metrics", &statState); 164 self->averageCpuUtilization = stats.processCPUSeconds / stats.elapsed; 165 166 //Try to let the IO operations finish so we can clean up after them 167 wait(timeout(waitForAll(self->operations), 10, Void())); 168 169 return Void(); 170 } 171 runCorrectnessTestAsyncFileCorrectnessWorkload172 ACTOR Future<Void> runCorrectnessTest(AsyncFileCorrectnessWorkload *self) 173 { 174 state vector<OperationInfo> postponedOperations; 175 state int validOperations = 0; 176 177 loop 178 { 179 wait(delay(0)); 180 181 //Fill the operations buffer with random operations 182 while(self->operations.size() < self->numSimultaneousOperations && postponedOperations.size() == 0) 183 { 184 self->operations.push_back(self->processOperation(self, self->generateOperation(self->operations.size(), false))); 185 validOperations++; 186 } 187 188 //Get the first operation that finishes 189 OperationInfo info = wait(waitForFirst(self->operations)); 190 191 //If it is a read, check that it matches what our memory representation has 192 if(info.operation == READ) 193 { 194 int start = 0; 195 bool isValid = true; 196 int length = std::min(info.length, self->fileLock.size() - info.offset); 197 198 //Scan the entire read range for sections that we know (fileValidityMask > 0) and those that we don't 199 for(int i = 0; i < length; i++) 200 { 201 bool currentValid = self->fileValidityMask[i] > 0; 202 if(start == 0) 203 isValid = currentValid; 204 else if(isValid != currentValid || i == length - 1) 205 { 206 //If we know what data should be in a particular range, then compare the result with what we know 207 if(isValid && memcmp(&self->fileValidityMask[info.offset + start], &info.data->buffer[start], i - start)) 208 { 209 printf("Read returned incorrect results at %llu of length %llu\n", info.offset, info.length); 210 211 self->success = false; 212 return Void(); 213 } 214 //Otherwise, skip the comparison and just update what we know 215 else if(!isValid) 216 { 217 memcpy(&self->memoryFile->buffer[info.offset + start], &info.data->buffer[start], i - start); 218 memset(&self->fileValidityMask[info.offset + start], 0xFF, i - start); 219 } 220 221 start = i; 222 } 223 224 isValid = currentValid; 225 } 226 227 //Decrement the read count for each byte that was read 228 int lockEnd = std::min(info.offset + info.length, (uint64_t)self->fileLock.size()); 229 if(lockEnd > self->fileSize) 230 lockEnd = self->fileLock.size(); 231 232 for(int i = info.offset; i < lockEnd; i++) 233 self->fileLock[i]--; 234 } 235 236 //If it is a write, clear the write locks 237 else if(info.operation == WRITE) 238 memset(&self->fileLock[info.offset], 0, info.length * sizeof(uint32_t)); 239 240 //Only generate new operations if we don't have a postponed operation in queue 241 if(postponedOperations.size() == 0) 242 { 243 //Insert a new operation into the operations buffer 244 OperationInfo newOperation = self->generateOperation(info.index); 245 246 //If we need to flush existing operations, postpone this operation 247 if(newOperation.flushOperations) 248 postponedOperations.push_back(newOperation); 249 //Otherwise, add it to our operations queue 250 else 251 self->operations[info.index] = self->processOperation(self, newOperation); 252 } 253 254 //If there is a postponed operation, clear the queue so that we can run it 255 if(postponedOperations.size() > 0) 256 { 257 self->operations[info.index] = Never(); 258 validOperations--; 259 } 260 261 //If there are no operations being processed and postponed operations are waiting, run them now 262 while(validOperations == 0 && postponedOperations.size() > 0) 263 { 264 self->operations.clear(); 265 self->operations.push_back(self->processOperation(self, postponedOperations.front())); 266 OperationInfo info = wait(self->operations.front()); 267 postponedOperations.erase(postponedOperations.begin()); 268 self->operations.clear(); 269 } 270 } 271 } 272 273 //Generates a random operation generateOperationAsyncFileCorrectnessWorkload274 OperationInfo generateOperation(int index, bool allowFlushingOperations = true) 275 { 276 OperationInfo info; 277 278 do 279 { 280 info.flushOperations = false; 281 282 //Cumulative density function for the different operations 283 int cdfArray[] = { 0, 1000, 2000, 2100, 2101, 2102 }; 284 vector<int> cdf = vector<int>(cdfArray, cdfArray + 6); 285 286 //Choose a random operation type (READ, WRITE, SYNC, REOPEN, TRUNCATE). 287 int random = g_random->randomInt(0, cdf.back()); 288 for(int i = 0; i < cdf.size() - 1; i++) 289 { 290 if(cdf[i] <= random && random < cdf[i + 1]) 291 { 292 info.operation = (OperationType)i; 293 break; 294 } 295 } 296 297 if(info.operation == READ || info.operation == WRITE) 298 { 299 int64_t maxOffset; 300 301 //Reads should not exceed the extent of written data 302 if(info.operation == READ) 303 { 304 maxOffset = fileSize - 1; 305 if(maxOffset < 0) 306 info.operation = WRITE; 307 } 308 309 //Only allow reads once the file has gotten large enough (to prevent blocking on locks) 310 if(maxOffset < targetFileSize / 2) 311 info.operation = WRITE; 312 313 //Writes can be up to the target file size or the current file size (the current file size could be larger than the target as a result of a truncate) 314 if(info.operation == WRITE) 315 maxOffset = std::max(fileSize, targetFileSize) - 1; 316 317 //Choose a random offset and length, retrying if that section is already locked 318 do 319 { 320 //Generate random length and offset 321 if(unbufferedIO) 322 { 323 info.length = g_random->randomInt(1, maxOperationSize / _PAGE_SIZE + 1) * _PAGE_SIZE; 324 info.offset = (int64_t)(g_random->random01() * maxOffset / _PAGE_SIZE) * _PAGE_SIZE; 325 } 326 else 327 { 328 info.length = g_random->randomInt(1, maxOperationSize); 329 info.offset = (int64_t)(g_random->random01() * maxOffset); 330 } 331 332 } while(checkFileLocked(info.operation, info.offset, info.length)); 333 334 //If the operation is a read, increment the read count for each byte 335 if(info.operation == READ) 336 { 337 //If the read extends past the end of the file, then we have to lock all bytes beyond the end of the file 338 //This is so that we can accurately determine if the read count is correct 339 int lockEnd = std::min(info.offset + info.length, (uint64_t)fileLock.size()); 340 if(lockEnd > fileSize) 341 lockEnd = fileLock.size(); 342 343 for(int i = info.offset; i < lockEnd; i++) 344 fileLock[i]++; 345 } 346 347 //If the operation is a write, set the write lock for each byte 348 else if(info.operation == WRITE) 349 { 350 //Don't write past the end of the file 351 info.length = std::min(info.length, std::max(targetFileSize, fileSize) - info.offset); 352 memset(&fileLock[info.offset], 0xFF, info.length * sizeof(uint32_t)); 353 } 354 } 355 else if(info.operation == REOPEN) 356 info.flushOperations = true; 357 else if(info.operation == TRUNCATE) 358 { 359 info.flushOperations = true; 360 361 //Choose a random length to truncate to 362 if(unbufferedIO) 363 info.offset = (int64_t)(g_random->random01() * (2 * targetFileSize) / _PAGE_SIZE) * _PAGE_SIZE; 364 else 365 info.offset = (int64_t)(g_random->random01() * (2 * targetFileSize)); 366 } 367 368 } while(!allowFlushingOperations && info.flushOperations); 369 370 info.index = index; 371 return info; 372 } 373 374 //Checks if a file is already locked for a given set of bytes. The file is locked if it is being written (fileLock[i] = 0xFFFFFFFF) 375 //or if we are trying to perform a write and the read count is nonzero (fileLock[i] != 0) checkFileLockedAsyncFileCorrectnessWorkload376 bool checkFileLocked(int operation, int offset, int length) 377 { 378 for(int i = offset; i < offset + length && i < fileLock.size(); i++) 379 if(fileLock[i] == 0xFFFFFFFF || (fileLock[i] != 0 && operation == WRITE)) 380 return true; 381 382 return false; 383 } 384 385 //Populates a buffer with a random sequence of bytes generateRandomDataAsyncFileCorrectnessWorkload386 void generateRandomData(unsigned char *buffer, int length) 387 { 388 for(int i = 0; i < length; i+= sizeof(uint32_t)) 389 { 390 uint32_t val = g_random->randomUInt32(); 391 memcpy(&buffer[i], &val, std::min(length - i, (int)sizeof(uint32_t))); 392 } 393 } 394 395 //Performs an operation on a file and the memory representation of that file processOperationAsyncFileCorrectnessWorkload396 ACTOR Future<OperationInfo> processOperation(AsyncFileCorrectnessWorkload *self, OperationInfo info) 397 { 398 if(info.operation == READ) 399 { 400 info.data = self->allocateBuffer(info.length); 401 402 //Perform the read. Don't allow it to be cancelled (because the underlying IO may not be cancellable) and don't allow 403 //objects that the read uses to be deleted 404 int numRead = wait 405 ( 406 uncancellable 407 ( 408 holdWhile 409 ( 410 self->fileHandle, 411 holdWhile(info, self->fileHandle->file->read(info.data->buffer, info.length, info.offset)) 412 ) 413 ) 414 ); 415 416 if(numRead != std::min(info.length, self->fileSize - info.offset)) 417 { 418 printf("Read reported incorrect number of bytes at %llu of length %llu\n", info.offset, info.length); 419 self->success = false; 420 } 421 } 422 else if(info.operation == WRITE) 423 { 424 info.data = self->allocateBuffer(info.length); 425 self->generateRandomData(info.data->buffer, info.length); 426 memcpy(&self->memoryFile->buffer[info.offset], info.data->buffer, info.length); 427 memset(&self->fileValidityMask[info.offset], 0xFF, info.length); 428 429 //Perform the write. Don't allow it to be cancelled (because the underlying IO may not be cancellable) and don't allow 430 //objects that the write uses to be deleted 431 wait 432 ( 433 uncancellable 434 ( 435 holdWhile 436 ( 437 self->fileHandle, 438 holdWhile(info, self->fileHandle->file->write(info.data->buffer, info.length, info.offset)) 439 ) 440 ) 441 ); 442 443 //If we wrote past the end of the file, update the size of the file 444 self->fileSize = std::max((int64_t)(info.offset + info.length), self->fileSize); 445 } 446 else if(info.operation == SYNC) 447 { 448 info.data = Reference<AsyncFileBuffer>(NULL); 449 wait(self->fileHandle->file->sync()); 450 } 451 else if(info.operation == REOPEN) 452 { 453 // Will fail if the file does not exist 454 wait(self->openFile(self, IAsyncFile::OPEN_READWRITE, 0666, 0, false)); 455 int64_t fileSize = wait(self->fileHandle->file->size()); 456 int64_t fileSizeChange = fileSize - self->fileSize; 457 if(fileSizeChange >= _PAGE_SIZE) 458 { 459 printf("Reopened file increased in size by %lld bytes (at most %d allowed)\n", fileSizeChange, _PAGE_SIZE - 1); 460 self->success = false; 461 } 462 else if(fileSizeChange < 0) 463 { 464 printf("Reopened file decreased in size by %lld bytes\n", -fileSizeChange); 465 self->success = false; 466 } 467 468 self->updateMemoryBuffer(fileSize); 469 } 470 else if(info.operation == TRUNCATE) 471 { 472 //Perform the truncate. Don't allow it to be cancelled (because the underlying IO may not be cancellable) and don't allow 473 //file handle to be deleted 474 wait 475 ( 476 uncancellable 477 ( 478 holdWhile(self->fileHandle, self->fileHandle->file->truncate(info.offset)) 479 ) 480 ); 481 482 int64_t fileSize = wait(self->fileHandle->file->size()); 483 if(fileSize != info.offset) 484 { 485 printf("Incorrect file size reported after truncate\n"); 486 self->success = false; 487 } 488 489 self->updateMemoryBuffer(fileSize); 490 } 491 492 ++self->numOperations; 493 return info; 494 } 495 checkAsyncFileCorrectnessWorkload496 virtual Future<bool> check(Database const& cx) 497 { 498 return success; 499 } 500 getMetricsAsyncFileCorrectnessWorkload501 virtual void getMetrics(vector<PerfMetric>& m) 502 { 503 if(enabled) 504 { 505 m.push_back(PerfMetric("Number of Operations Performed", numOperations.getValue(), false)); 506 m.push_back(PerfMetric("Average CPU Utilization (Percentage)", averageCpuUtilization * 100, false)); 507 } 508 } 509 }; 510 511 WorkloadFactory<AsyncFileCorrectnessWorkload> AsyncFileCorrectnessWorkloadFactory("AsyncFileCorrectness"); 512