1 /*
2 * sim2.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdbrpc/simulator.h"
22 #include "flow/IThreadPool.h"
23 #include "flow/Util.h"
24 #include "fdbrpc/IAsyncFile.h"
25 #include "fdbrpc/AsyncFileCached.actor.h"
26 #include "fdbrpc/AsyncFileNonDurable.actor.h"
27 #include "flow/Hash3.h"
28 #include "fdbrpc/TraceFileIO.h"
29 #include "flow/FaultInjection.h"
30 #include "flow/network.h"
31 #include "fdbrpc/Net2FileSystem.h"
32 #include "fdbrpc/Replication.h"
33 #include "fdbrpc/ReplicationUtils.h"
34 #include "fdbrpc/AsyncFileWriteChecker.h"
35 #include "flow/actorcompiler.h" // This must be the last #include.
36
simulator_should_inject_fault(const char * context,const char * file,int line,int error_code)37 bool simulator_should_inject_fault( const char* context, const char* file, int line, int error_code ) {
38 if (!g_network->isSimulated()) return false;
39
40 auto p = g_simulator.getCurrentProcess();
41
42 if (p->fault_injection_p2 && g_random->random01() < p->fault_injection_p2 && !g_simulator.speedUpSimulation) {
43 uint32_t
44 h1 = line + (p->fault_injection_r>>32),
45 h2 = p->fault_injection_r;
46
47 if (h1 < p->fault_injection_p1*std::numeric_limits<uint32_t>::max()) {
48 TEST(true); // A fault was injected
49 TEST(error_code == error_code_io_timeout); // An io timeout was injected
50 TEST(error_code == error_code_io_error); // An io error was injected
51 TEST(error_code == error_code_platform_error); // A platform error was injected.
52 TraceEvent(SevWarn, "FaultInjected").detail("Context", context).detail("File", file).detail("Line", line).detail("ErrorCode", error_code);
53 if(error_code == error_code_io_timeout) {
54 g_network->setGlobal(INetwork::enASIOTimedOut, (flowGlobalType)true);
55 }
56 return true;
57 }
58 }
59
60 return false;
61 }
62
displayWorkers() const63 void ISimulator::displayWorkers() const
64 {
65 std::map<std::string, std::vector<ISimulator::ProcessInfo*>> machineMap;
66
67 // Create a map of machine Id
68 for (auto processInfo : getAllProcesses()) {
69 std::string dataHall = processInfo->locality.dataHallId().present() ? processInfo->locality.dataHallId().get().printable() : "[unset]";
70 std::string machineId = processInfo->locality.machineId().present() ? processInfo->locality.machineId().get().printable() : "[unset]";
71 machineMap[format("%-8s %s", dataHall.c_str(), machineId.c_str())].push_back(processInfo);
72 }
73
74 printf("DataHall MachineId\n");
75 printf(" Address Name Class Excluded Failed Rebooting Cleared Role DataFolder\n");
76 for (auto& machineRecord : machineMap) {
77 printf("\n%s\n", machineRecord.first.c_str());
78 for (auto& processInfo : machineRecord.second) {
79 printf(" %9s %-10s%-13s%-8s %-6s %-9s %-8s %-48s %-40s\n",
80 processInfo->address.toString().c_str(), processInfo->name, processInfo->startingClass.toString().c_str(), (processInfo->isExcluded() ? "True" : "False"), (processInfo->failed ? "True" : "False"), (processInfo->rebooting ? "True" : "False"), (processInfo->isCleared() ? "True" : "False"), getRoles(processInfo->address).c_str(), processInfo->dataFolder);
81 }
82 }
83
84 return;
85 }
86
87 namespace std {
88 template<>
89 class hash<Endpoint> {
90 public:
operator ()(const Endpoint & s) const91 size_t operator()(const Endpoint &s) const
92 {
93 return hashlittle(&s, sizeof(s), 0);
94 }
95 };
96 }
97
onlyBeforeSimulatorInit()98 bool onlyBeforeSimulatorInit() {
99 return g_network->isSimulated() && g_simulator.getAllProcesses().empty();
100 }
101
102 const UID TOKEN_ENDPOINT_NOT_FOUND(-1, -1);
103 const uint64_t TOKEN_STREAM_FLAG = 1;
104
105 ISimulator* g_pSimulator = 0;
106 thread_local ISimulator::ProcessInfo* ISimulator::currentProcess = 0;
107 int openCount = 0;
108
109 struct SimClogging {
getSendDelaySimClogging110 double getSendDelay( NetworkAddress from, NetworkAddress to ) {
111 return halfLatency();
112 double tnow = now();
113 double t = tnow + halfLatency();
114
115 if (!g_simulator.speedUpSimulation && clogSendUntil.count( to.ip ))
116 t = std::max( t, clogSendUntil[ to.ip ] );
117
118 return t - tnow;
119 }
120
getRecvDelaySimClogging121 double getRecvDelay( NetworkAddress from, NetworkAddress to ) {
122 auto pair = std::make_pair( from.ip, to.ip );
123
124 double tnow = now();
125 double t = tnow + halfLatency();
126 if(!g_simulator.speedUpSimulation)
127 t += clogPairLatency[ pair ];
128
129 if (!g_simulator.speedUpSimulation && clogPairUntil.count( pair ))
130 t = std::max( t, clogPairUntil[ pair ] );
131
132 if (!g_simulator.speedUpSimulation && clogRecvUntil.count( to.ip ))
133 t = std::max( t, clogRecvUntil[ to.ip ] );
134
135 return t - tnow;
136 }
137
clogPairForSimClogging138 void clogPairFor(const IPAddress& from, const IPAddress& to, double t) {
139 auto& u = clogPairUntil[ std::make_pair( from, to ) ];
140 u = std::max(u, now() + t);
141 }
clogSendForSimClogging142 void clogSendFor(const IPAddress& from, double t) {
143 auto& u = clogSendUntil[from];
144 u = std::max(u, now() + t);
145 }
clogRecvForSimClogging146 void clogRecvFor(const IPAddress& from, double t) {
147 auto& u = clogRecvUntil[from];
148 u = std::max(u, now() + t);
149 }
setPairLatencyIfNotSetSimClogging150 double setPairLatencyIfNotSet(const IPAddress& from, const IPAddress& to, double t) {
151 auto i = clogPairLatency.find( std::make_pair(from,to) );
152 if (i == clogPairLatency.end())
153 i = clogPairLatency.insert( std::make_pair( std::make_pair(from,to), t ) ).first;
154 return i->second;
155 }
156
157 private:
158 std::map<IPAddress, double> clogSendUntil, clogRecvUntil;
159 std::map<std::pair<IPAddress, IPAddress>, double> clogPairUntil;
160 std::map<std::pair<IPAddress, IPAddress>, double> clogPairLatency;
halfLatencySimClogging161 double halfLatency() {
162 double a = g_random->random01();
163 const double pFast = 0.999;
164 if (a <= pFast) {
165 a = a / pFast;
166 return 0.5 * (FLOW_KNOBS->MIN_NETWORK_LATENCY * (1-a) + FLOW_KNOBS->FAST_NETWORK_LATENCY/pFast * a); // 0.5ms average
167 } else {
168 a = (a-pFast) / (1-pFast); // uniform 0-1 again
169 return 0.5 * (FLOW_KNOBS->MIN_NETWORK_LATENCY * (1-a) + FLOW_KNOBS->SLOW_NETWORK_LATENCY*a); // long tail up to X ms
170 }
171 }
172 };
173
174 SimClogging g_clogging;
175
176 struct Sim2Conn : IConnection, ReferenceCounted<Sim2Conn> {
Sim2ConnSim2Conn177 Sim2Conn( ISimulator::ProcessInfo* process )
178 : process(process), dbgid( g_random->randomUniqueID() ), opened(false), closedByCaller(false)
179 {
180 pipes = sender(this) && receiver(this);
181 }
182
183 // connect() is called on a pair of connections immediately after creation; logically it is part of the constructor and no other method may be called previously!
connectSim2Conn184 void connect( Reference<Sim2Conn> peer, NetworkAddress peerEndpoint ) {
185 this->peer = peer;
186 this->peerProcess = peer->process;
187 this->peerId = peer->dbgid;
188 this->peerEndpoint = peerEndpoint;
189
190 // Every one-way connection gets a random permanent latency and a random send buffer for the duration of the connection
191 auto latency = g_clogging.setPairLatencyIfNotSet( peerProcess->address.ip, process->address.ip, FLOW_KNOBS->MAX_CLOGGING_LATENCY*g_random->random01() );
192 sendBufSize = std::max<double>( g_random->randomInt(0, 5000000), 25e6 * (latency + .002) );
193 TraceEvent("Sim2Connection").detail("SendBufSize", sendBufSize).detail("Latency", latency);
194 }
195
~Sim2ConnSim2Conn196 ~Sim2Conn() {
197 ASSERT_ABORT( !opened || closedByCaller );
198 }
199
addrefSim2Conn200 virtual void addref() { ReferenceCounted<Sim2Conn>::addref(); }
delrefSim2Conn201 virtual void delref() { ReferenceCounted<Sim2Conn>::delref(); }
closeSim2Conn202 virtual void close() { closedByCaller = true; closeInternal(); }
203
onWritableSim2Conn204 virtual Future<Void> onWritable() { return whenWritable(this); }
onReadableSim2Conn205 virtual Future<Void> onReadable() { return whenReadable(this); }
206
isPeerGoneSim2Conn207 bool isPeerGone() {
208 return !peer || peerProcess->failed;
209 }
210
peerClosedSim2Conn211 void peerClosed() {
212 leakedConnectionTracker = trackLeakedConnection(this);
213 }
214
215 // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0)
216 // (or may throw an error if the connection dies)
readSim2Conn217 virtual int read( uint8_t* begin, uint8_t* end ) {
218 rollRandomClose();
219
220 int64_t avail = receivedBytes.get() - readBytes.get(); // SOMEDAY: random?
221 int toRead = std::min<int64_t>( end-begin, avail );
222 ASSERT( toRead >= 0 && toRead <= recvBuf.size() && toRead <= end-begin );
223 for(int i=0; i<toRead; i++)
224 begin[i] = recvBuf[i];
225 recvBuf.erase( recvBuf.begin(), recvBuf.begin() + toRead );
226 readBytes.set( readBytes.get() + toRead );
227 return toRead;
228 }
229
230 // Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of bytes written (might be 0)
231 // (or may throw an error if the connection dies)
writeSim2Conn232 virtual int write( SendBuffer const* buffer, int limit) {
233 rollRandomClose();
234 ASSERT(limit > 0);
235
236 int toSend = 0;
237 if (BUGGIFY) {
238 toSend = std::min(limit, buffer->bytes_written - buffer->bytes_sent);
239 } else {
240 for(auto p = buffer; p; p=p->next) {
241 toSend += p->bytes_written - p->bytes_sent;
242 if(toSend >= limit) {
243 if(toSend > limit)
244 toSend = limit;
245 break;
246 }
247 }
248 }
249 ASSERT(toSend);
250 if (BUGGIFY) toSend = std::min(toSend, g_random->randomInt(0, 1000));
251
252 if (!peer) return toSend;
253 toSend = std::min( toSend, peer->availableSendBufferForPeer() );
254 ASSERT( toSend >= 0 );
255
256 int leftToSend = toSend;
257 for(auto p = buffer; p && leftToSend>0; p=p->next) {
258 int ts = std::min(leftToSend, p->bytes_written - p->bytes_sent);
259 peer->recvBuf.insert( peer->recvBuf.end(), p->data + p->bytes_sent, p->data + p->bytes_sent + ts );
260 leftToSend -= ts;
261 }
262 ASSERT( leftToSend == 0 );
263 peer->writtenBytes.set( peer->writtenBytes.get() + toSend );
264 return toSend;
265 }
266
267 // Returns the network address and port of the other end of the connection. In the case of an incoming connection, this may not
268 // be an address we can connect to!
getPeerAddressSim2Conn269 virtual NetworkAddress getPeerAddress() { return peerEndpoint; }
getDebugIDSim2Conn270 virtual UID getDebugID() { return dbgid; }
271
272 bool opened, closedByCaller;
273
274 private:
275 ISimulator::ProcessInfo* process, *peerProcess;
276 UID dbgid, peerId;
277 NetworkAddress peerEndpoint;
278 std::deque< uint8_t > recvBuf; // Includes bytes written but not yet received!
279 AsyncVar<int64_t> readBytes, // bytes already pulled from recvBuf (location of the beginning of recvBuf)
280 receivedBytes,
281 sentBytes,
282 writtenBytes; // location of the end of recvBuf ( == recvBuf.size() + readBytes.get() )
283 Reference<Sim2Conn> peer;
284 int sendBufSize;
285
286 Future<Void> leakedConnectionTracker;
287
288 Future<Void> pipes;
289
availableSendBufferForPeerSim2Conn290 int availableSendBufferForPeer() const { return sendBufSize - (writtenBytes.get() - receivedBytes.get()); } // SOMEDAY: acknowledgedBytes instead of receivedBytes
291
closeInternalSim2Conn292 void closeInternal() {
293 if(peer) {
294 peer->peerClosed();
295 }
296 leakedConnectionTracker.cancel();
297 peer.clear();
298 }
299
senderSim2Conn300 ACTOR static Future<Void> sender( Sim2Conn* self ) {
301 loop {
302 wait( self->writtenBytes.onChange() ); // takes place on peer!
303 ASSERT( g_simulator.getCurrentProcess() == self->peerProcess );
304 wait( delay( .002 * g_random->random01() ) );
305 self->sentBytes.set( self->writtenBytes.get() ); // or possibly just some sometimes...
306 }
307 }
receiverSim2Conn308 ACTOR static Future<Void> receiver( Sim2Conn* self ) {
309 loop {
310 if (self->sentBytes.get() != self->receivedBytes.get())
311 wait( g_simulator.onProcess( self->peerProcess ) );
312 while ( self->sentBytes.get() == self->receivedBytes.get() )
313 wait( self->sentBytes.onChange() );
314 ASSERT( g_simulator.getCurrentProcess() == self->peerProcess );
315 state int64_t pos = g_random->random01() < .5 ? self->sentBytes.get() : g_random->randomInt64( self->receivedBytes.get(), self->sentBytes.get()+1 );
316 wait( delay( g_clogging.getSendDelay( self->process->address, self->peerProcess->address ) ) );
317 wait( g_simulator.onProcess( self->process ) );
318 ASSERT( g_simulator.getCurrentProcess() == self->process );
319 wait( delay( g_clogging.getRecvDelay( self->process->address, self->peerProcess->address ) ) );
320 ASSERT( g_simulator.getCurrentProcess() == self->process );
321 self->receivedBytes.set( pos );
322 wait( Future<Void>(Void()) ); // Prior notification can delete self and cancel this actor
323 ASSERT( g_simulator.getCurrentProcess() == self->process );
324 }
325 }
whenReadableSim2Conn326 ACTOR static Future<Void> whenReadable( Sim2Conn* self ) {
327 try {
328 loop {
329 if (self->readBytes.get() != self->receivedBytes.get()) {
330 ASSERT( g_simulator.getCurrentProcess() == self->process );
331 return Void();
332 }
333 wait( self->receivedBytes.onChange() );
334 self->rollRandomClose();
335 }
336 } catch (Error& e) {
337 ASSERT( g_simulator.getCurrentProcess() == self->process );
338 throw;
339 }
340 }
whenWritableSim2Conn341 ACTOR static Future<Void> whenWritable( Sim2Conn* self ) {
342 try {
343 loop {
344 if (!self->peer) return Void();
345 if (self->peer->availableSendBufferForPeer() > 0) {
346 ASSERT( g_simulator.getCurrentProcess() == self->process );
347 return Void();
348 }
349 try {
350 wait( self->peer->receivedBytes.onChange() );
351 ASSERT( g_simulator.getCurrentProcess() == self->peerProcess );
352 } catch (Error& e) {
353 if (e.code() != error_code_broken_promise) throw;
354 }
355 wait( g_simulator.onProcess( self->process ) );
356 }
357 } catch (Error& e) {
358 ASSERT( g_simulator.getCurrentProcess() == self->process );
359 throw;
360 }
361 }
362
rollRandomCloseSim2Conn363 void rollRandomClose() {
364 if (now() - g_simulator.lastConnectionFailure > g_simulator.connectionFailuresDisableDuration && g_random->random01() < .00001) {
365 g_simulator.lastConnectionFailure = now();
366 double a = g_random->random01(), b = g_random->random01();
367 TEST(true); // Simulated connection failure
368 TraceEvent("ConnectionFailure", dbgid).detail("MyAddr", process->address).detail("PeerAddr", peerProcess->address).detail("SendClosed", a > .33).detail("RecvClosed", a < .66).detail("Explicit", b < .3);
369 if (a < .66 && peer) peer->closeInternal();
370 if (a > .33) closeInternal();
371 // At the moment, we occasionally notice the connection failed immediately. In principle, this could happen but only after a delay.
372 if (b < .3)
373 throw connection_failed();
374 }
375 }
376
trackLeakedConnectionSim2Conn377 ACTOR static Future<Void> trackLeakedConnection( Sim2Conn* self ) {
378 wait( g_simulator.onProcess( self->process ) );
379 // SOMEDAY: Make this value variable? Dependent on buggification status?
380 wait( delay( 20.0 ) );
381 TraceEvent(SevError, "LeakedConnection", self->dbgid).error(connection_leaked()).detail("MyAddr", self->process->address).detail("PeerAddr", self->peerEndpoint).detail("PeerId", self->peerId).detail("Opened", self->opened);
382 return Void();
383 }
384 };
385
386 #include <fcntl.h>
387 #include <sys/stat.h>
388
389 int sf_open( const char* filename, int flags, int convFlags, int mode );
390
391 #if defined(_WIN32)
392 #include <io.h>
393
394 #elif defined(__unixish__)
395 #define _open ::open
396 #define _read ::read
397 #define _write ::write
398 #define _close ::close
399 #define _lseeki64 ::lseek
400 #define _commit ::fsync
401 #define _chsize ::ftruncate
402 #define O_BINARY 0
403
sf_open(const char * filename,int flags,int convFlags,int mode)404 int sf_open( const char* filename, int flags, int convFlags, int mode ) {
405 return _open( filename, convFlags, mode );
406 }
407
408 #else
409 #error How do i open a file on a new platform?
410 #endif
411
412 class SimpleFile : public IAsyncFile, public ReferenceCounted<SimpleFile> {
413 public:
init()414 static void init() {}
415
should_poll()416 static bool should_poll() { return false; }
417
open(std::string filename,int flags,int mode,Reference<DiskParameters> diskParameters=Reference<DiskParameters> (new DiskParameters (25000,150000000)),bool delayOnWrite=true)418 ACTOR static Future<Reference<IAsyncFile>> open( std::string filename, int flags, int mode,
419 Reference<DiskParameters> diskParameters = Reference<DiskParameters>(new DiskParameters(25000, 150000000)), bool delayOnWrite = true ) {
420 state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
421 state int currentTaskID = g_network->getCurrentTask();
422
423 if(++openCount >= 3000) {
424 TraceEvent(SevError, "TooManyFiles");
425 ASSERT(false);
426 }
427
428 if(openCount == 2000) {
429 TraceEvent(SevWarnAlways, "DisableConnectionFailures_TooManyFiles");
430 g_simulator.speedUpSimulation = true;
431 g_simulator.connectionFailuresDisableDuration = 1e6;
432 }
433
434 // Filesystems on average these days seem to start to have limits of around 255 characters for a
435 // filename. We add ".part" below, so we need to stay under 250.
436 ASSERT( basename(filename).size() < 250 );
437
438 wait( g_simulator.onMachine( currentProcess ) );
439 try {
440 wait( delay(FLOW_KNOBS->MIN_OPEN_TIME + g_random->random01() * (FLOW_KNOBS->MAX_OPEN_TIME - FLOW_KNOBS->MIN_OPEN_TIME) ) );
441
442 std::string open_filename = filename;
443 if (flags & OPEN_ATOMIC_WRITE_AND_CREATE) {
444 ASSERT( (flags & OPEN_CREATE) && (flags & OPEN_READWRITE) && !(flags & OPEN_EXCLUSIVE) );
445 open_filename = filename + ".part";
446 }
447
448 int h = sf_open( open_filename.c_str(), flags, flagConversion(flags), mode );
449 if( h == -1 ) {
450 bool notFound = errno == ENOENT;
451 Error e = notFound ? file_not_found() : io_error();
452 TraceEvent(notFound ? SevWarn : SevWarnAlways, "FileOpenError").error(e).GetLastError().detail("File", filename).detail("Flags", flags);
453 throw e;
454 }
455
456 platform::makeTemporary(open_filename.c_str());
457 SimpleFile *simpleFile = new SimpleFile( h, diskParameters, delayOnWrite, filename, open_filename, flags );
458 state Reference<IAsyncFile> file = Reference<IAsyncFile>( simpleFile );
459 wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
460 return file;
461 } catch( Error &e ) {
462 state Error err = e;
463 wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
464 throw err;
465 }
466 }
467
addref()468 virtual void addref() { ReferenceCounted<SimpleFile>::addref(); }
delref()469 virtual void delref() { ReferenceCounted<SimpleFile>::delref(); }
470
debugFD()471 virtual int64_t debugFD() { return (int64_t)h; }
472
read(void * data,int length,int64_t offset)473 virtual Future<int> read( void* data, int length, int64_t offset ) {
474 return read_impl( this, data, length, offset );
475 }
476
write(void const * data,int length,int64_t offset)477 virtual Future<Void> write( void const* data, int length, int64_t offset ) {
478 return write_impl( this, StringRef((const uint8_t*)data, length), offset );
479 }
480
truncate(int64_t size)481 virtual Future<Void> truncate( int64_t size ) {
482 return truncate_impl( this, size );
483 }
484
sync()485 virtual Future<Void> sync() {
486 return sync_impl( this );
487 }
488
size()489 virtual Future<int64_t> size() {
490 return size_impl( this );
491 }
492
getFilename()493 virtual std::string getFilename() {
494 return actualFilename;
495 }
496
~SimpleFile()497 ~SimpleFile() {
498 _close( h );
499 }
500
501 private:
502 int h;
503
504 //Performance parameters of simulated disk
505 Reference<DiskParameters> diskParameters;
506
507 std::string filename, actualFilename;
508 int flags;
509 UID dbgId;
510
511 //If true, then writes/truncates will be preceded by a delay (like other operations). If false, then they will not
512 //This is to support AsyncFileNonDurable, which issues its own delays for writes and truncates
513 bool delayOnWrite;
514
SimpleFile(int h,Reference<DiskParameters> diskParameters,bool delayOnWrite,const std::string & filename,const std::string & actualFilename,int flags)515 SimpleFile(int h, Reference<DiskParameters> diskParameters, bool delayOnWrite, const std::string& filename, const std::string& actualFilename, int flags)
516 : h(h), diskParameters(diskParameters), delayOnWrite(delayOnWrite), filename(filename), actualFilename(actualFilename), dbgId(g_random->randomUniqueID()), flags(flags) {}
517
flagConversion(int flags)518 static int flagConversion( int flags ) {
519 int outFlags = O_BINARY;
520 if( flags&OPEN_READWRITE ) outFlags |= O_RDWR;
521 if( flags&OPEN_CREATE ) outFlags |= O_CREAT;
522 if( flags&OPEN_READONLY ) outFlags |= O_RDONLY;
523 if( flags&OPEN_EXCLUSIVE ) outFlags |= O_EXCL;
524 if( flags&OPEN_ATOMIC_WRITE_AND_CREATE ) outFlags |= O_TRUNC;
525
526 return outFlags;
527 }
528
read_impl(SimpleFile * self,void * data,int length,int64_t offset)529 ACTOR static Future<int> read_impl( SimpleFile* self, void* data, int length, int64_t offset ) {
530 ASSERT( ( self->flags & IAsyncFile::OPEN_NO_AIO ) != 0 ||
531 ( (uintptr_t)data % 4096 == 0 && length % 4096 == 0 && offset % 4096 == 0 ) ); // Required by KAIO.
532 state UID opId = g_random->randomUniqueID();
533 if (randLog)
534 fprintf( randLog, "SFR1 %s %s %s %d %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), length, offset );
535
536 wait( waitUntilDiskReady( self->diskParameters, length ) );
537
538 if( _lseeki64( self->h, offset, SEEK_SET ) == -1 ) {
539 TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 1);
540 throw io_error();
541 }
542
543 unsigned int read_bytes = 0;
544 if( ( read_bytes = _read( self->h, data, (unsigned int) length ) ) == -1 ) {
545 TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 2);
546 throw io_error();
547 }
548
549 if (randLog) {
550 uint32_t a=0, b=0;
551 hashlittle2( data, read_bytes, &a, &b );
552 fprintf( randLog, "SFR2 %s %s %s %d %d\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), read_bytes, a );
553 }
554
555 debugFileCheck("SimpleFileRead", self->filename, data, offset, length);
556
557 INJECT_FAULT(io_timeout, "SimpleFile::read");
558 INJECT_FAULT(io_error, "SimpleFile::read");
559
560 return read_bytes;
561 }
562
write_impl(SimpleFile * self,StringRef data,int64_t offset)563 ACTOR static Future<Void> write_impl( SimpleFile* self, StringRef data, int64_t offset ) {
564 state UID opId = g_random->randomUniqueID();
565 if (randLog) {
566 uint32_t a=0, b=0;
567 hashlittle2( data.begin(), data.size(), &a, &b );
568 fprintf( randLog, "SFW1 %s %s %s %d %d %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), a, data.size(), offset );
569 }
570
571 if(self->delayOnWrite)
572 wait( waitUntilDiskReady( self->diskParameters, data.size() ) );
573
574 if( _lseeki64( self->h, offset, SEEK_SET ) == -1 ) {
575 TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 3);
576 throw io_error();
577 }
578
579 unsigned int write_bytes = 0;
580 if ( ( write_bytes = _write( self->h, (void*)data.begin(), data.size() ) ) == -1 ) {
581 TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 4);
582 throw io_error();
583 }
584
585 if ( write_bytes != data.size() ) {
586 TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 5);
587 throw io_error();
588 }
589
590 if (randLog) {
591 fprintf( randLog, "SFW2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
592 }
593
594 debugFileCheck("SimpleFileWrite", self->filename, (void*)data.begin(), offset, data.size());
595
596 INJECT_FAULT(io_timeout, "SimpleFile::write");
597 INJECT_FAULT(io_error, "SimpleFile::write");
598
599 return Void();
600 }
601
truncate_impl(SimpleFile * self,int64_t size)602 ACTOR static Future<Void> truncate_impl( SimpleFile* self, int64_t size ) {
603 state UID opId = g_random->randomUniqueID();
604 if (randLog)
605 fprintf( randLog, "SFT1 %s %s %s %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), size );
606
607 if(self->delayOnWrite)
608 wait( waitUntilDiskReady( self->diskParameters, 0 ) );
609
610 if( _chsize( self->h, (long) size ) == -1 ) {
611 TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 6);
612 throw io_error();
613 }
614
615 if (randLog)
616 fprintf( randLog, "SFT2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
617
618 INJECT_FAULT( io_timeout, "SimpleFile::truncate" );
619 INJECT_FAULT( io_error, "SimpleFile::truncate" );
620
621 return Void();
622 }
623
sync_impl(SimpleFile * self)624 ACTOR static Future<Void> sync_impl( SimpleFile* self ) {
625 state UID opId = g_random->randomUniqueID();
626 if (randLog)
627 fprintf( randLog, "SFC1 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
628
629 if(self->delayOnWrite)
630 wait( waitUntilDiskReady( self->diskParameters, 0, true ) );
631
632 if (self->flags & OPEN_ATOMIC_WRITE_AND_CREATE) {
633 self->flags &= ~OPEN_ATOMIC_WRITE_AND_CREATE;
634 auto& machineCache = g_simulator.getCurrentProcess()->machine->openFiles;
635 std::string sourceFilename = self->filename + ".part";
636
637 if(machineCache.count(sourceFilename)) {
638 TraceEvent("SimpleFileRename").detail("From", sourceFilename).detail("To", self->filename).detail("SourceCount", machineCache.count(sourceFilename)).detail("FileCount", machineCache.count(self->filename));
639 renameFile( sourceFilename.c_str(), self->filename.c_str() );
640
641 ASSERT(!machineCache.count(self->filename));
642 machineCache[self->filename] = machineCache[sourceFilename];
643 machineCache.erase(sourceFilename);
644 self->actualFilename = self->filename;
645 }
646 }
647
648 if (randLog)
649 fprintf( randLog, "SFC2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
650
651 INJECT_FAULT( io_timeout, "SimpleFile::sync" );
652 INJECT_FAULT( io_error, "SimpleFile::sync" );
653
654 return Void();
655 }
656
size_impl(SimpleFile * self)657 ACTOR static Future<int64_t> size_impl( SimpleFile* self ) {
658 state UID opId = g_random->randomUniqueID();
659 if (randLog)
660 fprintf(randLog, "SFS1 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
661
662 wait( waitUntilDiskReady( self->diskParameters, 0 ) );
663
664 int64_t pos = _lseeki64( self->h, 0L, SEEK_END );
665 if( pos == -1 ) {
666 TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 8);
667 throw io_error();
668 }
669
670 if (randLog)
671 fprintf(randLog, "SFS2 %s %s %s %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), pos);
672 INJECT_FAULT( io_error, "SimpleFile::size" );
673
674 return pos;
675 }
676 };
677
678 struct SimDiskSpace {
679 int64_t totalSpace;
680 int64_t baseFreeSpace; //The original free space of the disk + deltas from simulated external modifications
681 double lastUpdate;
682 };
683
684 void doReboot( ISimulator::ProcessInfo* const& p, ISimulator::KillType const& kt );
685
686 struct Sim2Listener : IListener, ReferenceCounted<Sim2Listener> {
Sim2ListenerSim2Listener687 explicit Sim2Listener( ISimulator::ProcessInfo* process, const NetworkAddress& listenAddr )
688 : process(process),
689 address(listenAddr) {}
690
incomingConnectionSim2Listener691 void incomingConnection( double seconds, Reference<IConnection> conn ) { // Called by another process!
692 incoming( Reference<Sim2Listener>::addRef( this ), seconds, conn );
693 }
694
addrefSim2Listener695 virtual void addref() { ReferenceCounted<Sim2Listener>::addref(); }
delrefSim2Listener696 virtual void delref() { ReferenceCounted<Sim2Listener>::delref(); }
697
acceptSim2Listener698 virtual Future<Reference<IConnection>> accept() {
699 return popOne( nextConnection.getFuture() );
700 }
701
getListenAddressSim2Listener702 virtual NetworkAddress getListenAddress() { return address; }
703
704 private:
705 ISimulator::ProcessInfo* process;
706 PromiseStream< Reference<IConnection> > nextConnection;
707
incomingSim2Listener708 ACTOR static void incoming( Reference<Sim2Listener> self, double seconds, Reference<IConnection> conn ) {
709 wait( g_simulator.onProcess(self->process) );
710 wait( delay( seconds ) );
711 if (((Sim2Conn*)conn.getPtr())->isPeerGone() && g_random->random01()<0.5)
712 return;
713 TraceEvent("Sim2IncomingConn", conn->getDebugID())
714 .detail("ListenAddress", self->getListenAddress())
715 .detail("PeerAddress", conn->getPeerAddress());
716 self->nextConnection.send( conn );
717 }
popOneSim2Listener718 ACTOR static Future<Reference<IConnection>> popOne( FutureStream< Reference<IConnection> > conns ) {
719 Reference<IConnection> c = waitNext( conns );
720 ((Sim2Conn*)c.getPtr())->opened = true;
721 return c;
722 }
723
724 NetworkAddress address;
725 };
726
727 #define g_sim2 ((Sim2&)g_simulator)
728
729 class Sim2 : public ISimulator, public INetworkConnections {
730 public:
731 // Implement INetwork interface
732 // Everything actually network related is delegated to the Sim2Net class; Sim2 is only concerned with simulating machines and time
now()733 virtual double now() { return time; }
734
delay(double seconds,int taskID)735 virtual Future<class Void> delay( double seconds, int taskID ) {
736 ASSERT(taskID >= TaskMinPriority && taskID <= TaskMaxPriority);
737 return delay( seconds, taskID, currentProcess );
738 }
delay(double seconds,int taskID,ProcessInfo * machine)739 Future<class Void> delay( double seconds, int taskID, ProcessInfo* machine ) {
740 ASSERT( seconds >= -0.0001 );
741 seconds = std::max(0.0, seconds);
742 Future<Void> f;
743
744 if(!currentProcess->rebooting && machine == currentProcess && !currentProcess->shutdownSignal.isSet() && FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 && g_random->random01() < 0.25) { //FIXME: why doesnt this work when we are changing machines?
745 seconds += FLOW_KNOBS->MAX_BUGGIFIED_DELAY*pow(g_random->random01(),1000.0);
746 }
747
748 mutex.enter();
749 tasks.push( Task( time + seconds, taskID, taskCount++, machine, f ) );
750 mutex.leave();
751
752 return f;
753 }
checkShutdown(Sim2 * self,int taskID)754 ACTOR static Future<Void> checkShutdown(Sim2 *self, int taskID) {
755 ISimulator::KillType kt = wait( self->getCurrentProcess()->shutdownSignal.getFuture() );
756 self->setCurrentTask(taskID);
757 return Void();
758 }
yield(int taskID)759 virtual Future<class Void> yield( int taskID ) {
760 if (taskID == TaskDefaultYield) taskID = currentTaskID;
761 if (check_yield(taskID)) {
762 // We want to check that yielders can handle actual time elapsing (it sometimes will outside simulation), but
763 // don't want to prevent instantaneous shutdown of "rebooted" machines.
764 return delay(getCurrentProcess()->rebooting ? 0 : .001,taskID) || checkShutdown(this, taskID);
765 }
766 setCurrentTask(taskID);
767 return Void();
768 }
check_yield(int taskID)769 virtual bool check_yield( int taskID ) {
770 if (yielded) return true;
771 if (--yield_limit <= 0) {
772 yield_limit = g_random->randomInt(1, 150); // If yield returns false *too* many times in a row, there could be a stack overflow, since we can't deterministically check stack size as the real network does
773 return yielded = true;
774 }
775 return yielded = BUGGIFY_WITH_PROB(0.01);
776 }
getCurrentTask()777 virtual int getCurrentTask() {
778 return currentTaskID;
779 }
setCurrentTask(int taskID)780 virtual void setCurrentTask(int taskID ) {
781 currentTaskID = taskID;
782 }
783 // Sets the taskID/priority of the current task, without yielding
connect(NetworkAddress toAddr,std::string host)784 virtual Future<Reference<IConnection>> connect( NetworkAddress toAddr, std::string host ) {
785 ASSERT( !toAddr.isTLS() && host.empty());
786 if (!addressMap.count( toAddr )) {
787 return waitForProcessAndConnect( toAddr, this );
788 }
789 auto peerp = getProcessByAddress(toAddr);
790 Reference<Sim2Conn> myc( new Sim2Conn( getCurrentProcess() ) );
791 Reference<Sim2Conn> peerc( new Sim2Conn( peerp ) );
792
793 myc->connect(peerc, toAddr);
794 IPAddress localIp;
795 if (getCurrentProcess()->address.ip.isV6()) {
796 IPAddress::IPAddressStore store = getCurrentProcess()->address.ip.toV6();
797 uint16_t* ipParts = (uint16_t*)store.data();
798 ipParts[7] += g_random->randomInt(0, 256);
799 localIp = IPAddress(store);
800 } else {
801 localIp = IPAddress(getCurrentProcess()->address.ip.toV4() + g_random->randomInt(0, 256));
802 }
803 peerc->connect(myc, NetworkAddress(localIp, g_random->randomInt(40000, 60000)));
804
805 ((Sim2Listener*)peerp->getListener(toAddr).getPtr())->incomingConnection( 0.5*g_random->random01(), Reference<IConnection>(peerc) );
806 return onConnect( ::delay(0.5*g_random->random01()), myc );
807 }
resolveTCPEndpoint(std::string host,std::string service)808 virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint( std::string host, std::string service) {
809 throw lookup_failed();
810 }
onConnect(Future<Void> ready,Reference<Sim2Conn> conn)811 ACTOR static Future<Reference<IConnection>> onConnect( Future<Void> ready, Reference<Sim2Conn> conn ) {
812 wait(ready);
813 if (conn->isPeerGone() && g_random->random01()<0.5) {
814 conn.clear();
815 wait(Never());
816 }
817 conn->opened = true;
818 return conn;
819 }
listen(NetworkAddress localAddr)820 virtual Reference<IListener> listen( NetworkAddress localAddr ) {
821 ASSERT( !localAddr.isTLS() );
822 Reference<IListener> listener( getCurrentProcess()->getListener(localAddr) );
823 ASSERT(listener);
824 return listener;
825 }
waitForProcessAndConnect(NetworkAddress toAddr,INetworkConnections * self)826 ACTOR static Future<Reference<IConnection>> waitForProcessAndConnect(
827 NetworkAddress toAddr, INetworkConnections *self ) {
828 // We have to be able to connect to processes that don't yet exist, so we do some silly polling
829 loop {
830 wait( ::delay( 0.1 * g_random->random01() ) );
831 if (g_sim2.addressMap.count(toAddr)) {
832 Reference<IConnection> c = wait( self->connect( toAddr ) );
833 return c;
834 }
835 }
836 }
837
stop()838 virtual void stop() { isStopped = true; }
isSimulated() const839 virtual bool isSimulated() const { return true; }
840
841 struct SimThreadArgs {
842 THREAD_FUNC_RETURN (*func) (void*);
843 void *arg;
844
845 ISimulator::ProcessInfo *currentProcess;
846
SimThreadArgsSim2::SimThreadArgs847 SimThreadArgs(THREAD_FUNC_RETURN (*func) (void*), void *arg) : func(func), arg(arg) {
848 ASSERT(g_network->isSimulated());
849 currentProcess = g_simulator.getCurrentProcess();
850 }
851 };
852
853 //Starts a new thread, making sure to set any thread local state
simStartThread(void * arg)854 THREAD_FUNC simStartThread(void *arg) {
855 SimThreadArgs *simArgs = (SimThreadArgs*)arg;
856 ISimulator::currentProcess = simArgs->currentProcess;
857 simArgs->func(simArgs->arg);
858
859 delete simArgs;
860 THREAD_RETURN;
861 }
862
startThread(THREAD_FUNC_RETURN (* func)(void *),void * arg)863 virtual THREAD_HANDLE startThread( THREAD_FUNC_RETURN (*func) (void*), void *arg ) {
864 SimThreadArgs *simArgs = new SimThreadArgs(func, arg);
865 return ::startThread(simStartThread, simArgs);
866 }
867
getDiskBytes(std::string const & directory,int64_t & free,int64_t & total)868 virtual void getDiskBytes( std::string const& directory, int64_t& free, int64_t& total) {
869 ProcessInfo *proc = getCurrentProcess();
870 SimDiskSpace &diskSpace = diskSpaceMap[proc->address.ip];
871
872 int64_t totalFileSize = 0;
873 int numFiles = 0;
874
875 //Get the size of all files we've created on the server and subtract them from the free space
876 for(auto file = proc->machine->openFiles.begin(); file != proc->machine->openFiles.end(); ++file) {
877 if( file->second.isReady() ) {
878 totalFileSize += ((AsyncFileNonDurable*)file->second.get().getPtr())->approximateSize;
879 }
880 numFiles++;
881 }
882
883 bool ok = false;
884
885 if(diskSpace.totalSpace == 0) {
886 diskSpace.totalSpace = 5e9 + g_random->random01() * 100e9; //Total space between 5GB and 105GB
887 diskSpace.baseFreeSpace = std::min<int64_t>(diskSpace.totalSpace, std::max(5e9, (g_random->random01() * (1 - .075) + .075) * diskSpace.totalSpace) + totalFileSize); //Minimum 5GB or 7.5% total disk space, whichever is higher
888
889 TraceEvent("Sim2DiskSpaceInitialization").detail("TotalSpace", diskSpace.totalSpace).detail("BaseFreeSpace", diskSpace.baseFreeSpace).detail("TotalFileSize", totalFileSize).detail("NumFiles", numFiles);
890 }
891 else {
892 int64_t maxDelta = std::min(5.0, (now() - diskSpace.lastUpdate)) * (BUGGIFY ? 10e6 : 1e6); //External processes modifying the disk
893 int64_t delta = -maxDelta + g_random->random01() * maxDelta * 2;
894 diskSpace.baseFreeSpace = std::min<int64_t>(diskSpace.totalSpace, std::max<int64_t>(diskSpace.baseFreeSpace + delta, totalFileSize));
895 }
896
897 diskSpace.lastUpdate = now();
898
899 total = diskSpace.totalSpace;
900 free = std::max<int64_t>(0, diskSpace.baseFreeSpace - totalFileSize);
901
902 if(free == 0)
903 TraceEvent(SevWarnAlways, "Sim2NoFreeSpace").detail("TotalSpace", diskSpace.totalSpace).detail("BaseFreeSpace", diskSpace.baseFreeSpace).detail("TotalFileSize", totalFileSize).detail("NumFiles", numFiles);
904 }
isAddressOnThisHost(NetworkAddress const & addr)905 virtual bool isAddressOnThisHost( NetworkAddress const& addr ) {
906 return addr.ip == getCurrentProcess()->address.ip;
907 }
908
deleteFileImpl(Sim2 * self,std::string filename,bool mustBeDurable)909 ACTOR static Future<Void> deleteFileImpl( Sim2* self, std::string filename, bool mustBeDurable ) {
910 // This is a _rudimentary_ simulation of the untrustworthiness of non-durable deletes and the possibility of
911 // rebooting during a durable one. It isn't perfect: for example, on real filesystems testing
912 // for the existence of a non-durably deleted file BEFORE a reboot will show that it apparently doesn't exist.
913 if(g_simulator.getCurrentProcess()->machine->openFiles.count(filename)) {
914 g_simulator.getCurrentProcess()->machine->openFiles.erase(filename);
915 g_simulator.getCurrentProcess()->machine->deletingFiles.insert(filename);
916 }
917 if ( mustBeDurable || g_random->random01() < 0.5 ) {
918 state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
919 state int currentTaskID = g_network->getCurrentTask();
920 wait( g_simulator.onMachine( currentProcess ) );
921 try {
922 wait( ::delay(0.05 * g_random->random01()) );
923 if (!currentProcess->rebooting) {
924 auto f = IAsyncFileSystem::filesystem(self->net2)->deleteFile(filename, false);
925 ASSERT( f.isReady() );
926 wait( ::delay(0.05 * g_random->random01()) );
927 TEST( true ); // Simulated durable delete
928 }
929 wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
930 return Void();
931 } catch( Error &e ) {
932 state Error err = e;
933 wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
934 throw err;
935 }
936 } else {
937 TEST( true ); // Simulated non-durable delete
938 return Void();
939 }
940 }
941
runLoop(Sim2 * self)942 ACTOR static Future<Void> runLoop(Sim2 *self) {
943 state ISimulator::ProcessInfo *callingMachine = self->currentProcess;
944 while ( !self->isStopped ) {
945 wait( self->net2->yield(TaskDefaultYield) );
946
947 self->mutex.enter();
948 if( self->tasks.size() == 0 ) {
949 self->mutex.leave();
950 ASSERT(false);
951 }
952 //if (!randLog/* && now() >= 32.0*/)
953 // randLog = fopen("randLog.txt", "wt");
954 Task t = std::move( self->tasks.top() ); // Unfortunately still a copy under gcc where .top() returns const&
955 self->currentTaskID = t.taskID;
956 self->tasks.pop();
957 self->mutex.leave();
958
959 self->execTask(t);
960 self->yielded = false;
961 }
962 self->currentProcess = callingMachine;
963 self->net2->stop();
964 return Void();
965 }
966
_run(Sim2 * self)967 ACTOR Future<Void> _run(Sim2 *self) {
968 Future<Void> loopFuture = self->runLoop(self);
969 self->net2->run();
970 wait( loopFuture );
971 return Void();
972 }
973
974 // Implement ISimulator interface
run()975 virtual void run() {
976 _run(this);
977 }
newProcess(const char * name,IPAddress ip,uint16_t port,uint16_t listenPerProcess,LocalityData locality,ProcessClass startingClass,const char * dataFolder,const char * coordinationFolder)978 virtual ProcessInfo* newProcess(const char* name, IPAddress ip, uint16_t port, uint16_t listenPerProcess,
979 LocalityData locality, ProcessClass startingClass, const char* dataFolder,
980 const char* coordinationFolder) {
981 ASSERT( locality.machineId().present() );
982 MachineInfo& machine = machines[ locality.machineId().get() ];
983 if (!machine.machineId.present())
984 machine.machineId = locality.machineId();
985 for( int i = 0; i < machine.processes.size(); i++ ) {
986 if( machine.processes[i]->locality.machineId() != locality.machineId() ) { // SOMEDAY: compute ip from locality to avoid this check
987 TraceEvent("Sim2Mismatch")
988 .detail("IP", format("%s", ip.toString().c_str()))
989 .detail("MachineId", locality.machineId())
990 .detail("NewName", name)
991 .detail("ExistingMachineId", machine.processes[i]->locality.machineId())
992 .detail("ExistingName", machine.processes[i]->name);
993 ASSERT( false );
994 }
995 ASSERT( machine.processes[i]->address.port != port );
996 }
997
998 // This is for async operations on non-durable files.
999 // These files must live on after process kills for sim purposes.
1000 if( machine.machineProcess == 0 ) {
1001 NetworkAddress machineAddress(ip, 0, false, false);
1002 machine.machineProcess = new ProcessInfo("Machine", locality, startingClass, {machineAddress}, this, "", "");
1003 machine.machineProcess->machine = &machine;
1004 }
1005
1006 NetworkAddressList addresses;
1007 addresses.address = NetworkAddress(ip, port, true, false);
1008 if(listenPerProcess == 2) {
1009 addresses.secondaryAddress = NetworkAddress(ip, port+1, true, false);
1010 }
1011
1012 ProcessInfo* m = new ProcessInfo(name, locality, startingClass, addresses, this, dataFolder, coordinationFolder);
1013 for (int processPort = port; processPort < port + listenPerProcess; ++processPort) {
1014 NetworkAddress address(ip, processPort, true, false); // SOMEDAY see above about becoming SSL!
1015 m->listenerMap[address] = Reference<IListener>( new Sim2Listener(m, address) );
1016 addressMap[address] = m;
1017 }
1018 m->machine = &machine;
1019 machine.processes.push_back(m);
1020 currentlyRebootingProcesses.erase(addresses.address);
1021 m->excluded = g_simulator.isExcluded(addresses.address);
1022 m->cleared = g_simulator.isCleared(addresses.address);
1023
1024 m->setGlobal(enTDMetrics, (flowGlobalType) &m->tdmetrics);
1025 m->setGlobal(enNetworkConnections, (flowGlobalType) m->network);
1026 m->setGlobal(enASIOTimedOut, (flowGlobalType) false);
1027
1028 TraceEvent("NewMachine").detail("Name", name).detail("Address", m->address).detail("MachineId", m->locality.machineId()).detail("Excluded", m->excluded).detail("Cleared", m->cleared);
1029
1030 // FIXME: Sometimes, connections to/from this process will explicitly close
1031
1032 return m;
1033 }
isAvailable() const1034 virtual bool isAvailable() const
1035 {
1036 std::vector<ProcessInfo*> processesLeft, processesDead;
1037 for (auto processInfo : getAllProcesses()) {
1038 if (processInfo->isAvailableClass()) {
1039 if (processInfo->isExcluded() || processInfo->isCleared() || !processInfo->isAvailable()) {
1040 processesDead.push_back(processInfo);
1041 } else {
1042 processesLeft.push_back(processInfo);
1043 }
1044 }
1045 }
1046 return canKillProcesses(processesLeft, processesDead, KillInstantly, NULL);
1047 }
1048
datacenterDead(Optional<Standalone<StringRef>> dcId) const1049 virtual bool datacenterDead(Optional<Standalone<StringRef>> dcId) const
1050 {
1051 if(!dcId.present()) {
1052 return false;
1053 }
1054
1055 LocalityGroup primaryProcessesLeft, primaryProcessesDead;
1056 std::vector<LocalityData> primaryLocalitiesDead, primaryLocalitiesLeft;
1057
1058 for (auto processInfo : getAllProcesses()) {
1059 if (processInfo->isAvailableClass() && processInfo->locality.dcId() == dcId) {
1060 if (processInfo->isExcluded() || processInfo->isCleared() || !processInfo->isAvailable()) {
1061 primaryProcessesDead.add(processInfo->locality);
1062 primaryLocalitiesDead.push_back(processInfo->locality);
1063 } else {
1064 primaryProcessesLeft.add(processInfo->locality);
1065 primaryLocalitiesLeft.push_back(processInfo->locality);
1066 }
1067 }
1068 }
1069
1070 std::vector<LocalityData> badCombo;
1071 bool primaryTLogsDead = tLogWriteAntiQuorum ? !validateAllCombinations(badCombo, primaryProcessesDead, tLogPolicy, primaryLocalitiesLeft, tLogWriteAntiQuorum, false) : primaryProcessesDead.validate(tLogPolicy);
1072 if(usableRegions > 1 && remoteTLogPolicy && !primaryTLogsDead) {
1073 primaryTLogsDead = primaryProcessesDead.validate(remoteTLogPolicy);
1074 }
1075
1076 return primaryTLogsDead || primaryProcessesDead.validate(storagePolicy);
1077 }
1078
1079 // The following function will determine if the specified configuration of available and dead processes can allow the cluster to survive
canKillProcesses(std::vector<ProcessInfo * > const & availableProcesses,std::vector<ProcessInfo * > const & deadProcesses,KillType kt,KillType * newKillType) const1080 virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) const
1081 {
1082 bool canSurvive = true;
1083 int nQuorum = ((desiredCoordinators+1)/2)*2-1;
1084
1085 KillType newKt = kt;
1086 if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete))
1087 {
1088 LocalityGroup primaryProcessesLeft, primaryProcessesDead;
1089 LocalityGroup primarySatelliteProcessesLeft, primarySatelliteProcessesDead;
1090 LocalityGroup remoteProcessesLeft, remoteProcessesDead;
1091 LocalityGroup remoteSatelliteProcessesLeft, remoteSatelliteProcessesDead;
1092
1093 std::vector<LocalityData> primaryLocalitiesDead, primaryLocalitiesLeft;
1094 std::vector<LocalityData> primarySatelliteLocalitiesDead, primarySatelliteLocalitiesLeft;
1095 std::vector<LocalityData> remoteLocalitiesDead, remoteLocalitiesLeft;
1096 std::vector<LocalityData> remoteSatelliteLocalitiesDead, remoteSatelliteLocalitiesLeft;
1097
1098 std::vector<LocalityData> badCombo;
1099 std::set<Optional<Standalone<StringRef>>> uniqueMachines;
1100
1101 if(!primaryDcId.present()) {
1102 for (auto processInfo : availableProcesses) {
1103 primaryProcessesLeft.add(processInfo->locality);
1104 primaryLocalitiesLeft.push_back(processInfo->locality);
1105 uniqueMachines.insert(processInfo->locality.zoneId());
1106 }
1107 for (auto processInfo : deadProcesses) {
1108 primaryProcessesDead.add(processInfo->locality);
1109 primaryLocalitiesDead.push_back(processInfo->locality);
1110 }
1111 } else {
1112 for (auto processInfo : availableProcesses) {
1113 uniqueMachines.insert(processInfo->locality.zoneId());
1114 if(processInfo->locality.dcId() == primaryDcId) {
1115 primaryProcessesLeft.add(processInfo->locality);
1116 primaryLocalitiesLeft.push_back(processInfo->locality);
1117 } else if(processInfo->locality.dcId() == remoteDcId) {
1118 remoteProcessesLeft.add(processInfo->locality);
1119 remoteLocalitiesLeft.push_back(processInfo->locality);
1120 } else if(std::find(primarySatelliteDcIds.begin(), primarySatelliteDcIds.end(), processInfo->locality.dcId()) != primarySatelliteDcIds.end()) {
1121 primarySatelliteProcessesLeft.add(processInfo->locality);
1122 primarySatelliteLocalitiesLeft.push_back(processInfo->locality);
1123 } else if(std::find(remoteSatelliteDcIds.begin(), remoteSatelliteDcIds.end(), processInfo->locality.dcId()) != remoteSatelliteDcIds.end()) {
1124 remoteSatelliteProcessesLeft.add(processInfo->locality);
1125 remoteSatelliteLocalitiesLeft.push_back(processInfo->locality);
1126 }
1127 }
1128 for (auto processInfo : deadProcesses) {
1129 if(processInfo->locality.dcId() == primaryDcId) {
1130 primaryProcessesDead.add(processInfo->locality);
1131 primaryLocalitiesDead.push_back(processInfo->locality);
1132 } else if(processInfo->locality.dcId() == remoteDcId) {
1133 remoteProcessesDead.add(processInfo->locality);
1134 remoteLocalitiesDead.push_back(processInfo->locality);
1135 } else if(std::find(primarySatelliteDcIds.begin(), primarySatelliteDcIds.end(), processInfo->locality.dcId()) != primarySatelliteDcIds.end()) {
1136 primarySatelliteProcessesDead.add(processInfo->locality);
1137 primarySatelliteLocalitiesDead.push_back(processInfo->locality);
1138 } else if(std::find(remoteSatelliteDcIds.begin(), remoteSatelliteDcIds.end(), processInfo->locality.dcId()) != remoteSatelliteDcIds.end()) {
1139 remoteSatelliteProcessesDead.add(processInfo->locality);
1140 remoteSatelliteLocalitiesDead.push_back(processInfo->locality);
1141 }
1142 }
1143 }
1144
1145 bool tooManyDead = false;
1146 bool notEnoughLeft = false;
1147 bool primaryTLogsDead = tLogWriteAntiQuorum ? !validateAllCombinations(badCombo, primaryProcessesDead, tLogPolicy, primaryLocalitiesLeft, tLogWriteAntiQuorum, false) : primaryProcessesDead.validate(tLogPolicy);
1148 if(usableRegions > 1 && remoteTLogPolicy && !primaryTLogsDead) {
1149 primaryTLogsDead = primaryProcessesDead.validate(remoteTLogPolicy);
1150 }
1151
1152 if(!primaryDcId.present()) {
1153 tooManyDead = primaryTLogsDead || primaryProcessesDead.validate(storagePolicy);
1154 notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy);
1155 } else {
1156 bool remoteTLogsDead = tLogWriteAntiQuorum ? !validateAllCombinations(badCombo, remoteProcessesDead, tLogPolicy, remoteLocalitiesLeft, tLogWriteAntiQuorum, false) : remoteProcessesDead.validate(tLogPolicy);
1157 if(usableRegions > 1 && remoteTLogPolicy && !remoteTLogsDead) {
1158 remoteTLogsDead = remoteProcessesDead.validate(remoteTLogPolicy);
1159 }
1160
1161 if(!hasSatelliteReplication) {
1162 if(usableRegions > 1) {
1163 tooManyDead = primaryTLogsDead || remoteTLogsDead || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) );
1164 notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(remoteTLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(remoteTLogPolicy) || !remoteProcessesLeft.validate(storagePolicy);
1165 } else {
1166 tooManyDead = primaryTLogsDead || remoteTLogsDead || primaryProcessesDead.validate(storagePolicy) || remoteProcessesDead.validate(storagePolicy);
1167 notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy);
1168 }
1169 } else {
1170 bool primarySatelliteTLogsDead = satelliteTLogWriteAntiQuorumFallback ? !validateAllCombinations(badCombo, primarySatelliteProcessesDead, satelliteTLogPolicyFallback, primarySatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorumFallback, false) : primarySatelliteProcessesDead.validate(satelliteTLogPolicyFallback);
1171 bool remoteSatelliteTLogsDead = satelliteTLogWriteAntiQuorumFallback ? !validateAllCombinations(badCombo, remoteSatelliteProcessesDead, satelliteTLogPolicyFallback, remoteSatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorumFallback, false) : remoteSatelliteProcessesDead.validate(satelliteTLogPolicyFallback);
1172
1173 if(usableRegions > 1) {
1174 notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(remoteTLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(remoteTLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy);
1175 } else {
1176 notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy);
1177 }
1178
1179 if(usableRegions > 1 && allowLogSetKills) {
1180 tooManyDead = ( primaryTLogsDead && primarySatelliteTLogsDead ) || ( remoteTLogsDead && remoteSatelliteTLogsDead ) || ( primaryTLogsDead && remoteTLogsDead ) || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) );
1181 } else {
1182 tooManyDead = primaryTLogsDead || remoteTLogsDead || primaryProcessesDead.validate(storagePolicy) || remoteProcessesDead.validate(storagePolicy);
1183 }
1184 }
1185 }
1186
1187 // Reboot if dead machines do fulfill policies
1188 if (tooManyDead) {
1189 newKt = Reboot;
1190 canSurvive = false;
1191 TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("TLogPolicy", tLogPolicy->info()).detail("Reason", "tLogPolicy validates against dead processes.");
1192 }
1193 // Reboot and Delete if remaining machines do NOT fulfill policies
1194 else if ((kt < RebootAndDelete) && notEnoughLeft) {
1195 newKt = RebootAndDelete;
1196 canSurvive = false;
1197 TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("TLogPolicy", tLogPolicy->info()).detail("Reason", "tLogPolicy does not validates against remaining processes.");
1198 }
1199 else if ((kt < RebootAndDelete) && (nQuorum > uniqueMachines.size())) {
1200 newKt = RebootAndDelete;
1201 canSurvive = false;
1202 TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("StoragePolicy", storagePolicy->info()).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("Reason", "Not enough unique machines to perform auto configuration of coordinators.");
1203 }
1204 else {
1205 TraceEvent("CanSurviveKills").detail("KillType", kt).detail("TLogPolicy", tLogPolicy->info()).detail("StoragePolicy", storagePolicy->info()).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size());
1206 }
1207 }
1208 if (newKillType) *newKillType = newKt;
1209 return canSurvive;
1210 }
1211
destroyProcess(ISimulator::ProcessInfo * p)1212 virtual void destroyProcess( ISimulator::ProcessInfo *p ) {
1213 TraceEvent("ProcessDestroyed").detail("Name", p->name).detail("Address", p->address).detail("MachineId", p->locality.machineId());
1214 currentlyRebootingProcesses.insert(std::pair<NetworkAddress, ProcessInfo*>(p->address, p));
1215 std::vector<ProcessInfo*>& processes = machines[ p->locality.machineId().get() ].processes;
1216 if( p != processes.back() ) {
1217 auto it = std::find( processes.begin(), processes.end(), p );
1218 std::swap( *it, processes.back() );
1219 }
1220 processes.pop_back();
1221 killProcess_internal( p, KillInstantly );
1222 }
killProcess_internal(ProcessInfo * machine,KillType kt)1223 void killProcess_internal( ProcessInfo* machine, KillType kt ) {
1224 TEST( true ); // Simulated machine was killed with any kill type
1225 TEST( kt == KillInstantly ); // Simulated machine was killed instantly
1226 TEST( kt == InjectFaults ); // Simulated machine was killed with faults
1227
1228 if (kt == KillInstantly) {
1229 TraceEvent(SevWarn, "FailMachine").detail("Name", machine->name).detail("Address", machine->address).detail("ZoneId", machine->locality.zoneId()).detail("Process", machine->toString()).detail("Rebooting", machine->rebooting).detail("Protected", protectedAddresses.count(machine->address)).backtrace();
1230 // This will remove all the "tracked" messages that came from the machine being killed
1231 latestEventCache.clear();
1232 machine->failed = true;
1233 } else if (kt == InjectFaults) {
1234 TraceEvent(SevWarn, "FaultMachine").detail("Name", machine->name).detail("Address", machine->address).detail("ZoneId", machine->locality.zoneId()).detail("Process", machine->toString()).detail("Rebooting", machine->rebooting).detail("Protected", protectedAddresses.count(machine->address)).backtrace();
1235 should_inject_fault = simulator_should_inject_fault;
1236 machine->fault_injection_r = g_random->randomUniqueID().first();
1237 machine->fault_injection_p1 = 0.1;
1238 machine->fault_injection_p2 = g_random->random01();
1239 } else {
1240 ASSERT( false );
1241 }
1242 ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting);
1243 }
rebootProcess(ProcessInfo * process,KillType kt)1244 virtual void rebootProcess( ProcessInfo* process, KillType kt ) {
1245 if( kt == RebootProcessAndDelete && protectedAddresses.count(process->address) ) {
1246 TraceEvent("RebootChanged").detail("ZoneId", process->locality.describeZone()).detail("KillType", RebootProcess).detail("OrigKillType", kt).detail("Reason", "Protected process");
1247 kt = RebootProcess;
1248 }
1249 doReboot( process, kt );
1250 }
rebootProcess(Optional<Standalone<StringRef>> zoneId,bool allProcesses)1251 virtual void rebootProcess(Optional<Standalone<StringRef>> zoneId, bool allProcesses ) {
1252 if( allProcesses ) {
1253 auto processes = getAllProcesses();
1254 for( int i = 0; i < processes.size(); i++ )
1255 if( processes[i]->locality.zoneId() == zoneId && !processes[i]->rebooting )
1256 doReboot( processes[i], RebootProcess );
1257 } else {
1258 auto processes = getAllProcesses();
1259 for( int i = 0; i < processes.size(); i++ ) {
1260 if( processes[i]->locality.zoneId() != zoneId || processes[i]->rebooting ) {
1261 swapAndPop(&processes, i--);
1262 }
1263 }
1264 if( processes.size() )
1265 doReboot( g_random->randomChoice( processes ), RebootProcess );
1266 }
1267 }
killProcess(ProcessInfo * machine,KillType kt)1268 virtual void killProcess( ProcessInfo* machine, KillType kt ) {
1269 TraceEvent("AttemptingKillProcess");
1270 if (kt < RebootAndDelete ) {
1271 killProcess_internal( machine, kt );
1272 }
1273 }
killInterface(NetworkAddress address,KillType kt)1274 virtual void killInterface( NetworkAddress address, KillType kt ) {
1275 if (kt < RebootAndDelete ) {
1276 std::vector<ProcessInfo*>& processes = machines[ addressMap[address]->locality.machineId() ].processes;
1277 for( int i = 0; i < processes.size(); i++ )
1278 killProcess_internal( processes[i], kt );
1279 }
1280 }
killZone(Optional<Standalone<StringRef>> zoneId,KillType kt,bool forceKill,KillType * ktFinal)1281 virtual bool killZone(Optional<Standalone<StringRef>> zoneId, KillType kt, bool forceKill, KillType* ktFinal) {
1282 auto processes = getAllProcesses();
1283 std::set<Optional<Standalone<StringRef>>> zoneMachines;
1284 for (auto& process : processes) {
1285 if(process->locality.zoneId() == zoneId) {
1286 zoneMachines.insert(process->locality.machineId());
1287 }
1288 }
1289 bool result = false;
1290 for(auto& machineId : zoneMachines) {
1291 if(killMachine(machineId, kt, forceKill, ktFinal)) {
1292 result = true;
1293 }
1294 }
1295 return result;
1296 }
killMachine(Optional<Standalone<StringRef>> machineId,KillType kt,bool forceKill,KillType * ktFinal)1297 virtual bool killMachine(Optional<Standalone<StringRef>> machineId, KillType kt, bool forceKill, KillType* ktFinal) {
1298 auto ktOrig = kt;
1299
1300 TEST(true); // Trying to killing a machine
1301 TEST(kt == KillInstantly); // Trying to kill instantly
1302 TEST(kt == InjectFaults); // Trying to kill by injecting faults
1303
1304 if(speedUpSimulation && !forceKill) {
1305 TraceEvent(SevWarn, "AbortedKill").detail("MachineId", machineId).detail("Reason", "Unforced kill within speedy simulation.").backtrace();
1306 if (ktFinal) *ktFinal = None;
1307 return false;
1308 }
1309
1310 int processesOnMachine = 0;
1311
1312 KillType originalKt = kt;
1313 // Reboot if any of the processes are protected and count the number of processes not rebooting
1314 for (auto& process : machines[machineId].processes) {
1315 if (protectedAddresses.count(process->address))
1316 kt = Reboot;
1317 if (!process->rebooting)
1318 processesOnMachine++;
1319 }
1320
1321 // Do nothing, if no processes to kill
1322 if (processesOnMachine == 0) {
1323 TraceEvent(SevWarn, "AbortedKill").detail("MachineId", machineId).detail("Reason", "The target had no processes running.").detail("Processes", processesOnMachine).detail("ProcessesPerMachine", processesPerMachine).backtrace();
1324 if (ktFinal) *ktFinal = None;
1325 return false;
1326 }
1327
1328 // Check if machine can be removed, if requested
1329 if (!forceKill && ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)))
1330 {
1331 std::vector<ProcessInfo*> processesLeft, processesDead;
1332 int protectedWorker = 0, unavailable = 0, excluded = 0, cleared = 0;
1333
1334 for (auto processInfo : getAllProcesses()) {
1335 if (processInfo->isAvailableClass()) {
1336 if (processInfo->isExcluded()) {
1337 processesDead.push_back(processInfo);
1338 excluded++;
1339 }
1340 else if (processInfo->isCleared()) {
1341 processesDead.push_back(processInfo);
1342 cleared++;
1343 }
1344 else if (!processInfo->isAvailable()) {
1345 processesDead.push_back(processInfo);
1346 unavailable++;
1347 }
1348 else if (protectedAddresses.count(processInfo->address)) {
1349 processesLeft.push_back(processInfo);
1350 protectedWorker++;
1351 }
1352 else if (processInfo->locality.machineId() != machineId) {
1353 processesLeft.push_back(processInfo);
1354 } else {
1355 processesDead.push_back(processInfo);
1356 }
1357 }
1358 }
1359 if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) {
1360 TraceEvent("ChangedKillMachine").detail("MachineId", machineId).detail("KillType", kt).detail("OrigKillType", ktOrig).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("ProcessesPerMachine", processesPerMachine).detail("Protected", protectedWorker).detail("Unavailable", unavailable).detail("Excluded", excluded).detail("Cleared", cleared).detail("ProtectedTotal", protectedAddresses.size()).detail("TLogPolicy", tLogPolicy->info()).detail("StoragePolicy", storagePolicy->info());
1361 }
1362 else if ((kt == KillInstantly) || (kt == InjectFaults)) {
1363 TraceEvent("DeadMachine").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("ProcessesPerMachine", processesPerMachine).detail("TLogPolicy", tLogPolicy->info()).detail("StoragePolicy", storagePolicy->info());
1364 for (auto process : processesLeft) {
1365 TraceEvent("DeadMachineSurvivors").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", process->toString());
1366 }
1367 for (auto process : processesDead) {
1368 TraceEvent("DeadMachineVictims").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", process->toString());
1369 }
1370 }
1371 else {
1372 TraceEvent("ClearMachine").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("ProcessesPerMachine", processesPerMachine).detail("TLogPolicy", tLogPolicy->info()).detail("StoragePolicy", storagePolicy->info());
1373 for (auto process : processesLeft) {
1374 TraceEvent("ClearMachineSurvivors").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", process->toString());
1375 }
1376 for (auto process : processesDead) {
1377 TraceEvent("ClearMachineVictims").detail("MachineId", machineId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", process->toString());
1378 }
1379 }
1380 }
1381
1382 TEST(originalKt != kt); // Kill type was changed from requested to reboot.
1383
1384 // Check if any processes on machine are rebooting
1385 if( processesOnMachine != processesPerMachine && kt >= RebootAndDelete ) {
1386 TEST(true); //Attempted reboot, but the target did not have all of its processes running
1387 TraceEvent(SevWarn, "AbortedKill").detail("KillType", kt).detail("MachineId", machineId).detail("Reason", "Machine processes does not match number of processes per machine").detail("Processes", processesOnMachine).detail("ProcessesPerMachine", processesPerMachine).backtrace();
1388 if (ktFinal) *ktFinal = None;
1389 return false;
1390 }
1391
1392 // Check if any processes on machine are rebooting
1393 if ( processesOnMachine != processesPerMachine ) {
1394 TEST(true); //Attempted reboot, but the target did not have all of its processes running
1395 TraceEvent(SevWarn, "AbortedKill").detail("KillType", kt).detail("MachineId", machineId).detail("Reason", "Machine processes does not match number of processes per machine").detail("Processes", processesOnMachine).detail("ProcessesPerMachine", processesPerMachine).backtrace();
1396 if (ktFinal) *ktFinal = None;
1397 return false;
1398 }
1399
1400 TraceEvent("KillMachine").detail("MachineId", machineId).detail("Kt", kt).detail("KtOrig", ktOrig).detail("KillableMachines", processesOnMachine).detail("ProcessPerMachine", processesPerMachine).detail("KillChanged", kt!=ktOrig);
1401 if ( kt < RebootAndDelete ) {
1402 if(kt == InjectFaults && machines[machineId].machineProcess != nullptr)
1403 killProcess_internal( machines[machineId].machineProcess, kt );
1404 for (auto& process : machines[machineId].processes) {
1405 TraceEvent("KillMachineProcess").detail("KillType", kt).detail("Process", process->toString()).detail("StartingClass", process->startingClass.toString()).detail("Failed", process->failed).detail("Excluded", process->excluded).detail("Cleared", process->cleared).detail("Rebooting", process->rebooting);
1406 if (process->startingClass != ProcessClass::TesterClass)
1407 killProcess_internal( process, kt );
1408 }
1409 }
1410 else if ( kt == Reboot || kt == RebootAndDelete ) {
1411 for (auto& process : machines[machineId].processes) {
1412 TraceEvent("KillMachineProcess").detail("KillType", kt).detail("Process", process->toString()).detail("StartingClass", process->startingClass.toString()).detail("Failed", process->failed).detail("Excluded", process->excluded).detail("Cleared", process->cleared).detail("Rebooting", process->rebooting);
1413 if (process->startingClass != ProcessClass::TesterClass)
1414 doReboot(process, kt );
1415 }
1416 }
1417
1418 TEST(kt == RebootAndDelete); // Resulted in a reboot and delete
1419 TEST(kt == Reboot); // Resulted in a reboot
1420 TEST(kt == KillInstantly); // Resulted in an instant kill
1421 TEST(kt == InjectFaults); // Resulted in a kill by injecting faults
1422
1423 if (ktFinal) *ktFinal = kt;
1424 return true;
1425 }
1426
killDataCenter(Optional<Standalone<StringRef>> dcId,KillType kt,bool forceKill,KillType * ktFinal)1427 virtual bool killDataCenter(Optional<Standalone<StringRef>> dcId, KillType kt, bool forceKill, KillType* ktFinal) {
1428 auto ktOrig = kt;
1429 auto processes = getAllProcesses();
1430 std::map<Optional<Standalone<StringRef>>, int> datacenterMachines;
1431 int dcProcesses = 0;
1432
1433 // Switch to a reboot, if anything protected on machine
1434 for (auto& procRecord : processes) {
1435 auto processDcId = procRecord->locality.dcId();
1436 auto processMachineId = procRecord->locality.machineId();
1437 ASSERT(processMachineId.present());
1438 if (processDcId.present() && (processDcId == dcId)) {
1439 if ((kt != Reboot) && (protectedAddresses.count(procRecord->address))) {
1440 kt = Reboot;
1441 TraceEvent(SevWarn, "DcKillChanged").detail("DataCenter", dcId).detail("KillType", kt).detail("OrigKillType", ktOrig)
1442 .detail("Reason", "Datacenter has protected process").detail("ProcessAddress", procRecord->address).detail("Failed", procRecord->failed).detail("Rebooting", procRecord->rebooting).detail("Excluded", procRecord->excluded).detail("Cleared", procRecord->cleared).detail("Process", procRecord->toString());
1443 }
1444 datacenterMachines[processMachineId.get()] ++;
1445 dcProcesses ++;
1446 }
1447 }
1448
1449 // Check if machine can be removed, if requested
1450 if (!forceKill && ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)))
1451 {
1452 std::vector<ProcessInfo*> processesLeft, processesDead;
1453 for (auto processInfo : getAllProcesses()) {
1454 if (processInfo->isAvailableClass()) {
1455 if (processInfo->isExcluded() || processInfo->isCleared() || !processInfo->isAvailable()) {
1456 processesDead.push_back(processInfo);
1457 } else if (protectedAddresses.count(processInfo->address) || datacenterMachines.find(processInfo->locality.machineId()) == datacenterMachines.end()) {
1458 processesLeft.push_back(processInfo);
1459 } else {
1460 processesDead.push_back(processInfo);
1461 }
1462 }
1463 }
1464
1465 if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) {
1466 TraceEvent(SevWarn, "DcKillChanged").detail("DataCenter", dcId).detail("KillType", kt).detail("OrigKillType", ktOrig);
1467 }
1468 else {
1469 TraceEvent("DeadDataCenter").detail("DataCenter", dcId).detail("KillType", kt).detail("DcZones", datacenterMachines.size()).detail("DcProcesses", dcProcesses).detail("ProcessesDead", processesDead.size()).detail("ProcessesLeft", processesLeft.size()).detail("TLogPolicy", tLogPolicy->info()).detail("StoragePolicy", storagePolicy->info());
1470 for (auto process : processesLeft) {
1471 TraceEvent("DeadDcSurvivors").detail("MachineId", process->locality.machineId()).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", process->toString());
1472 }
1473 for (auto process : processesDead) {
1474 TraceEvent("DeadDcVictims").detail("MachineId", process->locality.machineId()).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", process->toString());
1475 }
1476 }
1477 }
1478
1479 KillType ktResult, ktMin = kt;
1480 for (auto& datacenterMachine : datacenterMachines) {
1481 if(g_random->random01() < 0.99) {
1482 killMachine(datacenterMachine.first, kt, true, &ktResult);
1483 if (ktResult != kt) {
1484 TraceEvent(SevWarn, "KillDCFail")
1485 .detail("Zone", datacenterMachine.first)
1486 .detail("KillType", kt)
1487 .detail("KillTypeResult", ktResult)
1488 .detail("KillTypeOrig", ktOrig);
1489 ASSERT(ktResult == None);
1490 }
1491 ktMin = std::min<KillType>( ktResult, ktMin );
1492 }
1493 }
1494
1495 TraceEvent("KillDataCenter")
1496 .detail("DcZones", datacenterMachines.size())
1497 .detail("DcProcesses", dcProcesses)
1498 .detail("DCID", dcId)
1499 .detail("KillType", kt)
1500 .detail("KillTypeOrig", ktOrig)
1501 .detail("KillTypeMin", ktMin)
1502 .detail("KilledDC", kt==ktMin);
1503
1504 TEST(kt != ktMin); // DataCenter kill was rejected by killMachine
1505 TEST((kt==ktMin) && (kt == RebootAndDelete)); // Resulted in a reboot and delete
1506 TEST((kt==ktMin) && (kt == Reboot)); // Resulted in a reboot
1507 TEST((kt==ktMin) && (kt == KillInstantly)); // Resulted in an instant kill
1508 TEST((kt==ktMin) && (kt == InjectFaults)); // Resulted in a kill by injecting faults
1509 TEST((kt==ktMin) && (kt != ktOrig)); // Kill request was downgraded
1510 TEST((kt==ktMin) && (kt == ktOrig)); // Requested kill was done
1511
1512 if (ktFinal) *ktFinal = ktMin;
1513
1514 return (kt == ktMin);
1515 }
clogInterface(const IPAddress & ip,double seconds,ClogMode mode=ClogDefault)1516 virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) {
1517 if (mode == ClogDefault) {
1518 double a = g_random->random01();
1519 if ( a < 0.3 ) mode = ClogSend;
1520 else if (a < 0.6 ) mode = ClogReceive;
1521 else mode = ClogAll;
1522 }
1523 TraceEvent("ClogInterface")
1524 .detail("IP", ip.toString())
1525 .detail("Delay", seconds)
1526 .detail("Queue", mode == ClogSend ? "Send" : mode == ClogReceive ? "Receive" : "All");
1527
1528 if (mode == ClogSend || mode==ClogAll)
1529 g_clogging.clogSendFor( ip, seconds );
1530 if (mode == ClogReceive || mode==ClogAll)
1531 g_clogging.clogRecvFor( ip, seconds );
1532 }
clogPair(const IPAddress & from,const IPAddress & to,double seconds)1533 virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) {
1534 g_clogging.clogPairFor( from, to, seconds );
1535 }
getAllProcesses() const1536 virtual std::vector<ProcessInfo*> getAllProcesses() const {
1537 std::vector<ProcessInfo*> processes;
1538 for( auto& c : machines ) {
1539 processes.insert( processes.end(), c.second.processes.begin(), c.second.processes.end() );
1540 }
1541 for( auto& c : currentlyRebootingProcesses ) {
1542 processes.push_back( c.second );
1543 }
1544 return processes;
1545 }
getProcessByAddress(NetworkAddress const & address)1546 virtual ProcessInfo* getProcessByAddress( NetworkAddress const& address ) {
1547 NetworkAddress normalizedAddress(address.ip, address.port, true, false);
1548 ASSERT( addressMap.count( normalizedAddress ) );
1549 return addressMap[ normalizedAddress ];
1550 }
1551
getMachineByNetworkAddress(NetworkAddress const & address)1552 virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) {
1553 return &machines[addressMap[address]->locality.machineId()];
1554 }
1555
getMachineById(Optional<Standalone<StringRef>> const & machineId)1556 virtual MachineInfo* getMachineById(Optional<Standalone<StringRef>> const& machineId) {
1557 return &machines[machineId];
1558 }
1559
destroyMachine(Optional<Standalone<StringRef>> const & machineId)1560 virtual void destroyMachine(Optional<Standalone<StringRef>> const& machineId ) {
1561 auto& machine = machines[machineId];
1562 for( auto process : machine.processes ) {
1563 ASSERT( process->failed );
1564 }
1565 if( machine.machineProcess ) {
1566 killProcess_internal( machine.machineProcess, KillInstantly );
1567 }
1568 machines.erase(machineId);
1569 }
1570
Sim2()1571 Sim2() : time(0.0), taskCount(0), yielded(false), yield_limit(0), currentTaskID(-1) {
1572 // Not letting currentProcess be NULL eliminates some annoying special cases
1573 currentProcess = new ProcessInfo( "NoMachine", LocalityData(Optional<Standalone<StringRef>>(), StringRef(), StringRef(), StringRef()), ProcessClass(), {NetworkAddress()}, this, "", "" );
1574 g_network = net2 = newNet2(false, true);
1575 Net2FileSystem::newFileSystem();
1576 check_yield(0);
1577 }
1578
1579 // Implementation
1580 struct Task {
1581 int taskID;
1582 double time;
1583 uint64_t stable;
1584 ProcessInfo* machine;
1585 Promise<Void> action;
TaskSim2::Task1586 Task( double time, int taskID, uint64_t stable, ProcessInfo* machine, Promise<Void>&& action ) : time(time), taskID(taskID), stable(stable), machine(machine), action(std::move(action)) {}
TaskSim2::Task1587 Task( double time, int taskID, uint64_t stable, ProcessInfo* machine, Future<Void>& future ) : time(time), taskID(taskID), stable(stable), machine(machine) { future = action.getFuture(); }
TaskSim2::Task1588 Task(Task&& rhs) BOOST_NOEXCEPT : time(rhs.time), taskID(rhs.taskID), stable(rhs.stable), machine(rhs.machine), action(std::move(rhs.action)) {}
operator =Sim2::Task1589 void operator= ( Task const& rhs ) { taskID = rhs.taskID; time = rhs.time; stable = rhs.stable; machine = rhs.machine; action = rhs.action; }
TaskSim2::Task1590 Task( Task const& rhs ) : taskID(rhs.taskID), time(rhs.time), stable(rhs.stable), machine(rhs.machine), action(rhs.action) {}
operator =Sim2::Task1591 void operator= (Task&& rhs) BOOST_NOEXCEPT { time = rhs.time; taskID = rhs.taskID; stable = rhs.stable; machine = rhs.machine; action = std::move(rhs.action); }
1592
operator <Sim2::Task1593 bool operator < (Task const& rhs) const {
1594 // Ordering is reversed for priority_queue
1595 if (time != rhs.time) return time > rhs.time;
1596 return stable > rhs.stable;
1597 }
1598 };
1599
execTask(struct Task & t)1600 void execTask(struct Task& t) {
1601 if (t.machine->failed) {
1602 t.action.send(Never());
1603 }
1604 else {
1605 mutex.enter();
1606 this->time = t.time;
1607 mutex.leave();
1608
1609 this->currentProcess = t.machine;
1610 try {
1611 //auto before = getCPUTicks();
1612 t.action.send(Void());
1613 ASSERT( this->currentProcess == t.machine );
1614 /*auto elapsed = getCPUTicks() - before;
1615 currentProcess->cpuTicks += elapsed;
1616 if (g_random->random01() < 0.01){
1617 TraceEvent("TaskDuration").detail("CpuTicks", currentProcess->cpuTicks);
1618 currentProcess->cpuTicks = 0;
1619 }*/
1620 } catch (Error& e) {
1621 TraceEvent(SevError, "UnhandledSimulationEventError").error(e, true);
1622 killProcess(t.machine, KillInstantly);
1623 }
1624
1625 //if( this->time > 45.522817 ) {
1626 // printf("foo\n");
1627 //}
1628
1629 if (randLog)
1630 fprintf( randLog, "T %f %d %s %lld\n", this->time, int(g_random->peek() % 10000), t.machine ? t.machine->name : "none", t.stable);
1631 }
1632 }
1633
onMainThread(Promise<Void> && signal,int taskID)1634 virtual void onMainThread( Promise<Void>&& signal, int taskID ) {
1635 // This is presumably coming from either a "fake" thread pool thread, i.e. it is actually on this thread
1636 // or a thread created with g_network->startThread
1637 ASSERT(getCurrentProcess());
1638
1639 mutex.enter();
1640 ASSERT(taskID >= TaskMinPriority && taskID <= TaskMaxPriority);
1641 tasks.push( Task( time, taskID, taskCount++, getCurrentProcess(), std::move(signal) ) );
1642 mutex.leave();
1643 }
onProcess(ISimulator::ProcessInfo * process,int taskID)1644 virtual Future<Void> onProcess( ISimulator::ProcessInfo *process, int taskID ) {
1645 return delay( 0, taskID, process );
1646 }
onMachine(ISimulator::ProcessInfo * process,int taskID)1647 virtual Future<Void> onMachine( ISimulator::ProcessInfo *process, int taskID ) {
1648 if( process->machine == 0 )
1649 return Void();
1650 return delay( 0, taskID, process->machine->machineProcess );
1651 }
1652
1653 //time is guarded by ISimulator::mutex. It is not necessary to guard reads on the main thread because
1654 //time should only be modified from the main thread.
1655 double time;
1656 int currentTaskID;
1657
1658 //taskCount is guarded by ISimulator::mutex
1659 uint64_t taskCount;
1660
1661 std::map<Optional<Standalone<StringRef>>, MachineInfo > machines;
1662 std::map<NetworkAddress, ProcessInfo*> addressMap;
1663 std::map<ProcessInfo*, Promise<Void>> filesDeadMap;
1664
1665 //tasks is guarded by ISimulator::mutex
1666 std::priority_queue<Task, std::vector<Task>> tasks;
1667
1668 //Sim2Net network;
1669 INetwork *net2;
1670
1671 //Map from machine IP -> machine disk space info
1672 std::map<IPAddress, SimDiskSpace> diskSpaceMap;
1673
1674 //Whether or not yield has returned true during the current iteration of the run loop
1675 bool yielded;
1676 int yield_limit; // how many more times yield may return false before next returning true
1677 };
1678
startNewSimulator()1679 void startNewSimulator() {
1680 ASSERT( !g_network );
1681 g_network = g_pSimulator = new Sim2();
1682 g_simulator.connectionFailuresDisableDuration = g_random->random01() < 0.5 ? 0 : 1e6;
1683 }
1684
doReboot(ISimulator::ProcessInfo * p,ISimulator::KillType kt)1685 ACTOR void doReboot( ISimulator::ProcessInfo *p, ISimulator::KillType kt ) {
1686 TraceEvent("RebootingProcessAttempt").detail("ZoneId", p->locality.zoneId()).detail("KillType", kt).detail("Process", p->toString()).detail("StartingClass", p->startingClass.toString()).detail("Failed", p->failed).detail("Excluded", p->excluded).detail("Cleared", p->cleared).detail("Rebooting", p->rebooting).detail("TaskDefaultDelay", TaskDefaultDelay);
1687
1688 wait( g_sim2.delay( 0, TaskDefaultDelay, p ) ); // Switch to the machine in question
1689
1690 try {
1691 ASSERT( kt == ISimulator::RebootProcess || kt == ISimulator::Reboot || kt == ISimulator::RebootAndDelete || kt == ISimulator::RebootProcessAndDelete );
1692
1693 TEST( kt == ISimulator::RebootProcess ); // Simulated process rebooted
1694 TEST( kt == ISimulator::Reboot ); // Simulated machine rebooted
1695 TEST( kt == ISimulator::RebootAndDelete ); // Simulated machine rebooted with data and coordination state deletion
1696 TEST( kt == ISimulator::RebootProcessAndDelete ); // Simulated process rebooted with data and coordination state deletion
1697
1698 if( p->rebooting )
1699 return;
1700 TraceEvent("RebootingProcess").detail("KillType", kt).detail("Address", p->address).detail("ZoneId", p->locality.zoneId()).detail("DataHall", p->locality.dataHallId()).detail("Locality", p->locality.toString()).detail("Failed", p->failed).detail("Excluded", p->excluded).detail("Cleared", p->cleared).backtrace();
1701 p->rebooting = true;
1702 if ((kt == ISimulator::RebootAndDelete) || (kt == ISimulator::RebootProcessAndDelete)) {
1703 p->cleared = true;
1704 g_simulator.clearAddress(p->address);
1705 }
1706 p->shutdownSignal.send( kt );
1707 } catch (Error& e) {
1708 TraceEvent(SevError, "RebootError").error(e);
1709 p->shutdownSignal.sendError(e); // ?
1710 throw; // goes nowhere!
1711 }
1712 }
1713
1714 //Simulates delays for performing operations on disk
waitUntilDiskReady(Reference<DiskParameters> diskParameters,int64_t size,bool sync)1715 Future<Void> waitUntilDiskReady( Reference<DiskParameters> diskParameters, int64_t size, bool sync ) {
1716 if(g_simulator.connectionFailuresDisableDuration > 1e4)
1717 return delay(0.0001);
1718
1719 if( diskParameters->nextOperation < now() ) diskParameters->nextOperation = now();
1720 diskParameters->nextOperation += ( 1.0 / diskParameters->iops ) + ( size / diskParameters->bandwidth );
1721
1722 double randomLatency;
1723 if(sync) {
1724 randomLatency = .005 + g_random->random01() * (BUGGIFY ? 1.0 : .010);
1725 } else
1726 randomLatency = 10 * g_random->random01() / diskParameters->iops;
1727
1728 return delayUntil( diskParameters->nextOperation + randomLatency );
1729 }
1730
1731 #if defined(_WIN32)
1732
1733 /* Opening with FILE_SHARE_DELETE lets simulation actually work on windows - previously renames were always failing.
1734 FIXME: Use an actual platform abstraction for this stuff! Is there any reason we can't use underlying net2 for example? */
1735
1736 #include <Windows.h>
1737
sf_open(const char * filename,int flags,int convFlags,int mode)1738 int sf_open( const char* filename, int flags, int convFlags, int mode ) {
1739 HANDLE wh = CreateFile( filename, GENERIC_READ | ((flags&IAsyncFile::OPEN_READWRITE) ? GENERIC_WRITE : 0),
1740 FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, NULL,
1741 (flags&IAsyncFile::OPEN_EXCLUSIVE) ? CREATE_NEW :
1742 (flags&IAsyncFile::OPEN_CREATE) ? OPEN_ALWAYS :
1743 OPEN_EXISTING,
1744 FILE_ATTRIBUTE_NORMAL,
1745 NULL );
1746 int h = -1;
1747 if (wh != INVALID_HANDLE_VALUE) h = _open_osfhandle( (intptr_t)wh, convFlags );
1748 else errno = GetLastError() == ERROR_FILE_NOT_FOUND ? ENOENT : EFAULT;
1749 return h;
1750 }
1751
1752 #endif
1753
1754 // Opens a file for asynchronous I/O
open(std::string filename,int64_t flags,int64_t mode)1755 Future< Reference<class IAsyncFile> > Sim2FileSystem::open( std::string filename, int64_t flags, int64_t mode )
1756 {
1757 ASSERT( (flags & IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE) ||
1758 !(flags & IAsyncFile::OPEN_CREATE) ||
1759 StringRef(filename).endsWith(LiteralStringRef(".fdb-lock")) ); // We don't use "ordinary" non-atomic file creation right now except for folder locking, and we don't have code to simulate its unsafeness.
1760
1761 if ( (flags & IAsyncFile::OPEN_EXCLUSIVE) ) ASSERT( flags & IAsyncFile::OPEN_CREATE );
1762
1763 if (flags & IAsyncFile::OPEN_UNCACHED) {
1764 auto& machineCache = g_simulator.getCurrentProcess()->machine->openFiles;
1765 std::string actualFilename = filename;
1766 if ( machineCache.find(filename) == machineCache.end() ) {
1767 if(flags & IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE) {
1768 actualFilename = filename + ".part";
1769 auto partFile = machineCache.find(actualFilename);
1770 if(partFile != machineCache.end()) {
1771 Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open(partFile->second);
1772 if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
1773 f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
1774 return f;
1775 }
1776 }
1777 //Simulated disk parameters are shared by the AsyncFileNonDurable and the underlying SimpleFile. This way, they can both keep up with the time to start the next operation
1778 Reference<DiskParameters> diskParameters(new DiskParameters(FLOW_KNOBS->SIM_DISK_IOPS, FLOW_KNOBS->SIM_DISK_BANDWIDTH));
1779 machineCache[actualFilename] = AsyncFileNonDurable::open(filename, actualFilename, SimpleFile::open(filename, flags, mode, diskParameters, false), diskParameters);
1780 }
1781 Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open( machineCache[actualFilename] );
1782 if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
1783 f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
1784 return f;
1785 }
1786 else
1787 return AsyncFileCached::open(filename, flags, mode);
1788 }
1789
1790 // Deletes the given file. If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure.
deleteFile(std::string filename,bool mustBeDurable)1791 Future< Void > Sim2FileSystem::deleteFile( std::string filename, bool mustBeDurable )
1792 {
1793 return Sim2::deleteFileImpl(&g_sim2, filename, mustBeDurable);
1794 }
1795
lastWriteTime(std::string filename)1796 Future< std::time_t > Sim2FileSystem::lastWriteTime( std::string filename ) {
1797 // TODO: update this map upon file writes.
1798 static std::map<std::string, double> fileWrites;
1799 if (BUGGIFY && g_random->random01() < 0.01) {
1800 fileWrites[filename] = now();
1801 }
1802 return fileWrites[filename];
1803 }
1804
newFileSystem()1805 void Sim2FileSystem::newFileSystem()
1806 {
1807 g_network->setGlobal(INetwork::enFileSystem, (flowGlobalType) new Sim2FileSystem());
1808 }
1809