1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * This source code is licensed under the MIT license found in the 5 * LICENSE file in the root directory of this source tree. 6 * 7 */ 8 9 #pragma once 10 11 #include <condition_variable> 12 #include <memory> 13 #include <vector> 14 15 #include <folly/ThreadLocal.h> 16 #include <folly/container/F14Map.h> 17 #include <folly/io/SocketOptionMap.h> 18 #include <folly/io/async/ScopedEventBaseThread.h> 19 20 #include <quic/QuicConstants.h> 21 #include <quic/codec/ConnectionIdAlgo.h> 22 #include <quic/congestion_control/ServerCongestionControllerFactory.h> 23 #include <quic/server/QuicServerTransportFactory.h> 24 #include <quic/server/QuicServerWorker.h> 25 #include <quic/server/QuicUDPSocketFactory.h> 26 #include <quic/state/QuicConnectionStats.h> 27 #include <quic/state/QuicTransportStatsCallback.h> 28 29 namespace quic { 30 31 class QuicServer : public QuicServerWorker::WorkerCallback, 32 public std::enable_shared_from_this<QuicServer> { 33 public: 34 using TransportSettingsOverrideFn = 35 std::function<folly::Optional<quic::TransportSettings>( 36 const quic::TransportSettings&, 37 const folly::IPAddress&)>; 38 createQuicServer()39 static std::shared_ptr<QuicServer> createQuicServer() { 40 return std::shared_ptr<QuicServer>(new QuicServer()); 41 } 42 43 virtual ~QuicServer(); 44 45 // Initialize and start the quic server where the quic server manages 46 // the eventbases for workers 47 void start(const folly::SocketAddress& address, size_t maxWorkers); 48 49 // Initialize quic server worker per evb. 50 void initialize( 51 const folly::SocketAddress& address, 52 const std::vector<folly::EventBase*>& evbs, 53 bool useDefaultTransport = false); 54 55 /** 56 * start reading from sockets 57 */ 58 void start(); 59 60 /* 61 * Pause reading from the listening socket the server workers are bound to 62 */ 63 void pauseRead(); 64 65 /* 66 * Take in a function to supply overrides for transport parameters, given 67 * the client address as input. This can be useful if we are running 68 * experiments. 69 */ 70 void setTransportSettingsOverrideFn(TransportSettingsOverrideFn fn); 71 72 /* 73 * Transport factory to create server-transport. 74 * QuicServer calls 'make()' on the supplied transport factory for *each* new 75 * connection. 76 * This is useful to do proper set-up on the callers side for each new 77 * established connection, such as transport settings and setup sessions. 78 */ 79 void setQuicServerTransportFactory( 80 std::unique_ptr<QuicServerTransportFactory> factory); 81 82 /* 83 * The socket factory used to create sockets for client connections. These 84 * will end up backing QuicServerTransports and managing per connection state. 85 */ 86 void setQuicUDPSocketFactory(std::unique_ptr<QuicUDPSocketFactory> factory); 87 88 /* 89 * The socket factory used to create acceptor sockets. The sockets created 90 * from this factory will listen for udp packets and create new connections 91 * via the factory specified in setQuicUDPSocketFactory. 92 */ 93 void setListenerSocketFactory(std::unique_ptr<QuicUDPSocketFactory> factory); 94 95 /** 96 * Set factory to create specific congestion controller instances 97 * for a given connection 98 * This must be set before the server is started. 99 */ 100 void setCongestionControllerFactory( 101 std::shared_ptr<CongestionControllerFactory> ccFactory); 102 103 void setRateLimit( 104 std::function<uint64_t()> count, 105 std::chrono::seconds window); 106 107 /** 108 * Set list of supported QUICVersion for this server. These versions will be 109 * used during the 'Version-Negotiation' phase with the client. 110 */ 111 void setSupportedVersion(const std::vector<QuicVersion>& versions); 112 113 /** 114 * A token to use for health checking VIPs. When a UDP packet is sent to the 115 * server with the exact contents of the health check token, the server will 116 * respond with an "OK". 117 */ 118 void setHealthCheckToken(const std::string& healthCheckToken); 119 120 /** 121 * Set server TLS context. 122 */ 123 void setFizzContext( 124 std::shared_ptr<const fizz::server::FizzServerContext> ctx); 125 126 /** 127 * Set server TLS context for a worker associated with the given eventbase. 128 */ 129 void setFizzContext( 130 folly::EventBase* evb, 131 std::shared_ptr<const fizz::server::FizzServerContext> ctx); 132 133 /** 134 * Set socket options for the underlying socket. 135 * Options are being set before and after bind, and not at the time of 136 * invoking this function. 137 */ setSocketOptions(const folly::SocketOptionMap & options)138 void setSocketOptions(const folly::SocketOptionMap& options) noexcept { 139 socketOptions_ = options; 140 } 141 142 /** 143 * Sets whether the underlying socket should set the IPV6_ONLY socket option 144 * or not. If set to false, IPv4-mapped IPv6 addresses will be enabled on the 145 * socket. 146 */ setBindV6Only(bool bindV6Only)147 void setBindV6Only(bool bindV6Only) { 148 bindOptions_.bindV6Only = bindV6Only; 149 } 150 151 /** 152 * Set the server id of the quic server. 153 * Note that this function must be called before initialize(..) 154 */ 155 void setProcessId(ProcessId id) noexcept; 156 157 ProcessId getProcessId() const noexcept; 158 159 /** 160 * Set the id of the host where this server is running. 161 * It is used to make routing decision by setting this id in the ConnectionId 162 */ 163 void setHostId(uint32_t hostId) noexcept; 164 165 /** 166 * Set version of connection ID used by quic server. 167 * Note that this function must be called before initialize(..) 168 */ 169 void setConnectionIdVersion(ConnectionIdVersion cidVersion) noexcept; 170 171 /** 172 * Get transport settings. 173 */ 174 const TransportSettings& getTransportSettings() const noexcept; 175 176 /** 177 * Set initial flow control settings for the connection. 178 */ 179 void setTransportSettings(TransportSettings transportSettings); 180 181 /** 182 * If the calling application wants to use CCP for CC, it's the 183 * app's responsibility to start an instance of CCP -- this ID 184 * refers to that unique instance of CCP so we (QuicServer) know 185 * how to connect to it. 186 */ 187 void setCcpId(uint64_t ccpId); 188 189 /** 190 * Tells the server to start rejecting any new connection. The parameter 191 * function is stored and evaluated on each new connection before being 192 * accepted. 193 */ 194 void rejectNewConnections(std::function<bool()> rejectFn); 195 196 /** 197 * Tells the server to begin rejecting any new connections with block listed 198 * source ports. Like rejectNewConnections above, the parameter function is 199 * stored and evaluated on each new connection before being accepted. 200 */ 201 void blockListedSrcPort(std::function<bool(uint16_t)> isBlockListedSrcPort); 202 203 /** 204 * Returns listening address of this server 205 */ 206 const folly::SocketAddress& getAddress() const; 207 208 /** 209 * Returns true iff the server is fully initialized 210 */ 211 bool isInitialized() const noexcept; 212 213 /** 214 * Shutdown the sever (and all the workers) 215 */ 216 void shutdown(LocalErrorCode error = LocalErrorCode::SHUTTING_DOWN); 217 218 /** 219 * Returns true if the server has begun the termination process or if it has 220 * not been initialized 221 */ 222 bool hasShutdown() const noexcept; 223 224 /** 225 * Blocks the calling thread until isInitialized() is true 226 */ 227 void waitUntilInitialized(); 228 229 void handleWorkerError(LocalErrorCode error) override; 230 231 /** 232 * Routes the given data for the given client to the correct worker that may 233 * have the state for the connection associated with the given data and client 234 */ 235 void routeDataToWorker( 236 const folly::SocketAddress& client, 237 RoutingData&& routingData, 238 NetworkData&& networkData, 239 folly::Optional<QuicVersion> quicVersion, 240 bool isForwardedData = false) override; 241 242 /** 243 * Set the transport factory for the worker associated with the given 244 * eventbase. 245 * This is relevant if the QuicServer is initialized with the vector of 246 * event-bases supplied by the caller. 247 * Typically, this is useful when the server is already running fixed pool of 248 * thread ('workers'), and want to run QuicServer within those workers. 249 * In such scenario, the supplied factory's make() will be called (lock-free) 250 * upon each new connection establishment within each worker. 251 */ 252 void addTransportFactory( 253 folly::EventBase*, 254 QuicServerTransportFactory* acceptor); 255 256 /** 257 * Initialize necessary steps to enable being taken over of this server by 258 * another server, such as binding to a local port so that once another 259 * process starts to takeover the port this server is listening to, the other 260 * server can forward packets belonging to this server 261 * Note that this method cannot be called on a worker's thread. 262 * Note that this should also be called after initialize(..), 263 * calling this before initialize is undefined. 264 */ 265 void allowBeingTakenOver(const folly::SocketAddress& addr); 266 267 folly::SocketAddress overrideTakeoverHandlerAddress( 268 const folly::SocketAddress& addr); 269 270 /* 271 * Setup and initialize the listening socket of the old server from the given 272 * address to forward misrouted packets belonging to that server during 273 * the takeover process 274 */ 275 void startPacketForwarding(const folly::SocketAddress& destAddr); 276 277 /* 278 * Disable packet forwarding, even if the packet has no connection id 279 * associated with it after the 'delayMS' milliseconds 280 */ 281 void stopPacketForwarding(std::chrono::milliseconds delay); 282 283 /** 284 * Set takenover socket fds for the quic server from another process. 285 * Quic server calls ::dup for each fd and will not bind to the address for 286 * all the valid fds (i.e. not -1) in the given vector 287 * NOTE: it must be called before calling 'start()' 288 */ 289 void setListeningFDs(const std::vector<int>& fds); 290 291 /* 292 * Returns the File Descriptor of the listening socket for this server. 293 */ 294 int getListeningSocketFD() const; 295 296 /* 297 * Returns all the File Descriptor of the listening sockets for each 298 * worker for this server. 299 */ 300 std::vector<int> getAllListeningSocketFDs() const noexcept; 301 302 /* 303 * Once this server is notified that another server has initiated the takeover 304 * it opens a new communication channel so that new server can forward 305 * misrouted packets to this server. 306 * This method returns the File Descriptor of a local port that this server 307 * is listening to. 308 */ 309 int getTakeoverHandlerSocketFD() const; 310 311 TakeoverProtocolVersion getTakeoverProtocolVersion() const noexcept; 312 313 /** 314 * Factory to create per worker callback for various transport stats (such as 315 * packet received, dropped etc). QuicServer calls 'make' during the 316 * initialization _for each worker_. 317 * Also, 'make' is called from the worker's eventbase. 318 * 319 * NOTE: Since the callback is invoked very frequently and per thread, 320 * it is important that the implementation of QuicTransportStatsCallback is 321 * efficient. 322 * NOTE: Quic does not synchronize across threads before calling 323 * callbacks for various stats. 324 */ 325 void setTransportStatsCallbackFactory( 326 std::unique_ptr<QuicTransportStatsCallbackFactory> statsFactory); 327 328 /** 329 * Factory to create per worker ConnectionIdAlgo instance 330 * NOTE: it must be set before calling 'start()' or 'initialize(..)' 331 */ 332 void setConnectionIdAlgoFactory( 333 std::unique_ptr<ConnectionIdAlgoFactory> connIdAlgoFactory); 334 335 /** 336 * Returns vector of running eventbases. 337 * This is useful if QuicServer is initialized with a 'default' mode by just 338 * specifying number of workers. 339 */ 340 std::vector<folly::EventBase*> getWorkerEvbs() const noexcept; 341 342 /** 343 * Adds observer for accept events. 344 * 345 * Adds for the worker associated with the given EventBase. This is relevant 346 * if the QuicServer is initialized with a vector of EventBase supplied by 347 * the caller. With this approach, each worker thread can (but is not 348 * required to) have its own observer, removing the need for the observer 349 * implementation to be thread safe. 350 * 351 * Can be used to install socket observers and instrumentation without 352 * changing / interfering with application-specific acceptor logic. 353 * 354 * See AcceptObserver class for details. 355 * 356 * @param evb Worker EventBase for which we want to add observer. 357 * @param observer Observer to add (implements AcceptObserver). 358 * @return Whether worker found and observer added. 359 */ 360 bool addAcceptObserver(folly::EventBase* evb, AcceptObserver* observer); 361 362 /** 363 * Remove observer for accept events. 364 * 365 * Removes for the worker associated with the given EventBase. 366 * 367 * @param evb Worker EventBase for which we want to remove observer. 368 * @param observer Observer to remove. 369 * @return Whether worker + observer found and observer removed. 370 */ 371 bool removeAcceptObserver(folly::EventBase* evb, AcceptObserver* observer); 372 373 void getAllConnectionsStats(std::vector<QuicConnectionStats>& stats); 374 375 private: 376 QuicServer(); 377 378 static std::unique_ptr<folly::EventBaseBackendBase> getEventBaseBackend(); 379 380 // helper function to initialize workers 381 void initializeWorkers( 382 const std::vector<folly::EventBase*>& evbs, 383 bool useDefaultTransport); 384 385 std::unique_ptr<QuicServerWorker> newWorkerWithoutSocket(); 386 387 // helper method to run the given function in all worker asynchronously 388 void runOnAllWorkers(const std::function<void(QuicServerWorker*)>& func); 389 390 // helper method to run the given function in all worker synchronously 391 void runOnAllWorkersSync(const std::function<void(QuicServerWorker*)>& func); 392 393 void bindWorkersToSocket( 394 const folly::SocketAddress& address, 395 const std::vector<folly::EventBase*>& evbs); 396 397 std::vector<QuicVersion> supportedVersions_{ 398 {QuicVersion::MVFST, 399 QuicVersion::MVFST_EXPERIMENTAL, 400 QuicVersion::MVFST_ALIAS, 401 QuicVersion::QUIC_V1, 402 QuicVersion::QUIC_DRAFT, 403 QuicVersion::QUIC_DRAFT_LEGACY}}; 404 405 bool isUsingCCP(); 406 407 std::atomic<bool> shutdown_{true}; 408 std::shared_ptr<const fizz::server::FizzServerContext> ctx_; 409 TransportSettings transportSettings_; 410 std::mutex startMutex_; 411 std::atomic<bool> initialized_{false}; 412 std::atomic<bool> workersInitialized_{false}; 413 std::condition_variable startCv_; 414 std::atomic<bool> takeoverHandlerInitialized_{false}; 415 std::vector<std::unique_ptr<folly::ScopedEventBaseThread>> workerEvbs_; 416 417 std::vector<std::unique_ptr<QuicServerWorker>> workers_; 418 // Thread local pointer to QuicServerWorker. This is useful to avoid 419 // looking up the worker to route to. 420 // NOTE: QuicServer still maintains ownership of all the workers and manages 421 // their destruction 422 folly::ThreadLocalPtr<QuicServerWorker> workerPtr_; 423 folly::F14FastMap<folly::EventBase*, QuicServerWorker*> evbToWorkers_; 424 std::unique_ptr<QuicServerTransportFactory> transportFactory_; 425 folly::F14FastMap<folly::EventBase*, QuicServerTransportFactory*> 426 evbToAcceptors_; 427 // factory used for workers to create their listening / bound sockets 428 std::unique_ptr<QuicUDPSocketFactory> listenerSocketFactory_; 429 // factory used by workers to create sockets for connection transports 430 std::unique_ptr<QuicUDPSocketFactory> socketFactory_; 431 // factory used to create specific instance of Congestion control algorithm 432 std::shared_ptr<CongestionControllerFactory> ccFactory_; 433 434 std::shared_ptr<folly::EventBaseObserver> evbObserver_; 435 folly::Optional<std::string> healthCheckToken_; 436 // vector of all the listening fds on each quic server worker 437 std::vector<int> listeningFDs_; 438 ProcessId processId_{ProcessId::ZERO}; 439 uint32_t hostId_{0}; 440 ConnectionIdVersion cidVersion_{ConnectionIdVersion::V1}; 441 std::function<bool()> rejectNewConnections_{[]() { return false; }}; 442 std::function<bool(uint16_t)> isBlockListedSrcPort_{ 443 [](uint16_t) { return false; }}; 444 // factory to create per worker QuicTransportStatsCallback 445 std::unique_ptr<QuicTransportStatsCallbackFactory> transportStatsFactory_; 446 // factory to create per worker ConnectionIdAlgo 447 std::unique_ptr<ConnectionIdAlgoFactory> connIdAlgoFactory_; 448 // Impl of ConnectionIdAlgo to make routing decisions from ConnectionId 449 std::unique_ptr<ConnectionIdAlgo> connIdAlgo_; 450 // Used to override certain transport parameters, given the client address 451 TransportSettingsOverrideFn transportSettingsOverrideFn_; 452 // address that the server is bound to 453 folly::SocketAddress boundAddress_; 454 folly::SocketOptionMap socketOptions_; 455 // Rate limits 456 struct RateLimit { RateLimitRateLimit457 RateLimit(std::function<uint64_t()> c, std::chrono::seconds w) 458 : count(std::move(c)), window(w) {} 459 std::function<uint64_t()> count; 460 std::chrono::seconds window; 461 }; 462 folly::Optional<RateLimit> rateLimit_; 463 464 // Options to AsyncUDPSocket::bind, only controls IPV6_ONLY currently. 465 folly::AsyncUDPSocket::BindOptions bindOptions_; 466 467 #ifdef CCP_ENABLED 468 std::unique_ptr<folly::ScopedEventBaseThread> ccpEvb_; 469 #endif 470 // Random number to uniquely identify this instance of quic to ccp 471 // in case there are multiple concurrent instances (e.g. when proxygen is 472 // migrating connections and there are two concurrent instances of proxygen) 473 uint64_t ccpId_{0}; 474 }; 475 476 } // namespace quic 477