1 /*
2  * AsyncFileNonDurable.actor.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #pragma once
22 
23 // When actually compiled (NO_INTELLISENSE), include the generated version of this file.  In intellisense use the source version.
24 #if defined(NO_INTELLISENSE) && !defined(FLOW_ASYNCFILENONDURABLE_ACTOR_G_H)
25 	#define FLOW_ASYNCFILENONDURABLE_ACTOR_G_H
26 	#include "fdbrpc/AsyncFileNonDurable.actor.g.h"
27 #elif !defined(FLOW_ASYNCFILENONDURABLE_ACTOR_H)
28 	#define FLOW_ASYNCFILENONDURABLE_ACTOR_H
29 
30 #include "flow/flow.h"
31 #include "fdbrpc/IAsyncFile.h"
32 #include "flow/ActorCollection.h"
33 #include "fdbrpc/simulator.h"
34 #include "fdbrpc/TraceFileIO.h"
35 #include "fdbrpc/RangeMap.h"
36 #include "flow/actorcompiler.h"  // This must be the last #include.
37 
38 #undef max
39 #undef min
40 
41 Future<Void> sendOnProcess( ISimulator::ProcessInfo* const& process, Promise<Void> const& promise, int const& taskID );
42 Future<Void> sendErrorOnProcess( ISimulator::ProcessInfo* const& process, Promise<Void> const& promise, Error const& e, int const& taskID );
43 
44 ACTOR template <class T>
sendErrorOnShutdown(Future<T> in)45 Future<T> sendErrorOnShutdown( Future<T> in ) {
46 	choose {
47 		when( wait(success( g_simulator.getCurrentProcess()->shutdownSignal.getFuture() )) ) {
48 			throw io_error().asInjectedFault();
49 		}
50 		when( T rep = wait( in ) ) {
51 			return rep;
52 		}
53 	}
54 }
55 
56 class AsyncFileDetachable sealed : public IAsyncFile, public ReferenceCounted<AsyncFileDetachable>{
57 private:
58 	Reference<IAsyncFile> file;
59 	Future<Void> shutdown;
60 
61 public:
AsyncFileDetachable(Reference<IAsyncFile> file)62 	explicit AsyncFileDetachable( Reference<IAsyncFile> file ) : file(file) {
63 		shutdown = doShutdown(this);
64 	}
65 
doShutdown(AsyncFileDetachable * self)66 	ACTOR Future<Void> doShutdown( AsyncFileDetachable* self ) {
67 		wait(success( g_simulator.getCurrentProcess()->shutdownSignal.getFuture() ));
68 		self->file = Reference<IAsyncFile>();
69 		return Void();
70 	}
71 
open(Future<Reference<IAsyncFile>> wrappedFile)72 	ACTOR static Future<Reference<IAsyncFile>> open( Future<Reference<IAsyncFile>> wrappedFile ) {
73 		choose {
74 			when( wait(success( g_simulator.getCurrentProcess()->shutdownSignal.getFuture() )) ) {
75 				throw io_error().asInjectedFault();
76 			}
77 			when( Reference<IAsyncFile> f = wait( wrappedFile ) ) {
78 				return Reference<AsyncFileDetachable>( new AsyncFileDetachable(f) );
79 			}
80 		}
81 	}
82 
addref()83 	virtual void addref() {
84 		ReferenceCounted<AsyncFileDetachable>::addref();
85 	}
delref()86 	virtual void delref() {
87 		ReferenceCounted<AsyncFileDetachable>::delref();
88 	}
89 
read(void * data,int length,int64_t offset)90 	Future<int> read(void *data, int length, int64_t offset) {
91 		if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
92 			return io_error().asInjectedFault();
93 		return sendErrorOnShutdown( file->read( data, length, offset ) );
94 	}
95 
write(void const * data,int length,int64_t offset)96 	Future<Void> write(void const *data, int length, int64_t offset) {
97 		if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
98 			return io_error().asInjectedFault();
99 		return sendErrorOnShutdown( file->write( data, length, offset ) );
100 	}
101 
truncate(int64_t size)102 	Future<Void> truncate(int64_t size) {
103 		if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
104 			return io_error().asInjectedFault();
105 		return sendErrorOnShutdown( file->truncate( size ) );
106 	}
107 
sync()108 	Future<Void> sync() {
109 		if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
110 			return io_error().asInjectedFault();
111 		return sendErrorOnShutdown( file->sync() );
112 	}
113 
size()114 	Future<int64_t> size() {
115 		if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
116 			return io_error().asInjectedFault();
117 		return sendErrorOnShutdown( file->size() );
118 	}
119 
debugFD()120 	int64_t debugFD() {
121 		if( !file.getPtr() )
122 			throw io_error().asInjectedFault();
123 		return file->debugFD();
124 	}
getFilename()125 	std::string getFilename() {
126 		if( !file.getPtr() )
127 			throw io_error().asInjectedFault();
128 		return file->getFilename();
129 	}
130 };
131 
132 //An async file implementation which wraps another async file and will randomly destroy sectors that it is writing when killed
133 //This is used to simulate a power failure which prevents all written data from being persisted to disk
134 class AsyncFileNonDurable sealed : public IAsyncFile, public ReferenceCounted<AsyncFileNonDurable>{
135 public:
136 	UID id;
137 	std::string filename;
138 
139 	//An approximation of the size of the file; .size() should be used instead of this variable in most cases
140 	int64_t approximateSize;
141 
142 	//The address of the machine that opened the file
143 	NetworkAddress openedAddress;
144 
145 private:
146 	//The wrapped IAsyncFile
147 	Reference<IAsyncFile> file;
148 
149 	//The maximum amount of time a write is delayed before being passed along to the underlying file
150 	double maxWriteDelay;
151 
152 	//Modifications which haven't been pushed to file, mapped by the location in the file that is being modified
153 	RangeMap< uint64_t, Future<Void> > pendingModifications;
154 
155 	//Will be blocked whenever kill is running
156 	Promise<Void> killed;
157 	Promise<Void> killComplete;
158 
159 	//Used by sync (and kill) to force writes which have not yet been passed along.
160 	//If true is sent, then writes will be durable.  If false, then they may not be durable.
161 	Promise<bool> startSyncPromise;
162 
163 	//The performance parameters of the simulated disk
164 	Reference<DiskParameters> diskParameters;
165 
166 	//Set to true the first time sync is called on the file
167 	bool hasBeenSynced;
168 
169 	//Used to describe what corruption is allowed by the file as well as the type of corruption being used on a particular page
170 	enum KillMode { NO_CORRUPTION = 0, DROP_ONLY = 1, FULL_CORRUPTION = 2 };
171 
172 	//Limits what types of corruption are applied to writes from this file
173 	KillMode killMode;
174 
175 	ActorCollection reponses; //cannot call getResult on this actor collection, since the actors will be on different processes
176 
AsyncFileNonDurable(const std::string & filename,Reference<IAsyncFile> file,Reference<DiskParameters> diskParameters,NetworkAddress openedAddress)177 	AsyncFileNonDurable(const std::string& filename, Reference<IAsyncFile> file, Reference<DiskParameters> diskParameters, NetworkAddress openedAddress)
178 		: openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false) {
179 
180 		//This is only designed to work in simulation
181 		ASSERT(g_network->isSimulated());
182 		this->id = g_random->randomUniqueID();
183 
184 		//TraceEvent("AsyncFileNonDurable_Create", id).detail("Filename", filename);
185 		this->file = file;
186 		this->filename = filename;
187 		this->diskParameters = diskParameters;
188 		maxWriteDelay = 5.0;
189 		hasBeenSynced = false;
190 
191 		killMode = (KillMode)g_random->randomInt(1, 3);
192 		//TraceEvent("AsyncFileNonDurable_CreateEnd", id).detail("Filename", filename).backtrace();
193 	}
194 
195 public:
196 	static std::map<std::string, Future<Void>> filesBeingDeleted;
197 
198 	//Creates a new AsyncFileNonDurable which wraps the provided IAsyncFile
open(std::string filename,std::string actualFilename,Future<Reference<IAsyncFile>> wrappedFile,Reference<DiskParameters> diskParameters)199 	ACTOR static Future<Reference<IAsyncFile>> open(std::string filename, std::string actualFilename, Future<Reference<IAsyncFile>> wrappedFile, Reference<DiskParameters> diskParameters) {
200 		state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
201 		state int currentTaskID = g_network->getCurrentTask();
202 		state Future<Void> shutdown = success(currentProcess->shutdownSignal.getFuture());
203 
204 		//TraceEvent("AsyncFileNonDurableOpenBegin").detail("Filename", filename).detail("Addr", g_simulator.getCurrentProcess()->address);
205 		wait( g_simulator.onMachine( currentProcess ) );
206 		try {
207 			wait(success(wrappedFile) || shutdown);
208 
209 			if(shutdown.isReady())
210 				throw io_error().asInjectedFault();
211 
212 			state Reference<IAsyncFile> file = wrappedFile.get();
213 
214 			//If we are in the process of deleting a file, we can't let someone else modify it at the same time.  We therefore block the creation of new files until deletion is complete
215 			state std::map<std::string, Future<Void>>::iterator deletedFile = filesBeingDeleted.find(filename);
216 			if(deletedFile != filesBeingDeleted.end()) {
217 				//TraceEvent("AsyncFileNonDurableOpenWaitOnDelete1").detail("Filename", filename);
218 				wait( deletedFile->second || shutdown );
219 				//TraceEvent("AsyncFileNonDurableOpenWaitOnDelete2").detail("Filename", filename);
220 				if(shutdown.isReady())
221 					throw io_error().asInjectedFault();
222 			}
223 
224 			state Reference<AsyncFileNonDurable> nonDurableFile( new AsyncFileNonDurable(filename, file, diskParameters, currentProcess->address) );
225 
226 			//Causes the approximateSize member to be set
227 			state Future<int64_t> sizeFuture = nonDurableFile->size();
228 			wait(success(sizeFuture) || shutdown);
229 
230 			if(shutdown.isReady())
231 				throw io_error().asInjectedFault();
232 
233 			//TraceEvent("AsyncFileNonDurableOpenComplete").detail("Filename", filename);
234 
235 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
236 
237 			return nonDurableFile;
238 		} catch( Error &e ) {
239 			state Error err = e;
240 			std::string currentFilename = ( wrappedFile.isReady() && !wrappedFile.isError() ) ? wrappedFile.get()->getFilename() : actualFilename;
241 			currentProcess->machine->openFiles.erase( currentFilename );
242 			//TraceEvent("AsyncFileNonDurableOpenError").error(e, true).detail("Filename", filename).detail("Address", currentProcess->address).detail("Addr", g_simulator.getCurrentProcess()->address);
243 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
244 			throw err;
245 		}
246 	}
247 
~AsyncFileNonDurable()248 	~AsyncFileNonDurable() {
249 		//TraceEvent("AsyncFileNonDurable_Destroy", id).detail("Filename", filename);
250 	}
251 
addref()252 	virtual void addref() {
253 		ReferenceCounted<AsyncFileNonDurable>::addref();
254 	}
delref()255 	virtual void delref() {
256 		if(delref_no_destroy()) {
257 			ASSERT(filesBeingDeleted.count(filename) == 0);
258 			//TraceEvent("AsyncFileNonDurable_StartDelete", id).detail("Filename", filename);
259 			Future<Void> deleteFuture = deleteFile(this);
260 			if(!deleteFuture.isReady())
261 				filesBeingDeleted[filename] = deleteFuture;
262 		}
263 	}
264 
265 	//Passes along reads straight to the underlying file, waiting for any outstanding changes that could affect the results
read(void * data,int length,int64_t offset)266 	Future<int> read(void *data, int length, int64_t offset) {
267 		return read(this, data, length, offset);
268 	}
269 
270 	//Writes data to the file.  Writes are delayed a random amount of time before being
271 	//passed to the underlying file
write(void const * data,int length,int64_t offset)272 	Future<Void> write(void const *data, int length, int64_t offset) {
273 		//TraceEvent("AsyncFileNonDurable_Write", id).detail("Filename", filename).detail("Offset", offset).detail("Length", length);
274 		if(length == 0) {
275 			TraceEvent(SevWarnAlways, "AsyncFileNonDurable_EmptyModification", id).detail("Filename", filename);
276 			return Void();
277 		}
278 
279 		debugFileSet("AsyncFileNonDurableWrite", filename, data, offset, length);
280 
281 		Promise<Void> writeStarted;
282 		Promise<Future<Void>> writeEnded;
283 		writeEnded.send(write(this, writeStarted, writeEnded.getFuture(), data, length, offset));
284 		return writeStarted.getFuture();
285 	}
286 
287 	//Truncates the file.  Truncates are delayed a random amount of time before being
288 	//passed to the underlying file
truncate(int64_t size)289 	Future<Void> truncate(int64_t size) {
290 		//TraceEvent("AsyncFileNonDurable_Truncate", id).detail("Filename", filename).detail("Offset", size);
291 		debugFileTruncate("AsyncFileNonDurableTruncate", filename, size);
292 
293 		Promise<Void> truncateStarted;
294 		Promise<Future<Void>> truncateEnded;
295 		truncateEnded.send(truncate(this, truncateStarted, truncateEnded.getFuture(), size));
296 		return truncateStarted.getFuture();
297 	}
298 
299 	//Fsyncs the file.  This allows all delayed modifications to the file to complete before
300 	//syncing the underlying file
sync()301 	Future<Void> sync() {
302 		//TraceEvent("AsyncFileNonDurable_Sync", id).detail("Filename", filename);
303 		Future<Void> syncFuture = sync(this, true);
304 		reponses.add( syncFuture );
305 		return syncFuture;
306 	}
307 
308 	//Passes along size requests to the underlying file, augmenting with any writes past the end of the file
size()309 	Future<int64_t> size() {
310 		return size(this);
311 	}
312 
debugFD()313 	int64_t debugFD() {
314 		return file->debugFD();
315 	}
316 
getFilename()317 	std::string getFilename() {
318 		return file->getFilename();
319 	}
320 
321 	//Forces a non-durable sync (some writes are not made or made incorrectly)
322 	//This is used when the file should 'die' without first completing its operations
323 	//(e.g. to simulate power failure)
kill()324 	Future<Void> kill() {
325 		TraceEvent("AsyncFileNonDurable_Kill", id).detail("Filename", filename);
326 		TEST(true); //AsyncFileNonDurable was killed
327 		return sync(this, false);
328 	}
329 
330 private:
331 
332 	//Returns a future that is used to ensure the waiter ends up on the main thread
returnToMainThread()333 	Future<Void> returnToMainThread() {
334 		Promise<Void> p;
335 		Future<Void> f = p.getFuture();
336 		g_network->onMainThread(std::move(p), g_network->getCurrentTask());
337 		return f;
338 	}
339 
340 	//Gets existing modifications that overlap the specified range.  Optionally inserts a new modification into the map
341 	std::vector<Future<Void>> getModificationsAndInsert(int64_t offset, int64_t length, bool insertModification = false, Future<Void> value = Void()) {
342 		auto modification = RangeMapRange<uint64_t>(offset, length>=0 ? offset+length : uint64_t(-1));
343 		auto priorModifications = pendingModifications.intersectingRanges(modification);
344 
345 		//Aggregate existing modifications in this range
346 		std::vector<Future<Void>> modificationFutures;
347 		for(auto itr = priorModifications.begin(); itr != priorModifications.end(); ++itr) {
348 			if(itr.value().isValid() && (!itr.value().isReady() || itr.value().isError())) {
349 				modificationFutures.push_back(itr.value());
350 			}
351 		}
352 
353 		//Add the modification if we are doing a write or truncate
354 		if(insertModification)
355 			pendingModifications.insert(modification, value);
356 
357 		return modificationFutures;
358 	}
359 
360 	//Checks if the file is killed.  If so, then the current sync is completed if running and then an error is thrown
checkKilled(AsyncFileNonDurable * self,std::string context)361 	ACTOR Future<Void> checkKilled(AsyncFileNonDurable *self, std::string context) {
362 		if(self->killed.isSet()) {
363 			//TraceEvent("AsyncFileNonDurable_KilledInCheck", self->id).detail("In", context).detail("Filename", self->filename);
364 			wait(self->killComplete.getFuture());
365 			TraceEvent("AsyncFileNonDurable_KilledFileOperation", self->id).detail("In", context).detail("Filename", self->filename);
366 			TEST(true); // AsyncFileNonDurable operation killed
367 			throw io_error().asInjectedFault();
368 		}
369 
370 		return Void();
371 	}
372 
373 	//Passes along reads straight to the underlying file, waiting for any outstanding changes that could affect the results
onRead(AsyncFileNonDurable * self,void * data,int length,int64_t offset)374 	ACTOR Future<int> onRead(AsyncFileNonDurable *self, void *data, int length, int64_t offset) {
375 		wait(self->checkKilled(self, "Read"));
376 		vector<Future<Void>> priorModifications = self->getModificationsAndInsert(offset, length);
377 		wait(waitForAll(priorModifications));
378 		state Future<int> readFuture = self->file->read(data, length, offset);
379 		wait( success( readFuture ) || self->killed.getFuture() );
380 
381 		// throws if we were killed
382 		wait(self->checkKilled(self, "ReadEnd"));
383 
384 		debugFileCheck("AsyncFileNonDurableRead", self->filename, data, offset, length);
385 
386 		//if(g_simulator.getCurrentProcess()->rebooting)
387 			//TraceEvent("AsyncFileNonDurable_ReadEnd", self->id).detail("Filename", self->filename);
388 
389 		return readFuture.get();
390 	}
391 
read(AsyncFileNonDurable * self,void * data,int length,int64_t offset)392 	ACTOR Future<int> read(AsyncFileNonDurable *self, void *data, int length, int64_t offset) {
393 		state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
394 		state int currentTaskID = g_network->getCurrentTask();
395 		wait( g_simulator.onMachine( currentProcess ) );
396 
397 		try {
398 			state int rep = wait( self->onRead( self, data, length, offset ) );
399 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
400 
401 			return rep;
402 		} catch( Error &e ) {
403 			state Error err = e;
404 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
405 			throw err;
406 		}
407 	}
408 
409 	//Delays writes a random amount of time before passing them through to the underlying file.
410 	//If a kill interrupts the delay, then the output could be the correct write, part of the write,
411 	//or none of the write.  It may also corrupt parts of sectors which have not been written correctly
write(AsyncFileNonDurable * self,Promise<Void> writeStarted,Future<Future<Void>> ownFuture,void const * data,int length,int64_t offset)412 	ACTOR Future<Void> write(AsyncFileNonDurable *self, Promise<Void> writeStarted, Future<Future<Void>> ownFuture, void const* data, int length, int64_t offset) {
413 		state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
414 		state int currentTaskID = g_network->getCurrentTask();
415 		wait( g_simulator.onMachine( currentProcess ) );
416 
417 		state double delayDuration = g_random->random01() * self->maxWriteDelay;
418 		state Standalone<StringRef> dataCopy(StringRef((uint8_t*)data, length));
419 
420 		state Future<bool> startSyncFuture = self->startSyncPromise.getFuture();
421 
422 		try {
423 			//TraceEvent("AsyncFileNonDurable_Write", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
424 			wait(self->checkKilled(self, "Write"));
425 
426 			Future<Void> writeEnded = wait(ownFuture);
427 			std::vector<Future<Void>> priorModifications = self->getModificationsAndInsert(offset, length, true, writeEnded);
428 
429 			if(BUGGIFY_WITH_PROB(0.001))
430 				priorModifications.push_back(delay(g_random->random01() * FLOW_KNOBS->MAX_PRIOR_MODIFICATION_DELAY) || self->killed.getFuture());
431 			else
432 				priorModifications.push_back(waitUntilDiskReady(self->diskParameters, length) || self->killed.getFuture());
433 
434 			wait(waitForAll(priorModifications));
435 
436 			self->approximateSize = std::max(self->approximateSize, length + offset);
437 
438 			self->reponses.add( sendOnProcess( currentProcess, writeStarted, currentTaskID ) );
439 		}
440 		catch(Error &e) {
441 			self->reponses.add( sendErrorOnProcess( currentProcess, writeStarted, e, currentTaskID ) );
442 			throw;
443 		}
444 
445 		//TraceEvent("AsyncFileNonDurable_WriteDoneWithPreviousMods", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
446 
447 		//Wait a random amount of time or until a sync/kill is issued
448 		state bool saveDurable = true;
449 		choose {
450 			when(wait(delay(delayDuration))) { }
451 			when(bool durable = wait(startSyncFuture)) {
452 				saveDurable = durable;
453 			}
454 		}
455 
456 		debugFileCheck("AsyncFileNonDurableWriteAfterWait", self->filename, dataCopy.begin(), offset, length);
457 
458 		//Only page-aligned writes are supported
459 		ASSERT(offset % 4096 == 0 && length % 4096 == 0);
460 
461 		//Non-durable writes should introduce errors at the page level and corrupt at the sector level
462 		//Otherwise, we can perform the entire write at once
463 		int pageLength = saveDurable ? length : 4096;
464 		int sectorLength = saveDurable ? length : 512;
465 
466 		vector<Future<Void>> writeFutures;
467 		for(int writeOffset = 0; writeOffset < length; writeOffset += pageLength) {
468 			//choose a random action to perform on this page write (write correctly, corrupt, or don't write)
469 			KillMode pageKillMode = (KillMode)g_random->randomInt(0, self->killMode + 1);
470 
471 			for(int pageOffset = 0; pageOffset < pageLength; pageOffset += sectorLength) {
472 				//If saving durable, then perform the write correctly.  Otherwise, perform the write correcly with a probability of 1/3.
473 				//If corrupting the write, then this sector will be written correctly with a 1/4 chance
474 				if(saveDurable || pageKillMode == NO_CORRUPTION || (pageKillMode == FULL_CORRUPTION && g_random->random01() < 0.25)) {
475 					//if (!saveDurable) TraceEvent(SevInfo, "AsyncFileNonDurableWrite", self->id).detail("Filename", self->filename).detail("Offset", offset+writeOffset+pageOffset).detail("Length", sectorLength);
476 					writeFutures.push_back(self->file->write(dataCopy.begin() + writeOffset + pageOffset, sectorLength, offset + writeOffset + pageOffset));
477 				}
478 
479 				//If the write is not durable, then the write will either be corrupted or not written at all.  If corrupted, there is 1/4 chance that a given
480 				//sector will not be written
481 				else if(pageKillMode == FULL_CORRUPTION && g_random->random01() < 0.66667) {
482 					//The incorrect part of the write can be the rightmost bytes (side = 0), the leftmost bytes (side = 1), or the entire write (side = 2)
483 					int side = g_random->randomInt(0, 3);
484 
485 					//There is a 1/2 chance that a bad write will have garbage written into its bad portion
486 					//The chance is increased to 1 if the entire write is bad
487 					bool garbage = side == 2 || g_random->random01() < 0.5;
488 
489 					int64_t goodStart = 0;
490 					int64_t goodEnd = sectorLength;
491 					int64_t badStart = 0;
492 					int64_t badEnd = sectorLength;
493 
494 					if(side == 0) {
495 						goodEnd = g_random->randomInt(0, sectorLength);
496 						badStart = goodEnd;
497 					}
498 					else if(side == 1) {
499 						badEnd = g_random->randomInt(0, sectorLength);
500 						goodStart = badEnd;
501 					}
502 					else
503 						goodEnd = 0;
504 
505 					//Write randomly generated bytes, if required
506 					if(garbage && badStart != badEnd) {
507 						uint8_t *badData = const_cast<uint8_t*>(&dataCopy.begin()[badStart + writeOffset + pageOffset]);
508 						for(int i = 0; i < badEnd - badStart; i += sizeof(uint32_t)) {
509 							uint32_t val = g_random->randomUInt32();
510 							memcpy(&badData[i], &val, std::min(badEnd - badStart - i, (int64_t)sizeof(uint32_t)));
511 						}
512 
513 						writeFutures.push_back(self->file->write(dataCopy.begin() + writeOffset + pageOffset, sectorLength, offset + writeOffset + pageOffset));
514 						debugFileSet("AsyncFileNonDurableBadWrite", self->filename, dataCopy.begin() + writeOffset + pageOffset, offset + writeOffset + pageOffset, sectorLength);
515 					}
516 					else if(goodStart != goodEnd)
517 						writeFutures.push_back(self->file->write(dataCopy.begin() + goodStart + writeOffset + pageOffset, goodEnd - goodStart, goodStart + offset + writeOffset + pageOffset));
518 
519 					TraceEvent("AsyncFileNonDurable_BadWrite", self->id).detail("Offset", offset + writeOffset + pageOffset).detail("Length", sectorLength).detail("GoodStart", goodStart).detail("GoodEnd", goodEnd).detail("HasGarbage", garbage).detail("Side", side).detail("Filename", self->filename);
520 					TEST(true); //AsyncFileNonDurable bad write
521 				}
522 				else {
523 					TraceEvent("AsyncFileNonDurable_DroppedWrite", self->id).detail("Offset", offset + writeOffset + pageOffset).detail("Length", sectorLength).detail("Filename", self->filename);
524 					TEST(true); //AsyncFileNonDurable dropped write
525 				}
526 			}
527 		}
528 
529 		wait(waitForAll(writeFutures));
530 		//TraceEvent("AsyncFileNonDurable_WriteDone", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
531 		return Void();
532 	}
533 
534 	//Delays truncates a random amount of time before passing them through to the underlying file.
535 	//If a kill interrupts the delay, then the truncate may or may not be performed
truncate(AsyncFileNonDurable * self,Promise<Void> truncateStarted,Future<Future<Void>> ownFuture,int64_t size)536 	ACTOR Future<Void> truncate(AsyncFileNonDurable *self, Promise<Void> truncateStarted, Future<Future<Void>> ownFuture, int64_t size) {
537 		state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
538 		state int currentTaskID = g_network->getCurrentTask();
539 		wait( g_simulator.onMachine( currentProcess ) );
540 
541 		state double delayDuration = g_random->random01() * self->maxWriteDelay;
542 		state Future<bool> startSyncFuture = self->startSyncPromise.getFuture();
543 
544 		try {
545 			//TraceEvent("AsyncFileNonDurable_Truncate", self->id).detail("Delay", delayDuration).detail("Filename", self->filename);
546 			wait(self->checkKilled(self, "Truncate"));
547 
548 			Future<Void> truncateEnded = wait(ownFuture);
549 			std::vector<Future<Void>> priorModifications = self->getModificationsAndInsert(size, -1, true, truncateEnded);
550 
551 			if(BUGGIFY_WITH_PROB(0.001))
552 				priorModifications.push_back(delay(g_random->random01() * FLOW_KNOBS->MAX_PRIOR_MODIFICATION_DELAY) || self->killed.getFuture());
553 			else
554 				priorModifications.push_back(waitUntilDiskReady(self->diskParameters, 0) || self->killed.getFuture());
555 
556 			wait(waitForAll(priorModifications));
557 
558 			self->approximateSize = size;
559 
560 			self->reponses.add( sendOnProcess( currentProcess, truncateStarted, currentTaskID ) );
561 		}
562 		catch(Error &e) {
563 			self->reponses.add( sendErrorOnProcess( currentProcess, truncateStarted, e, currentTaskID ) );
564 			throw;
565 		}
566 
567 		//Wait a random amount of time or until a sync/kill is issued
568 		state bool saveDurable = true;
569 		choose {
570 			when(wait(delay(delayDuration))) { }
571 			when(bool durable = wait(startSyncFuture)) {
572 				saveDurable = durable;
573 			}
574 		}
575 
576 		if(g_network->check_yield(TaskDefaultYield)) {
577 			wait(delay(0, TaskDefaultYield));
578 		}
579 
580 		//If performing a durable truncate, then pass it through to the file.  Otherwise, pass it through with a 1/2 chance
581 		if(saveDurable || self->killMode == NO_CORRUPTION || g_random->random01() < 0.5)
582 			wait(self->file->truncate(size));
583 		else {
584 			TraceEvent("AsyncFileNonDurable_DroppedTruncate", self->id).detail("Size", size);
585 			TEST(true); //AsyncFileNonDurable dropped truncate
586 		}
587 
588 		return Void();
589 	}
590 
591 	//Waits for delayed modifications to the file to complete and then syncs the underlying file
592 	//If durable is false, then some of the delayed modifications will not be applied or will be
593 	//applied incorrectly
onSync(AsyncFileNonDurable * self,bool durable)594 	ACTOR Future<Void> onSync(AsyncFileNonDurable *self, bool durable) {
595 		//TraceEvent("AsyncFileNonDurable_ImplSync", self->id).detail("Filename", self->filename).detail("Durable", durable);
596 		ASSERT(durable || !self->killed.isSet()); // this file is kill()ed only once
597 
598 		if(durable) {
599 			self->hasBeenSynced = true;
600 			wait(waitUntilDiskReady(self->diskParameters, 0, true) || self->killed.getFuture());
601 		}
602 
603 		wait(self->checkKilled(self, durable ? "Sync" : "Kill"));
604 
605 		if(!durable)
606 			self->killed.send( Void() );
607 
608 		//Get all outstanding modifications
609 		std::vector<Future<Void>> outstandingModifications;
610 		std::vector<RangeMapRange<uint64_t>> stillPendingModifications;
611 
612 		auto rangeItr = self->pendingModifications.ranges();
613 		for(auto itr = rangeItr.begin(); itr != rangeItr.end(); ++itr) {
614 			if(itr.value().isValid() && (!itr->value().isReady() || itr->value().isError())) {
615 				outstandingModifications.push_back(itr->value());
616 
617 				if(!itr.value().isReady())
618 					stillPendingModifications.push_back(itr->range());
619 			}
620 		}
621 
622 		Future<Void> allModifications = waitForAll(outstandingModifications);
623 		//Clear out the pending modifications map of all completed modifications
624 		self->pendingModifications.insert(RangeMapRange<uint64_t>(0, -1), Void());
625 		for(auto itr = stillPendingModifications.begin(); itr != stillPendingModifications.end(); ++itr)
626 			self->pendingModifications.insert(*itr, success(allModifications)); //waitForAll cannot wait on the same future more than once, so wrap the future with success
627 
628 		//Signal all modifications to end their delay and reset the startSyncPromise
629 		Promise<bool> startSyncPromise = self->startSyncPromise;
630 		self->startSyncPromise = Promise<bool>();
631 
632 		//Writes will be durable in a kill with a 10% probability
633 		state bool writeDurable = durable || g_random->random01() < 0.1;
634 		startSyncPromise.send(writeDurable);
635 
636 		//Wait for outstanding writes to complete
637 		if(durable)
638 			wait(allModifications);
639 		else
640 			wait(success(errorOr(allModifications)));
641 
642 		if(!durable) {
643 			//Sometimes sync the file if writes were made durably.  Before a file is first synced, it is stored in a temporary file and then renamed to the correct
644 			//location once sync is called.  By not calling sync, we simulate a failure to fsync the directory storing the file
645 			if(self->hasBeenSynced && writeDurable && g_random->random01() < 0.5) {
646 				TEST(true); //AsyncFileNonDurable kill was durable and synced
647 				wait(success(errorOr(self->file->sync())));
648 			}
649 
650 			//Setting this promise could trigger the deletion of the AsyncFileNonDurable; after this none of its members should be used
651 			//TraceEvent("AsyncFileNonDurable_ImplSyncEnd", self->id).detail("Filename", self->filename).detail("Durable", durable);
652 			self->killComplete.send(Void());
653 		}
654 		//A killed file cannot be allowed to report that it successfully synced
655 		else {
656 			wait(self->checkKilled(self, "SyncEnd"));
657 			wait(self->file->sync());
658 			//TraceEvent("AsyncFileNonDurable_ImplSyncEnd", self->id).detail("Filename", self->filename).detail("Durable", durable);
659 		}
660 
661 		return Void();
662 	}
663 
sync(AsyncFileNonDurable * self,bool durable)664 	ACTOR Future<Void> sync(AsyncFileNonDurable *self, bool durable) {
665 		state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
666 		state int currentTaskID = g_network->getCurrentTask();
667 		wait( g_simulator.onMachine( currentProcess ) );
668 
669 		try {
670 			wait( self->onSync( self, durable ) );
671 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
672 
673 			return Void();
674 		} catch( Error &e ) {
675 			state Error err = e;
676 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
677 			throw err;
678 		}
679 	}
680 
681 	//Passes along size requests to the underlying file, augmenting with any writes past the end of the file
onSize(AsyncFileNonDurable * self)682 	ACTOR Future<int64_t> onSize(AsyncFileNonDurable *self) {
683 		//TraceEvent("AsyncFileNonDurable_Size", self->id).detail("Filename", self->filename);
684 		wait(self->checkKilled(self, "Size"));
685 		state Future<int64_t> sizeFuture = self->file->size();
686 		wait( success( sizeFuture ) || self->killed.getFuture() );
687 
688 		wait(self->checkKilled(self, "SizeEnd"));
689 
690 		//Include any modifications which extend past the end of the file
691 		uint64_t maxModification = self->pendingModifications.lastItem().begin();
692 		self->approximateSize = std::max<int64_t>(sizeFuture.get(), maxModification);
693 		return self->approximateSize;
694 	}
695 
size(AsyncFileNonDurable * self)696 	ACTOR Future<int64_t> size(AsyncFileNonDurable *self) {
697 		state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
698 		state int currentTaskID = g_network->getCurrentTask();
699 
700 		wait( g_simulator.onMachine( currentProcess ) );
701 
702 		try {
703 			state int64_t rep = wait( self->onSize( self ) );
704 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
705 
706 			return rep;
707 		} catch( Error &e ) {
708 			state Error err = e;
709 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
710 			throw err;
711 		}
712 	}
713 
714 	//Finishes all outstanding actors on an AsyncFileNonDurable and then deletes it
deleteFile(AsyncFileNonDurable * self)715 	ACTOR Future<Void> deleteFile(AsyncFileNonDurable *self) {
716 		state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
717 		state int currentTaskID = g_network->getCurrentTask();
718 		state std::string filename = self->filename;
719 
720 		wait( g_simulator.onMachine( currentProcess ) );
721 		try {
722 			//Make sure all writes have gone through.
723 			Promise<bool> startSyncPromise = self->startSyncPromise;
724 			self->startSyncPromise = Promise<bool>();
725 			startSyncPromise.send(true);
726 
727 			std::vector<Future<Void>> outstandingModifications;
728 
729 			for(auto itr = self->pendingModifications.ranges().begin(); itr != self->pendingModifications.ranges().end(); ++itr)
730 				if(itr->value().isValid() && !itr->value().isReady())
731 					outstandingModifications.push_back(itr->value());
732 
733 			//Ignore errors here so that all modifications can finish
734 			wait(waitForAllReady(outstandingModifications));
735 
736 			//Make sure we aren't in the process of killing the file
737 			if(self->killed.isSet())
738 				wait(self->killComplete.getFuture());
739 
740 			//Remove this file from the filesBeingDeleted map so that new files can be created with this filename
741 			g_simulator.getMachineByNetworkAddress( self->openedAddress )->closingFiles.erase(self->getFilename());
742 			g_simulator.getMachineByNetworkAddress( self->openedAddress )->deletingFiles.erase(self->getFilename());
743 			AsyncFileNonDurable::filesBeingDeleted.erase(self->filename);
744 			//TraceEvent("AsyncFileNonDurable_FinishDelete", self->id).detail("Filename", self->filename);
745 
746 			delete self;
747 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
748 			return Void();
749 		} catch( Error &e ) {
750 			state Error err = e;
751 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
752 			throw err;
753 		}
754 	}
755 };
756 
757 #include "flow/unactorcompiler.h"
758 #endif
759