1 /*
2  * simulator.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef FLOW_SIMULATOR_H
22 #define FLOW_SIMULATOR_H
23 #pragma once
24 
25 #include "flow/flow.h"
26 #include "fdbrpc/FailureMonitor.h"
27 #include "fdbrpc/Locality.h"
28 #include "fdbrpc/IAsyncFile.h"
29 #include "flow/TDMetric.actor.h"
30 #include <random>
31 #include "fdbrpc/ReplicationPolicy.h"
32 
33 enum ClogMode { ClogDefault, ClogAll, ClogSend, ClogReceive };
34 
35 class ISimulator : public INetwork {
36 public:
ISimulator()37 	ISimulator() : desiredCoordinators(1), physicalDatacenters(1), processesPerMachine(0), listenersPerProcess(1), isStopped(false), lastConnectionFailure(0), connectionFailuresDisableDuration(0), speedUpSimulation(false), allSwapsDisabled(false), backupAgents(WaitForType), drAgents(WaitForType), extraDB(NULL), allowLogSetKills(true), usableRegions(1) {}
38 
39 	// Order matters!
40 	enum KillType { KillInstantly, InjectFaults, RebootAndDelete, RebootProcessAndDelete, Reboot, RebootProcess, None };
41 
42 	enum BackupAgentType { NoBackupAgents, WaitForType, BackupToFile, BackupToDB };
43 
44 	// Subclasses may subclass ProcessInfo as well
45 	struct MachineInfo;
46 
47 	struct ProcessInfo : NonCopyable {
48 		const char* name;
49 		const char* coordinationFolder;
50 		const char* dataFolder;
51 		MachineInfo* machine;
52 		NetworkAddressList addresses;
53 		NetworkAddress address;
54 		LocalityData	locality;
55 		ProcessClass startingClass;
56 		TDMetricCollection tdmetrics;
57 		std::map<NetworkAddress, Reference<IListener>> listenerMap;
58 		bool failed;
59 		bool excluded;
60 		bool cleared;
61 		int64_t cpuTicks;
62 		bool rebooting;
63 		std::vector<flowGlobalType> globals;
64 
65 		INetworkConnections *network;
66 
67 		uint64_t fault_injection_r;
68 		double fault_injection_p1, fault_injection_p2;
69 
ProcessInfoProcessInfo70 		ProcessInfo(const char* name, LocalityData locality, ProcessClass startingClass, NetworkAddressList addresses,
71 					INetworkConnections *net, const char* dataFolder, const char* coordinationFolder )
72 			: name(name), locality(locality), startingClass(startingClass), addresses(addresses), address(addresses.address), dataFolder(dataFolder),
73 				network(net), coordinationFolder(coordinationFolder), failed(false), excluded(false), cpuTicks(0),
74 				rebooting(false), fault_injection_p1(0), fault_injection_p2(0),
75 				fault_injection_r(0), machine(0), cleared(false) {}
76 
onShutdownProcessInfo77 		Future<KillType> onShutdown() { return shutdownSignal.getFuture(); }
78 
isReliableProcessInfo79 		bool isReliable() const { return !failed && fault_injection_p1 == 0 && fault_injection_p2 == 0; }
isAvailableProcessInfo80 		bool isAvailable() const { return !isExcluded() && isReliable(); }
isExcludedProcessInfo81 		bool isExcluded() const { return excluded; }
isClearedProcessInfo82 		bool isCleared() const { return cleared; }
83 
84 		// Returns true if the class represents an acceptable worker
isAvailableClassProcessInfo85 		bool isAvailableClass() const {
86 			switch (startingClass._class) {
87 				case ProcessClass::UnsetClass: return true;
88 				case ProcessClass::StorageClass: return true;
89 				case ProcessClass::TransactionClass: return true;
90 				case ProcessClass::ResolutionClass: return false;
91 				case ProcessClass::ProxyClass: return false;
92 				case ProcessClass::MasterClass: return false;
93 				case ProcessClass::TesterClass: return false;
94 				case ProcessClass::StatelessClass: return false;
95 				case ProcessClass::LogClass: return true;
96 				case ProcessClass::LogRouterClass: return false;
97 				case ProcessClass::ClusterControllerClass: return false;
98 				case ProcessClass::DataDistributorClass: return false;
99 				case ProcessClass::RatekeeperClass: return false;
100 				default: return false;
101 			}
102 		}
103 
getListenerProcessInfo104 		const Reference<IListener> getListener(const NetworkAddress& addr) {
105 			auto listener = listenerMap.find(addr);
106 			ASSERT( listener != listenerMap.end());
107 			return listener->second;
108 		}
109 
globalProcessInfo110 		inline flowGlobalType global(int id) { return (globals.size() > id) ? globals[id] : NULL; };
setGlobalProcessInfo111 		inline void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; };
112 
toStringProcessInfo113 		std::string toString() const {
114 			return format(
115 			    "name: %s address: %s zone: %s datahall: %s class: %s excluded: %d cleared: %d", name,
116 			    formatIpPort(addresses.address.ip, addresses.address.port).c_str(),
117 			    (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"),
118 			    (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"),
119 			    startingClass.toString().c_str(), excluded, cleared);
120 		}
121 
122 		// Members not for external use
123 		Promise<KillType> shutdownSignal;
124 	};
125 
126 	struct MachineInfo {
127 		ProcessInfo* machineProcess;
128 		std::vector<ProcessInfo*> processes;
129 		std::map<std::string, Future<Reference<IAsyncFile>>> openFiles;
130 		std::set<std::string> deletingFiles;
131 		std::set<std::string> closingFiles;
132 		Optional<Standalone<StringRef>>	machineId;
133 
MachineInfoMachineInfo134 		MachineInfo() : machineProcess(0) {}
135 	};
136 
getProcess(Endpoint const & endpoint)137 	ProcessInfo* getProcess( Endpoint const& endpoint ) { return getProcessByAddress(endpoint.getPrimaryAddress()); }
getCurrentProcess()138 	ProcessInfo* getCurrentProcess() { return currentProcess; }
139 	virtual Future<Void> onProcess( ISimulator::ProcessInfo *process, int taskID = -1 ) = 0;
140 	virtual Future<Void> onMachine( ISimulator::ProcessInfo *process, int taskID = -1 ) = 0;
141 
142 	virtual ProcessInfo* newProcess(const char* name, IPAddress ip, uint16_t port, uint16_t listenPerProcess,
143 	                                LocalityData locality, ProcessClass startingClass, const char* dataFolder,
144 	                                const char* coordinationFolder) = 0;
145 	virtual void killProcess( ProcessInfo* machine, KillType ) = 0;
146 	virtual void rebootProcess(Optional<Standalone<StringRef>> zoneId, bool allProcesses ) = 0;
147 	virtual void rebootProcess( ProcessInfo* process, KillType kt ) = 0;
148 	virtual void killInterface( NetworkAddress address, KillType ) = 0;
149 	virtual bool killMachine(Optional<Standalone<StringRef>> machineId, KillType kt, bool forceKill = false, KillType* ktFinal = NULL) = 0;
150 	virtual bool killZone(Optional<Standalone<StringRef>> zoneId, KillType kt, bool forceKill = false, KillType* ktFinal = NULL) = 0;
151 	virtual bool killDataCenter(Optional<Standalone<StringRef>> dcId, KillType kt, bool forceKill = false, KillType* ktFinal = NULL) = 0;
152 	//virtual KillType getMachineKillState( UID zoneID ) = 0;
153 	virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) const = 0;
154 	virtual bool isAvailable() const = 0;
155 	virtual bool datacenterDead(Optional<Standalone<StringRef>> dcId) const = 0;
156 	virtual void displayWorkers() const;
157 
addRole(NetworkAddress const & address,std::string const & role)158 	virtual void addRole(NetworkAddress const& address, std::string const& role) {
159 		roleAddresses[address][role] ++;
160 		TraceEvent("RoleAdd").detail("Address", address).detail("Role", role).detail("NumRoles", roleAddresses[address].size()).detail("Value", roleAddresses[address][role]);
161 	}
162 
removeRole(NetworkAddress const & address,std::string const & role)163 	virtual void removeRole(NetworkAddress const& address, std::string const& role) {
164 		auto addressIt = roleAddresses.find(address);
165 		if (addressIt != roleAddresses.end()) {
166 			auto rolesIt = addressIt->second.find(role);
167 			if (rolesIt != addressIt->second.end()) {
168 				if (rolesIt->second > 1) {
169 					rolesIt->second --;
170 					TraceEvent("RoleRemove").detail("Address", address).detail("Role", role).detail("NumRoles", addressIt->second.size()).detail("Value", rolesIt->second).detail("Result", "Decremented Role");
171 				}
172 				else {
173 					addressIt->second.erase(rolesIt);
174 					if (addressIt->second.size()) {
175 						TraceEvent("RoleRemove").detail("Address", address).detail("Role", role).detail("NumRoles", addressIt->second.size()).detail("Value", 0).detail("Result", "Removed Role");
176 					}
177 					else {
178 						roleAddresses.erase(addressIt);
179 						TraceEvent("RoleRemove").detail("Address", address).detail("Role", role).detail("NumRoles", 0).detail("Value", 0).detail("Result", "Removed Address");
180 					}
181 				}
182 			}
183 			else {
184 				TraceEvent(SevWarn,"RoleRemove").detail("Address", address).detail("Role", role).detail("Result", "Role Missing");
185 			}
186 		}
187 		else {
188 			TraceEvent(SevWarn,"RoleRemove").detail("Address", address).detail("Role", role).detail("Result", "Address Missing");
189 		}
190 	}
191 
192 	virtual std::string getRoles(NetworkAddress const& address, bool skipWorkers = true) const {
193 		auto addressIt = roleAddresses.find(address);
194 		std::string roleText;
195 		if (addressIt != roleAddresses.end()) {
196 			for (auto& roleIt : addressIt->second) {
197 				if ((!skipWorkers) || (roleIt.first != "Worker"))
198 					roleText += roleIt.first + ((roleIt.second > 1) ? format("-%d ", roleIt.second) : " ");
199 			}
200 		}
201 		if (roleText.empty())
202 				roleText = "[unset]";
203 		return roleText;
204 	}
205 
clearAddress(NetworkAddress const & address)206 	virtual void clearAddress(NetworkAddress const& address) {
207 		clearedAddresses[address]++;
208 		TraceEvent("ClearAddress").detail("Address", address).detail("Value", clearedAddresses[address]);
209 	}
isCleared(NetworkAddress const & address)210 	virtual bool isCleared(NetworkAddress const& address) const {
211 		return clearedAddresses.find(address) != clearedAddresses.end();
212 	}
213 
excludeAddress(NetworkAddress const & address)214 	virtual void excludeAddress(NetworkAddress const& address) {
215 		excludedAddresses[address]++;
216 		TraceEvent("ExcludeAddress").detail("Address", address).detail("Value", excludedAddresses[address]);
217 	}
218 
includeAddress(NetworkAddress const & address)219 	virtual void includeAddress(NetworkAddress const& address) {
220 		auto addressIt = excludedAddresses.find(address);
221 		if (addressIt != excludedAddresses.end()) {
222 			if (addressIt->second > 1) {
223 				addressIt->second --;
224 				TraceEvent("IncludeAddress").detail("Address", address).detail("Value", addressIt->second).detail("Result", "Decremented");
225 			}
226 			else {
227 				excludedAddresses.erase(addressIt);
228 				TraceEvent("IncludeAddress").detail("Address", address).detail("Value", 0).detail("Result", "Removed");
229 			}
230 		}
231 		else {
232 			TraceEvent(SevWarn,"IncludeAddress").detail("Address", address).detail("Result", "Missing");
233 		}
234 	}
includeAllAddresses()235 	virtual void includeAllAddresses() {
236 		TraceEvent("IncludeAddressAll").detail("AddressTotal", excludedAddresses.size());
237 		excludedAddresses.clear();
238 	}
isExcluded(NetworkAddress const & address)239 	virtual bool isExcluded(NetworkAddress const& address) const {
240 		return excludedAddresses.find(address) != excludedAddresses.end();
241 	}
242 
disableSwapToMachine(Optional<Standalone<StringRef>> zoneId)243 	virtual void disableSwapToMachine(Optional<Standalone<StringRef>> zoneId ) {
244 		swapsDisabled.insert(zoneId);
245 	}
enableSwapToMachine(Optional<Standalone<StringRef>> zoneId)246 	virtual void enableSwapToMachine(Optional<Standalone<StringRef>> zoneId ) {
247 		swapsDisabled.erase(zoneId);
248 		allSwapsDisabled = false;
249 	}
canSwapToMachine(Optional<Standalone<StringRef>> zoneId)250 	virtual bool canSwapToMachine(Optional<Standalone<StringRef>> zoneId ) {
251 		return swapsDisabled.count( zoneId ) == 0 && !allSwapsDisabled && !extraDB;
252 	}
enableSwapsToAll()253 	virtual void enableSwapsToAll() {
254 		swapsDisabled.clear();
255 		allSwapsDisabled = false;
256 	}
disableSwapsToAll()257 	virtual void disableSwapsToAll() {
258 		swapsDisabled.clear();
259 		allSwapsDisabled = true;
260 	}
261 
262 	virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) = 0;
263 	virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) = 0;
264 	virtual std::vector<ProcessInfo*> getAllProcesses() const = 0;
265 	virtual ProcessInfo* getProcessByAddress( NetworkAddress const& address ) = 0;
266 	virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) = 0;
267 	virtual MachineInfo* getMachineById(Optional<Standalone<StringRef>> const& machineId) = 0;
run()268 	virtual void run() {}
269 	virtual void destroyProcess( ProcessInfo *p ) = 0;
270 	virtual void destroyMachine(Optional<Standalone<StringRef>> const& machineId ) = 0;
271 
272 	int desiredCoordinators;
273 	int physicalDatacenters;
274 	int processesPerMachine;
275 	int listenersPerProcess;
276 	std::set<NetworkAddress> protectedAddresses;
277 	std::map<NetworkAddress, ProcessInfo*> currentlyRebootingProcesses;
278 	class ClusterConnectionString* extraDB;
279 	Reference<IReplicationPolicy> storagePolicy;
280 	Reference<IReplicationPolicy> tLogPolicy;
281 	int32_t tLogWriteAntiQuorum;
282 	Optional<Standalone<StringRef>> primaryDcId;
283 	Reference<IReplicationPolicy> remoteTLogPolicy;
284 	int32_t usableRegions;
285 	std::string disablePrimary;
286 	std::string disableRemote;
287 	std::string originalRegions;
288 	bool allowLogSetKills;
289 	Optional<Standalone<StringRef>> remoteDcId;
290 	bool hasSatelliteReplication;
291 	Reference<IReplicationPolicy> satelliteTLogPolicy;
292 	Reference<IReplicationPolicy> satelliteTLogPolicyFallback;
293 	int32_t satelliteTLogWriteAntiQuorum;
294 	int32_t satelliteTLogWriteAntiQuorumFallback;
295 	std::vector<Optional<Standalone<StringRef>>> primarySatelliteDcIds;
296 	std::vector<Optional<Standalone<StringRef>>> remoteSatelliteDcIds;
297 
298 	//Used by workloads that perform reconfigurations
299 	int testerCount;
300 	std::string connectionString;
301 
302 	bool isStopped;
303 	double lastConnectionFailure;
304 	double connectionFailuresDisableDuration;
305 	bool speedUpSimulation;
306 	BackupAgentType backupAgents;
307 	BackupAgentType drAgents;
308 
global(int id)309 	virtual flowGlobalType global(int id) { return getCurrentProcess()->global(id); };
setGlobal(size_t id,flowGlobalType v)310 	virtual void setGlobal(size_t id, flowGlobalType v) { getCurrentProcess()->setGlobal(id,v); };
311 
312 	static thread_local ProcessInfo* currentProcess;
313 protected:
314 	Mutex mutex;
315 
316 private:
317 	std::set<Optional<Standalone<StringRef>>> swapsDisabled;
318 	std::map<NetworkAddress, int> excludedAddresses;
319 	std::map<NetworkAddress, int> clearedAddresses;
320 	std::map<NetworkAddress, std::map<std::string, int>> roleAddresses;
321 	bool allSwapsDisabled;
322 };
323 
324 // Quickly make existing code work that expects g_simulator to be of class type (not a pointer)
325 extern ISimulator* g_pSimulator;
326 #define g_simulator (*g_pSimulator)
327 
328 void startNewSimulator();
329 
330 //Parameters used to simulate disk performance
331 struct DiskParameters : ReferenceCounted<DiskParameters> {
332 	double nextOperation;
333 	int64_t iops;
334 	int64_t bandwidth;
335 
DiskParametersDiskParameters336 	DiskParameters(int64_t iops, int64_t bandwidth) : nextOperation(0), iops(iops), bandwidth(bandwidth) { }
337 };
338 
339 //Simulates delays for performing operations on disk
340 extern Future<Void> waitUntilDiskReady(Reference<DiskParameters> parameters, int64_t size, bool sync = false);
341 
342 
343 class Sim2FileSystem : public IAsyncFileSystem {
344 public:
345 	// Opens a file for asynchronous I/O
346 	virtual Future< Reference<class IAsyncFile> > open( std::string filename, int64_t flags, int64_t mode );
347 
348 	// Deletes the given file.  If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure.
349 	virtual Future< Void > deleteFile( std::string filename, bool mustBeDurable );
350 
351 	virtual Future< std::time_t > lastWriteTime( std::string filename );
352 
Sim2FileSystem()353 	Sim2FileSystem() {}
354 
~Sim2FileSystem()355 	virtual ~Sim2FileSystem() {}
356 
357 	static void newFileSystem();
358 };
359 
360 #endif
361