1 /* 2 * network.h 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #ifndef FLOW_OPENNETWORK_H 22 #define FLOW_OPENNETWORK_H 23 #pragma once 24 25 #include <array> 26 #include <string> 27 #include <stdint.h> 28 #include "boost/asio.hpp" 29 #include "flow/serialize.h" 30 #include "flow/IRandom.h" 31 #include "fdbrpc/crc32c.h" 32 33 enum { 34 TaskMaxPriority = 1000000, 35 TaskRunCycleFunction = 20000, 36 TaskFlushTrace = 10500, 37 TaskWriteSocket = 10000, 38 TaskPollEIO = 9900, 39 TaskDiskIOComplete = 9150, 40 TaskLoadBalancedEndpoint = 9000, 41 TaskReadSocket = 9000, 42 TaskCoordinationReply = 8810, 43 TaskCoordination = 8800, 44 TaskFailureMonitor = 8700, 45 TaskResolutionMetrics = 8700, 46 TaskClusterController = 8650, 47 TaskProxyCommitDispatcher = 8640, 48 TaskTLogQueuingMetrics = 8620, 49 TaskTLogPop = 8610, 50 TaskTLogPeekReply = 8600, 51 TaskTLogPeek = 8590, 52 TaskTLogCommitReply = 8580, 53 TaskTLogCommit = 8570, 54 TaskTLogSpilledPeekReply = 8567, 55 TaskProxyGetRawCommittedVersion = 8565, 56 TaskProxyResolverReply = 8560, 57 TaskProxyCommitBatcher = 8550, 58 TaskProxyCommit = 8540, 59 TaskTLogConfirmRunningReply = 8530, 60 TaskTLogConfirmRunning = 8520, 61 TaskProxyGRVTimer = 8510, 62 TaskProxyGetConsistentReadVersion = 8500, 63 TaskDefaultPromiseEndpoint = 8000, 64 TaskDefaultOnMainThread = 7500, 65 TaskDefaultDelay = 7010, 66 TaskDefaultYield = 7000, 67 TaskDiskRead = 5010, 68 TaskDefaultEndpoint = 5000, 69 TaskUnknownEndpoint = 4000, 70 TaskMoveKeys = 3550, 71 TaskDataDistributionLaunch = 3530, 72 TaskRatekeeper = 3510, 73 TaskDataDistribution = 3500, 74 TaskDiskWrite = 3010, 75 TaskUpdateStorage = 3000, 76 TaskLowPriority = 2000, 77 78 TaskMinPriority = 1000 79 }; 80 81 class Void; 82 83 template<class T> class Optional; 84 85 struct IPAddress { 86 // Represents both IPv4 and IPv6 address. For IPv4 addresses, 87 // only the first 32bits are relevant and rest are initialized to 88 // 0. 89 typedef boost::asio::ip::address_v6::bytes_type IPAddressStore; 90 static_assert(std::is_same<IPAddressStore, std::array<uint8_t, 16>>::value, 91 "IPAddressStore must be std::array<uint8_t, 16>"); 92 93 IPAddress(); 94 explicit IPAddress(const IPAddressStore& v6addr); 95 explicit IPAddress(uint32_t v4addr); 96 isV6IPAddress97 bool isV6() const { return isV6addr; } isV4IPAddress98 bool isV4() const { return !isV6addr; } 99 bool isValid() const; 100 IPAddressIPAddress101 IPAddress(const IPAddress& rhs) : isV6addr(rhs.isV6addr) { 102 if(isV6addr) { 103 addr.v6 = rhs.addr.v6; 104 } else { 105 addr.v4 = rhs.addr.v4; 106 } 107 } 108 109 IPAddress& operator=(const IPAddress& rhs) { 110 isV6addr = rhs.isV6addr; 111 if(isV6addr) { 112 addr.v6 = rhs.addr.v6; 113 } else { 114 addr.v4 = rhs.addr.v4; 115 } 116 return *this; 117 } 118 119 // Returns raw v4/v6 representation of address. Caller is responsible 120 // to call these functions safely. toV4IPAddress121 uint32_t toV4() const { return addr.v4; } toV6IPAddress122 const IPAddressStore& toV6() const { return addr.v6; } 123 124 std::string toString() const; 125 static Optional<IPAddress> parse(std::string str); 126 127 bool operator==(const IPAddress& addr) const; 128 bool operator!=(const IPAddress& addr) const; 129 bool operator<(const IPAddress& addr) const; 130 131 template <class Ar> serializeIPAddress132 void serialize(Ar& ar) { 133 serializer(ar, isV6addr); 134 if(isV6addr) { 135 serializer(ar, addr.v6); 136 } else { 137 serializer(ar, addr.v4); 138 } 139 } 140 141 private: 142 bool isV6addr; 143 union { 144 uint32_t v4; 145 IPAddressStore v6; 146 } addr; 147 }; 148 149 template<> 150 struct Traceable<IPAddress> : std::true_type { 151 static std::string toString(const IPAddress& value) { 152 return value.toString(); 153 } 154 }; 155 156 struct NetworkAddress { 157 // A NetworkAddress identifies a particular running server (i.e. a TCP endpoint). 158 IPAddress ip; 159 uint16_t port; 160 uint16_t flags; 161 162 enum { FLAG_PRIVATE = 1, FLAG_TLS = 2 }; 163 164 NetworkAddress() : ip(IPAddress(0)), port(0), flags(FLAG_PRIVATE) {} 165 NetworkAddress(const IPAddress& address, uint16_t port, bool isPublic, bool isTLS) 166 : ip(address), port(port), flags((isPublic ? 0 : FLAG_PRIVATE) | (isTLS ? FLAG_TLS : 0)) {} 167 NetworkAddress(uint32_t ip, uint16_t port, bool isPublic, bool isTLS) 168 : NetworkAddress(IPAddress(ip), port, isPublic, isTLS) {} 169 170 NetworkAddress(uint32_t ip, uint16_t port) : NetworkAddress(ip, port, false, false) {} 171 NetworkAddress(const IPAddress& ip, uint16_t port) : NetworkAddress(ip, port, false, false) {} 172 173 bool operator==(NetworkAddress const& r) const { return ip == r.ip && port == r.port && flags == r.flags; } 174 bool operator!=(NetworkAddress const& r) const { return ip != r.ip || port != r.port || flags != r.flags; } 175 bool operator<(NetworkAddress const& r) const { 176 if (flags != r.flags) 177 return flags < r.flags; 178 else if (ip != r.ip) 179 return ip < r.ip; 180 return port < r.port; 181 } 182 183 bool isValid() const { return ip.isValid() || port != 0; } 184 bool isPublic() const { return !(flags & FLAG_PRIVATE); } 185 bool isTLS() const { return (flags & FLAG_TLS) != 0; } 186 bool isV6() const { return ip.isV6(); } 187 188 static NetworkAddress parse( std::string const& ); 189 static std::vector<NetworkAddress> parseList( std::string const& ); 190 std::string toString() const; 191 192 template <class Ar> 193 void serialize(Ar& ar) { 194 if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) { 195 uint32_t ipV4; 196 serializer(ar, ipV4, port, flags); 197 ip = IPAddress(ipV4); 198 } else { 199 serializer(ar, ip, port, flags); 200 } 201 } 202 }; 203 204 template<> 205 struct Traceable<NetworkAddress> : std::true_type { 206 static std::string toString(const NetworkAddress& value) { 207 return value.toString(); 208 } 209 }; 210 211 namespace std 212 { 213 template <> 214 struct hash<NetworkAddress> 215 { 216 size_t operator()(const NetworkAddress& na) const 217 { 218 size_t result = 0; 219 if (na.ip.isV6()) { 220 uint16_t* ptr = (uint16_t*)na.ip.toV6().data(); 221 result = ((size_t)ptr[5] << 32) | ((size_t)ptr[6] << 16) | ptr[7]; 222 } else { 223 result = na.ip.toV4(); 224 } 225 return (result << 16) + na.port; 226 } 227 }; 228 } 229 230 struct NetworkAddressList { 231 NetworkAddress address; 232 Optional<NetworkAddress> secondaryAddress; 233 234 bool operator==(NetworkAddressList const& r) const { return address == r.address && secondaryAddress == r.secondaryAddress; } 235 bool operator!=(NetworkAddressList const& r) const { return address != r.address || secondaryAddress != r.secondaryAddress; } 236 bool operator<(NetworkAddressList const& r) const { 237 if (address != r.address) 238 return address < r.address; 239 return secondaryAddress < r.secondaryAddress; 240 } 241 242 std::string toString() const { 243 if(!secondaryAddress.present()) { 244 return address.toString(); 245 } 246 return address.toString() + ", " + secondaryAddress.get().toString(); 247 } 248 249 template <class Ar> 250 void serialize(Ar& ar) { 251 serializer(ar, address, secondaryAddress); 252 } 253 }; 254 255 std::string toIPVectorString(std::vector<uint32_t> ips); 256 std::string toIPVectorString(const std::vector<IPAddress>& ips); 257 std::string formatIpPort(const IPAddress& ip, uint16_t port); 258 259 template <class T> class Future; 260 template <class T> class Promise; 261 262 struct NetworkMetrics { 263 enum { SLOW_EVENT_BINS = 16 }; 264 uint64_t countSlowEvents[SLOW_EVENT_BINS]; 265 266 enum { PRIORITY_BINS = 9 }; 267 int priorityBins[ PRIORITY_BINS ]; 268 double secSquaredPriorityBlocked[PRIORITY_BINS]; 269 270 double oldestAlternativesFailure; 271 double newestAlternativesFailure; 272 double lastSync; 273 274 double secSquaredSubmit; 275 double secSquaredDiskStall; 276 277 NetworkMetrics() { memset(this, 0, sizeof(*this)); } 278 }; 279 280 class IEventFD : public ReferenceCounted<IEventFD> { 281 public: 282 virtual ~IEventFD() {} 283 virtual int getFD() = 0; 284 virtual Future<int64_t> read() = 0; 285 }; 286 287 class IConnection { 288 public: 289 // IConnection is reference-counted (use Reference<IConnection>), but the caller must explicitly call close() 290 virtual void addref() = 0; 291 virtual void delref() = 0; 292 293 // Closes the underlying connection eventually if it is not already closed. 294 virtual void close() = 0; 295 296 // returns when write() can write at least one byte (or may throw an error if the connection dies) 297 virtual Future<Void> onWritable() = 0; 298 299 // returns when read() can read at least one byte (or may throw an error if the connection dies) 300 virtual Future<Void> onReadable() = 0; 301 302 // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0) 303 // (or may throw an error if the connection dies) 304 virtual int read( uint8_t* begin, uint8_t* end ) = 0; 305 306 // Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of bytes written (might be 0) 307 // (or may throw an error if the connection dies) 308 // The SendBuffer chain cannot be empty, and the limit must be positive. 309 // Important non-obvious behavior: The caller is committing to write the contents of the buffer chain up to the limit. If all of those bytes could 310 // not be sent in this call to write() then further calls must be made to write the remainder. An IConnection implementation can make decisions 311 // based on the entire byte set that the caller was attempting to write even if it is unable to write all of it immediately. 312 // Due to limitations of TLSConnection, callers must also avoid reallocations that reduce the amount of written data in the first buffer in the chain. 313 virtual int write( SendBuffer const* buffer, int limit = std::numeric_limits<int>::max()) = 0; 314 315 // Returns the network address and port of the other end of the connection. In the case of an incoming connection, this may not 316 // be an address we can connect to! 317 virtual NetworkAddress getPeerAddress() = 0; 318 319 virtual UID getDebugID() = 0; 320 }; 321 322 class IListener { 323 public: 324 virtual void addref() = 0; 325 virtual void delref() = 0; 326 327 // Returns one incoming connection when it is available. Do not cancel unless you are done with the listener! 328 virtual Future<Reference<IConnection>> accept() = 0; 329 330 virtual NetworkAddress getListenAddress() = 0; 331 }; 332 333 typedef void* flowGlobalType; 334 typedef NetworkAddress (*NetworkAddressFuncPtr)(); 335 typedef NetworkAddressList (*NetworkAddressesFuncPtr)(); 336 337 class INetwork; 338 extern INetwork* g_network; 339 extern INetwork* newNet2(bool useThreadPool = false, bool useMetrics = false); 340 341 class INetwork { 342 public: 343 // This interface abstracts the physical or simulated network, event loop and hardware that FoundationDB is running on. 344 // Note that there are tools for disk access, scheduling, etc as well as networking, and that almost all access 345 // to the network should be through FlowTransport, not directly through these low level interfaces! 346 347 enum enumGlobal { 348 enFailureMonitor = 0, enFlowTransport = 1, enTDMetrics = 2, enNetworkConnections = 3, 349 enNetworkAddressFunc = 4, enFileSystem = 5, enASIOService = 6, enEventFD = 7, enRunCycleFunc = 8, enASIOTimedOut = 9, enBlobCredentialFiles = 10, 350 enNetworkAddressesFunc = 11 351 }; 352 353 virtual void longTaskCheck( const char* name ) {} 354 355 virtual double now() = 0; 356 // Provides a clock that advances at a similar rate on all connected endpoints 357 // FIXME: Return a fixed point Time class 358 359 virtual Future<class Void> delay( double seconds, int taskID ) = 0; 360 // The given future will be set after seconds have elapsed 361 362 virtual Future<class Void> yield( int taskID ) = 0; 363 // The given future will be set immediately or after higher-priority tasks have executed 364 365 virtual bool check_yield( int taskID ) = 0; 366 // Returns true if a call to yield would result in a delay 367 368 virtual int getCurrentTask() = 0; 369 // Gets the taskID/priority of the current task 370 371 virtual void setCurrentTask(int taskID ) = 0; 372 // Sets the taskID/priority of the current task, without yielding 373 374 virtual flowGlobalType global(int id) = 0; 375 virtual void setGlobal(size_t id, flowGlobalType v) = 0; 376 377 virtual void stop() = 0; 378 // Terminate the program 379 380 virtual bool isSimulated() const = 0; 381 // Returns true if this network is a local simulation 382 383 virtual void onMainThread( Promise<Void>&& signal, int taskID ) = 0; 384 // Executes signal.send(Void()) on a/the thread belonging to this network 385 386 virtual THREAD_HANDLE startThread( THREAD_FUNC_RETURN (*func) (void *), void *arg) = 0; 387 // Starts a thread and returns a handle to it 388 389 virtual void run() = 0; 390 // Devotes this thread to running the network (generally until stop()) 391 392 virtual void initMetrics() {} 393 // Metrics must be initialized after FlowTransport::createInstance has been called 394 395 virtual void getDiskBytes( std::string const& directory, int64_t& free, int64_t& total) = 0; 396 //Gets the number of free and total bytes available on the disk which contains directory 397 398 virtual bool isAddressOnThisHost( NetworkAddress const& addr ) = 0; 399 // Returns true if it is reasonably certain that a connection to the given address would be a fast loopback connection 400 401 // Shorthand for transport().getLocalAddress() 402 static NetworkAddress getLocalAddress() 403 { 404 flowGlobalType netAddressFuncPtr = reinterpret_cast<flowGlobalType>(g_network->global(INetwork::enNetworkAddressFunc)); 405 return (netAddressFuncPtr) ? reinterpret_cast<NetworkAddressFuncPtr>(netAddressFuncPtr)() : NetworkAddress(); 406 } 407 408 // Shorthand for transport().getLocalAddresses() 409 static NetworkAddressList getLocalAddresses() 410 { 411 flowGlobalType netAddressesFuncPtr = reinterpret_cast<flowGlobalType>(g_network->global(INetwork::enNetworkAddressesFunc)); 412 return (netAddressesFuncPtr) ? reinterpret_cast<NetworkAddressesFuncPtr>(netAddressesFuncPtr)() : NetworkAddressList(); 413 } 414 415 NetworkMetrics networkMetrics; 416 protected: 417 INetwork() {} 418 419 ~INetwork() {} // Please don't try to delete through this interface! 420 }; 421 422 class INetworkConnections { 423 public: 424 // Methods for making and accepting network connections. Logically this is part of the INetwork abstraction 425 // that abstracts all interaction with the physical world; it is separated out to make it easy for e.g. transport 426 // security to override only these operations without having to delegate everything in INetwork. 427 428 // Make an outgoing connection to the given address. May return an error or block indefinitely in case of connection problems! 429 virtual Future<Reference<IConnection>> connect( NetworkAddress toAddr, std::string host = "") = 0; 430 431 // Resolve host name and service name (such as "http" or can be a plain number like "80") to a list of 1 or more NetworkAddresses 432 virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint( std::string host, std::string service ) = 0; 433 434 // Convenience function to resolve host/service and connect to one of its NetworkAddresses randomly 435 // useTLS has to be a parameter here because it is passed to connect() as part of the toAddr object. 436 virtual Future<Reference<IConnection>> connect( std::string host, std::string service, bool useTLS = false); 437 438 // Listen for connections on the given local address 439 virtual Reference<IListener> listen( NetworkAddress localAddr ) = 0; 440 441 static INetworkConnections* net() { return static_cast<INetworkConnections*>((void*) g_network->global(INetwork::enNetworkConnections)); } 442 // Returns the interface that should be used to make and accept socket connections 443 }; 444 445 #endif 446