1 /*
2  * AsyncFileCached.actor.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #pragma once
22 
23 // When actually compiled (NO_INTELLISENSE), include the generated version of this file.  In intellisense use the source version.
24 #if defined(NO_INTELLISENSE) && !defined(FLOW_ASYNCFILECACHED_ACTOR_G_H)
25 	#define FLOW_ASYNCFILECACHED_ACTOR_G_H
26 	#include "fdbrpc/AsyncFileCached.actor.g.h"
27 #elif !defined(FLOW_ASYNCFILECACHED_ACTOR_H)
28 	#define FLOW_ASYNCFILECACHED_ACTOR_H
29 
30 #include "flow/flow.h"
31 #include "fdbrpc/IAsyncFile.h"
32 #include "flow/Knobs.h"
33 #include "flow/TDMetric.actor.h"
34 #include "flow/network.h"
35 #include "flow/actorcompiler.h"  // This must be the last #include.
36 
37 struct EvictablePage {
38 	void* data;
39 	int index;
40 	class Reference<struct EvictablePageCache> pageCache;
41 
42 	virtual bool evict() = 0; // true if page was evicted, false if it isn't immediately evictable (but will be evicted regardless if possible)
43 
EvictablePageEvictablePage44 	EvictablePage(Reference<EvictablePageCache> pageCache) : data(0), index(-1), pageCache(pageCache) {}
45 	virtual ~EvictablePage();
46 };
47 
48 struct EvictablePageCache : ReferenceCounted<EvictablePageCache> {
EvictablePageCacheEvictablePageCache49 	EvictablePageCache() : pageSize(0), maxPages(0) {}
EvictablePageCacheEvictablePageCache50 	explicit EvictablePageCache(int pageSize, int64_t maxSize) : pageSize(pageSize), maxPages(maxSize / pageSize) {}
51 
allocateEvictablePageCache52 	void allocate(EvictablePage* page) {
53 		try_evict();
54 		try_evict();
55 		page->data = pageSize == 4096 ? FastAllocator<4096>::allocate() : aligned_alloc(4096,pageSize);
56 		page->index = pages.size();
57 		pages.push_back(page);
58 	}
59 
try_evictEvictablePageCache60 	void try_evict() {
61 		if (pages.size() >= (uint64_t)maxPages && !pages.empty()) {
62 			for (int i = 0; i < FLOW_KNOBS->MAX_EVICT_ATTEMPTS; i++) { // If we don't manage to evict anything, just go ahead and exceed the cache limit
63 				int toEvict = g_random->randomInt(0, pages.size());
64 				if (pages[toEvict]->evict())
65 					break;
66 			}
67 		}
68 	}
69 
70 	std::vector<EvictablePage*> pages;
71 	int pageSize;
72 	int64_t maxPages;
73 };
74 
75 struct OpenFileInfo : NonCopyable {
76 	IAsyncFile* f;
77 	Future<Reference<IAsyncFile>> opened; // Only valid until the file is fully opened
78 
OpenFileInfoOpenFileInfo79 	OpenFileInfo() : f(0) {}
OpenFileInfoOpenFileInfo80 	OpenFileInfo(OpenFileInfo && r) BOOST_NOEXCEPT : f(r.f), opened(std::move(r.opened)) { r.f = 0; }
81 
getOpenFileInfo82 	Future<Reference<IAsyncFile>> get() {
83 		if (f) return Reference<IAsyncFile>::addRef(f);
84 		else return opened;
85 	}
86 };
87 
88 struct AFCPage;
89 
90 class AsyncFileCached : public IAsyncFile, public ReferenceCounted<AsyncFileCached> {
91 	friend struct AFCPage;
92 
93 public:
open(std::string filename,int flags,int mode)94 	static Future<Reference<IAsyncFile>> open( std::string filename, int flags, int mode ) {
95 		//TraceEvent("AsyncFileCachedOpen").detail("Filename", filename);
96 		if ( openFiles.find(filename) == openFiles.end() ) {
97 			auto f = open_impl( filename, flags, mode );
98 			if ( f.isReady() && f.isError() )
99 				return f;
100 			if( !f.isReady() )
101 				openFiles[filename].opened = f;
102 			else
103 				return f.get();
104 		}
105 		return openFiles[filename].get();
106 	}
107 
read(void * data,int length,int64_t offset)108 	virtual Future<int> read( void* data, int length, int64_t offset ) {
109 		++countFileCacheReads;
110 		++countCacheReads;
111 		if (offset + length > this->length) {
112 			length = int(this->length - offset);
113 			ASSERT(length >= 0);
114 		}
115 		auto f = read_write_impl(this, data, length, offset, false);
116 		if( f.isReady() && !f.isError() ) return length;
117 		++countFileCacheReadsBlocked;
118 		++countCacheReadsBlocked;
119 		return tag(f,length);
120 	}
121 
write_impl(AsyncFileCached * self,void const * data,int length,int64_t offset)122 	ACTOR static Future<Void> write_impl( AsyncFileCached *self, void const* data, int length, int64_t offset ) {
123 		// If there is a truncate in progress before the the write position then we must
124 		// wait for it to complete.
125 		if(length + offset > self->currentTruncateSize)
126 			wait(self->currentTruncate);
127 		++self->countFileCacheWrites;
128 		++self->countCacheWrites;
129 		Future<Void> f = read_write_impl(self, const_cast<void*>(data), length, offset, true);
130 		if (!f.isReady()) {
131 			++self->countFileCacheWritesBlocked;
132 			++self->countCacheWritesBlocked;
133 		}
134 		wait(f);
135 		return Void();
136 	}
137 
write(void const * data,int length,int64_t offset)138 	virtual Future<Void> write( void const* data, int length, int64_t offset ) {
139 		return write_impl(this, data, length, offset);
140 	}
141 
142 	virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset );
143 	virtual void releaseZeroCopy( void* data, int length, int64_t offset );
144 
145 	// This waits for previously started truncates to finish and then truncates
truncate(int64_t size)146 	virtual Future<Void> truncate( int64_t size ) {
147 		return truncate_impl(this, size);
148 	}
149 
150 	// This is the 'real' truncate that does the actual removal of cache blocks and then shortens the file
151 	Future<Void> changeFileSize( int64_t size );
152 
153 	// This wrapper for the actual truncation operation enforces ordering of truncates.
154 	// It maintains currentTruncate and currentTruncateSize so writers can wait behind truncates that would affect them.
truncate_impl(AsyncFileCached * self,int64_t size)155 	ACTOR static Future<Void> truncate_impl(AsyncFileCached *self, int64_t size) {
156 		wait(self->currentTruncate);
157 		self->currentTruncateSize = size;
158 		self->currentTruncate = self->changeFileSize(size);
159 		wait(self->currentTruncate);
160 		return Void();
161 	}
162 
sync()163 	virtual Future<Void> sync() {
164 		return waitAndSync( this, flush() );
165 	}
166 
size()167 	virtual Future<int64_t> size() {
168 		return length;
169 	}
170 
debugFD()171 	virtual int64_t debugFD() {
172 		return uncached->debugFD();
173 	}
174 
getFilename()175 	virtual std::string getFilename() {
176 		return filename;
177 	}
178 
addref()179 	virtual void addref() {
180 		ReferenceCounted<AsyncFileCached>::addref();
181 		//TraceEvent("AsyncFileCachedAddRef").detail("Filename", filename).detail("Refcount", debugGetReferenceCount()).backtrace();
182 	}
delref()183 	virtual void delref() {
184 		if (delref_no_destroy()) {
185 			// If this is ever ThreadSafeReferenceCounted...
186 			// setrefCountUnsafe(0);
187 
188 			auto f = quiesce();
189 			//TraceEvent("AsyncFileCachedDel").detail("Filename", filename)
190 			//	.detail("Refcount", debugGetReferenceCount()).detail("CanDie", f.isReady()).backtrace();
191 			if (f.isReady())
192 				delete this;
193 			else
194 				uncancellable( holdWhile( Reference<AsyncFileCached>::addRef( this ), f ) );
195 		}
196 	}
197 
198 	~AsyncFileCached();
199 
200 private:
201 	static std::map< std::string, OpenFileInfo > openFiles;
202 	std::string filename;
203 	Reference<IAsyncFile> uncached;
204 	int64_t length;
205 	int64_t prevLength;
206 	std::unordered_map<int64_t, AFCPage*> pages;
207 	std::vector<AFCPage*> flushable;
208 	Reference<EvictablePageCache> pageCache;
209 	Future<Void> currentTruncate;
210 	int64_t currentTruncateSize;
211 
212 	// Map of pointers which hold page buffers for pages which have been overwritten
213 	// but at the time of write there were still readZeroCopy holders.
214 	std::unordered_map<void *, int> orphanedPages;
215 
216 	Int64MetricHandle countFileCacheFinds;
217 	Int64MetricHandle countFileCacheReads;
218 	Int64MetricHandle countFileCacheWrites;
219 	Int64MetricHandle countFileCacheReadsBlocked;
220 	Int64MetricHandle countFileCacheWritesBlocked;
221 	Int64MetricHandle countFileCachePageReadsMerged;
222 	Int64MetricHandle countFileCacheReadBytes;
223 
224 	Int64MetricHandle countCacheFinds;
225 	Int64MetricHandle countCacheReads;
226 	Int64MetricHandle countCacheWrites;
227 	Int64MetricHandle countCacheReadsBlocked;
228 	Int64MetricHandle countCacheWritesBlocked;
229 	Int64MetricHandle countCachePageReadsMerged;
230 	Int64MetricHandle countCacheReadBytes;
231 
AsyncFileCached(Reference<IAsyncFile> uncached,const std::string & filename,int64_t length,Reference<EvictablePageCache> pageCache)232 	AsyncFileCached( Reference<IAsyncFile> uncached, const std::string& filename, int64_t length, Reference<EvictablePageCache> pageCache )
233 		: uncached(uncached), filename(filename), length(length), prevLength(length), pageCache(pageCache), currentTruncate(Void()), currentTruncateSize(0) {
234 		if( !g_network->isSimulated() ) {
235 			countFileCacheWrites.init(         LiteralStringRef("AsyncFile.CountFileCacheWrites"), filename);
236 			countFileCacheReads.init(          LiteralStringRef("AsyncFile.CountFileCacheReads"), filename);
237 			countFileCacheWritesBlocked.init(  LiteralStringRef("AsyncFile.CountFileCacheWritesBlocked"), filename);
238 			countFileCacheReadsBlocked.init(   LiteralStringRef("AsyncFile.CountFileCacheReadsBlocked"), filename);
239 			countFileCachePageReadsMerged.init(LiteralStringRef("AsyncFile.CountFileCachePageReadsMerged"), filename);
240 			countFileCacheFinds.init(          LiteralStringRef("AsyncFile.CountFileCacheFinds"), filename);
241 			countFileCacheReadBytes.init(      LiteralStringRef("AsyncFile.CountFileCacheReadBytes"), filename);
242 
243 			countCacheWrites.init(         LiteralStringRef("AsyncFile.CountCacheWrites"));
244 			countCacheReads.init(          LiteralStringRef("AsyncFile.CountCacheReads"));
245 			countCacheWritesBlocked.init(  LiteralStringRef("AsyncFile.CountCacheWritesBlocked"));
246 			countCacheReadsBlocked.init(   LiteralStringRef("AsyncFile.CountCacheReadsBlocked"));
247 			countCachePageReadsMerged.init(LiteralStringRef("AsyncFile.CountCachePageReadsMerged"));
248 			countCacheFinds.init(          LiteralStringRef("AsyncFile.CountCacheFinds"));
249 			countCacheReadBytes.init(      LiteralStringRef("AsyncFile.CountCacheReadBytes"));
250 
251 		}
252 	}
253 
254 	static Future<Reference<IAsyncFile>> open_impl( std::string filename, int flags, int mode );
255 
open_impl(std::string filename,int flags,int mode,Reference<EvictablePageCache> pageCache)256 	ACTOR static Future<Reference<IAsyncFile>> open_impl( std::string filename, int flags, int mode, Reference<EvictablePageCache> pageCache ) {
257 		try {
258 			TraceEvent("AFCUnderlyingOpenBegin").detail("Filename", filename);
259 			if(flags & IAsyncFile::OPEN_CACHED_READ_ONLY)
260 				flags = (flags & ~IAsyncFile::OPEN_READWRITE) | IAsyncFile::OPEN_READONLY;
261 			else
262 				flags = (flags & ~IAsyncFile::OPEN_READONLY) | IAsyncFile::OPEN_READWRITE;
263 			state Reference<IAsyncFile> f = wait( IAsyncFileSystem::filesystem()->open(filename, flags | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED, mode) );
264 			TraceEvent("AFCUnderlyingOpenEnd").detail("Filename", filename);
265 			int64_t l = wait( f->size() );
266 			TraceEvent("AFCUnderlyingSize").detail("Filename", filename).detail("Size", l);
267 			auto& of = openFiles[filename];
268 			of.f = new AsyncFileCached(f, filename, l, pageCache);
269 			of.opened = Future<Reference<IAsyncFile>>();
270 			return Reference<IAsyncFile>( of.f );
271 		} catch (Error& e) {
272 			if( e.code() != error_code_actor_cancelled )
273 				openFiles.erase( filename );
274 			throw e;
275 		}
276 	}
277 
278 	virtual Future<Void> flush();
279 
280 	Future<Void> quiesce();
281 
waitAndSync(AsyncFileCached * self,Future<Void> flush)282 	ACTOR static Future<Void> waitAndSync( AsyncFileCached* self, Future<Void> flush ) {
283 		wait( flush );
284 		wait( self->uncached->sync() );
285 		return Void();
286 	}
287 
288 	static Future<Void> read_write_impl( AsyncFileCached* self, void* data, int length, int64_t offset, bool writing );
289 
290 	void remove_page( AFCPage* page );
291 };
292 
293 struct AFCPage : public EvictablePage, public FastAllocated<AFCPage> {
evictAFCPage294 	virtual bool evict() {
295 		if ( notReading.isReady() && notFlushing.isReady() && !dirty && !zeroCopyRefCount && !truncated ) {
296 			owner->remove_page( this );
297 			delete this;
298 			return true;
299 		}
300 
301 		if (dirty)
302 			flush();
303 
304 		return false;
305 	}
306 
307 	// Move this page's data into the orphanedPages set of the owner
orphanAFCPage308 	void orphan() {
309 		owner->orphanedPages[data] = zeroCopyRefCount;
310 		zeroCopyRefCount = 0;
311 		notReading = Void();
312 		data = pageCache->pageSize == 4096 ? FastAllocator<4096>::allocate() : aligned_alloc(4096, pageCache->pageSize);
313 	}
314 
writeAFCPage315 	Future<Void> write( void const* data, int length, int offset ) {
316 		// If zero-copy reads are in progress, allow whole page writes to a new page buffer so the effects
317 		// are not seen by the prior readers who still hold zeroCopyRead pointers
318 		bool fullPage = offset == 0 && length == pageCache->pageSize;
319 		ASSERT(zeroCopyRefCount == 0 || fullPage);
320 
321 		if(zeroCopyRefCount != 0) {
322 			ASSERT(fullPage);
323 			orphan();
324 		}
325 
326 		setDirty();
327 
328 		// If there are no active readers then if data is valid or we're replacing all of it we can write directly
329 		if (valid || fullPage) {
330 			valid = true;
331 			memcpy( static_cast<uint8_t*>(this->data) + offset, data, length );
332 			return yield();
333 		}
334 
335 		// If data is not valid but no read is in progress, start reading
336 		if (notReading.isReady()) {
337 			notReading = readThrough( this );
338 		}
339 
340 		notReading = waitAndWrite( this, data, length, offset );
341 
342 		return notReading;
343 	}
344 
waitAndWriteAFCPage345 	ACTOR static Future<Void> waitAndWrite( AFCPage* self, void const* data, int length, int offset ) {
346 		wait( self->notReading );
347 		memcpy( static_cast<uint8_t*>(self->data) + offset, data, length );
348 		return Void();
349 	}
350 
readZeroCopyAFCPage351 	Future<Void> readZeroCopy() {
352 		++zeroCopyRefCount;
353 		if (valid) return yield();
354 
355 		if (notReading.isReady()) {
356 			notReading = readThrough( this );
357 		} else {
358 			++owner->countFileCachePageReadsMerged;
359 			++owner->countCachePageReadsMerged;
360 		}
361 
362 		return notReading;
363 	}
releaseZeroCopyAFCPage364 	void releaseZeroCopy() {
365 		--zeroCopyRefCount;
366 		ASSERT( zeroCopyRefCount >= 0 );
367 	}
368 
readAFCPage369 	Future<Void> read( void* data, int length, int offset ) {
370 		if (valid) {
371 			owner->countFileCacheReadBytes += length;
372 			owner->countCacheReadBytes += length;
373 			memcpy( data, static_cast<uint8_t const*>(this->data) + offset, length );
374 			return yield();
375 		}
376 
377 		if (notReading.isReady()) {
378 			notReading = readThrough( this );
379 		} else {
380 			++owner->countFileCachePageReadsMerged;
381 			++owner->countCachePageReadsMerged;
382 		}
383 
384 		notReading = waitAndRead( this, data, length, offset );
385 
386 		return notReading;
387 	}
388 
waitAndReadAFCPage389 	ACTOR static Future<Void> waitAndRead( AFCPage* self, void* data, int length, int offset ) {
390 		wait( self->notReading );
391 		memcpy( data, static_cast<uint8_t const*>(self->data) + offset, length );
392 		return Void();
393 	}
394 
readThroughAFCPage395 	ACTOR static Future<Void> readThrough( AFCPage* self ) {
396 		ASSERT(!self->valid);
397 		state void *dst = self->data;
398 		if ( self->pageOffset < self->owner->prevLength ) {
399 			try {
400 				int _ = wait( self->owner->uncached->read( dst, self->pageCache->pageSize, self->pageOffset ) );
401 				if (_ != self->pageCache->pageSize)
402 					TraceEvent("ReadThroughShortRead").detail("ReadAmount", _).detail("PageSize", self->pageCache->pageSize).detail("PageOffset", self->pageOffset);
403 			} catch (Error& e) {
404 				self->zeroCopyRefCount = 0;
405 				TraceEvent("ReadThroughFailed").error(e);
406 				throw;
407 			}
408 		}
409 		// If the memory we read into wasn't orphaned while we were waiting on the read then set valid to true
410 		if(dst == self->data)
411 			self->valid = true;
412 		return Void();
413 	}
414 
writeThroughAFCPage415 	ACTOR static Future<Void> writeThrough( AFCPage* self, Promise<Void> writing ) {
416 		// writeThrough can be called on a page that is not dirty, just to wait for a previous writeThrough to finish.  In that
417 		// case we don't want to do any disk I/O
418 		try {
419 			state bool dirty = self->dirty;
420 			++self->writeThroughCount;
421 			self->updateFlushableIndex();
422 
423 			wait( self->notReading && self->notFlushing );
424 
425 			if (dirty) {
426 				if ( self->pageOffset + self->pageCache->pageSize > self->owner->length ) {
427 					ASSERT(self->pageOffset < self->owner->length);
428 					memset( static_cast<uint8_t *>(self->data) + self->owner->length - self->pageOffset, 0, self->pageCache->pageSize - (self->owner->length - self->pageOffset) );
429 				}
430 
431 				auto f = self->owner->uncached->write( self->data, self->pageCache->pageSize, self->pageOffset );
432 
433 				wait( f );
434 			}
435 		}
436 		catch(Error& e) {
437 			--self->writeThroughCount;
438 			self->setDirty();
439 			writing.sendError(e);
440 			throw;
441 		}
442 		--self->writeThroughCount;
443 		self->updateFlushableIndex();
444 
445 		writing.send(Void()); // FIXME: This could happen before the wait if AsyncFileKAIO dealt properly with overlapping write and sync operations
446 
447 		self->pageCache->try_evict();
448 
449 		return Void();
450 	}
451 
flushAFCPage452 	Future<Void> flush() {
453 		if (!dirty && notFlushing.isReady()) return Void();
454 
455 		ASSERT(valid || !notReading.isReady() || notReading.isError());
456 
457 		Promise<Void> writing;
458 
459 		notFlushing = writeThrough( this, writing );
460 
461 		clearDirty(); // Do this last so that if writeThrough immediately calls try_evict, we can't be evicted before assigning notFlushing
462 		return writing.getFuture();
463 	}
464 
quiesceAFCPage465 	Future<Void> quiesce() {
466 		if (dirty) flush();
467 
468 		// If we are flushing, we will be quiescent when all flushes are finished
469 		// Returning flush() isn't right, because flush can return before notFlushing.isReady()
470 		if (!notFlushing.isReady()) {
471 			return notFlushing;
472 		}
473 
474 		// else if we are reading, we will be quiescent when the read is finished
475 		if ( !notReading.isReady() )
476 			return notReading;
477 
478 		return Void();
479 	}
480 
truncateAFCPage481 	Future<Void> truncate() {
482 		// Allow truncatation during zero copy reads but orphan the previous buffer
483 		if( zeroCopyRefCount != 0)
484 			orphan();
485 		truncated = true;
486 		return truncate_impl( this );
487 	}
488 
truncate_implAFCPage489 	ACTOR static Future<Void> truncate_impl( AFCPage* self ) {
490 		wait( self->notReading && self->notFlushing && yield() );
491 		delete self;
492 		return Void();
493 	}
494 
AFCPageAFCPage495 	AFCPage( AsyncFileCached* owner, int64_t offset ) : EvictablePage(owner->pageCache), owner(owner), pageOffset(offset), dirty(false), valid(false), truncated(false), notReading(Void()), notFlushing(Void()), zeroCopyRefCount(0), flushableIndex(-1), writeThroughCount(0) {
496 		pageCache->allocate(this);
497 	}
498 
~AFCPageAFCPage499 	virtual ~AFCPage() {
500 		clearDirty();
501 		ASSERT_ABORT( flushableIndex == -1 );
502 	}
503 
setDirtyAFCPage504 	void setDirty() {
505 		dirty = true;
506 		updateFlushableIndex();
507 	}
508 
clearDirtyAFCPage509 	void clearDirty() {
510 		dirty = false;
511 		updateFlushableIndex();
512 	}
513 
updateFlushableIndexAFCPage514 	void updateFlushableIndex() {
515 		bool flushable = dirty || writeThroughCount;
516 		if (flushable == (flushableIndex != -1)) return;
517 
518 		if (flushable) {
519 			flushableIndex = owner->flushable.size();
520 			owner->flushable.push_back(this);
521 		} else {
522 			ASSERT( owner->flushable[flushableIndex] == this );
523 			owner->flushable[flushableIndex] = owner->flushable.back();
524 			owner->flushable[flushableIndex]->flushableIndex = flushableIndex;
525 			owner->flushable.pop_back();
526 			flushableIndex = -1;
527 		}
528 	}
529 
530 	AsyncFileCached* owner;
531 	int64_t pageOffset;
532 
533 	Future<Void> notReading; // .isReady when a readThrough (or waitAndWrite) is not in progress
534 	Future<Void> notFlushing; // .isReady when a writeThrough is not in progress
535 
536 	bool dirty; // write has been called more recently than flush
537 	bool valid; // data contains the file contents
538 	bool truncated; // true if this page has been truncated
539 	int writeThroughCount;  // number of writeThrough actors that are in progress (potentially writing or waiting to write)
540 	int flushableIndex;  // index in owner->flushable[]
541 	int zeroCopyRefCount;  // references held by "zero-copy" reads
542 };
543 
544 #include "flow/unactorcompiler.h"
545 #endif
546