1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  *
7  */
8 
9 #pragma once
10 
11 #include <condition_variable>
12 #include <memory>
13 #include <vector>
14 
15 #include <folly/ThreadLocal.h>
16 #include <folly/container/F14Map.h>
17 #include <folly/io/SocketOptionMap.h>
18 #include <folly/io/async/ScopedEventBaseThread.h>
19 
20 #include <quic/QuicConstants.h>
21 #include <quic/codec/ConnectionIdAlgo.h>
22 #include <quic/congestion_control/ServerCongestionControllerFactory.h>
23 #include <quic/server/QuicServerTransportFactory.h>
24 #include <quic/server/QuicServerWorker.h>
25 #include <quic/server/QuicUDPSocketFactory.h>
26 #include <quic/state/QuicConnectionStats.h>
27 #include <quic/state/QuicTransportStatsCallback.h>
28 
29 namespace quic {
30 
31 class QuicServer : public QuicServerWorker::WorkerCallback,
32                    public std::enable_shared_from_this<QuicServer> {
33  public:
34   using TransportSettingsOverrideFn =
35       std::function<folly::Optional<quic::TransportSettings>(
36           const quic::TransportSettings&,
37           const folly::IPAddress&)>;
38 
createQuicServer()39   static std::shared_ptr<QuicServer> createQuicServer() {
40     return std::shared_ptr<QuicServer>(new QuicServer());
41   }
42 
43   virtual ~QuicServer();
44 
45   // Initialize and start the quic server where the quic server manages
46   // the eventbases for workers
47   void start(const folly::SocketAddress& address, size_t maxWorkers);
48 
49   // Initialize quic server worker per evb.
50   void initialize(
51       const folly::SocketAddress& address,
52       const std::vector<folly::EventBase*>& evbs,
53       bool useDefaultTransport = false);
54 
55   /**
56    * start reading from sockets
57    */
58   void start();
59 
60   /*
61    * Pause reading from the listening socket the server workers are bound to
62    */
63   void pauseRead();
64 
65   /*
66    * Take in a function to supply overrides for transport parameters, given
67    * the client address as input. This can be useful if we are running
68    * experiments.
69    */
70   void setTransportSettingsOverrideFn(TransportSettingsOverrideFn fn);
71 
72   /*
73    * Transport factory to create server-transport.
74    * QuicServer calls 'make()' on the supplied transport factory for *each* new
75    * connection.
76    * This is useful to do proper set-up on the callers side for each new
77    * established connection, such as transport settings and setup sessions.
78    */
79   void setQuicServerTransportFactory(
80       std::unique_ptr<QuicServerTransportFactory> factory);
81 
82   /*
83    * The socket factory used to create sockets for client connections.  These
84    * will end up backing QuicServerTransports and managing per connection state.
85    */
86   void setQuicUDPSocketFactory(std::unique_ptr<QuicUDPSocketFactory> factory);
87 
88   /*
89    * The socket factory used to create acceptor sockets.  The sockets created
90    * from this factory will listen for udp packets and create new connections
91    * via the factory specified in setQuicUDPSocketFactory.
92    */
93   void setListenerSocketFactory(std::unique_ptr<QuicUDPSocketFactory> factory);
94 
95   /**
96    * Set factory to create specific congestion controller instances
97    * for a given connection
98    * This must be set before the server is started.
99    */
100   void setCongestionControllerFactory(
101       std::shared_ptr<CongestionControllerFactory> ccFactory);
102 
103   void setRateLimit(
104       std::function<uint64_t()> count,
105       std::chrono::seconds window);
106 
107   /**
108    * Set list of supported QUICVersion for this server. These versions will be
109    * used during the 'Version-Negotiation' phase with the client.
110    */
111   void setSupportedVersion(const std::vector<QuicVersion>& versions);
112 
113   /**
114    * A token to use for health checking VIPs. When a UDP packet is sent to the
115    * server with the exact contents of the health check token, the server will
116    * respond with an "OK".
117    */
118   void setHealthCheckToken(const std::string& healthCheckToken);
119 
120   /**
121    * Set server TLS context.
122    */
123   void setFizzContext(
124       std::shared_ptr<const fizz::server::FizzServerContext> ctx);
125 
126   /**
127    * Set server TLS context for a worker associated with the given eventbase.
128    */
129   void setFizzContext(
130       folly::EventBase* evb,
131       std::shared_ptr<const fizz::server::FizzServerContext> ctx);
132 
133   /**
134    * Set socket options for the underlying socket.
135    * Options are being set before and after bind, and not at the time of
136    * invoking this function.
137    */
setSocketOptions(const folly::SocketOptionMap & options)138   void setSocketOptions(const folly::SocketOptionMap& options) noexcept {
139     socketOptions_ = options;
140   }
141 
142   /**
143    * Sets whether the underlying socket should set the IPV6_ONLY socket option
144    * or not. If set to false, IPv4-mapped IPv6 addresses will be enabled on the
145    * socket.
146    */
setBindV6Only(bool bindV6Only)147   void setBindV6Only(bool bindV6Only) {
148     bindOptions_.bindV6Only = bindV6Only;
149   }
150 
151   /**
152    * Set the server id of the quic server.
153    * Note that this function must be called before initialize(..)
154    */
155   void setProcessId(ProcessId id) noexcept;
156 
157   ProcessId getProcessId() const noexcept;
158 
159   /**
160    * Set the id of the host where this server is running.
161    * It is used to make routing decision by setting this id in the ConnectionId
162    */
163   void setHostId(uint32_t hostId) noexcept;
164 
165   /**
166    * Set version of connection ID used by quic server.
167    * Note that this function must be called before initialize(..)
168    */
169   void setConnectionIdVersion(ConnectionIdVersion cidVersion) noexcept;
170 
171   /**
172    * Get transport settings.
173    */
174   const TransportSettings& getTransportSettings() const noexcept;
175 
176   /**
177    * Set initial flow control settings for the connection.
178    */
179   void setTransportSettings(TransportSettings transportSettings);
180 
181   /**
182    * If the calling application wants to use CCP for CC, it's the
183    * app's responsibility to start an instance of CCP -- this ID
184    * refers to that unique instance of CCP so we (QuicServer) know
185    * how to connect to it.
186    */
187   void setCcpId(uint64_t ccpId);
188 
189   /**
190    * Tells the server to start rejecting any new connection. The parameter
191    * function is stored and evaluated on each new connection before being
192    * accepted.
193    */
194   void rejectNewConnections(std::function<bool()> rejectFn);
195 
196   /**
197    * Tells the server to begin rejecting any new connections with block listed
198    * source ports. Like rejectNewConnections above, the parameter function is
199    * stored and evaluated on each new connection before being accepted.
200    */
201   void blockListedSrcPort(std::function<bool(uint16_t)> isBlockListedSrcPort);
202 
203   /**
204    * Returns listening address of this server
205    */
206   const folly::SocketAddress& getAddress() const;
207 
208   /**
209    * Returns true iff the server is fully initialized
210    */
211   bool isInitialized() const noexcept;
212 
213   /**
214    * Shutdown the sever (and all the workers)
215    */
216   void shutdown(LocalErrorCode error = LocalErrorCode::SHUTTING_DOWN);
217 
218   /**
219    * Returns true if the server has begun the termination process or if it has
220    * not been initialized
221    */
222   bool hasShutdown() const noexcept;
223 
224   /**
225    * Blocks the calling thread until isInitialized() is true
226    */
227   void waitUntilInitialized();
228 
229   void handleWorkerError(LocalErrorCode error) override;
230 
231   /**
232    * Routes the given data for the given client to the correct worker that may
233    * have the state for the connection associated with the given data and client
234    */
235   void routeDataToWorker(
236       const folly::SocketAddress& client,
237       RoutingData&& routingData,
238       NetworkData&& networkData,
239       folly::Optional<QuicVersion> quicVersion,
240       bool isForwardedData = false) override;
241 
242   /**
243    * Set the transport factory for the worker associated with the given
244    * eventbase.
245    * This is relevant if the QuicServer is initialized with the vector of
246    * event-bases supplied by the caller.
247    * Typically, this is useful when the server is already running fixed pool of
248    * thread ('workers'), and want to run QuicServer within those workers.
249    * In such scenario, the supplied factory's make() will be called (lock-free)
250    * upon each new connection establishment within each worker.
251    */
252   void addTransportFactory(
253       folly::EventBase*,
254       QuicServerTransportFactory* acceptor);
255 
256   /**
257    * Initialize necessary steps to enable being taken over of this server by
258    * another server, such as binding to a local port so that once another
259    * process starts to takeover the port this server is listening to, the other
260    * server can forward packets belonging to this server
261    * Note that this method cannot be called on a worker's thread.
262    * Note that this should also be called after initialize(..),
263    * calling this before initialize is undefined.
264    */
265   void allowBeingTakenOver(const folly::SocketAddress& addr);
266 
267   folly::SocketAddress overrideTakeoverHandlerAddress(
268       const folly::SocketAddress& addr);
269 
270   /*
271    * Setup and initialize the listening socket of the old server from the given
272    * address to forward misrouted packets belonging to that server during
273    * the takeover process
274    */
275   void startPacketForwarding(const folly::SocketAddress& destAddr);
276 
277   /*
278    * Disable packet forwarding, even if the packet has no connection id
279    * associated with it after the 'delayMS' milliseconds
280    */
281   void stopPacketForwarding(std::chrono::milliseconds delay);
282 
283   /**
284    * Set takenover socket fds for the quic server from another process.
285    * Quic server calls ::dup for each fd and will not bind to the address for
286    * all the valid fds (i.e. not -1) in the given vector
287    * NOTE: it must be called before calling 'start()'
288    */
289   void setListeningFDs(const std::vector<int>& fds);
290 
291   /*
292    * Returns the File Descriptor of the listening socket for this server.
293    */
294   int getListeningSocketFD() const;
295 
296   /*
297    * Returns all the File Descriptor of the listening sockets for each
298    * worker for this server.
299    */
300   std::vector<int> getAllListeningSocketFDs() const noexcept;
301 
302   /*
303    * Once this server is notified that another server has initiated the takeover
304    * it opens a new communication channel so that new server can forward
305    * misrouted packets to this server.
306    * This method returns the File Descriptor of a local port that this server
307    * is listening to.
308    */
309   int getTakeoverHandlerSocketFD() const;
310 
311   TakeoverProtocolVersion getTakeoverProtocolVersion() const noexcept;
312 
313   /**
314    * Factory to create per worker callback for various transport stats (such as
315    * packet received, dropped etc). QuicServer calls 'make' during the
316    * initialization _for each worker_.
317    * Also, 'make' is called from the worker's eventbase.
318    *
319    * NOTE: Since the callback is invoked very frequently and per thread,
320    * it is important that the implementation of QuicTransportStatsCallback is
321    * efficient.
322    * NOTE: Quic does not synchronize across threads before calling
323    * callbacks for various stats.
324    */
325   void setTransportStatsCallbackFactory(
326       std::unique_ptr<QuicTransportStatsCallbackFactory> statsFactory);
327 
328   /**
329    * Factory to create per worker ConnectionIdAlgo instance
330    * NOTE: it must be set before calling 'start()' or 'initialize(..)'
331    */
332   void setConnectionIdAlgoFactory(
333       std::unique_ptr<ConnectionIdAlgoFactory> connIdAlgoFactory);
334 
335   /**
336    * Returns vector of running eventbases.
337    * This is useful if QuicServer is initialized with a 'default' mode by just
338    * specifying number of workers.
339    */
340   std::vector<folly::EventBase*> getWorkerEvbs() const noexcept;
341 
342   /**
343    * Adds observer for accept events.
344    *
345    * Adds for the worker associated with the given EventBase. This is relevant
346    * if the QuicServer is initialized with a vector of EventBase supplied by
347    * the caller. With this approach, each worker thread can (but is not
348    * required to) have its own observer, removing the need for the observer
349    * implementation to be thread safe.
350    *
351    * Can be used to install socket observers and instrumentation without
352    * changing / interfering with application-specific acceptor logic.
353    *
354    * See AcceptObserver class for details.
355    *
356    * @param evb           Worker EventBase for which we want to add observer.
357    * @param observer      Observer to add (implements AcceptObserver).
358    * @return              Whether worker found and observer added.
359    */
360   bool addAcceptObserver(folly::EventBase* evb, AcceptObserver* observer);
361 
362   /**
363    * Remove observer for accept events.
364    *
365    * Removes for the worker associated with the given EventBase.
366    *
367    * @param evb           Worker EventBase for which we want to remove observer.
368    * @param observer      Observer to remove.
369    * @return              Whether worker + observer found and observer removed.
370    */
371   bool removeAcceptObserver(folly::EventBase* evb, AcceptObserver* observer);
372 
373   void getAllConnectionsStats(std::vector<QuicConnectionStats>& stats);
374 
375  private:
376   QuicServer();
377 
378   static std::unique_ptr<folly::EventBaseBackendBase> getEventBaseBackend();
379 
380   // helper function to initialize workers
381   void initializeWorkers(
382       const std::vector<folly::EventBase*>& evbs,
383       bool useDefaultTransport);
384 
385   std::unique_ptr<QuicServerWorker> newWorkerWithoutSocket();
386 
387   // helper method to run the given function in all worker asynchronously
388   void runOnAllWorkers(const std::function<void(QuicServerWorker*)>& func);
389 
390   // helper method to run the given function in all worker synchronously
391   void runOnAllWorkersSync(const std::function<void(QuicServerWorker*)>& func);
392 
393   void bindWorkersToSocket(
394       const folly::SocketAddress& address,
395       const std::vector<folly::EventBase*>& evbs);
396 
397   std::vector<QuicVersion> supportedVersions_{
398       {QuicVersion::MVFST,
399        QuicVersion::MVFST_EXPERIMENTAL,
400        QuicVersion::MVFST_ALIAS,
401        QuicVersion::QUIC_V1,
402        QuicVersion::QUIC_DRAFT,
403        QuicVersion::QUIC_DRAFT_LEGACY}};
404 
405   bool isUsingCCP();
406 
407   std::atomic<bool> shutdown_{true};
408   std::shared_ptr<const fizz::server::FizzServerContext> ctx_;
409   TransportSettings transportSettings_;
410   std::mutex startMutex_;
411   std::atomic<bool> initialized_{false};
412   std::atomic<bool> workersInitialized_{false};
413   std::condition_variable startCv_;
414   std::atomic<bool> takeoverHandlerInitialized_{false};
415   std::vector<std::unique_ptr<folly::ScopedEventBaseThread>> workerEvbs_;
416 
417   std::vector<std::unique_ptr<QuicServerWorker>> workers_;
418   // Thread local pointer to QuicServerWorker. This is useful to avoid
419   // looking up the worker to route to.
420   // NOTE: QuicServer still maintains ownership of all the workers and manages
421   // their destruction
422   folly::ThreadLocalPtr<QuicServerWorker> workerPtr_;
423   folly::F14FastMap<folly::EventBase*, QuicServerWorker*> evbToWorkers_;
424   std::unique_ptr<QuicServerTransportFactory> transportFactory_;
425   folly::F14FastMap<folly::EventBase*, QuicServerTransportFactory*>
426       evbToAcceptors_;
427   // factory used for workers to create their listening / bound sockets
428   std::unique_ptr<QuicUDPSocketFactory> listenerSocketFactory_;
429   // factory used by workers to create sockets for connection transports
430   std::unique_ptr<QuicUDPSocketFactory> socketFactory_;
431   // factory used to create specific instance of Congestion control algorithm
432   std::shared_ptr<CongestionControllerFactory> ccFactory_;
433 
434   std::shared_ptr<folly::EventBaseObserver> evbObserver_;
435   folly::Optional<std::string> healthCheckToken_;
436   // vector of all the listening fds on each quic server worker
437   std::vector<int> listeningFDs_;
438   ProcessId processId_{ProcessId::ZERO};
439   uint32_t hostId_{0};
440   ConnectionIdVersion cidVersion_{ConnectionIdVersion::V1};
441   std::function<bool()> rejectNewConnections_{[]() { return false; }};
442   std::function<bool(uint16_t)> isBlockListedSrcPort_{
443       [](uint16_t) { return false; }};
444   // factory to create per worker QuicTransportStatsCallback
445   std::unique_ptr<QuicTransportStatsCallbackFactory> transportStatsFactory_;
446   // factory to create per worker ConnectionIdAlgo
447   std::unique_ptr<ConnectionIdAlgoFactory> connIdAlgoFactory_;
448   // Impl of ConnectionIdAlgo to make routing decisions from ConnectionId
449   std::unique_ptr<ConnectionIdAlgo> connIdAlgo_;
450   // Used to override certain transport parameters, given the client address
451   TransportSettingsOverrideFn transportSettingsOverrideFn_;
452   // address that the server is bound to
453   folly::SocketAddress boundAddress_;
454   folly::SocketOptionMap socketOptions_;
455   // Rate limits
456   struct RateLimit {
RateLimitRateLimit457     RateLimit(std::function<uint64_t()> c, std::chrono::seconds w)
458         : count(std::move(c)), window(w) {}
459     std::function<uint64_t()> count;
460     std::chrono::seconds window;
461   };
462   folly::Optional<RateLimit> rateLimit_;
463 
464   // Options to AsyncUDPSocket::bind, only controls IPV6_ONLY currently.
465   folly::AsyncUDPSocket::BindOptions bindOptions_;
466 
467 #ifdef CCP_ENABLED
468   std::unique_ptr<folly::ScopedEventBaseThread> ccpEvb_;
469 #endif
470   // Random number to uniquely identify this instance of quic to ccp
471   // in case there are multiple concurrent instances (e.g. when proxygen is
472   // migrating connections and there are two concurrent instances of proxygen)
473   uint64_t ccpId_{0};
474 };
475 
476 } // namespace quic
477