1 /* 2 * AsyncFileCached.actor.h 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #pragma once 22 23 // When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version. 24 #if defined(NO_INTELLISENSE) && !defined(FLOW_ASYNCFILECACHED_ACTOR_G_H) 25 #define FLOW_ASYNCFILECACHED_ACTOR_G_H 26 #include "fdbrpc/AsyncFileCached.actor.g.h" 27 #elif !defined(FLOW_ASYNCFILECACHED_ACTOR_H) 28 #define FLOW_ASYNCFILECACHED_ACTOR_H 29 30 #include "flow/flow.h" 31 #include "fdbrpc/IAsyncFile.h" 32 #include "flow/Knobs.h" 33 #include "flow/TDMetric.actor.h" 34 #include "flow/network.h" 35 #include "flow/actorcompiler.h" // This must be the last #include. 36 37 struct EvictablePage { 38 void* data; 39 int index; 40 class Reference<struct EvictablePageCache> pageCache; 41 42 virtual bool evict() = 0; // true if page was evicted, false if it isn't immediately evictable (but will be evicted regardless if possible) 43 EvictablePageEvictablePage44 EvictablePage(Reference<EvictablePageCache> pageCache) : data(0), index(-1), pageCache(pageCache) {} 45 virtual ~EvictablePage(); 46 }; 47 48 struct EvictablePageCache : ReferenceCounted<EvictablePageCache> { EvictablePageCacheEvictablePageCache49 EvictablePageCache() : pageSize(0), maxPages(0) {} EvictablePageCacheEvictablePageCache50 explicit EvictablePageCache(int pageSize, int64_t maxSize) : pageSize(pageSize), maxPages(maxSize / pageSize) {} 51 allocateEvictablePageCache52 void allocate(EvictablePage* page) { 53 try_evict(); 54 try_evict(); 55 page->data = pageSize == 4096 ? FastAllocator<4096>::allocate() : aligned_alloc(4096,pageSize); 56 page->index = pages.size(); 57 pages.push_back(page); 58 } 59 try_evictEvictablePageCache60 void try_evict() { 61 if (pages.size() >= (uint64_t)maxPages && !pages.empty()) { 62 for (int i = 0; i < FLOW_KNOBS->MAX_EVICT_ATTEMPTS; i++) { // If we don't manage to evict anything, just go ahead and exceed the cache limit 63 int toEvict = g_random->randomInt(0, pages.size()); 64 if (pages[toEvict]->evict()) 65 break; 66 } 67 } 68 } 69 70 std::vector<EvictablePage*> pages; 71 int pageSize; 72 int64_t maxPages; 73 }; 74 75 struct OpenFileInfo : NonCopyable { 76 IAsyncFile* f; 77 Future<Reference<IAsyncFile>> opened; // Only valid until the file is fully opened 78 OpenFileInfoOpenFileInfo79 OpenFileInfo() : f(0) {} OpenFileInfoOpenFileInfo80 OpenFileInfo(OpenFileInfo && r) BOOST_NOEXCEPT : f(r.f), opened(std::move(r.opened)) { r.f = 0; } 81 getOpenFileInfo82 Future<Reference<IAsyncFile>> get() { 83 if (f) return Reference<IAsyncFile>::addRef(f); 84 else return opened; 85 } 86 }; 87 88 struct AFCPage; 89 90 class AsyncFileCached : public IAsyncFile, public ReferenceCounted<AsyncFileCached> { 91 friend struct AFCPage; 92 93 public: open(std::string filename,int flags,int mode)94 static Future<Reference<IAsyncFile>> open( std::string filename, int flags, int mode ) { 95 //TraceEvent("AsyncFileCachedOpen").detail("Filename", filename); 96 if ( openFiles.find(filename) == openFiles.end() ) { 97 auto f = open_impl( filename, flags, mode ); 98 if ( f.isReady() && f.isError() ) 99 return f; 100 if( !f.isReady() ) 101 openFiles[filename].opened = f; 102 else 103 return f.get(); 104 } 105 return openFiles[filename].get(); 106 } 107 read(void * data,int length,int64_t offset)108 virtual Future<int> read( void* data, int length, int64_t offset ) { 109 ++countFileCacheReads; 110 ++countCacheReads; 111 if (offset + length > this->length) { 112 length = int(this->length - offset); 113 ASSERT(length >= 0); 114 } 115 auto f = read_write_impl(this, data, length, offset, false); 116 if( f.isReady() && !f.isError() ) return length; 117 ++countFileCacheReadsBlocked; 118 ++countCacheReadsBlocked; 119 return tag(f,length); 120 } 121 write_impl(AsyncFileCached * self,void const * data,int length,int64_t offset)122 ACTOR static Future<Void> write_impl( AsyncFileCached *self, void const* data, int length, int64_t offset ) { 123 // If there is a truncate in progress before the the write position then we must 124 // wait for it to complete. 125 if(length + offset > self->currentTruncateSize) 126 wait(self->currentTruncate); 127 ++self->countFileCacheWrites; 128 ++self->countCacheWrites; 129 Future<Void> f = read_write_impl(self, const_cast<void*>(data), length, offset, true); 130 if (!f.isReady()) { 131 ++self->countFileCacheWritesBlocked; 132 ++self->countCacheWritesBlocked; 133 } 134 wait(f); 135 return Void(); 136 } 137 write(void const * data,int length,int64_t offset)138 virtual Future<Void> write( void const* data, int length, int64_t offset ) { 139 return write_impl(this, data, length, offset); 140 } 141 142 virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset ); 143 virtual void releaseZeroCopy( void* data, int length, int64_t offset ); 144 145 // This waits for previously started truncates to finish and then truncates truncate(int64_t size)146 virtual Future<Void> truncate( int64_t size ) { 147 return truncate_impl(this, size); 148 } 149 150 // This is the 'real' truncate that does the actual removal of cache blocks and then shortens the file 151 Future<Void> changeFileSize( int64_t size ); 152 153 // This wrapper for the actual truncation operation enforces ordering of truncates. 154 // It maintains currentTruncate and currentTruncateSize so writers can wait behind truncates that would affect them. truncate_impl(AsyncFileCached * self,int64_t size)155 ACTOR static Future<Void> truncate_impl(AsyncFileCached *self, int64_t size) { 156 wait(self->currentTruncate); 157 self->currentTruncateSize = size; 158 self->currentTruncate = self->changeFileSize(size); 159 wait(self->currentTruncate); 160 return Void(); 161 } 162 sync()163 virtual Future<Void> sync() { 164 return waitAndSync( this, flush() ); 165 } 166 size()167 virtual Future<int64_t> size() { 168 return length; 169 } 170 debugFD()171 virtual int64_t debugFD() { 172 return uncached->debugFD(); 173 } 174 getFilename()175 virtual std::string getFilename() { 176 return filename; 177 } 178 addref()179 virtual void addref() { 180 ReferenceCounted<AsyncFileCached>::addref(); 181 //TraceEvent("AsyncFileCachedAddRef").detail("Filename", filename).detail("Refcount", debugGetReferenceCount()).backtrace(); 182 } delref()183 virtual void delref() { 184 if (delref_no_destroy()) { 185 // If this is ever ThreadSafeReferenceCounted... 186 // setrefCountUnsafe(0); 187 188 auto f = quiesce(); 189 //TraceEvent("AsyncFileCachedDel").detail("Filename", filename) 190 // .detail("Refcount", debugGetReferenceCount()).detail("CanDie", f.isReady()).backtrace(); 191 if (f.isReady()) 192 delete this; 193 else 194 uncancellable( holdWhile( Reference<AsyncFileCached>::addRef( this ), f ) ); 195 } 196 } 197 198 ~AsyncFileCached(); 199 200 private: 201 static std::map< std::string, OpenFileInfo > openFiles; 202 std::string filename; 203 Reference<IAsyncFile> uncached; 204 int64_t length; 205 int64_t prevLength; 206 std::unordered_map<int64_t, AFCPage*> pages; 207 std::vector<AFCPage*> flushable; 208 Reference<EvictablePageCache> pageCache; 209 Future<Void> currentTruncate; 210 int64_t currentTruncateSize; 211 212 // Map of pointers which hold page buffers for pages which have been overwritten 213 // but at the time of write there were still readZeroCopy holders. 214 std::unordered_map<void *, int> orphanedPages; 215 216 Int64MetricHandle countFileCacheFinds; 217 Int64MetricHandle countFileCacheReads; 218 Int64MetricHandle countFileCacheWrites; 219 Int64MetricHandle countFileCacheReadsBlocked; 220 Int64MetricHandle countFileCacheWritesBlocked; 221 Int64MetricHandle countFileCachePageReadsMerged; 222 Int64MetricHandle countFileCacheReadBytes; 223 224 Int64MetricHandle countCacheFinds; 225 Int64MetricHandle countCacheReads; 226 Int64MetricHandle countCacheWrites; 227 Int64MetricHandle countCacheReadsBlocked; 228 Int64MetricHandle countCacheWritesBlocked; 229 Int64MetricHandle countCachePageReadsMerged; 230 Int64MetricHandle countCacheReadBytes; 231 AsyncFileCached(Reference<IAsyncFile> uncached,const std::string & filename,int64_t length,Reference<EvictablePageCache> pageCache)232 AsyncFileCached( Reference<IAsyncFile> uncached, const std::string& filename, int64_t length, Reference<EvictablePageCache> pageCache ) 233 : uncached(uncached), filename(filename), length(length), prevLength(length), pageCache(pageCache), currentTruncate(Void()), currentTruncateSize(0) { 234 if( !g_network->isSimulated() ) { 235 countFileCacheWrites.init( LiteralStringRef("AsyncFile.CountFileCacheWrites"), filename); 236 countFileCacheReads.init( LiteralStringRef("AsyncFile.CountFileCacheReads"), filename); 237 countFileCacheWritesBlocked.init( LiteralStringRef("AsyncFile.CountFileCacheWritesBlocked"), filename); 238 countFileCacheReadsBlocked.init( LiteralStringRef("AsyncFile.CountFileCacheReadsBlocked"), filename); 239 countFileCachePageReadsMerged.init(LiteralStringRef("AsyncFile.CountFileCachePageReadsMerged"), filename); 240 countFileCacheFinds.init( LiteralStringRef("AsyncFile.CountFileCacheFinds"), filename); 241 countFileCacheReadBytes.init( LiteralStringRef("AsyncFile.CountFileCacheReadBytes"), filename); 242 243 countCacheWrites.init( LiteralStringRef("AsyncFile.CountCacheWrites")); 244 countCacheReads.init( LiteralStringRef("AsyncFile.CountCacheReads")); 245 countCacheWritesBlocked.init( LiteralStringRef("AsyncFile.CountCacheWritesBlocked")); 246 countCacheReadsBlocked.init( LiteralStringRef("AsyncFile.CountCacheReadsBlocked")); 247 countCachePageReadsMerged.init(LiteralStringRef("AsyncFile.CountCachePageReadsMerged")); 248 countCacheFinds.init( LiteralStringRef("AsyncFile.CountCacheFinds")); 249 countCacheReadBytes.init( LiteralStringRef("AsyncFile.CountCacheReadBytes")); 250 251 } 252 } 253 254 static Future<Reference<IAsyncFile>> open_impl( std::string filename, int flags, int mode ); 255 open_impl(std::string filename,int flags,int mode,Reference<EvictablePageCache> pageCache)256 ACTOR static Future<Reference<IAsyncFile>> open_impl( std::string filename, int flags, int mode, Reference<EvictablePageCache> pageCache ) { 257 try { 258 TraceEvent("AFCUnderlyingOpenBegin").detail("Filename", filename); 259 if(flags & IAsyncFile::OPEN_CACHED_READ_ONLY) 260 flags = (flags & ~IAsyncFile::OPEN_READWRITE) | IAsyncFile::OPEN_READONLY; 261 else 262 flags = (flags & ~IAsyncFile::OPEN_READONLY) | IAsyncFile::OPEN_READWRITE; 263 state Reference<IAsyncFile> f = wait( IAsyncFileSystem::filesystem()->open(filename, flags | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED, mode) ); 264 TraceEvent("AFCUnderlyingOpenEnd").detail("Filename", filename); 265 int64_t l = wait( f->size() ); 266 TraceEvent("AFCUnderlyingSize").detail("Filename", filename).detail("Size", l); 267 auto& of = openFiles[filename]; 268 of.f = new AsyncFileCached(f, filename, l, pageCache); 269 of.opened = Future<Reference<IAsyncFile>>(); 270 return Reference<IAsyncFile>( of.f ); 271 } catch (Error& e) { 272 if( e.code() != error_code_actor_cancelled ) 273 openFiles.erase( filename ); 274 throw e; 275 } 276 } 277 278 virtual Future<Void> flush(); 279 280 Future<Void> quiesce(); 281 waitAndSync(AsyncFileCached * self,Future<Void> flush)282 ACTOR static Future<Void> waitAndSync( AsyncFileCached* self, Future<Void> flush ) { 283 wait( flush ); 284 wait( self->uncached->sync() ); 285 return Void(); 286 } 287 288 static Future<Void> read_write_impl( AsyncFileCached* self, void* data, int length, int64_t offset, bool writing ); 289 290 void remove_page( AFCPage* page ); 291 }; 292 293 struct AFCPage : public EvictablePage, public FastAllocated<AFCPage> { evictAFCPage294 virtual bool evict() { 295 if ( notReading.isReady() && notFlushing.isReady() && !dirty && !zeroCopyRefCount && !truncated ) { 296 owner->remove_page( this ); 297 delete this; 298 return true; 299 } 300 301 if (dirty) 302 flush(); 303 304 return false; 305 } 306 307 // Move this page's data into the orphanedPages set of the owner orphanAFCPage308 void orphan() { 309 owner->orphanedPages[data] = zeroCopyRefCount; 310 zeroCopyRefCount = 0; 311 notReading = Void(); 312 data = pageCache->pageSize == 4096 ? FastAllocator<4096>::allocate() : aligned_alloc(4096, pageCache->pageSize); 313 } 314 writeAFCPage315 Future<Void> write( void const* data, int length, int offset ) { 316 // If zero-copy reads are in progress, allow whole page writes to a new page buffer so the effects 317 // are not seen by the prior readers who still hold zeroCopyRead pointers 318 bool fullPage = offset == 0 && length == pageCache->pageSize; 319 ASSERT(zeroCopyRefCount == 0 || fullPage); 320 321 if(zeroCopyRefCount != 0) { 322 ASSERT(fullPage); 323 orphan(); 324 } 325 326 setDirty(); 327 328 // If there are no active readers then if data is valid or we're replacing all of it we can write directly 329 if (valid || fullPage) { 330 valid = true; 331 memcpy( static_cast<uint8_t*>(this->data) + offset, data, length ); 332 return yield(); 333 } 334 335 // If data is not valid but no read is in progress, start reading 336 if (notReading.isReady()) { 337 notReading = readThrough( this ); 338 } 339 340 notReading = waitAndWrite( this, data, length, offset ); 341 342 return notReading; 343 } 344 waitAndWriteAFCPage345 ACTOR static Future<Void> waitAndWrite( AFCPage* self, void const* data, int length, int offset ) { 346 wait( self->notReading ); 347 memcpy( static_cast<uint8_t*>(self->data) + offset, data, length ); 348 return Void(); 349 } 350 readZeroCopyAFCPage351 Future<Void> readZeroCopy() { 352 ++zeroCopyRefCount; 353 if (valid) return yield(); 354 355 if (notReading.isReady()) { 356 notReading = readThrough( this ); 357 } else { 358 ++owner->countFileCachePageReadsMerged; 359 ++owner->countCachePageReadsMerged; 360 } 361 362 return notReading; 363 } releaseZeroCopyAFCPage364 void releaseZeroCopy() { 365 --zeroCopyRefCount; 366 ASSERT( zeroCopyRefCount >= 0 ); 367 } 368 readAFCPage369 Future<Void> read( void* data, int length, int offset ) { 370 if (valid) { 371 owner->countFileCacheReadBytes += length; 372 owner->countCacheReadBytes += length; 373 memcpy( data, static_cast<uint8_t const*>(this->data) + offset, length ); 374 return yield(); 375 } 376 377 if (notReading.isReady()) { 378 notReading = readThrough( this ); 379 } else { 380 ++owner->countFileCachePageReadsMerged; 381 ++owner->countCachePageReadsMerged; 382 } 383 384 notReading = waitAndRead( this, data, length, offset ); 385 386 return notReading; 387 } 388 waitAndReadAFCPage389 ACTOR static Future<Void> waitAndRead( AFCPage* self, void* data, int length, int offset ) { 390 wait( self->notReading ); 391 memcpy( data, static_cast<uint8_t const*>(self->data) + offset, length ); 392 return Void(); 393 } 394 readThroughAFCPage395 ACTOR static Future<Void> readThrough( AFCPage* self ) { 396 ASSERT(!self->valid); 397 state void *dst = self->data; 398 if ( self->pageOffset < self->owner->prevLength ) { 399 try { 400 int _ = wait( self->owner->uncached->read( dst, self->pageCache->pageSize, self->pageOffset ) ); 401 if (_ != self->pageCache->pageSize) 402 TraceEvent("ReadThroughShortRead").detail("ReadAmount", _).detail("PageSize", self->pageCache->pageSize).detail("PageOffset", self->pageOffset); 403 } catch (Error& e) { 404 self->zeroCopyRefCount = 0; 405 TraceEvent("ReadThroughFailed").error(e); 406 throw; 407 } 408 } 409 // If the memory we read into wasn't orphaned while we were waiting on the read then set valid to true 410 if(dst == self->data) 411 self->valid = true; 412 return Void(); 413 } 414 writeThroughAFCPage415 ACTOR static Future<Void> writeThrough( AFCPage* self, Promise<Void> writing ) { 416 // writeThrough can be called on a page that is not dirty, just to wait for a previous writeThrough to finish. In that 417 // case we don't want to do any disk I/O 418 try { 419 state bool dirty = self->dirty; 420 ++self->writeThroughCount; 421 self->updateFlushableIndex(); 422 423 wait( self->notReading && self->notFlushing ); 424 425 if (dirty) { 426 if ( self->pageOffset + self->pageCache->pageSize > self->owner->length ) { 427 ASSERT(self->pageOffset < self->owner->length); 428 memset( static_cast<uint8_t *>(self->data) + self->owner->length - self->pageOffset, 0, self->pageCache->pageSize - (self->owner->length - self->pageOffset) ); 429 } 430 431 auto f = self->owner->uncached->write( self->data, self->pageCache->pageSize, self->pageOffset ); 432 433 wait( f ); 434 } 435 } 436 catch(Error& e) { 437 --self->writeThroughCount; 438 self->setDirty(); 439 writing.sendError(e); 440 throw; 441 } 442 --self->writeThroughCount; 443 self->updateFlushableIndex(); 444 445 writing.send(Void()); // FIXME: This could happen before the wait if AsyncFileKAIO dealt properly with overlapping write and sync operations 446 447 self->pageCache->try_evict(); 448 449 return Void(); 450 } 451 flushAFCPage452 Future<Void> flush() { 453 if (!dirty && notFlushing.isReady()) return Void(); 454 455 ASSERT(valid || !notReading.isReady() || notReading.isError()); 456 457 Promise<Void> writing; 458 459 notFlushing = writeThrough( this, writing ); 460 461 clearDirty(); // Do this last so that if writeThrough immediately calls try_evict, we can't be evicted before assigning notFlushing 462 return writing.getFuture(); 463 } 464 quiesceAFCPage465 Future<Void> quiesce() { 466 if (dirty) flush(); 467 468 // If we are flushing, we will be quiescent when all flushes are finished 469 // Returning flush() isn't right, because flush can return before notFlushing.isReady() 470 if (!notFlushing.isReady()) { 471 return notFlushing; 472 } 473 474 // else if we are reading, we will be quiescent when the read is finished 475 if ( !notReading.isReady() ) 476 return notReading; 477 478 return Void(); 479 } 480 truncateAFCPage481 Future<Void> truncate() { 482 // Allow truncatation during zero copy reads but orphan the previous buffer 483 if( zeroCopyRefCount != 0) 484 orphan(); 485 truncated = true; 486 return truncate_impl( this ); 487 } 488 truncate_implAFCPage489 ACTOR static Future<Void> truncate_impl( AFCPage* self ) { 490 wait( self->notReading && self->notFlushing && yield() ); 491 delete self; 492 return Void(); 493 } 494 AFCPageAFCPage495 AFCPage( AsyncFileCached* owner, int64_t offset ) : EvictablePage(owner->pageCache), owner(owner), pageOffset(offset), dirty(false), valid(false), truncated(false), notReading(Void()), notFlushing(Void()), zeroCopyRefCount(0), flushableIndex(-1), writeThroughCount(0) { 496 pageCache->allocate(this); 497 } 498 ~AFCPageAFCPage499 virtual ~AFCPage() { 500 clearDirty(); 501 ASSERT_ABORT( flushableIndex == -1 ); 502 } 503 setDirtyAFCPage504 void setDirty() { 505 dirty = true; 506 updateFlushableIndex(); 507 } 508 clearDirtyAFCPage509 void clearDirty() { 510 dirty = false; 511 updateFlushableIndex(); 512 } 513 updateFlushableIndexAFCPage514 void updateFlushableIndex() { 515 bool flushable = dirty || writeThroughCount; 516 if (flushable == (flushableIndex != -1)) return; 517 518 if (flushable) { 519 flushableIndex = owner->flushable.size(); 520 owner->flushable.push_back(this); 521 } else { 522 ASSERT( owner->flushable[flushableIndex] == this ); 523 owner->flushable[flushableIndex] = owner->flushable.back(); 524 owner->flushable[flushableIndex]->flushableIndex = flushableIndex; 525 owner->flushable.pop_back(); 526 flushableIndex = -1; 527 } 528 } 529 530 AsyncFileCached* owner; 531 int64_t pageOffset; 532 533 Future<Void> notReading; // .isReady when a readThrough (or waitAndWrite) is not in progress 534 Future<Void> notFlushing; // .isReady when a writeThrough is not in progress 535 536 bool dirty; // write has been called more recently than flush 537 bool valid; // data contains the file contents 538 bool truncated; // true if this page has been truncated 539 int writeThroughCount; // number of writeThrough actors that are in progress (potentially writing or waiting to write) 540 int flushableIndex; // index in owner->flushable[] 541 int zeroCopyRefCount; // references held by "zero-copy" reads 542 }; 543 544 #include "flow/unactorcompiler.h" 545 #endif 546