1 /*
2  * sim2.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbrpc/simulator.h"
22 #include "flow/IThreadPool.h"
23 #include "flow/Util.h"
24 #include "fdbrpc/IAsyncFile.h"
25 #include "fdbrpc/AsyncFileCached.actor.h"
26 #include "fdbrpc/AsyncFileNonDurable.actor.h"
27 #include "flow/Hash3.h"
28 #include "fdbrpc/TraceFileIO.h"
29 #include "flow/FaultInjection.h"
30 #include "flow/network.h"
31 #include "fdbrpc/Net2FileSystem.h"
32 #include "fdbrpc/Replication.h"
33 #include "fdbrpc/ReplicationUtils.h"
34 #include "fdbrpc/AsyncFileWriteChecker.h"
35 #include "flow/actorcompiler.h"  // This must be the last #include.
36 
simulator_should_inject_fault(const char * context,const char * file,int line,int error_code)37 bool simulator_should_inject_fault( const char* context, const char* file, int line, int error_code ) {
38 	if (!g_network->isSimulated()) return false;
39 
40 	auto p = g_simulator.getCurrentProcess();
41 
42 	if (p->fault_injection_p2 && g_random->random01() < p->fault_injection_p2 && !g_simulator.speedUpSimulation) {
43 		uint32_t
44 			h1 = line + (p->fault_injection_r>>32),
45 			h2 = p->fault_injection_r;
46 
47 		if (h1 < p->fault_injection_p1*std::numeric_limits<uint32_t>::max()) {
48 			TEST(true);                                     // A fault was injected
49 			TEST(error_code == error_code_io_timeout);      // An io timeout was injected
50 			TEST(error_code == error_code_io_error);        // An io error was injected
51 			TEST(error_code == error_code_platform_error);  // A platform error was injected.
52 			TraceEvent(SevWarn, "FaultInjected").detail("Context", context).detail("File", file).detail("Line", line).detail("ErrorCode", error_code);
53 			if(error_code == error_code_io_timeout) {
54 				g_network->setGlobal(INetwork::enASIOTimedOut, (flowGlobalType)true);
55 			}
56 			return true;
57 		}
58 	}
59 
60 	return false;
61 }
62 
displayWorkers() const63 void ISimulator::displayWorkers() const
64 {
65 	std::map<std::string, std::vector<ISimulator::ProcessInfo*>> machineMap;
66 
67 	// Create a map of machine Id
68 	for (auto processInfo : getAllProcesses()) {
69 		std::string dataHall = processInfo->locality.dataHallId().present() ? processInfo->locality.dataHallId().get().printable() : "[unset]";
70 		std::string machineId = processInfo->locality.machineId().present() ? processInfo->locality.machineId().get().printable() : "[unset]";
71 		machineMap[format("%-8s  %s", dataHall.c_str(), machineId.c_str())].push_back(processInfo);
72 	}
73 
74 	printf("DataHall  MachineId\n");
75 	printf("                  Address   Name      Class        Excluded Failed Rebooting Cleared Role                                              DataFolder\n");
76 	for (auto& machineRecord : machineMap) {
77 		printf("\n%s\n", machineRecord.first.c_str());
78 		for (auto& processInfo : machineRecord.second) {
79 			printf("                  %9s %-10s%-13s%-8s %-6s %-9s %-8s %-48s %-40s\n",
80 			processInfo->address.toString().c_str(), processInfo->name, processInfo->startingClass.toString().c_str(), (processInfo->isExcluded() ? "True" : "False"), (processInfo->failed ? "True" : "False"), (processInfo->rebooting ? "True" : "False"), (processInfo->isCleared() ? "True" : "False"), getRoles(processInfo->address).c_str(), processInfo->dataFolder);
81 		}
82 	}
83 
84 	return;
85 }
86 
87 namespace std {
88 template<>
89 class hash<Endpoint> {
90 public:
operator ()(const Endpoint & s) const91 	size_t operator()(const Endpoint &s) const
92 	{
93 		return hashlittle(&s, sizeof(s), 0);
94 	}
95 };
96 }
97 
onlyBeforeSimulatorInit()98 bool onlyBeforeSimulatorInit() {
99 	return g_network->isSimulated() && g_simulator.getAllProcesses().empty();
100 }
101 
102 const UID TOKEN_ENDPOINT_NOT_FOUND(-1, -1);
103 const uint64_t TOKEN_STREAM_FLAG = 1;
104 
105 ISimulator* g_pSimulator = 0;
106 thread_local ISimulator::ProcessInfo* ISimulator::currentProcess = 0;
107 int openCount = 0;
108 
109 struct SimClogging {
getSendDelaySimClogging110 	double getSendDelay( NetworkAddress from, NetworkAddress to ) {
111 		return halfLatency();
112 		double tnow = now();
113 		double t = tnow + halfLatency();
114 
115 		if (!g_simulator.speedUpSimulation && clogSendUntil.count( to.ip ))
116 			t = std::max( t, clogSendUntil[ to.ip ] );
117 
118 		return t - tnow;
119 	}
120 
getRecvDelaySimClogging121 	double getRecvDelay( NetworkAddress from, NetworkAddress to ) {
122 		auto pair = std::make_pair( from.ip, to.ip );
123 
124 		double tnow = now();
125 		double t = tnow + halfLatency();
126 		if(!g_simulator.speedUpSimulation)
127 			t += clogPairLatency[ pair ];
128 
129 		if (!g_simulator.speedUpSimulation && clogPairUntil.count( pair ))
130 			t = std::max( t, clogPairUntil[ pair ] );
131 
132 		if (!g_simulator.speedUpSimulation && clogRecvUntil.count( to.ip ))
133 			t = std::max( t, clogRecvUntil[ to.ip ] );
134 
135 		return t - tnow;
136 	}
137 
clogPairForSimClogging138 	void clogPairFor(const IPAddress& from, const IPAddress& to, double t) {
139 		auto& u = clogPairUntil[ std::make_pair( from, to ) ];
140 		u = std::max(u, now() + t);
141 	}
clogSendForSimClogging142 	void clogSendFor(const IPAddress& from, double t) {
143 		auto& u = clogSendUntil[from];
144 		u = std::max(u, now() + t);
145 	}
clogRecvForSimClogging146 	void clogRecvFor(const IPAddress& from, double t) {
147 		auto& u = clogRecvUntil[from];
148 		u = std::max(u, now() + t);
149 	}
setPairLatencyIfNotSetSimClogging150 	double setPairLatencyIfNotSet(const IPAddress& from, const IPAddress& to, double t) {
151 		auto i = clogPairLatency.find( std::make_pair(from,to) );
152 		if (i == clogPairLatency.end())
153 			i = clogPairLatency.insert( std::make_pair( std::make_pair(from,to), t ) ).first;
154 		return i->second;
155 	}
156 
157 private:
158 	std::map<IPAddress, double> clogSendUntil, clogRecvUntil;
159 	std::map<std::pair<IPAddress, IPAddress>, double> clogPairUntil;
160 	std::map<std::pair<IPAddress, IPAddress>, double> clogPairLatency;
halfLatencySimClogging161 	double halfLatency() {
162 		double a = g_random->random01();
163 		const double pFast = 0.999;
164 		if (a <= pFast) {
165 			a = a / pFast;
166 			return 0.5 * (FLOW_KNOBS->MIN_NETWORK_LATENCY * (1-a) + FLOW_KNOBS->FAST_NETWORK_LATENCY/pFast * a); // 0.5ms average
167 		} else {
168 			a = (a-pFast) / (1-pFast); // uniform 0-1 again
169 			return 0.5 * (FLOW_KNOBS->MIN_NETWORK_LATENCY * (1-a) + FLOW_KNOBS->SLOW_NETWORK_LATENCY*a); // long tail up to X ms
170 		}
171 	}
172 };
173 
174 SimClogging g_clogging;
175 
176 struct Sim2Conn : IConnection, ReferenceCounted<Sim2Conn> {
Sim2ConnSim2Conn177 	Sim2Conn( ISimulator::ProcessInfo* process )
178 		: process(process), dbgid( g_random->randomUniqueID() ), opened(false), closedByCaller(false)
179 	{
180 		pipes = sender(this) && receiver(this);
181 	}
182 
183 	// connect() is called on a pair of connections immediately after creation; logically it is part of the constructor and no other method may be called previously!
connectSim2Conn184 	void connect( Reference<Sim2Conn> peer, NetworkAddress peerEndpoint ) {
185 		this->peer = peer;
186 		this->peerProcess = peer->process;
187 		this->peerId = peer->dbgid;
188 		this->peerEndpoint = peerEndpoint;
189 
190 		// Every one-way connection gets a random permanent latency and a random send buffer for the duration of the connection
191 		auto latency = g_clogging.setPairLatencyIfNotSet( peerProcess->address.ip, process->address.ip, FLOW_KNOBS->MAX_CLOGGING_LATENCY*g_random->random01() );
192 		sendBufSize = std::max<double>( g_random->randomInt(0, 5000000), 25e6 * (latency + .002) );
193 		TraceEvent("Sim2Connection").detail("SendBufSize", sendBufSize).detail("Latency", latency);
194 	}
195 
~Sim2ConnSim2Conn196 	~Sim2Conn() {
197 		ASSERT_ABORT( !opened || closedByCaller );
198 	}
199 
addrefSim2Conn200 	virtual void addref() { ReferenceCounted<Sim2Conn>::addref(); }
delrefSim2Conn201 	virtual void delref() { ReferenceCounted<Sim2Conn>::delref(); }
closeSim2Conn202 	virtual void close() { closedByCaller = true; closeInternal(); }
203 
onWritableSim2Conn204 	virtual Future<Void> onWritable() { return whenWritable(this); }
onReadableSim2Conn205 	virtual Future<Void> onReadable() { return whenReadable(this); }
206 
isPeerGoneSim2Conn207 	bool isPeerGone() {
208 		return !peer || peerProcess->failed;
209 	}
210 
peerClosedSim2Conn211 	void peerClosed() {
212 		leakedConnectionTracker = trackLeakedConnection(this);
213 	}
214 
215 	// Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0)
216 	// (or may throw an error if the connection dies)
readSim2Conn217 	virtual int read( uint8_t* begin, uint8_t* end ) {
218 		rollRandomClose();
219 
220 		int64_t avail = receivedBytes.get() - readBytes.get();  // SOMEDAY: random?
221 		int toRead = std::min<int64_t>( end-begin, avail );
222 		ASSERT( toRead >= 0 && toRead <= recvBuf.size() && toRead <= end-begin );
223 		for(int i=0; i<toRead; i++)
224 			begin[i] = recvBuf[i];
225 		recvBuf.erase( recvBuf.begin(), recvBuf.begin() + toRead );
226 		readBytes.set( readBytes.get() + toRead );
227 		return toRead;
228 	}
229 
230 	// Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of bytes written (might be 0)
231 	// (or may throw an error if the connection dies)
writeSim2Conn232 	virtual int write( SendBuffer const* buffer, int limit) {
233 		rollRandomClose();
234 		ASSERT(limit > 0);
235 
236 		int toSend = 0;
237 		if (BUGGIFY) {
238 			toSend = std::min(limit, buffer->bytes_written - buffer->bytes_sent);
239 		} else {
240 			for(auto p = buffer; p; p=p->next) {
241 				toSend += p->bytes_written - p->bytes_sent;
242 				if(toSend >= limit) {
243 					if(toSend > limit)
244 						toSend = limit;
245 					break;
246 				}
247 			}
248 		}
249 		ASSERT(toSend);
250 		if (BUGGIFY) toSend = std::min(toSend, g_random->randomInt(0, 1000));
251 
252 		if (!peer) return toSend;
253 		toSend = std::min( toSend, peer->availableSendBufferForPeer() );
254 		ASSERT( toSend >= 0 );
255 
256 		int leftToSend = toSend;
257 		for(auto p = buffer; p && leftToSend>0; p=p->next) {
258 			int ts = std::min(leftToSend, p->bytes_written - p->bytes_sent);
259 			peer->recvBuf.insert( peer->recvBuf.end(), p->data + p->bytes_sent, p->data + p->bytes_sent + ts );
260 			leftToSend -= ts;
261 		}
262 		ASSERT( leftToSend == 0 );
263 		peer->writtenBytes.set( peer->writtenBytes.get() + toSend );
264 		return toSend;
265 	}
266 
267 	// Returns the network address and port of the other end of the connection.  In the case of an incoming connection, this may not
268 	// be an address we can connect to!
getPeerAddressSim2Conn269 	virtual NetworkAddress getPeerAddress() { return peerEndpoint; }
getDebugIDSim2Conn270 	virtual UID getDebugID() { return dbgid; }
271 
272 	bool opened, closedByCaller;
273 
274 private:
275 	ISimulator::ProcessInfo* process, *peerProcess;
276 	UID dbgid, peerId;
277 	NetworkAddress peerEndpoint;
278 	std::deque< uint8_t > recvBuf;  // Includes bytes written but not yet received!
279 	AsyncVar<int64_t> readBytes, // bytes already pulled from recvBuf (location of the beginning of recvBuf)
280 		              receivedBytes,
281 					  sentBytes,
282 					  writtenBytes; // location of the end of recvBuf ( == recvBuf.size() + readBytes.get() )
283 	Reference<Sim2Conn> peer;
284 	int sendBufSize;
285 
286 	Future<Void> leakedConnectionTracker;
287 
288 	Future<Void> pipes;
289 
availableSendBufferForPeerSim2Conn290 	int availableSendBufferForPeer() const { return sendBufSize - (writtenBytes.get() - receivedBytes.get()); }  // SOMEDAY: acknowledgedBytes instead of receivedBytes
291 
closeInternalSim2Conn292 	void closeInternal() {
293 		if(peer) {
294 			peer->peerClosed();
295 		}
296 		leakedConnectionTracker.cancel();
297 		peer.clear();
298 	}
299 
senderSim2Conn300 	ACTOR static Future<Void> sender( Sim2Conn* self ) {
301 		loop {
302 			wait( self->writtenBytes.onChange() );  // takes place on peer!
303 			ASSERT( g_simulator.getCurrentProcess() == self->peerProcess );
304 			wait( delay( .002 * g_random->random01() ) );
305 			self->sentBytes.set( self->writtenBytes.get() );  // or possibly just some sometimes...
306 		}
307 	}
receiverSim2Conn308 	ACTOR static Future<Void> receiver( Sim2Conn* self ) {
309 		loop {
310 			if (self->sentBytes.get() != self->receivedBytes.get())
311 				wait( g_simulator.onProcess( self->peerProcess ) );
312 			while ( self->sentBytes.get() == self->receivedBytes.get() )
313 				wait( self->sentBytes.onChange() );
314 			ASSERT( g_simulator.getCurrentProcess() == self->peerProcess );
315 			state int64_t pos = g_random->random01() < .5 ? self->sentBytes.get() : g_random->randomInt64( self->receivedBytes.get(), self->sentBytes.get()+1 );
316 			wait( delay( g_clogging.getSendDelay( self->process->address, self->peerProcess->address ) ) );
317 			wait( g_simulator.onProcess( self->process ) );
318 			ASSERT( g_simulator.getCurrentProcess() == self->process );
319 			wait( delay( g_clogging.getRecvDelay( self->process->address, self->peerProcess->address ) ) );
320 			ASSERT( g_simulator.getCurrentProcess() == self->process );
321 			self->receivedBytes.set( pos );
322 			wait( Future<Void>(Void()) );  // Prior notification can delete self and cancel this actor
323 			ASSERT( g_simulator.getCurrentProcess() == self->process );
324 		}
325 	}
whenReadableSim2Conn326 	ACTOR static Future<Void> whenReadable( Sim2Conn* self ) {
327 		try {
328 			loop {
329 				if (self->readBytes.get() != self->receivedBytes.get()) {
330 					ASSERT( g_simulator.getCurrentProcess() == self->process );
331 					return Void();
332 				}
333 				wait( self->receivedBytes.onChange() );
334 				self->rollRandomClose();
335 			}
336 		} catch (Error& e) {
337 			ASSERT( g_simulator.getCurrentProcess() == self->process );
338 			throw;
339 		}
340 	}
whenWritableSim2Conn341 	ACTOR static Future<Void> whenWritable( Sim2Conn* self ) {
342 		try {
343 			loop {
344 				if (!self->peer) return Void();
345 				if (self->peer->availableSendBufferForPeer() > 0) {
346 					ASSERT( g_simulator.getCurrentProcess() == self->process );
347 					return Void();
348 				}
349 				try {
350 					wait( self->peer->receivedBytes.onChange() );
351 					ASSERT( g_simulator.getCurrentProcess() == self->peerProcess );
352 				} catch (Error& e) {
353 					if (e.code() != error_code_broken_promise) throw;
354 				}
355 				wait( g_simulator.onProcess( self->process ) );
356 			}
357 		} catch (Error& e) {
358 			ASSERT( g_simulator.getCurrentProcess() == self->process );
359 			throw;
360 		}
361 	}
362 
rollRandomCloseSim2Conn363 	void rollRandomClose() {
364 		if (now() - g_simulator.lastConnectionFailure > g_simulator.connectionFailuresDisableDuration && g_random->random01() < .00001) {
365 			g_simulator.lastConnectionFailure = now();
366 			double a = g_random->random01(), b = g_random->random01();
367 			TEST(true);  // Simulated connection failure
368 			TraceEvent("ConnectionFailure", dbgid).detail("MyAddr", process->address).detail("PeerAddr", peerProcess->address).detail("SendClosed", a > .33).detail("RecvClosed", a < .66).detail("Explicit", b < .3);
369 			if (a < .66 && peer) peer->closeInternal();
370 			if (a > .33) closeInternal();
371 			// At the moment, we occasionally notice the connection failed immediately.  In principle, this could happen but only after a delay.
372 			if (b < .3)
373 				throw connection_failed();
374 		}
375 	}
376 
trackLeakedConnectionSim2Conn377 	ACTOR static Future<Void> trackLeakedConnection( Sim2Conn* self ) {
378 		wait( g_simulator.onProcess( self->process ) );
379 		// SOMEDAY: Make this value variable? Dependent on buggification status?
380 		wait( delay( 20.0 ) );
381 		TraceEvent(SevError, "LeakedConnection", self->dbgid).error(connection_leaked()).detail("MyAddr", self->process->address).detail("PeerAddr", self->peerEndpoint).detail("PeerId", self->peerId).detail("Opened", self->opened);
382 		return Void();
383 	}
384 };
385 
386 #include <fcntl.h>
387 #include <sys/stat.h>
388 
389 int sf_open( const char* filename, int flags, int convFlags, int mode );
390 
391 #if defined(_WIN32)
392 #include <io.h>
393 
394 #elif defined(__unixish__)
395 #define _open ::open
396 #define _read ::read
397 #define _write ::write
398 #define _close ::close
399 #define _lseeki64 ::lseek
400 #define _commit ::fsync
401 #define _chsize ::ftruncate
402 #define O_BINARY 0
403 
sf_open(const char * filename,int flags,int convFlags,int mode)404 int sf_open( const char* filename, int flags, int convFlags, int mode ) {
405 	return _open( filename, convFlags, mode );
406 }
407 
408 #else
409 #error How do i open a file on a new platform?
410 #endif
411 
412 class SimpleFile : public IAsyncFile, public ReferenceCounted<SimpleFile> {
413 public:
init()414 	static void init() {}
415 
should_poll()416 	static bool should_poll() { return false; }
417 
open(std::string filename,int flags,int mode,Reference<DiskParameters> diskParameters=Reference<DiskParameters> (new DiskParameters (25000,150000000)),bool delayOnWrite=true)418 	ACTOR static Future<Reference<IAsyncFile>> open( std::string filename, int flags, int mode,
419 													Reference<DiskParameters> diskParameters = Reference<DiskParameters>(new DiskParameters(25000, 150000000)), bool delayOnWrite = true ) {
420 		state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
421 		state int currentTaskID = g_network->getCurrentTask();
422 
423 		if(++openCount >= 3000) {
424 			TraceEvent(SevError, "TooManyFiles");
425 			ASSERT(false);
426 		}
427 
428 		if(openCount == 2000) {
429 			TraceEvent(SevWarnAlways, "DisableConnectionFailures_TooManyFiles");
430 			g_simulator.speedUpSimulation = true;
431 			g_simulator.connectionFailuresDisableDuration = 1e6;
432 		}
433 
434 		// Filesystems on average these days seem to start to have limits of around 255 characters for a
435 		// filename.  We add ".part" below, so we need to stay under 250.
436 		ASSERT( basename(filename).size() < 250 );
437 
438 		wait( g_simulator.onMachine( currentProcess ) );
439 		try {
440 			wait( delay(FLOW_KNOBS->MIN_OPEN_TIME + g_random->random01() * (FLOW_KNOBS->MAX_OPEN_TIME - FLOW_KNOBS->MIN_OPEN_TIME) ) );
441 
442 			std::string open_filename = filename;
443 			if (flags & OPEN_ATOMIC_WRITE_AND_CREATE) {
444 				ASSERT( (flags & OPEN_CREATE) && (flags & OPEN_READWRITE) && !(flags & OPEN_EXCLUSIVE) );
445 				open_filename = filename + ".part";
446 			}
447 
448 			int h = sf_open( open_filename.c_str(), flags, flagConversion(flags), mode );
449 			if( h == -1 ) {
450 				bool notFound = errno == ENOENT;
451 				Error e = notFound ? file_not_found() : io_error();
452 				TraceEvent(notFound ? SevWarn : SevWarnAlways, "FileOpenError").error(e).GetLastError().detail("File", filename).detail("Flags", flags);
453 				throw e;
454 			}
455 
456 			platform::makeTemporary(open_filename.c_str());
457 			SimpleFile *simpleFile = new SimpleFile( h, diskParameters, delayOnWrite, filename, open_filename, flags );
458 			state Reference<IAsyncFile> file = Reference<IAsyncFile>( simpleFile );
459 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
460 			return file;
461 		} catch( Error &e ) {
462 			state Error err = e;
463 			wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
464 			throw err;
465 		}
466 	}
467 
addref()468 	virtual void addref() { ReferenceCounted<SimpleFile>::addref(); }
delref()469 	virtual void delref() { ReferenceCounted<SimpleFile>::delref(); }
470 
debugFD()471 	virtual int64_t debugFD() { return (int64_t)h; }
472 
read(void * data,int length,int64_t offset)473 	virtual Future<int> read( void* data, int length, int64_t offset ) {
474 		return read_impl( this, data, length, offset );
475 	}
476 
write(void const * data,int length,int64_t offset)477 	virtual Future<Void> write( void const* data, int length, int64_t offset ) {
478 		return write_impl( this, StringRef((const uint8_t*)data, length), offset );
479 	}
480 
truncate(int64_t size)481 	virtual Future<Void> truncate( int64_t size ) {
482 		return truncate_impl( this, size );
483 	}
484 
sync()485 	virtual Future<Void> sync() {
486 		return sync_impl( this );
487 	}
488 
size()489 	virtual Future<int64_t> size() {
490 		return size_impl( this );
491 	}
492 
getFilename()493 	virtual std::string getFilename() {
494 		return actualFilename;
495 	}
496 
~SimpleFile()497 	~SimpleFile() {
498 		_close( h );
499 	}
500 
501 private:
502 	int h;
503 
504 	//Performance parameters of simulated disk
505 	Reference<DiskParameters> diskParameters;
506 
507 	std::string filename, actualFilename;
508 	int flags;
509 	UID dbgId;
510 
511 	//If true, then writes/truncates will be preceded by a delay (like other operations).  If false, then they will not
512 	//This is to support AsyncFileNonDurable, which issues its own delays for writes and truncates
513 	bool delayOnWrite;
514 
SimpleFile(int h,Reference<DiskParameters> diskParameters,bool delayOnWrite,const std::string & filename,const std::string & actualFilename,int flags)515 	SimpleFile(int h, Reference<DiskParameters> diskParameters, bool delayOnWrite, const std::string& filename, const std::string& actualFilename, int flags)
516 		: h(h), diskParameters(diskParameters), delayOnWrite(delayOnWrite), filename(filename), actualFilename(actualFilename), dbgId(g_random->randomUniqueID()), flags(flags) {}
517 
flagConversion(int flags)518 	static int flagConversion( int flags ) {
519 		int outFlags = O_BINARY;
520 		if( flags&OPEN_READWRITE ) outFlags |= O_RDWR;
521 		if( flags&OPEN_CREATE ) outFlags |= O_CREAT;
522 		if( flags&OPEN_READONLY ) outFlags |= O_RDONLY;
523 		if( flags&OPEN_EXCLUSIVE ) outFlags |= O_EXCL;
524 		if( flags&OPEN_ATOMIC_WRITE_AND_CREATE ) outFlags |= O_TRUNC;
525 
526 		return outFlags;
527 	}
528 
read_impl(SimpleFile * self,void * data,int length,int64_t offset)529 	ACTOR static Future<int> read_impl( SimpleFile* self, void* data, int length, int64_t offset ) {
530 		ASSERT( ( self->flags & IAsyncFile::OPEN_NO_AIO ) != 0 ||
531 		        ( (uintptr_t)data % 4096 == 0 && length % 4096 == 0 && offset % 4096 == 0 ) );  // Required by KAIO.
532 		state UID opId = g_random->randomUniqueID();
533 		if (randLog)
534 			fprintf( randLog, "SFR1 %s %s %s %d %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), length, offset );
535 
536 		wait( waitUntilDiskReady( self->diskParameters, length ) );
537 
538 		if( _lseeki64( self->h, offset, SEEK_SET ) == -1 ) {
539 			TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 1);
540 			throw io_error();
541 		}
542 
543 		unsigned int read_bytes = 0;
544 		if( ( read_bytes = _read( self->h, data, (unsigned int) length ) ) == -1 ) {
545 			TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 2);
546 			throw io_error();
547 		}
548 
549 		if (randLog) {
550 			uint32_t a=0, b=0;
551 			hashlittle2( data, read_bytes, &a, &b );
552 			fprintf( randLog, "SFR2 %s %s %s %d %d\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), read_bytes, a );
553 		}
554 
555 		debugFileCheck("SimpleFileRead", self->filename, data, offset, length);
556 
557 		INJECT_FAULT(io_timeout, "SimpleFile::read");
558 		INJECT_FAULT(io_error, "SimpleFile::read");
559 
560 		return read_bytes;
561 	}
562 
write_impl(SimpleFile * self,StringRef data,int64_t offset)563 	ACTOR static Future<Void> write_impl( SimpleFile* self, StringRef data, int64_t offset ) {
564 		state UID opId = g_random->randomUniqueID();
565 		if (randLog) {
566 			uint32_t a=0, b=0;
567 			hashlittle2( data.begin(), data.size(), &a, &b );
568 			fprintf( randLog, "SFW1 %s %s %s %d %d %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), a, data.size(), offset );
569 		}
570 
571 		if(self->delayOnWrite)
572 			wait( waitUntilDiskReady( self->diskParameters, data.size() ) );
573 
574 		if( _lseeki64( self->h, offset, SEEK_SET ) == -1 ) {
575 			TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 3);
576 			throw io_error();
577 		}
578 
579 		unsigned int write_bytes = 0;
580 		if ( ( write_bytes = _write( self->h, (void*)data.begin(), data.size() ) ) == -1 ) {
581 			TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 4);
582 			throw io_error();
583 		}
584 
585 		if ( write_bytes != data.size() ) {
586 			TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 5);
587 			throw io_error();
588 		}
589 
590 		if (randLog) {
591 			fprintf( randLog, "SFW2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
592 		}
593 
594 		debugFileCheck("SimpleFileWrite", self->filename, (void*)data.begin(), offset, data.size());
595 
596 		INJECT_FAULT(io_timeout, "SimpleFile::write");
597 		INJECT_FAULT(io_error, "SimpleFile::write");
598 
599 		return Void();
600 	}
601 
truncate_impl(SimpleFile * self,int64_t size)602 	ACTOR static Future<Void> truncate_impl( SimpleFile* self, int64_t size ) {
603 		state UID opId = g_random->randomUniqueID();
604 		if (randLog)
605 			fprintf( randLog, "SFT1 %s %s %s %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), size );
606 
607 		if(self->delayOnWrite)
608 			wait( waitUntilDiskReady( self->diskParameters, 0 ) );
609 
610 		if( _chsize( self->h, (long) size ) == -1 ) {
611 			TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 6);
612 			throw io_error();
613 		}
614 
615 		if (randLog)
616 			fprintf( randLog, "SFT2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
617 
618 		INJECT_FAULT( io_timeout, "SimpleFile::truncate" );
619 		INJECT_FAULT( io_error, "SimpleFile::truncate" );
620 
621 		return Void();
622 	}
623 
sync_impl(SimpleFile * self)624 	ACTOR static Future<Void> sync_impl( SimpleFile* self ) {
625 		state UID opId = g_random->randomUniqueID();
626 		if (randLog)
627 			fprintf( randLog, "SFC1 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
628 
629 		if(self->delayOnWrite)
630 			wait( waitUntilDiskReady( self->diskParameters, 0, true ) );
631 
632 		if (self->flags & OPEN_ATOMIC_WRITE_AND_CREATE) {
633 			self->flags &= ~OPEN_ATOMIC_WRITE_AND_CREATE;
634 			auto& machineCache = g_simulator.getCurrentProcess()->machine->openFiles;
635 			std::string sourceFilename = self->filename + ".part";
636 
637 			if(machineCache.count(sourceFilename)) {
638 				TraceEvent("SimpleFileRename").detail("From", sourceFilename).detail("To", self->filename).detail("SourceCount", machineCache.count(sourceFilename)).detail("FileCount", machineCache.count(self->filename));
639 				renameFile( sourceFilename.c_str(), self->filename.c_str() );
640 
641 				ASSERT(!machineCache.count(self->filename));
642 				machineCache[self->filename] = machineCache[sourceFilename];
643 				machineCache.erase(sourceFilename);
644 				self->actualFilename = self->filename;
645 			}
646 		}
647 
648 		if (randLog)
649 			fprintf( randLog, "SFC2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
650 
651 		INJECT_FAULT( io_timeout, "SimpleFile::sync" );
652 		INJECT_FAULT( io_error, "SimpleFile::sync" );
653 
654 		return Void();
655 	}
656 
size_impl(SimpleFile * self)657 	ACTOR static Future<int64_t> size_impl( SimpleFile* self ) {
658 		state UID opId = g_random->randomUniqueID();
659 		if (randLog)
660 			fprintf(randLog, "SFS1 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
661 
662 		wait( waitUntilDiskReady( self->diskParameters, 0 ) );
663 
664 		int64_t pos = _lseeki64( self->h, 0L, SEEK_END );
665 		if( pos == -1 ) {
666 			TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 8);
667 			throw io_error();
668 		}
669 
670 		if (randLog)
671 			fprintf(randLog, "SFS2 %s %s %s %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), pos);
672 		INJECT_FAULT( io_error, "SimpleFile::size" );
673 
674 		return pos;
675 	}
676 };
677 
678 struct SimDiskSpace {
679 	int64_t totalSpace;
680 	int64_t baseFreeSpace; //The original free space of the disk + deltas from simulated external modifications
681 	double lastUpdate;
682 };
683 
684 void doReboot( ISimulator::ProcessInfo* const& p, ISimulator::KillType const& kt );
685 
686 struct Sim2Listener : IListener, ReferenceCounted<Sim2Listener> {
Sim2ListenerSim2Listener687 	explicit Sim2Listener( ISimulator::ProcessInfo* process, const NetworkAddress& listenAddr )
688 		: process(process),
689 	      address(listenAddr) {}
690 
incomingConnectionSim2Listener691 	void incomingConnection( double seconds, Reference<IConnection> conn ) {  // Called by another process!
692 		incoming( Reference<Sim2Listener>::addRef( this ), seconds, conn );
693 	}
694 
addrefSim2Listener695 	virtual void addref() { ReferenceCounted<Sim2Listener>::addref(); }
delrefSim2Listener696 	virtual void delref() { ReferenceCounted<Sim2Listener>::delref(); }
697 
acceptSim2Listener698 	virtual Future<Reference<IConnection>> accept() {
699 		return popOne( nextConnection.getFuture() );
700 	}
701 
getListenAddressSim2Listener702 	virtual NetworkAddress getListenAddress() { return address; }
703 
704 private:
705 	ISimulator::ProcessInfo* process;
706 	PromiseStream< Reference<IConnection> > nextConnection;
707 
incomingSim2Listener708 	ACTOR static void incoming( Reference<Sim2Listener> self, double seconds, Reference<IConnection> conn ) {
709 		wait( g_simulator.onProcess(self->process) );
710 		wait( delay( seconds ) );
711 		if (((Sim2Conn*)conn.getPtr())->isPeerGone() && g_random->random01()<0.5)
712 			return;
713 		TraceEvent("Sim2IncomingConn", conn->getDebugID())
714 			.detail("ListenAddress", self->getListenAddress())
715 			.detail("PeerAddress", conn->getPeerAddress());
716 		self->nextConnection.send( conn );
717 	}
popOneSim2Listener718 	ACTOR static Future<Reference<IConnection>> popOne( FutureStream< Reference<IConnection> > conns ) {
719 		Reference<IConnection> c = waitNext( conns );
720 		((Sim2Conn*)c.getPtr())->opened = true;
721 		return c;
722 	}
723 
724 	NetworkAddress address;
725 };
726 
727 #define g_sim2 ((Sim2&)g_simulator)
728 
729 class Sim2 : public ISimulator, public INetworkConnections {
730 public:
731 	// Implement INetwork interface
732 	// Everything actually network related is delegated to the Sim2Net class; Sim2 is only concerned with simulating machines and time
now()733 	virtual double now() { return time; }
734 
delay(double seconds,int taskID)735 	virtual Future<class Void> delay( double seconds, int taskID ) {
736 		ASSERT(taskID >= TaskMinPriority && taskID <= TaskMaxPriority);
737 		return delay( seconds, taskID, currentProcess );
738 	}
delay(double seconds,int taskID,ProcessInfo * machine)739 	Future<class Void> delay( double seconds, int taskID, ProcessInfo* machine ) {
740 		ASSERT( seconds >= -0.0001 );
741 		seconds = std::max(0.0, seconds);
742 		Future<Void> f;
743 
744 		if(!currentProcess->rebooting && machine == currentProcess && !currentProcess->shutdownSignal.isSet() && FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 && g_random->random01() < 0.25) { //FIXME: why doesnt this work when we are changing machines?
745 			seconds += FLOW_KNOBS->MAX_BUGGIFIED_DELAY*pow(g_random->random01(),1000.0);
746 		}
747 
748 		mutex.enter();
749 		tasks.push( Task( time + seconds, taskID, taskCount++, machine, f ) );
750 		mutex.leave();
751 
752 		return f;
753 	}
checkShutdown(Sim2 * self,int taskID)754 	ACTOR static Future<Void> checkShutdown(Sim2 *self, int taskID) {
755 		ISimulator::KillType kt = wait( self->getCurrentProcess()->shutdownSignal.getFuture() );
756 		self->setCurrentTask(taskID);
757 		return Void();
758 	}
yield(int taskID)759 	virtual Future<class Void> yield( int taskID ) {
760 		if (taskID == TaskDefaultYield) taskID = currentTaskID;
761 		if (check_yield(taskID)) {
762 			// We want to check that yielders can handle actual time elapsing (it sometimes will outside simulation), but
763 			// don't want to prevent instantaneous shutdown of "rebooted" machines.
764 			return delay(getCurrentProcess()->rebooting ? 0 : .001,taskID) || checkShutdown(this, taskID);
765 		}
766 		setCurrentTask(taskID);
767 		return Void();
768 	}
check_yield(int taskID)769 	virtual bool check_yield( int taskID ) {
770 		if (yielded) return true;
771 		if (--yield_limit <= 0) {
772 			yield_limit = g_random->randomInt(1, 150);  // If yield returns false *too* many times in a row, there could be a stack overflow, since we can't deterministically check stack size as the real network does
773 			return yielded = true;
774 		}
775 		return yielded = BUGGIFY_WITH_PROB(0.01);
776 	}
getCurrentTask()777 	virtual int getCurrentTask() {
778 		return currentTaskID;
779 	}
setCurrentTask(int taskID)780 	virtual void setCurrentTask(int taskID ) {
781 		currentTaskID = taskID;
782 	}
783 	// Sets the taskID/priority of the current task, without yielding
connect(NetworkAddress toAddr,std::string host)784 	virtual Future<Reference<IConnection>> connect( NetworkAddress toAddr, std::string host ) {
785 		ASSERT( !toAddr.isTLS() && host.empty());
786 		if (!addressMap.count( toAddr )) {
787 			return waitForProcessAndConnect( toAddr, this );
788 		}
789 		auto peerp = getProcessByAddress(toAddr);
790 		Reference<Sim2Conn> myc( new Sim2Conn( getCurrentProcess() ) );
791 		Reference<Sim2Conn> peerc( new Sim2Conn( peerp ) );
792 
793 		myc->connect(peerc, toAddr);
794 		IPAddress localIp;
795 		if (getCurrentProcess()->address.ip.isV6()) {
796 			IPAddress::IPAddressStore store = getCurrentProcess()->address.ip.toV6();
797 			uint16_t* ipParts = (uint16_t*)store.data();
798 			ipParts[7] += g_random->randomInt(0, 256);
799 			localIp = IPAddress(store);
800 		} else {
801 			localIp = IPAddress(getCurrentProcess()->address.ip.toV4() + g_random->randomInt(0, 256));
802 		}
803 		peerc->connect(myc, NetworkAddress(localIp, g_random->randomInt(40000, 60000)));
804 
805 		((Sim2Listener*)peerp->getListener(toAddr).getPtr())->incomingConnection( 0.5*g_random->random01(), Reference<IConnection>(peerc) );
806 		return onConnect( ::delay(0.5*g_random->random01()), myc );
807 	}
resolveTCPEndpoint(std::string host,std::string service)808 	virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint( std::string host, std::string service) {
809 		throw lookup_failed();
810 	}
onConnect(Future<Void> ready,Reference<Sim2Conn> conn)811 	ACTOR static Future<Reference<IConnection>> onConnect( Future<Void> ready, Reference<Sim2Conn> conn ) {
812 		wait(ready);
813 		if (conn->isPeerGone() && g_random->random01()<0.5) {
814 			conn.clear();
815 			wait(Never());
816 		}
817 		conn->opened = true;
818 		return conn;
819 	}
listen(NetworkAddress localAddr)820 	virtual Reference<IListener> listen( NetworkAddress localAddr ) {
821 		ASSERT( !localAddr.isTLS() );
822 		Reference<IListener> listener( getCurrentProcess()->getListener(localAddr) );
823 		ASSERT(listener);
824 		return listener;
825 	}
waitForProcessAndConnect(NetworkAddress toAddr,INetworkConnections * self)826 	ACTOR static Future<Reference<IConnection>> waitForProcessAndConnect(
827 			NetworkAddress toAddr, INetworkConnections *self ) {
828 		// We have to be able to connect to processes that don't yet exist, so we do some silly polling
829 		loop {
830 			wait( ::delay( 0.1 * g_random->random01() ) );
831 			if (g_sim2.addressMap.count(toAddr)) {
832 				Reference<IConnection> c = wait( self->connect( toAddr ) );
833 				return c;
834 			}
835 		}
836 	}
837 
stop()838 	virtual void stop() { isStopped = true; }
isSimulated() const839 	virtual bool isSimulated() const { return true; }
840 
841 	struct SimThreadArgs {
842 		THREAD_FUNC_RETURN (*func) (void*);
843 		void *arg;
844 
845 		ISimulator::ProcessInfo *currentProcess;
846 
SimThreadArgsSim2::SimThreadArgs847 		SimThreadArgs(THREAD_FUNC_RETURN (*func) (void*), void *arg) : func(func), arg(arg) {
848 			ASSERT(g_network->isSimulated());
849 			currentProcess = g_simulator.getCurrentProcess();
850 		}
851 	};
852 
853 	//Starts a new thread, making sure to set any thread local state
simStartThread(void * arg)854 	THREAD_FUNC simStartThread(void *arg) {
855 		SimThreadArgs *simArgs = (SimThreadArgs*)arg;
856 		ISimulator::currentProcess = simArgs->currentProcess;
857 		simArgs->func(simArgs->arg);
858 
859 		delete simArgs;
860 		THREAD_RETURN;
861 	}
862 
startThread(THREAD_FUNC_RETURN (* func)(void *),void * arg)863 	virtual THREAD_HANDLE startThread( THREAD_FUNC_RETURN (*func) (void*), void *arg ) {
864 		SimThreadArgs *simArgs = new SimThreadArgs(func, arg);
865 		return ::startThread(simStartThread, simArgs);
866 	}
867 
getDiskBytes(std::string const & directory,int64_t & free,int64_t & total)868 	virtual void getDiskBytes( std::string const& directory, int64_t& free, int64_t& total) {
869 		ProcessInfo *proc = getCurrentProcess();
870 		SimDiskSpace &diskSpace = diskSpaceMap[proc->address.ip];
871 
872 		int64_t totalFileSize = 0;
873 		int numFiles = 0;
874 
875 		//Get the size of all files we've created on the server and subtract them from the free space
876 		for(auto file = proc->machine->openFiles.begin(); file != proc->machine->openFiles.end(); ++file) {
877 			if( file->second.isReady() ) {
878 				totalFileSize += ((AsyncFileNonDurable*)file->second.get().getPtr())->approximateSize;
879 			}
880 			numFiles++;
881 		}
882 
883 		bool ok = false;
884 
885 		if(diskSpace.totalSpace == 0) {
886 			diskSpace.totalSpace = 5e9 + g_random->random01() * 100e9; //Total space between 5GB and 105GB
887 			diskSpace.baseFreeSpace = std::min<int64_t>(diskSpace.totalSpace, std::max(5e9, (g_random->random01() * (1 - .075) + .075) * diskSpace.totalSpace) + totalFileSize); //Minimum 5GB or 7.5% total disk space, whichever is higher
888 
889 			TraceEvent("Sim2DiskSpaceInitialization").detail("TotalSpace", diskSpace.totalSpace).detail("BaseFreeSpace", diskSpace.baseFreeSpace).detail("TotalFileSize", totalFileSize).detail("NumFiles", numFiles);
890 		}
891 		else {
892 			int64_t maxDelta = std::min(5.0, (now() - diskSpace.lastUpdate)) * (BUGGIFY ? 10e6 : 1e6); //External processes modifying the disk
893 			int64_t delta = -maxDelta + g_random->random01() * maxDelta * 2;
894 			diskSpace.baseFreeSpace = std::min<int64_t>(diskSpace.totalSpace, std::max<int64_t>(diskSpace.baseFreeSpace + delta, totalFileSize));
895 		}
896 
897 		diskSpace.lastUpdate = now();
898 
899 		total = diskSpace.totalSpace;
900 		free = std::max<int64_t>(0, diskSpace.baseFreeSpace - totalFileSize);
901 
902 		if(free == 0)
903 			TraceEvent(SevWarnAlways, "Sim2NoFreeSpace").detail("TotalSpace", diskSpace.totalSpace).detail("BaseFreeSpace", diskSpace.baseFreeSpace).detail("TotalFileSize", totalFileSize).detail("NumFiles", numFiles);
904 	}
isAddressOnThisHost(NetworkAddress const & addr)905 	virtual bool isAddressOnThisHost( NetworkAddress const& addr ) {
906 		return addr.ip == getCurrentProcess()->address.ip;
907 	}
908 
deleteFileImpl(Sim2 * self,std::string filename,bool mustBeDurable)909 	ACTOR static Future<Void> deleteFileImpl( Sim2* self, std::string filename, bool mustBeDurable ) {
910 		// This is a _rudimentary_ simulation of the untrustworthiness of non-durable deletes and the possibility of
911 		// rebooting during a durable one.  It isn't perfect: for example, on real filesystems testing
912 		// for the existence of a non-durably deleted file BEFORE a reboot will show that it apparently doesn't exist.
913 		if(g_simulator.getCurrentProcess()->machine->openFiles.count(filename)) {
914 			g_simulator.getCurrentProcess()->machine->openFiles.erase(filename);
915 			g_simulator.getCurrentProcess()->machine->deletingFiles.insert(filename);
916 		}
917 		if ( mustBeDurable || g_random->random01() < 0.5 ) {
918 			state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
919 			state int currentTaskID = g_network->getCurrentTask();
920 			wait( g_simulator.onMachine( currentProcess ) );
921 			try {
922 				wait( ::delay(0.05 * g_random->random01()) );
923 				if (!currentProcess->rebooting) {
924 					auto f = IAsyncFileSystem::filesystem(self->net2)->deleteFile(filename, false);
925 					ASSERT( f.isReady() );
926 					wait( ::delay(0.05 * g_random->random01()) );
927 					TEST( true );  // Simulated durable delete
928 				}
929 				wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
930 				return Void();
931 			} catch( Error &e ) {
932 				state Error err = e;
933 				wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
934 				throw err;
935 			}
936 		} else {
937 			TEST( true );  // Simulated non-durable delete
938 			return Void();
939 		}
940 	}
941 
runLoop(Sim2 * self)942 	ACTOR static Future<Void> runLoop(Sim2 *self) {
943 		state ISimulator::ProcessInfo *callingMachine = self->currentProcess;
944 		while ( !self->isStopped ) {
945 			wait( self->net2->yield(TaskDefaultYield) );
946 
947 			self->mutex.enter();
948 			if( self->tasks.size() == 0 ) {
949 				self->mutex.leave();
950 				ASSERT(false);
951 			}
952 			//if (!randLog/* && now() >= 32.0*/)
953 			//	randLog = fopen("randLog.txt", "wt");
954 			Task t = std::move( self->tasks.top() ); // Unfortunately still a copy under gcc where .top() returns const&
955 			self->currentTaskID = t.taskID;
956 			self->tasks.pop();
957 			self->mutex.leave();
958 
959 			self->execTask(t);
960 			self->yielded = false;
961 		}
962 		self->currentProcess = callingMachine;
963 		self->net2->stop();
964 		return Void();
965 	}
966 
_run(Sim2 * self)967 	ACTOR Future<Void> _run(Sim2 *self) {
968 		Future<Void> loopFuture = self->runLoop(self);
969 		self->net2->run();
970 		wait( loopFuture );
971 		return Void();
972 	}
973 
974 	// Implement ISimulator interface
run()975 	virtual void run() {
976 		_run(this);
977 	}
newProcess(const char * name,IPAddress ip,uint16_t port,uint16_t listenPerProcess,LocalityData locality,ProcessClass startingClass,const char * dataFolder,const char * coordinationFolder)978 	virtual ProcessInfo* newProcess(const char* name, IPAddress ip, uint16_t port, uint16_t listenPerProcess,
979 	                                LocalityData locality, ProcessClass startingClass, const char* dataFolder,
980 	                                const char* coordinationFolder) {
981 		ASSERT( locality.machineId().present() );
982 		MachineInfo& machine = machines[ locality.machineId().get() ];
983 		if (!machine.machineId.present())
984 			machine.machineId = locality.machineId();
985 		for( int i = 0; i < machine.processes.size(); i++ ) {
986 			if( machine.processes[i]->locality.machineId() != locality.machineId() ) { // SOMEDAY: compute ip from locality to avoid this check
987 				TraceEvent("Sim2Mismatch")
988 				    .detail("IP", format("%s", ip.toString().c_str()))
989 				    .detail("MachineId", locality.machineId())
990 				    .detail("NewName", name)
991 				    .detail("ExistingMachineId", machine.processes[i]->locality.machineId())
992 				    .detail("ExistingName", machine.processes[i]->name);
993 				ASSERT( false );
994 			}
995 			ASSERT( machine.processes[i]->address.port != port );
996 		}
997 
998 		// This is for async operations on non-durable files.
999 		// These files must live on after process kills for sim purposes.
1000 		if( machine.machineProcess == 0 ) {
1001 			NetworkAddress machineAddress(ip, 0, false, false);
1002 			machine.machineProcess = new ProcessInfo("Machine", locality, startingClass, {machineAddress}, this, "", "");
1003 			machine.machineProcess->machine = &machine;
1004 		}
1005 
1006 		NetworkAddressList addresses;
1007 		addresses.address = NetworkAddress(ip, port, true, false);
1008 		if(listenPerProcess == 2) {
1009 			addresses.secondaryAddress = NetworkAddress(ip, port+1, true, false);
1010 		}
1011 
1012 		ProcessInfo* m = new ProcessInfo(name, locality, startingClass, addresses, this, dataFolder, coordinationFolder);
1013 		for (int processPort = port; processPort < port + listenPerProcess; ++processPort) {
1014 			NetworkAddress address(ip, processPort, true, false); // SOMEDAY see above about becoming SSL!
1015 			m->listenerMap[address] = Reference<IListener>( new Sim2Listener(m, address) );
1016 			addressMap[address] = m;
1017 		}
1018 		m->machine = &machine;
1019 		machine.processes.push_back(m);
1020 		currentlyRebootingProcesses.erase(addresses.address);
1021 		m->excluded = g_simulator.isExcluded(addresses.address);
1022 		m->cleared = g_simulator.isCleared(addresses.address);
1023 
1024 		m->setGlobal(enTDMetrics, (flowGlobalType) &m->tdmetrics);
1025 		m->setGlobal(enNetworkConnections, (flowGlobalType) m->network);
1026 		m->setGlobal(enASIOTimedOut, (flowGlobalType) false);
1027 
1028 		TraceEvent("NewMachine").detail("Name", name).detail("Address", m->address).detail("MachineId", m->locality.machineId()).detail("Excluded", m->excluded).detail("Cleared", m->cleared);
1029 
1030 		// FIXME: Sometimes, connections to/from this process will explicitly close
1031 
1032 		return m;
1033 	}
isAvailable() const1034 	virtual bool isAvailable() const
1035 	{
1036 		std::vector<ProcessInfo*> processesLeft, processesDead;
1037 		for (auto processInfo : getAllProcesses()) {
1038 			if (processInfo->isAvailableClass()) {
1039 				if (processInfo->isExcluded() || processInfo->isCleared() || !processInfo->isAvailable()) {
1040 					processesDead.push_back(processInfo);
1041 				} else {
1042 					processesLeft.push_back(processInfo);
1043 				}
1044 			}
1045 		}
1046 		return canKillProcesses(processesLeft, processesDead, KillInstantly, NULL);
1047 	}
1048 
datacenterDead(Optional<Standalone<StringRef>> dcId) const1049 	virtual bool datacenterDead(Optional<Standalone<StringRef>> dcId) const
1050 	{
1051 		if(!dcId.present()) {
1052 			return false;
1053 		}
1054 
1055 		LocalityGroup primaryProcessesLeft, primaryProcessesDead;
1056 		std::vector<LocalityData> primaryLocalitiesDead, primaryLocalitiesLeft;
1057 
1058 		for (auto processInfo : getAllProcesses()) {
1059 			if (processInfo->isAvailableClass() && processInfo->locality.dcId() == dcId) {
1060 				if (processInfo->isExcluded() || processInfo->isCleared() || !processInfo->isAvailable()) {
1061 					primaryProcessesDead.add(processInfo->locality);
1062 					primaryLocalitiesDead.push_back(processInfo->locality);
1063 				} else {
1064 					primaryProcessesLeft.add(processInfo->locality);
1065 					primaryLocalitiesLeft.push_back(processInfo->locality);
1066 				}
1067 			}
1068 		}
1069 
1070 		std::vector<LocalityData> badCombo;
1071 		bool primaryTLogsDead = tLogWriteAntiQuorum ? !validateAllCombinations(badCombo, primaryProcessesDead, tLogPolicy, primaryLocalitiesLeft, tLogWriteAntiQuorum, false) : primaryProcessesDead.validate(tLogPolicy);
1072 		if(usableRegions > 1 && remoteTLogPolicy && !primaryTLogsDead) {
1073 			primaryTLogsDead = primaryProcessesDead.validate(remoteTLogPolicy);
1074 		}
1075 
1076 		return primaryTLogsDead || primaryProcessesDead.validate(storagePolicy);
1077 	}
1078 
1079 	// The following function will determine if the specified configuration of available and dead processes can allow the cluster to survive
canKillProcesses(std::vector<ProcessInfo * > const & availableProcesses,std::vector<ProcessInfo * > const & deadProcesses,KillType kt,KillType * newKillType) const1080 	virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) const
1081 	{
1082 		bool canSurvive = true;
1083 		int nQuorum = ((desiredCoordinators+1)/2)*2-1;
1084 
1085 		KillType newKt = kt;
1086 		if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete))
1087 		{
1088 			LocalityGroup primaryProcessesLeft, primaryProcessesDead;
1089 			LocalityGroup primarySatelliteProcessesLeft, primarySatelliteProcessesDead;
1090 			LocalityGroup remoteProcessesLeft, remoteProcessesDead;
1091 			LocalityGroup remoteSatelliteProcessesLeft, remoteSatelliteProcessesDead;
1092 
1093 			std::vector<LocalityData> primaryLocalitiesDead, primaryLocalitiesLeft;
1094 			std::vector<LocalityData> primarySatelliteLocalitiesDead, primarySatelliteLocalitiesLeft;
1095 			std::vector<LocalityData> remoteLocalitiesDead, remoteLocalitiesLeft;
1096 			std::vector<LocalityData> remoteSatelliteLocalitiesDead, remoteSatelliteLocalitiesLeft;
1097 
1098 			std::vector<LocalityData> badCombo;
1099 			std::set<Optional<Standalone<StringRef>>> uniqueMachines;
1100 
1101 			if(!primaryDcId.present()) {
1102 				for (auto processInfo : availableProcesses) {
1103 					primaryProcessesLeft.add(processInfo->locality);
1104 					primaryLocalitiesLeft.push_back(processInfo->locality);
1105 					uniqueMachines.insert(processInfo->locality.zoneId());
1106 				}
1107 				for (auto processInfo : deadProcesses) {
1108 					primaryProcessesDead.add(processInfo->locality);
1109 					primaryLocalitiesDead.push_back(processInfo->locality);
1110 				}
1111 			} else {
1112 				for (auto processInfo : availableProcesses) {
1113 					uniqueMachines.insert(processInfo->locality.zoneId());
1114 					if(processInfo->locality.dcId() == primaryDcId) {
1115 						primaryProcessesLeft.add(processInfo->locality);
1116 						primaryLocalitiesLeft.push_back(processInfo->locality);
1117 					} else if(processInfo->locality.dcId() == remoteDcId) {
1118 						remoteProcessesLeft.add(processInfo->locality);
1119 						remoteLocalitiesLeft.push_back(processInfo->locality);
1120 					} else if(std::find(primarySatelliteDcIds.begin(), primarySatelliteDcIds.end(), processInfo->locality.dcId()) != primarySatelliteDcIds.end()) {
1121 						primarySatelliteProcessesLeft.add(processInfo->locality);
1122 						primarySatelliteLocalitiesLeft.push_back(processInfo->locality);
1123 					} else if(std::find(remoteSatelliteDcIds.begin(), remoteSatelliteDcIds.end(), processInfo->locality.dcId()) != remoteSatelliteDcIds.end()) {
1124 						remoteSatelliteProcessesLeft.add(processInfo->locality);
1125 						remoteSatelliteLocalitiesLeft.push_back(processInfo->locality);
1126 					}
1127 				}
1128 				for (auto processInfo : deadProcesses) {
1129 					if(processInfo->locality.dcId() == primaryDcId) {
1130 						primaryProcessesDead.add(processInfo->locality);
1131 						primaryLocalitiesDead.push_back(processInfo->locality);
1132 					} else if(processInfo->locality.dcId() == remoteDcId) {
1133 						remoteProcessesDead.add(processInfo->locality);
1134 						remoteLocalitiesDead.push_back(processInfo->locality);
1135 					} else if(std::find(primarySatelliteDcIds.begin(), primarySatelliteDcIds.end(), processInfo->locality.dcId()) != primarySatelliteDcIds.end()) {
1136 						primarySatelliteProcessesDead.add(processInfo->locality);
1137 						primarySatelliteLocalitiesDead.push_back(processInfo->locality);
1138 					} else if(std::find(remoteSatelliteDcIds.begin(), remoteSatelliteDcIds.end(), processInfo->locality.dcId()) != remoteSatelliteDcIds.end()) {
1139 						remoteSatelliteProcessesDead.add(processInfo->locality);
1140 						remoteSatelliteLocalitiesDead.push_back(processInfo->locality);
1141 					}
1142 				}
1143 			}
1144 
1145 			bool tooManyDead = false;
1146 			bool notEnoughLeft = false;
1147 			bool primaryTLogsDead = tLogWriteAntiQuorum ? !validateAllCombinations(badCombo, primaryProcessesDead, tLogPolicy, primaryLocalitiesLeft, tLogWriteAntiQuorum, false) : primaryProcessesDead.validate(tLogPolicy);
1148 			if(usableRegions > 1 && remoteTLogPolicy && !primaryTLogsDead) {
1149 				primaryTLogsDead = primaryProcessesDead.validate(remoteTLogPolicy);
1150 			}
1151 
1152 			if(!primaryDcId.present()) {
1153 				tooManyDead = primaryTLogsDead || primaryProcessesDead.validate(storagePolicy);
1154 				notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy);
1155 			} else {
1156 				bool remoteTLogsDead = tLogWriteAntiQuorum ? !validateAllCombinations(badCombo, remoteProcessesDead, tLogPolicy, remoteLocalitiesLeft, tLogWriteAntiQuorum, false) : remoteProcessesDead.validate(tLogPolicy);
1157 				if(usableRegions > 1 && remoteTLogPolicy && !remoteTLogsDead) {
1158 					remoteTLogsDead = remoteProcessesDead.validate(remoteTLogPolicy);
1159 				}
1160 
1161 				if(!hasSatelliteReplication) {
1162 					if(usableRegions > 1) {
1163 						tooManyDead = primaryTLogsDead || remoteTLogsDead || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) );
1164 						notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(remoteTLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(remoteTLogPolicy) || !remoteProcessesLeft.validate(storagePolicy);
1165 					} else {
1166 						tooManyDead = primaryTLogsDead || remoteTLogsDead || primaryProcessesDead.validate(storagePolicy) || remoteProcessesDead.validate(storagePolicy);
1167 						notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy);
1168 					}
1169 				} else {
1170 					bool primarySatelliteTLogsDead = satelliteTLogWriteAntiQuorumFallback ? !validateAllCombinations(badCombo, primarySatelliteProcessesDead, satelliteTLogPolicyFallback, primarySatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorumFallback, false) : primarySatelliteProcessesDead.validate(satelliteTLogPolicyFallback);
1171 					bool remoteSatelliteTLogsDead = satelliteTLogWriteAntiQuorumFallback ? !validateAllCombinations(badCombo, remoteSatelliteProcessesDead, satelliteTLogPolicyFallback, remoteSatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorumFallback, false) : remoteSatelliteProcessesDead.validate(satelliteTLogPolicyFallback);
1172 
1173 					if(usableRegions > 1) {
1174 						notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(remoteTLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(remoteTLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy);
1175 					} else {
1176 						notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy);
1177 					}
1178 
1179 					if(usableRegions > 1 && allowLogSetKills) {
1180 						tooManyDead = ( primaryTLogsDead && primarySatelliteTLogsDead ) || ( remoteTLogsDead && remoteSatelliteTLogsDead ) || ( primaryTLogsDead && remoteTLogsDead ) || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) );
1181 					} else {
1182 						tooManyDead = primaryTLogsDead || remoteTLogsDead || primaryProcessesDead.validate(storagePolicy) || remoteProcessesDead.validate(storagePolicy);
1183 					}
1184 				}
1185 			}
1186 
1187 			// Reboot if dead machines do fulfill policies
1188 			if (tooManyDead) {
1189 				newKt = Reboot;
1190 				canSurvive = false;
1191 				TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("TLogPolicy", tLogPolicy->info()).detail("Reason", "tLogPolicy validates against dead processes.");
1192 			}
1193 			// Reboot and Delete if remaining machines do NOT fulfill policies
1194 			else if ((kt < RebootAndDelete) && notEnoughLeft) {
1195 				newKt = RebootAndDelete;
1196 				canSurvive = false;
1197 				TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("TLogPolicy", tLogPolicy->info()).detail("Reason", "tLogPolicy does not validates against remaining processes.");
1198 			}
1199 			else if ((kt < RebootAndDelete) && (nQuorum > uniqueMachines.size())) {
1200 				newKt = RebootAndDelete;
1201 				canSurvive = false;
1202 				TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("StoragePolicy", storagePolicy->info()).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("Reason", "Not enough unique machines to perform auto configuration of coordinators.");
1203 			}
1204 			else {
1205 				TraceEvent("CanSurviveKills").detail("KillType", kt).detail("TLogPolicy", tLogPolicy->info()).detail("StoragePolicy", storagePolicy->info()).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size());
1206 			}
1207 		}
1208 		if (newKillType) *newKillType = newKt;
1209 		return canSurvive;
1210 	}
1211 
destroyProcess(ISimulator::ProcessInfo * p)1212 	virtual void destroyProcess( ISimulator::ProcessInfo *p ) {
1213 		TraceEvent("ProcessDestroyed").detail("Name", p->name).detail("Address", p->address).detail("MachineId", p->locality.machineId());
1214 		currentlyRebootingProcesses.insert(std::pair<NetworkAddress, ProcessInfo*>(p->address, p));
1215 		std::vector<ProcessInfo*>& processes = machines[ p->locality.machineId().get() ].processes;
1216 		if( p != processes.back() ) {
1217 			auto it = std::find( processes.begin(), processes.end(), p );
1218 			std::swap( *it, processes.back() );
1219 		}
1220 		processes.pop_back();
1221 		killProcess_internal( p, KillInstantly );
1222 	}
killProcess_internal(ProcessInfo * machine,KillType kt)1223 	void killProcess_internal( ProcessInfo* machine, KillType kt ) {
1224 		TEST( true ); // Simulated machine was killed with any kill type
1225 		TEST( kt == KillInstantly ); // Simulated machine was killed instantly
1226 		TEST( kt == InjectFaults ); // Simulated machine was killed with faults
1227 
1228 		if (kt == KillInstantly) {
1229 			TraceEvent(SevWarn, "FailMachine").detail("Name", machine->name).detail("Address", machine->address).detail("ZoneId", machine->locality.zoneId()).detail("Process", machine->toString()).detail("Rebooting", machine->rebooting).detail("Protected", protectedAddresses.count(machine->address)).backtrace();
1230 			// This will remove all the "tracked" messages that came from the machine being killed
1231 			latestEventCache.clear();
1232 			machine->failed = true;
1233 		} else if (kt == InjectFaults) {
1234 			TraceEvent(SevWarn, "FaultMachine").detail("Name", machine->name).detail("Address", machine->address).detail("ZoneId", machine->locality.zoneId()).detail("Process", machine->toString()).detail("Rebooting", machine->rebooting).detail("Protected", protectedAddresses.count(machine->address)).backtrace();
1235 			should_inject_fault = simulator_should_inject_fault;
1236 			machine->fault_injection_r = g_random->randomUniqueID().first();
1237 			machine->fault_injection_p1 = 0.1;
1238 			machine->fault_injection_p2 = g_random->random01();
1239 		} else {
1240 			ASSERT( false );
1241 		}
1242 		ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting);
1243 	}
rebootProcess(ProcessInfo * process,KillType kt)1244 	virtual void rebootProcess( ProcessInfo* process, KillType kt ) {
1245 		if( kt == RebootProcessAndDelete && protectedAddresses.count(process->address) ) {
1246 			TraceEvent("RebootChanged").detail("ZoneId", process->locality.describeZone()).detail("KillType", RebootProcess).detail("OrigKillType", kt).detail("Reason", "Protected process");
1247 			kt = RebootProcess;
1248 		}
1249 		doReboot( process, kt );
1250 	}
rebootProcess(Optional<Standalone<StringRef>> zoneId,bool allProcesses)1251 	virtual void rebootProcess(Optional<Standalone<StringRef>> zoneId, bool allProcesses ) {
1252 		if( allProcesses ) {
1253 			auto processes = getAllProcesses();
1254 			for( int i = 0; i < processes.size(); i++ )
1255 				if( processes[i]->locality.zoneId() == zoneId && !processes[i]->rebooting )
1256 					doReboot( processes[i], RebootProcess );
1257 		} else {
1258 			auto processes = getAllProcesses();
1259 			for( int i = 0; i < processes.size(); i++ ) {
1260 				if( processes[i]->locality.zoneId() != zoneId || processes[i]->rebooting ) {
1261 					swapAndPop(&processes, i--);
1262 				}
1263 			}
1264 			if( processes.size() )
1265 				doReboot( g_random->randomChoice( processes ), RebootProcess );
1266 		}
1267 	}
killProcess(ProcessInfo * machine,KillType kt)1268 	virtual void killProcess( ProcessInfo* machine, KillType kt ) {
1269 		TraceEvent("AttemptingKillProcess");
1270 		if (kt < RebootAndDelete ) {
1271 			killProcess_internal( machine, kt );
1272 		}
1273 	}
killInterface(NetworkAddress address,KillType kt)1274 	virtual void killInterface( NetworkAddress address, KillType kt  ) {
1275 		if (kt < RebootAndDelete ) {
1276 			std::vector<ProcessInfo*>& processes = machines[ addressMap[address]->locality.machineId() ].processes;
1277 			for( int i = 0; i < processes.size(); i++ )
1278 				killProcess_internal( processes[i], kt );
1279 		}
1280 	}
killZone(Optional<Standalone<StringRef>> zoneId,KillType kt,bool forceKill,KillType * ktFinal)1281 	virtual bool killZone(Optional<Standalone<StringRef>> zoneId, KillType kt, bool forceKill, KillType* ktFinal) {
1282 		auto processes = getAllProcesses();
1283 		std::set<Optional<Standalone<StringRef>>> zoneMachines;
1284 		for (auto& process : processes) {
1285 			if(process->locality.zoneId() == zoneId) {
1286 				zoneMachines.insert(process->locality.machineId());
1287 			}
1288 		}
1289 		bool result = false;
1290 		for(auto& machineId : zoneMachines) {
1291 			if(killMachine(machineId, kt, forceKill, ktFinal)) {
1292 				result = true;
1293 			}
1294 		}
1295 		return result;
1296 	}
killMachine(Optional<Standalone<StringRef>> machineId,KillType kt,bool forceKill,KillType * ktFinal)1297 	virtual bool killMachine(Optional<Standalone<StringRef>> machineId, KillType kt, bool forceKill, KillType* ktFinal) {
1298 		auto ktOrig = kt;
1299 
1300 		TEST(true); // Trying to killing a machine
1301 		TEST(kt == KillInstantly); // Trying to kill instantly
1302 		TEST(kt == InjectFaults);  // Trying to kill by injecting faults
1303 
1304 		if(speedUpSimulation && !forceKill) {
1305 			TraceEvent(SevWarn, "AbortedKill").detail("MachineId", machineId).detail("Reason", "Unforced kill within speedy simulation.").backtrace();
1306 			if (ktFinal) *ktFinal = None;
1307 			return false;
1308 		}
1309 
1310 		int processesOnMachine = 0;
1311 
1312 		KillType originalKt = kt;
1313 		// Reboot if any of the processes are protected and count the number of processes not rebooting
1314 		for (auto& process : machines[machineId].processes) {
1315 			if (protectedAddresses.count(process->address))
1316 				kt = Reboot;
1317 			if (!process->rebooting)
1318 				processesOnMachine++;
1319 		}
1320 
1321 		// Do nothing, if no processes to kill
1322 		if (processesOnMachine == 0) {
1323 			TraceEvent(SevWarn, "AbortedKill").detail("MachineId", machineId).detail("Reason", "The target had no processes running.").detail("Processes", processesOnMachine).detail("ProcessesPerMachine", processesPerMachine).backtrace();
1324 			if (ktFinal) *ktFinal = None;
1325 			return false;
1326 		}
1327 
1328 		// Check if machine can be removed, if requested
1329 		if (!forceKill && ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)))
1330 		{
1331 			std::vector<ProcessInfo*> processesLeft, processesDead;
1332 			int	protectedWorker = 0, unavailable = 0, excluded = 0, cleared = 0;
1333 
1334 			for (auto processInfo : getAllProcesses()) {
1335 				if (processInfo->isAvailableClass()) {
1336 					if (processInfo->isExcluded()) {
1337 						processesDead.push_back(processInfo);
1338 						excluded++;
1339 					}
1340 					else if (processInfo->isCleared()) {
1341 						processesDead.push_back(processInfo);
1342 						cleared++;
1343 					}
1344 					else if (!processInfo->isAvailable()) {
1345 						processesDead.push_back(processInfo);
1346 						unavailable++;
1347 					}
1348 					else if (protectedAddresses.count(processInfo->address)) {
1349 						processesLeft.push_back(processInfo);
1350 						protectedWorker++;
1351 					}
1352 					else if (processInfo->locality.machineId() != machineId) {
1353 						processesLeft.push_back(processInfo);
1354 					} else {
1355 						processesDead.push_back(processInfo);
1356 					}
1357 				}
1358 			}
1359 			if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) {
1360 				TraceEvent("ChangedKillMachine").detail("MachineId", machineId).detail("KillType", kt).detail("OrigKillType", ktOrig).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("ProcessesPerMachine", processesPerMachine).detail("Protected", protectedWorker).detail("Unavailable", unavailable).detail("Excluded", excluded).detail("Cleared", cleared).detail("ProtectedTotal", protectedAddresses.size()).detail("TLogPolicy", tLogPolicy->info()).detail("StoragePolicy", storagePolicy->info());
1361 			}
1362 			else if ((kt == KillInstantly) || (kt == InjectFaults)) {
1363 				TraceEvent("DeadMachine").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("ProcessesPerMachine", processesPerMachine).detail("TLogPolicy", tLogPolicy->info()).detail("StoragePolicy", storagePolicy->info());
1364 				for (auto process : processesLeft) {
1365 					TraceEvent("DeadMachineSurvivors").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", process->toString());
1366 				}
1367 				for (auto process : processesDead) {
1368 					TraceEvent("DeadMachineVictims").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", process->toString());
1369 				}
1370 			}
1371 			else {
1372 				TraceEvent("ClearMachine").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("ProcessesPerMachine", processesPerMachine).detail("TLogPolicy", tLogPolicy->info()).detail("StoragePolicy", storagePolicy->info());
1373 				for (auto process : processesLeft) {
1374 					TraceEvent("ClearMachineSurvivors").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", process->toString());
1375 				}
1376 				for (auto process : processesDead) {
1377 					TraceEvent("ClearMachineVictims").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", process->toString());
1378 				}
1379 			}
1380 		}
1381 
1382 		TEST(originalKt != kt);  // Kill type was changed from requested to reboot.
1383 
1384 		// Check if any processes on machine are rebooting
1385 		if( processesOnMachine != processesPerMachine && kt >= RebootAndDelete ) {
1386 			TEST(true); //Attempted reboot, but the target did not have all of its processes running
1387 			TraceEvent(SevWarn, "AbortedKill").detail("KillType", kt).detail("MachineId", machineId).detail("Reason", "Machine processes does not match number of processes per machine").detail("Processes", processesOnMachine).detail("ProcessesPerMachine", processesPerMachine).backtrace();
1388 			if (ktFinal) *ktFinal = None;
1389 			return false;
1390 		}
1391 
1392 		// Check if any processes on machine are rebooting
1393 		if ( processesOnMachine != processesPerMachine ) {
1394 			TEST(true); //Attempted reboot, but the target did not have all of its processes running
1395 			TraceEvent(SevWarn, "AbortedKill").detail("KillType", kt).detail("MachineId", machineId).detail("Reason", "Machine processes does not match number of processes per machine").detail("Processes", processesOnMachine).detail("ProcessesPerMachine", processesPerMachine).backtrace();
1396 			if (ktFinal) *ktFinal = None;
1397 			return false;
1398 		}
1399 
1400 		TraceEvent("KillMachine").detail("MachineId", machineId).detail("Kt", kt).detail("KtOrig", ktOrig).detail("KillableMachines", processesOnMachine).detail("ProcessPerMachine", processesPerMachine).detail("KillChanged", kt!=ktOrig);
1401 		if ( kt < RebootAndDelete ) {
1402 			if(kt == InjectFaults && machines[machineId].machineProcess != nullptr)
1403 				killProcess_internal( machines[machineId].machineProcess, kt );
1404 			for (auto& process : machines[machineId].processes) {
1405 				TraceEvent("KillMachineProcess").detail("KillType", kt).detail("Process", process->toString()).detail("StartingClass", process->startingClass.toString()).detail("Failed", process->failed).detail("Excluded", process->excluded).detail("Cleared", process->cleared).detail("Rebooting", process->rebooting);
1406 				if (process->startingClass != ProcessClass::TesterClass)
1407 					killProcess_internal( process, kt );
1408 			}
1409 		}
1410 		else if ( kt == Reboot || kt == RebootAndDelete ) {
1411 			for (auto& process : machines[machineId].processes) {
1412 				TraceEvent("KillMachineProcess").detail("KillType", kt).detail("Process", process->toString()).detail("StartingClass", process->startingClass.toString()).detail("Failed", process->failed).detail("Excluded", process->excluded).detail("Cleared", process->cleared).detail("Rebooting", process->rebooting);
1413 				if (process->startingClass != ProcessClass::TesterClass)
1414 					doReboot(process, kt );
1415 			}
1416 		}
1417 
1418 		TEST(kt == RebootAndDelete); // Resulted in a reboot and delete
1419 		TEST(kt == Reboot); // Resulted in a reboot
1420 		TEST(kt == KillInstantly); // Resulted in an instant kill
1421 		TEST(kt == InjectFaults);  // Resulted in a kill by injecting faults
1422 
1423 		if (ktFinal) *ktFinal = kt;
1424 		return true;
1425 	}
1426 
killDataCenter(Optional<Standalone<StringRef>> dcId,KillType kt,bool forceKill,KillType * ktFinal)1427 	virtual bool killDataCenter(Optional<Standalone<StringRef>> dcId, KillType kt, bool forceKill, KillType* ktFinal) {
1428 		auto ktOrig = kt;
1429 		auto processes = getAllProcesses();
1430 		std::map<Optional<Standalone<StringRef>>, int> datacenterMachines;
1431 		int	dcProcesses = 0;
1432 
1433 		// Switch to a reboot, if anything protected on machine
1434 		for (auto& procRecord : processes) {
1435 			auto processDcId = procRecord->locality.dcId();
1436 			auto processMachineId = procRecord->locality.machineId();
1437 			ASSERT(processMachineId.present());
1438 			if (processDcId.present() && (processDcId == dcId)) {
1439 				if ((kt != Reboot) && (protectedAddresses.count(procRecord->address))) {
1440 					kt = Reboot;
1441 					TraceEvent(SevWarn, "DcKillChanged").detail("DataCenter", dcId).detail("KillType", kt).detail("OrigKillType", ktOrig)
1442 						.detail("Reason", "Datacenter has protected process").detail("ProcessAddress", procRecord->address).detail("Failed", procRecord->failed).detail("Rebooting", procRecord->rebooting).detail("Excluded", procRecord->excluded).detail("Cleared", procRecord->cleared).detail("Process", procRecord->toString());
1443 				}
1444 				datacenterMachines[processMachineId.get()] ++;
1445 				dcProcesses ++;
1446 			}
1447 		}
1448 
1449 		// Check if machine can be removed, if requested
1450 		if (!forceKill && ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)))
1451 		{
1452 			std::vector<ProcessInfo*>	processesLeft, processesDead;
1453 			for (auto processInfo : getAllProcesses()) {
1454 				if (processInfo->isAvailableClass()) {
1455 					if (processInfo->isExcluded() || processInfo->isCleared() || !processInfo->isAvailable()) {
1456 						processesDead.push_back(processInfo);
1457 					} else if (protectedAddresses.count(processInfo->address) || datacenterMachines.find(processInfo->locality.machineId()) == datacenterMachines.end()) {
1458 						processesLeft.push_back(processInfo);
1459 					} else {
1460 						processesDead.push_back(processInfo);
1461 					}
1462 				}
1463 			}
1464 
1465 			if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) {
1466 				TraceEvent(SevWarn, "DcKillChanged").detail("DataCenter", dcId).detail("KillType", kt).detail("OrigKillType", ktOrig);
1467 			}
1468 			else {
1469 				TraceEvent("DeadDataCenter").detail("DataCenter", dcId).detail("KillType", kt).detail("DcZones", datacenterMachines.size()).detail("DcProcesses", dcProcesses).detail("ProcessesDead", processesDead.size()).detail("ProcessesLeft", processesLeft.size()).detail("TLogPolicy", tLogPolicy->info()).detail("StoragePolicy", storagePolicy->info());
1470 				for (auto process : processesLeft) {
1471 					TraceEvent("DeadDcSurvivors").detail("MachineId", process->locality.machineId()).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", process->toString());
1472 				}
1473 				for (auto process : processesDead) {
1474 					TraceEvent("DeadDcVictims").detail("MachineId", process->locality.machineId()).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", process->toString());
1475 				}
1476 			}
1477 		}
1478 
1479 		KillType	ktResult, ktMin = kt;
1480 		for (auto& datacenterMachine : datacenterMachines) {
1481 			if(g_random->random01() < 0.99) {
1482 				killMachine(datacenterMachine.first, kt, true, &ktResult);
1483 				if (ktResult != kt) {
1484 					TraceEvent(SevWarn, "KillDCFail")
1485 						.detail("Zone", datacenterMachine.first)
1486 						.detail("KillType", kt)
1487 						.detail("KillTypeResult", ktResult)
1488 						.detail("KillTypeOrig", ktOrig);
1489 					ASSERT(ktResult == None);
1490 				}
1491 				ktMin = std::min<KillType>( ktResult, ktMin );
1492 			}
1493 		}
1494 
1495 		TraceEvent("KillDataCenter")
1496 			.detail("DcZones", datacenterMachines.size())
1497 			.detail("DcProcesses", dcProcesses)
1498 			.detail("DCID", dcId)
1499 			.detail("KillType", kt)
1500 			.detail("KillTypeOrig", ktOrig)
1501 			.detail("KillTypeMin", ktMin)
1502 			.detail("KilledDC", kt==ktMin);
1503 
1504 		TEST(kt != ktMin); // DataCenter kill was rejected by killMachine
1505 		TEST((kt==ktMin) && (kt == RebootAndDelete)); // Resulted in a reboot and delete
1506 		TEST((kt==ktMin) && (kt == Reboot)); // Resulted in a reboot
1507 		TEST((kt==ktMin) && (kt == KillInstantly)); // Resulted in an instant kill
1508 		TEST((kt==ktMin) && (kt == InjectFaults));  // Resulted in a kill by injecting faults
1509 		TEST((kt==ktMin) && (kt != ktOrig)); // Kill request was downgraded
1510 		TEST((kt==ktMin) && (kt == ktOrig)); // Requested kill was done
1511 
1512 		if (ktFinal) *ktFinal = ktMin;
1513 
1514 		return (kt == ktMin);
1515 	}
clogInterface(const IPAddress & ip,double seconds,ClogMode mode=ClogDefault)1516 	virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) {
1517 		if (mode == ClogDefault) {
1518 			double a = g_random->random01();
1519 			if ( a < 0.3 ) mode = ClogSend;
1520 			else if (a < 0.6 ) mode = ClogReceive;
1521 			else mode = ClogAll;
1522 		}
1523 		TraceEvent("ClogInterface")
1524 		    .detail("IP", ip.toString())
1525 		    .detail("Delay", seconds)
1526 		    .detail("Queue", mode == ClogSend ? "Send" : mode == ClogReceive ? "Receive" : "All");
1527 
1528 		if (mode == ClogSend || mode==ClogAll)
1529 			g_clogging.clogSendFor( ip, seconds );
1530 		if (mode == ClogReceive || mode==ClogAll)
1531 			g_clogging.clogRecvFor( ip, seconds );
1532 	}
clogPair(const IPAddress & from,const IPAddress & to,double seconds)1533 	virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) {
1534 		g_clogging.clogPairFor( from, to, seconds );
1535 	}
getAllProcesses() const1536 	virtual std::vector<ProcessInfo*> getAllProcesses() const {
1537 		std::vector<ProcessInfo*> processes;
1538 		for( auto& c : machines ) {
1539 			processes.insert( processes.end(), c.second.processes.begin(), c.second.processes.end() );
1540 		}
1541 		for( auto& c : currentlyRebootingProcesses ) {
1542 			processes.push_back( c.second );
1543 		}
1544 		return processes;
1545 	}
getProcessByAddress(NetworkAddress const & address)1546 	virtual ProcessInfo* getProcessByAddress( NetworkAddress const& address ) {
1547 		NetworkAddress normalizedAddress(address.ip, address.port, true, false);
1548 		ASSERT( addressMap.count( normalizedAddress ) );
1549 		return addressMap[ normalizedAddress ];
1550 	}
1551 
getMachineByNetworkAddress(NetworkAddress const & address)1552 	virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) {
1553 		return &machines[addressMap[address]->locality.machineId()];
1554 	}
1555 
getMachineById(Optional<Standalone<StringRef>> const & machineId)1556 	virtual MachineInfo* getMachineById(Optional<Standalone<StringRef>> const& machineId) {
1557 		return &machines[machineId];
1558 	}
1559 
destroyMachine(Optional<Standalone<StringRef>> const & machineId)1560 	virtual void destroyMachine(Optional<Standalone<StringRef>> const& machineId ) {
1561 		auto& machine = machines[machineId];
1562 		for( auto process : machine.processes ) {
1563 			ASSERT( process->failed );
1564 		}
1565 		if( machine.machineProcess ) {
1566 			 killProcess_internal( machine.machineProcess, KillInstantly );
1567 		}
1568 		machines.erase(machineId);
1569 	}
1570 
Sim2()1571 	Sim2() : time(0.0), taskCount(0), yielded(false), yield_limit(0), currentTaskID(-1) {
1572 		// Not letting currentProcess be NULL eliminates some annoying special cases
1573 		currentProcess = new ProcessInfo( "NoMachine", LocalityData(Optional<Standalone<StringRef>>(), StringRef(), StringRef(), StringRef()), ProcessClass(), {NetworkAddress()}, this, "", "" );
1574 		g_network = net2 = newNet2(false, true);
1575 		Net2FileSystem::newFileSystem();
1576 		check_yield(0);
1577 	}
1578 
1579 	// Implementation
1580 	struct Task {
1581 		int taskID;
1582 		double time;
1583 		uint64_t stable;
1584 		ProcessInfo* machine;
1585 		Promise<Void> action;
TaskSim2::Task1586 		Task( double time, int taskID, uint64_t stable, ProcessInfo* machine, Promise<Void>&& action ) : time(time), taskID(taskID), stable(stable), machine(machine), action(std::move(action)) {}
TaskSim2::Task1587 		Task( double time, int taskID, uint64_t stable, ProcessInfo* machine, Future<Void>& future ) : time(time), taskID(taskID), stable(stable), machine(machine) { future = action.getFuture(); }
TaskSim2::Task1588 		Task(Task&& rhs) BOOST_NOEXCEPT : time(rhs.time), taskID(rhs.taskID), stable(rhs.stable), machine(rhs.machine), action(std::move(rhs.action)) {}
operator =Sim2::Task1589 		void operator= ( Task const& rhs ) { taskID = rhs.taskID; time = rhs.time; stable = rhs.stable; machine = rhs.machine; action = rhs.action; }
TaskSim2::Task1590 		Task( Task const& rhs ) : taskID(rhs.taskID), time(rhs.time), stable(rhs.stable), machine(rhs.machine), action(rhs.action) {}
operator =Sim2::Task1591 		void operator= (Task&& rhs) BOOST_NOEXCEPT { time = rhs.time; taskID = rhs.taskID; stable = rhs.stable; machine = rhs.machine; action = std::move(rhs.action); }
1592 
operator <Sim2::Task1593 		bool operator < (Task const& rhs) const {
1594 			// Ordering is reversed for priority_queue
1595 			if (time != rhs.time) return time > rhs.time;
1596 			return stable > rhs.stable;
1597 		}
1598 	};
1599 
execTask(struct Task & t)1600 	void execTask(struct Task& t) {
1601 		if (t.machine->failed) {
1602 			t.action.send(Never());
1603 		}
1604 		else {
1605 			mutex.enter();
1606 			this->time = t.time;
1607 			mutex.leave();
1608 
1609 			this->currentProcess = t.machine;
1610 			try {
1611 				//auto before = getCPUTicks();
1612 				t.action.send(Void());
1613 				ASSERT( this->currentProcess == t.machine );
1614 				/*auto elapsed = getCPUTicks() - before;
1615 				currentProcess->cpuTicks += elapsed;
1616 				if (g_random->random01() < 0.01){
1617 					TraceEvent("TaskDuration").detail("CpuTicks", currentProcess->cpuTicks);
1618 					currentProcess->cpuTicks = 0;
1619 				}*/
1620 			} catch (Error& e) {
1621 				TraceEvent(SevError, "UnhandledSimulationEventError").error(e, true);
1622 				killProcess(t.machine, KillInstantly);
1623 			}
1624 
1625 			//if( this->time > 45.522817 ) {
1626 			//	printf("foo\n");
1627 			//}
1628 
1629 			if (randLog)
1630 				fprintf( randLog, "T %f %d %s %lld\n", this->time, int(g_random->peek() % 10000), t.machine ? t.machine->name : "none", t.stable);
1631 		}
1632 	}
1633 
onMainThread(Promise<Void> && signal,int taskID)1634 	virtual void onMainThread( Promise<Void>&& signal, int taskID ) {
1635 		// This is presumably coming from either a "fake" thread pool thread, i.e. it is actually on this thread
1636 		// or a thread created with g_network->startThread
1637 		ASSERT(getCurrentProcess());
1638 
1639 		mutex.enter();
1640 		ASSERT(taskID >= TaskMinPriority && taskID <= TaskMaxPriority);
1641 		tasks.push( Task( time, taskID, taskCount++, getCurrentProcess(), std::move(signal) ) );
1642 		mutex.leave();
1643 	}
onProcess(ISimulator::ProcessInfo * process,int taskID)1644 	virtual Future<Void> onProcess( ISimulator::ProcessInfo *process, int taskID ) {
1645 		return delay( 0, taskID, process );
1646 	}
onMachine(ISimulator::ProcessInfo * process,int taskID)1647 	virtual Future<Void> onMachine( ISimulator::ProcessInfo *process, int taskID ) {
1648 		if( process->machine == 0 )
1649 			return Void();
1650 		return delay( 0, taskID, process->machine->machineProcess );
1651 	}
1652 
1653 	//time is guarded by ISimulator::mutex. It is not necessary to guard reads on the main thread because
1654 	//time should only be modified from the main thread.
1655 	double time;
1656 	int currentTaskID;
1657 
1658 	//taskCount is guarded by ISimulator::mutex
1659 	uint64_t taskCount;
1660 
1661 	std::map<Optional<Standalone<StringRef>>, MachineInfo > machines;
1662 	std::map<NetworkAddress, ProcessInfo*> addressMap;
1663 	std::map<ProcessInfo*, Promise<Void>> filesDeadMap;
1664 
1665 	//tasks is guarded by ISimulator::mutex
1666 	std::priority_queue<Task, std::vector<Task>> tasks;
1667 
1668 	//Sim2Net network;
1669 	INetwork *net2;
1670 
1671 	//Map from machine IP -> machine disk space info
1672 	std::map<IPAddress, SimDiskSpace> diskSpaceMap;
1673 
1674 	//Whether or not yield has returned true during the current iteration of the run loop
1675 	bool yielded;
1676 	int yield_limit;  // how many more times yield may return false before next returning true
1677 };
1678 
startNewSimulator()1679 void startNewSimulator() {
1680 	ASSERT( !g_network );
1681 	g_network = g_pSimulator = new Sim2();
1682 	g_simulator.connectionFailuresDisableDuration = g_random->random01() < 0.5 ? 0 : 1e6;
1683 }
1684 
doReboot(ISimulator::ProcessInfo * p,ISimulator::KillType kt)1685 ACTOR void doReboot( ISimulator::ProcessInfo *p, ISimulator::KillType kt ) {
1686 	TraceEvent("RebootingProcessAttempt").detail("ZoneId", p->locality.zoneId()).detail("KillType", kt).detail("Process", p->toString()).detail("StartingClass", p->startingClass.toString()).detail("Failed", p->failed).detail("Excluded", p->excluded).detail("Cleared", p->cleared).detail("Rebooting", p->rebooting).detail("TaskDefaultDelay", TaskDefaultDelay);
1687 
1688 	wait( g_sim2.delay( 0, TaskDefaultDelay, p ) ); // Switch to the machine in question
1689 
1690 	try {
1691 		ASSERT( kt == ISimulator::RebootProcess || kt == ISimulator::Reboot || kt == ISimulator::RebootAndDelete || kt == ISimulator::RebootProcessAndDelete );
1692 
1693 		TEST( kt == ISimulator::RebootProcess ); // Simulated process rebooted
1694 		TEST( kt == ISimulator::Reboot ); // Simulated machine rebooted
1695 		TEST( kt == ISimulator::RebootAndDelete ); // Simulated machine rebooted with data and coordination state deletion
1696 		TEST( kt == ISimulator::RebootProcessAndDelete ); // Simulated process rebooted with data and coordination state deletion
1697 
1698 		if( p->rebooting )
1699 			return;
1700 		TraceEvent("RebootingProcess").detail("KillType", kt).detail("Address", p->address).detail("ZoneId", p->locality.zoneId()).detail("DataHall", p->locality.dataHallId()).detail("Locality", p->locality.toString()).detail("Failed", p->failed).detail("Excluded", p->excluded).detail("Cleared", p->cleared).backtrace();
1701 		p->rebooting = true;
1702 		if ((kt == ISimulator::RebootAndDelete) || (kt == ISimulator::RebootProcessAndDelete)) {
1703 			p->cleared = true;
1704 			g_simulator.clearAddress(p->address);
1705 		}
1706 		p->shutdownSignal.send( kt );
1707 	} catch (Error& e) {
1708 		TraceEvent(SevError, "RebootError").error(e);
1709 		p->shutdownSignal.sendError(e);  // ?
1710 		throw; // goes nowhere!
1711 	}
1712 }
1713 
1714 //Simulates delays for performing operations on disk
waitUntilDiskReady(Reference<DiskParameters> diskParameters,int64_t size,bool sync)1715 Future<Void> waitUntilDiskReady( Reference<DiskParameters> diskParameters, int64_t size, bool sync ) {
1716 	if(g_simulator.connectionFailuresDisableDuration > 1e4)
1717 		return delay(0.0001);
1718 
1719 	if( diskParameters->nextOperation < now() ) diskParameters->nextOperation = now();
1720 	diskParameters->nextOperation += ( 1.0 / diskParameters->iops ) + ( size / diskParameters->bandwidth );
1721 
1722 	double randomLatency;
1723 	if(sync) {
1724 		randomLatency = .005 + g_random->random01() * (BUGGIFY ? 1.0 : .010);
1725 	} else
1726 		randomLatency = 10 * g_random->random01() / diskParameters->iops;
1727 
1728 	return delayUntil( diskParameters->nextOperation + randomLatency );
1729 }
1730 
1731 #if defined(_WIN32)
1732 
1733 /* Opening with FILE_SHARE_DELETE lets simulation actually work on windows - previously renames were always failing.
1734    FIXME: Use an actual platform abstraction for this stuff!  Is there any reason we can't use underlying net2 for example? */
1735 
1736 #include <Windows.h>
1737 
sf_open(const char * filename,int flags,int convFlags,int mode)1738 int sf_open( const char* filename, int flags, int convFlags, int mode ) {
1739 	HANDLE wh = CreateFile( filename, GENERIC_READ | ((flags&IAsyncFile::OPEN_READWRITE) ? GENERIC_WRITE : 0),
1740 		FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, NULL,
1741 		(flags&IAsyncFile::OPEN_EXCLUSIVE) ? CREATE_NEW :
1742 			(flags&IAsyncFile::OPEN_CREATE) ? OPEN_ALWAYS :
1743 			OPEN_EXISTING,
1744 		FILE_ATTRIBUTE_NORMAL,
1745 		NULL );
1746 	int h = -1;
1747 	if (wh != INVALID_HANDLE_VALUE) h = _open_osfhandle( (intptr_t)wh, convFlags );
1748 	else errno = GetLastError() == ERROR_FILE_NOT_FOUND ? ENOENT : EFAULT;
1749 	return h;
1750 }
1751 
1752 #endif
1753 
1754 // Opens a file for asynchronous I/O
open(std::string filename,int64_t flags,int64_t mode)1755 Future< Reference<class IAsyncFile> > Sim2FileSystem::open( std::string filename, int64_t flags, int64_t mode )
1756 {
1757 	ASSERT( (flags & IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE) ||
1758 			!(flags & IAsyncFile::OPEN_CREATE) ||
1759 			StringRef(filename).endsWith(LiteralStringRef(".fdb-lock")) );  // We don't use "ordinary" non-atomic file creation right now except for folder locking, and we don't have code to simulate its unsafeness.
1760 
1761 	if ( (flags & IAsyncFile::OPEN_EXCLUSIVE) ) ASSERT( flags & IAsyncFile::OPEN_CREATE );
1762 
1763 	if (flags & IAsyncFile::OPEN_UNCACHED) {
1764 		auto& machineCache = g_simulator.getCurrentProcess()->machine->openFiles;
1765 		std::string actualFilename = filename;
1766 		if ( machineCache.find(filename) == machineCache.end() ) {
1767 			if(flags & IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE) {
1768 				actualFilename = filename + ".part";
1769 				auto partFile = machineCache.find(actualFilename);
1770 				if(partFile != machineCache.end()) {
1771 					Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open(partFile->second);
1772 					if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
1773 						f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
1774 					return f;
1775 				}
1776 			}
1777 			//Simulated disk parameters are shared by the AsyncFileNonDurable and the underlying SimpleFile.  This way, they can both keep up with the time to start the next operation
1778 			Reference<DiskParameters> diskParameters(new DiskParameters(FLOW_KNOBS->SIM_DISK_IOPS, FLOW_KNOBS->SIM_DISK_BANDWIDTH));
1779 			machineCache[actualFilename] = AsyncFileNonDurable::open(filename, actualFilename, SimpleFile::open(filename, flags, mode, diskParameters, false), diskParameters);
1780 		}
1781 		Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open( machineCache[actualFilename] );
1782 		if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
1783 			f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
1784 		return f;
1785 	}
1786 	else
1787 		return AsyncFileCached::open(filename, flags, mode);
1788 }
1789 
1790 // Deletes the given file.  If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure.
deleteFile(std::string filename,bool mustBeDurable)1791 Future< Void > Sim2FileSystem::deleteFile( std::string filename, bool mustBeDurable )
1792 {
1793 	return Sim2::deleteFileImpl(&g_sim2, filename, mustBeDurable);
1794 }
1795 
lastWriteTime(std::string filename)1796 Future< std::time_t > Sim2FileSystem::lastWriteTime( std::string filename ) {
1797 	// TODO: update this map upon file writes.
1798 	static std::map<std::string, double> fileWrites;
1799 	if (BUGGIFY && g_random->random01() < 0.01) {
1800 		fileWrites[filename] = now();
1801 	}
1802 	return fileWrites[filename];
1803 }
1804 
newFileSystem()1805 void Sim2FileSystem::newFileSystem()
1806 {
1807 	g_network->setGlobal(INetwork::enFileSystem, (flowGlobalType) new Sim2FileSystem());
1808 }
1809