1 /*
2  * network.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef FLOW_OPENNETWORK_H
22 #define FLOW_OPENNETWORK_H
23 #pragma once
24 
25 #include <array>
26 #include <string>
27 #include <stdint.h>
28 #include "boost/asio.hpp"
29 #include "flow/serialize.h"
30 #include "flow/IRandom.h"
31 #include "fdbrpc/crc32c.h"
32 
33 enum {
34 	TaskMaxPriority = 1000000,
35 	TaskRunCycleFunction = 20000,
36 	TaskFlushTrace = 10500,
37 	TaskWriteSocket = 10000,
38 	TaskPollEIO = 9900,
39 	TaskDiskIOComplete = 9150,
40 	TaskLoadBalancedEndpoint = 9000,
41 	TaskReadSocket = 9000,
42 	TaskCoordinationReply = 8810,
43 	TaskCoordination = 8800,
44 	TaskFailureMonitor = 8700,
45 	TaskResolutionMetrics = 8700,
46 	TaskClusterController = 8650,
47 	TaskProxyCommitDispatcher = 8640,
48 	TaskTLogQueuingMetrics = 8620,
49 	TaskTLogPop = 8610,
50 	TaskTLogPeekReply = 8600,
51 	TaskTLogPeek = 8590,
52 	TaskTLogCommitReply = 8580,
53 	TaskTLogCommit = 8570,
54 	TaskTLogSpilledPeekReply = 8567,
55 	TaskProxyGetRawCommittedVersion = 8565,
56 	TaskProxyResolverReply = 8560,
57 	TaskProxyCommitBatcher = 8550,
58 	TaskProxyCommit = 8540,
59 	TaskTLogConfirmRunningReply = 8530,
60 	TaskTLogConfirmRunning = 8520,
61 	TaskProxyGRVTimer = 8510,
62 	TaskProxyGetConsistentReadVersion = 8500,
63 	TaskDefaultPromiseEndpoint = 8000,
64 	TaskDefaultOnMainThread = 7500,
65 	TaskDefaultDelay = 7010,
66 	TaskDefaultYield = 7000,
67 	TaskDiskRead = 5010,
68 	TaskDefaultEndpoint = 5000,
69 	TaskUnknownEndpoint = 4000,
70 	TaskMoveKeys = 3550,
71 	TaskDataDistributionLaunch = 3530,
72 	TaskRatekeeper = 3510,
73 	TaskDataDistribution = 3500,
74 	TaskDiskWrite = 3010,
75 	TaskUpdateStorage = 3000,
76 	TaskLowPriority = 2000,
77 
78 	TaskMinPriority = 1000
79 };
80 
81 class Void;
82 
83 template<class T> class Optional;
84 
85 struct IPAddress {
86 	// Represents both IPv4 and IPv6 address. For IPv4 addresses,
87 	// only the first 32bits are relevant and rest are initialized to
88 	// 0.
89 	typedef boost::asio::ip::address_v6::bytes_type IPAddressStore;
90 	static_assert(std::is_same<IPAddressStore, std::array<uint8_t, 16>>::value,
91 	              "IPAddressStore must be std::array<uint8_t, 16>");
92 
93 	IPAddress();
94 	explicit IPAddress(const IPAddressStore& v6addr);
95 	explicit IPAddress(uint32_t v4addr);
96 
isV6IPAddress97 	bool isV6() const { return isV6addr; }
isV4IPAddress98 	bool isV4() const { return !isV6addr; }
99 	bool isValid() const;
100 
IPAddressIPAddress101 	IPAddress(const IPAddress& rhs) : isV6addr(rhs.isV6addr) {
102 		if(isV6addr) {
103 			addr.v6 = rhs.addr.v6;
104 		} else {
105 			addr.v4 = rhs.addr.v4;
106 		}
107 	}
108 
109 	IPAddress& operator=(const IPAddress& rhs) {
110 		isV6addr = rhs.isV6addr;
111 		if(isV6addr) {
112 			addr.v6 = rhs.addr.v6;
113 		} else {
114 			addr.v4 = rhs.addr.v4;
115 		}
116 		return *this;
117 	}
118 
119 	// Returns raw v4/v6 representation of address. Caller is responsible
120 	// to call these functions safely.
toV4IPAddress121 	uint32_t toV4() const { return addr.v4; }
toV6IPAddress122 	const IPAddressStore& toV6() const { return addr.v6; }
123 
124 	std::string toString() const;
125 	static Optional<IPAddress> parse(std::string str);
126 
127 	bool operator==(const IPAddress& addr) const;
128 	bool operator!=(const IPAddress& addr) const;
129 	bool operator<(const IPAddress& addr) const;
130 
131 	template <class Ar>
serializeIPAddress132 	void serialize(Ar& ar) {
133 		serializer(ar, isV6addr);
134 		if(isV6addr) {
135 			serializer(ar, addr.v6);
136 		} else {
137 			serializer(ar, addr.v4);
138 		}
139 	}
140 
141 private:
142 	bool isV6addr;
143 	union {
144 		uint32_t v4;
145 		IPAddressStore v6;
146 	} addr;
147 };
148 
149 template<>
150 struct Traceable<IPAddress> : std::true_type {
151 	static std::string toString(const IPAddress& value) {
152 		return value.toString();
153 	}
154 };
155 
156 struct NetworkAddress {
157 	// A NetworkAddress identifies a particular running server (i.e. a TCP endpoint).
158 	IPAddress ip;
159 	uint16_t port;
160 	uint16_t flags;
161 
162 	enum { FLAG_PRIVATE = 1, FLAG_TLS = 2 };
163 
164 	NetworkAddress() : ip(IPAddress(0)), port(0), flags(FLAG_PRIVATE) {}
165 	NetworkAddress(const IPAddress& address, uint16_t port, bool isPublic, bool isTLS)
166 	  : ip(address), port(port), flags((isPublic ? 0 : FLAG_PRIVATE) | (isTLS ? FLAG_TLS : 0)) {}
167 	NetworkAddress(uint32_t ip, uint16_t port, bool isPublic, bool isTLS)
168 	  : NetworkAddress(IPAddress(ip), port, isPublic, isTLS) {}
169 
170 	NetworkAddress(uint32_t ip, uint16_t port) : NetworkAddress(ip, port, false, false) {}
171 	NetworkAddress(const IPAddress& ip, uint16_t port) : NetworkAddress(ip, port, false, false) {}
172 
173 	bool operator==(NetworkAddress const& r) const { return ip == r.ip && port == r.port && flags == r.flags; }
174 	bool operator!=(NetworkAddress const& r) const { return ip != r.ip || port != r.port || flags != r.flags; }
175 	bool operator<(NetworkAddress const& r) const {
176 		if (flags != r.flags)
177 			return flags < r.flags;
178 		else if (ip != r.ip)
179 			return ip < r.ip;
180 		return port < r.port;
181 	}
182 
183 	bool isValid() const { return ip.isValid() || port != 0; }
184 	bool isPublic() const { return !(flags & FLAG_PRIVATE); }
185 	bool isTLS() const { return (flags & FLAG_TLS) != 0; }
186 	bool isV6() const { return ip.isV6(); }
187 
188 	static NetworkAddress parse( std::string const& );
189 	static std::vector<NetworkAddress> parseList( std::string const& );
190 	std::string toString() const;
191 
192 	template <class Ar>
193 	void serialize(Ar& ar) {
194 		if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) {
195 			uint32_t ipV4;
196 			serializer(ar, ipV4, port, flags);
197 			ip = IPAddress(ipV4);
198 		} else {
199 			serializer(ar, ip, port, flags);
200 		}
201 	}
202 };
203 
204 template<>
205 struct Traceable<NetworkAddress> : std::true_type {
206 	static std::string toString(const NetworkAddress& value) {
207 		return value.toString();
208 	}
209 };
210 
211 namespace std
212 {
213 	template <>
214 	struct hash<NetworkAddress>
215 	{
216 		size_t operator()(const NetworkAddress& na) const
217 		{
218 			size_t result = 0;
219 			if (na.ip.isV6()) {
220 				uint16_t* ptr = (uint16_t*)na.ip.toV6().data();
221 				result = ((size_t)ptr[5] << 32) | ((size_t)ptr[6] << 16) | ptr[7];
222 			} else {
223 				result = na.ip.toV4();
224 			}
225 			return (result << 16) + na.port;
226 		}
227 	};
228 }
229 
230 struct NetworkAddressList {
231 	NetworkAddress address;
232 	Optional<NetworkAddress> secondaryAddress;
233 
234 	bool operator==(NetworkAddressList const& r) const { return address == r.address && secondaryAddress == r.secondaryAddress; }
235 	bool operator!=(NetworkAddressList const& r) const { return address != r.address || secondaryAddress != r.secondaryAddress; }
236 	bool operator<(NetworkAddressList const& r) const {
237 		if (address != r.address)
238 			return address < r.address;
239 		return secondaryAddress < r.secondaryAddress;
240 	}
241 
242 	std::string toString() const {
243 		if(!secondaryAddress.present()) {
244 			return address.toString();
245 		}
246 		return address.toString() + ", " + secondaryAddress.get().toString();
247 	}
248 
249 	template <class Ar>
250 	void serialize(Ar& ar) {
251 		serializer(ar, address, secondaryAddress);
252 	}
253 };
254 
255 std::string toIPVectorString(std::vector<uint32_t> ips);
256 std::string toIPVectorString(const std::vector<IPAddress>& ips);
257 std::string formatIpPort(const IPAddress& ip, uint16_t port);
258 
259 template <class T> class Future;
260 template <class T> class Promise;
261 
262 struct NetworkMetrics {
263 	enum { SLOW_EVENT_BINS = 16 };
264 	uint64_t countSlowEvents[SLOW_EVENT_BINS];
265 
266 	enum { PRIORITY_BINS = 9 };
267 	int priorityBins[ PRIORITY_BINS ];
268 	double secSquaredPriorityBlocked[PRIORITY_BINS];
269 
270 	double oldestAlternativesFailure;
271 	double newestAlternativesFailure;
272 	double lastSync;
273 
274 	double secSquaredSubmit;
275 	double secSquaredDiskStall;
276 
277 	NetworkMetrics() { memset(this, 0, sizeof(*this)); }
278 };
279 
280 class IEventFD : public ReferenceCounted<IEventFD> {
281 public:
282 	virtual ~IEventFD() {}
283 	virtual int getFD() = 0;
284 	virtual Future<int64_t> read() = 0;
285 };
286 
287 class IConnection {
288 public:
289 	// IConnection is reference-counted (use Reference<IConnection>), but the caller must explicitly call close()
290 	virtual void addref() = 0;
291 	virtual void delref() = 0;
292 
293 	// Closes the underlying connection eventually if it is not already closed.
294 	virtual void close() = 0;
295 
296 	// returns when write() can write at least one byte (or may throw an error if the connection dies)
297 	virtual Future<Void> onWritable() = 0;
298 
299 	// returns when read() can read at least one byte (or may throw an error if the connection dies)
300 	virtual Future<Void> onReadable() = 0;
301 
302 	// Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0)
303 	// (or may throw an error if the connection dies)
304 	virtual int read( uint8_t* begin, uint8_t* end ) = 0;
305 
306 	// Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of bytes written (might be 0)
307 	// (or may throw an error if the connection dies)
308 	// The SendBuffer chain cannot be empty, and the limit must be positive.
309 	// Important non-obvious behavior:  The caller is committing to write the contents of the buffer chain up to the limit.  If all of those bytes could
310 	// not be sent in this call to write() then further calls must be made to write the remainder.  An IConnection implementation can make decisions
311 	// based on the entire byte set that the caller was attempting to write even if it is unable to write all of it immediately.
312 	// Due to limitations of TLSConnection, callers must also avoid reallocations that reduce the amount of written data in the first buffer in the chain.
313 	virtual int write( SendBuffer const* buffer, int limit = std::numeric_limits<int>::max()) = 0;
314 
315 	// Returns the network address and port of the other end of the connection.  In the case of an incoming connection, this may not
316 	// be an address we can connect to!
317 	virtual NetworkAddress getPeerAddress() = 0;
318 
319 	virtual UID getDebugID() = 0;
320 };
321 
322 class IListener {
323 public:
324 	virtual void addref() = 0;
325 	virtual void delref() = 0;
326 
327 	// Returns one incoming connection when it is available.  Do not cancel unless you are done with the listener!
328 	virtual Future<Reference<IConnection>> accept() = 0;
329 
330 	virtual NetworkAddress getListenAddress() = 0;
331 };
332 
333 typedef void*	flowGlobalType;
334 typedef NetworkAddress (*NetworkAddressFuncPtr)();
335 typedef NetworkAddressList (*NetworkAddressesFuncPtr)();
336 
337 class INetwork;
338 extern INetwork* g_network;
339 extern INetwork* newNet2(bool useThreadPool = false, bool useMetrics = false);
340 
341 class INetwork {
342 public:
343 	// This interface abstracts the physical or simulated network, event loop and hardware that FoundationDB is running on.
344 	// Note that there are tools for disk access, scheduling, etc as well as networking, and that almost all access
345 	//   to the network should be through FlowTransport, not directly through these low level interfaces!
346 
347 	enum enumGlobal {
348 		enFailureMonitor = 0, enFlowTransport = 1, enTDMetrics = 2, enNetworkConnections = 3,
349 		enNetworkAddressFunc = 4, enFileSystem = 5, enASIOService = 6, enEventFD = 7, enRunCycleFunc = 8, enASIOTimedOut = 9, enBlobCredentialFiles = 10,
350 		enNetworkAddressesFunc = 11
351 	};
352 
353 	virtual void longTaskCheck( const char* name ) {}
354 
355 	virtual double now() = 0;
356 	// Provides a clock that advances at a similar rate on all connected endpoints
357 	// FIXME: Return a fixed point Time class
358 
359 	virtual Future<class Void> delay( double seconds, int taskID ) = 0;
360 	// The given future will be set after seconds have elapsed
361 
362 	virtual Future<class Void> yield( int taskID ) = 0;
363 	// The given future will be set immediately or after higher-priority tasks have executed
364 
365 	virtual bool check_yield( int taskID ) = 0;
366 	// Returns true if a call to yield would result in a delay
367 
368 	virtual int getCurrentTask() = 0;
369 	// Gets the taskID/priority of the current task
370 
371 	virtual void setCurrentTask(int taskID ) = 0;
372 	// Sets the taskID/priority of the current task, without yielding
373 
374 	virtual flowGlobalType global(int id) = 0;
375 	virtual void setGlobal(size_t id, flowGlobalType v) = 0;
376 
377 	virtual void stop() = 0;
378 	// Terminate the program
379 
380 	virtual bool isSimulated() const = 0;
381 	// Returns true if this network is a local simulation
382 
383 	virtual void onMainThread( Promise<Void>&& signal, int taskID ) = 0;
384 	// Executes signal.send(Void()) on a/the thread belonging to this network
385 
386 	virtual THREAD_HANDLE startThread( THREAD_FUNC_RETURN (*func) (void *), void *arg) = 0;
387 	// Starts a thread and returns a handle to it
388 
389 	virtual void run() = 0;
390 	// Devotes this thread to running the network (generally until stop())
391 
392 	virtual void initMetrics() {}
393 	// Metrics must be initialized after FlowTransport::createInstance has been called
394 
395 	virtual void getDiskBytes( std::string const& directory, int64_t& free, int64_t& total) = 0;
396 	//Gets the number of free and total bytes available on the disk which contains directory
397 
398 	virtual bool isAddressOnThisHost( NetworkAddress const& addr ) = 0;
399 	// Returns true if it is reasonably certain that a connection to the given address would be a fast loopback connection
400 
401 	// Shorthand for transport().getLocalAddress()
402 	static NetworkAddress getLocalAddress()
403 	{
404 		flowGlobalType netAddressFuncPtr = reinterpret_cast<flowGlobalType>(g_network->global(INetwork::enNetworkAddressFunc));
405 		return (netAddressFuncPtr) ? reinterpret_cast<NetworkAddressFuncPtr>(netAddressFuncPtr)() : NetworkAddress();
406 	}
407 
408 	// Shorthand for transport().getLocalAddresses()
409 	static NetworkAddressList getLocalAddresses()
410 	{
411 		flowGlobalType netAddressesFuncPtr = reinterpret_cast<flowGlobalType>(g_network->global(INetwork::enNetworkAddressesFunc));
412 		return (netAddressesFuncPtr) ? reinterpret_cast<NetworkAddressesFuncPtr>(netAddressesFuncPtr)() : NetworkAddressList();
413 	}
414 
415 	NetworkMetrics networkMetrics;
416 protected:
417 	INetwork() {}
418 
419 	~INetwork() {}	// Please don't try to delete through this interface!
420 };
421 
422 class INetworkConnections {
423 public:
424 	// Methods for making and accepting network connections.  Logically this is part of the INetwork abstraction
425 	// that abstracts all interaction with the physical world; it is separated out to make it easy for e.g. transport
426 	// security to override only these operations without having to delegate everything in INetwork.
427 
428 	// Make an outgoing connection to the given address.  May return an error or block indefinitely in case of connection problems!
429 	virtual Future<Reference<IConnection>> connect( NetworkAddress toAddr, std::string host = "") = 0;
430 
431 	// Resolve host name and service name (such as "http" or can be a plain number like "80") to a list of 1 or more NetworkAddresses
432 	virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint( std::string host, std::string service ) = 0;
433 
434 	// Convenience function to resolve host/service and connect to one of its NetworkAddresses randomly
435 	// useTLS has to be a parameter here because it is passed to connect() as part of the toAddr object.
436 	virtual Future<Reference<IConnection>> connect( std::string host, std::string service, bool useTLS = false);
437 
438 	// Listen for connections on the given local address
439 	virtual Reference<IListener> listen( NetworkAddress localAddr ) = 0;
440 
441 	static INetworkConnections* net() { return static_cast<INetworkConnections*>((void*) g_network->global(INetwork::enNetworkConnections)); }
442 	// Returns the interface that should be used to make and accept socket connections
443 };
444 
445 #endif
446