1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <limits.h>
20 #include <stddef.h>
21 
22 #include <chrono>
23 #include <exception>
24 #include <memory>
25 #include <vector>
26 #include <boost/variant.hpp>
27 
28 #include <folly/ExceptionWrapper.h>
29 #include <folly/SocketAddress.h>
30 #include <folly/String.h>
31 #include <folly/experimental/observer/Observer.h>
32 #include <folly/io/ShutdownSocketSet.h>
33 #include <folly/io/async/AsyncSocketBase.h>
34 #include <folly/io/async/AsyncTimeout.h>
35 #include <folly/io/async/DelayedDestruction.h>
36 #include <folly/io/async/EventBase.h>
37 #include <folly/io/async/EventBaseAtomicNotificationQueue.h>
38 #include <folly/io/async/EventHandler.h>
39 #include <folly/net/NetOps.h>
40 #include <folly/net/NetworkSocket.h>
41 #include <folly/portability/Sockets.h>
42 
43 // Due to the way kernel headers are included, this may or may not be defined.
44 // Number pulled from 3.10 kernel headers.
45 #ifndef SO_REUSEPORT
46 #define SO_REUSEPORT 15
47 #endif
48 
49 #if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS
50 #define SO_NO_TRANSPARENT_TLS 200
51 #endif
52 
53 // TODO(yichengfb): remove this
54 // For temporarily marking the new interface of
55 // folly::AsyncServerSocket::AcceptCallback::connectionAccepted()
56 #define FOLLY_ASYNCSERVERSOCKET_ACCEPTINFO_DEFINED 1
57 
58 namespace folly {
59 
60 /**
61  * A listening socket that asynchronously informs a callback whenever a new
62  * connection has been accepted.
63  *
64  * Unlike most async interfaces that always invoke their callback in the same
65  * EventBase thread, AsyncServerSocket is unusual in that it can distribute
66  * the callbacks across multiple EventBase threads.
67  *
68  * This supports a common use case for network servers to distribute incoming
69  * connections across a number of EventBase threads.  (Servers typically run
70  * with one EventBase thread per CPU.)
71  *
72  * Despite being able to invoke callbacks in multiple EventBase threads,
73  * AsyncServerSocket still has one "primary" EventBase.  Operations that
74  * modify the AsyncServerSocket state may only be performed from the primary
75  * EventBase thread.
76  */
77 class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase {
78  public:
79   typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
80   // Disallow copy, move, and default construction.
81   AsyncServerSocket(AsyncServerSocket&&) = delete;
82 
83   /**
84    * A callback interface to get notified of client socket events.
85    *
86    * The ConnectionEventCallback implementations need to be thread-safe as the
87    * callbacks may be called from different threads.
88    */
89   class ConnectionEventCallback {
90    public:
91     virtual ~ConnectionEventCallback() = default;
92 
93     /**
94      * onConnectionAccepted() is called right after a client connection
95      * is accepted using the system accept()/accept4() APIs.
96      */
97     virtual void onConnectionAccepted(
98         const NetworkSocket socket, const SocketAddress& addr) noexcept = 0;
99 
100     /**
101      * onConnectionAcceptError() is called when an error occurred accepting
102      * a connection.
103      */
104     virtual void onConnectionAcceptError(const int err) noexcept = 0;
105 
106     /**
107      * onConnectionDropped() is called when a connection is dropped,
108      * probably because of some error encountered.
109      */
110     virtual void onConnectionDropped(
111         const NetworkSocket socket, const SocketAddress& addr) noexcept = 0;
112 
113     /**
114      * onConnectionEnqueuedForAcceptorCallback() is called when the
115      * connection is successfully enqueued for an AcceptCallback to pick up.
116      */
117     virtual void onConnectionEnqueuedForAcceptorCallback(
118         const NetworkSocket socket, const SocketAddress& addr) noexcept = 0;
119 
120     /**
121      * onConnectionDequeuedByAcceptorCallback() is called when the
122      * connection is successfully dequeued by an AcceptCallback.
123      */
124     virtual void onConnectionDequeuedByAcceptorCallback(
125         const NetworkSocket socket, const SocketAddress& addr) noexcept = 0;
126 
127     /**
128      * onBackoffStarted is called when the socket has successfully started
129      * backing off accepting new client sockets.
130      */
131     virtual void onBackoffStarted() noexcept = 0;
132 
133     /**
134      * onBackoffEnded is called when the backoff period has ended and the socket
135      * has successfully resumed accepting new connections if there is any
136      * AcceptCallback registered.
137      */
138     virtual void onBackoffEnded() noexcept = 0;
139 
140     /**
141      * onBackoffError is called when there is an error entering backoff
142      */
143     virtual void onBackoffError() noexcept = 0;
144   };
145 
146   class AcceptCallback {
147    public:
148     struct AcceptInfo {
149       std::chrono::steady_clock::time_point timeBeforeEnqueue;
150     };
151 
152     virtual ~AcceptCallback() = default;
153 
154     /**
155      * connectionAccepted() is called whenever a new client connection is
156      * received.
157      *
158      * The AcceptCallback will remain installed after connectionAccepted()
159      * returns.
160      *
161      * @param fd          The newly accepted client socket.  The AcceptCallback
162      *                    assumes ownership of this socket, and is responsible
163      *                    for closing it when done.  The newly accepted file
164      *                    descriptor will have already been put into
165      *                    non-blocking mode.
166      * @param clientAddr  A reference to a SocketAddress struct containing the
167      *                    client's address.  This struct is only guaranteed to
168      *                    remain valid until connectionAccepted() returns.
169      * @param info        A simple structure that contains auxiliary information
170      *                    about this accepted socket, for example, when it's
171      *                    getting pushed into the waiting queue.
172      */
173     virtual void connectionAccepted(
174         NetworkSocket fd,
175         const SocketAddress& clientAddr,
176         AcceptInfo info) noexcept = 0;
177 
178     /**
179      * acceptError() is called if an error occurs while accepting.
180      *
181      * The AcceptCallback will remain installed even after an accept error,
182      * as the errors are typically somewhat transient, such as being out of
183      * file descriptors.  The server socket must be explicitly stopped if you
184      * wish to stop accepting after an error.
185      *
186      * @param ex  An exception representing the error.
187      */
188 
189     // TODO(T81599451): Remove the acceptError(const std::exception&)
190     // after migration and remove compile warning supression.
191     FOLLY_PUSH_WARNING
192     FOLLY_GNU_DISABLE_WARNING("-Woverloaded-virtual")
acceptError(exception_wrapper ew)193     virtual void acceptError(exception_wrapper ew) noexcept {
194       auto ex = ew.get_exception<std::exception>();
195       FOLLY_SAFE_CHECK(ex, "no exception");
196       acceptError(*ex);
197     }
198 
acceptError(const std::exception &)199     virtual void acceptError(const std::exception& /* unused */) noexcept {}
200     FOLLY_POP_WARNING
201 
202     /**
203      * acceptStarted() will be called in the callback's EventBase thread
204      * after this callback has been added to the AsyncServerSocket.
205      *
206      * acceptStarted() will be called before any calls to connectionAccepted()
207      * or acceptError() are made on this callback.
208      *
209      * acceptStarted() makes it easier for callbacks to perform initialization
210      * inside the callback thread.  (The call to addAcceptCallback() must
211      * always be made from the AsyncServerSocket's primary EventBase thread.
212      * acceptStarted() provides a hook that will always be invoked in the
213      * callback's thread.)
214      *
215      * Note that the call to acceptStarted() is made once the callback is
216      * added, regardless of whether or not the AsyncServerSocket is actually
217      * accepting at the moment.  acceptStarted() will be called even if the
218      * AsyncServerSocket is paused when the callback is added (including if
219      * the initial call to startAccepting() on the AsyncServerSocket has not
220      * been made yet).
221      */
acceptStarted()222     virtual void acceptStarted() noexcept {}
223 
224     /**
225      * acceptStopped() will be called when this AcceptCallback is removed from
226      * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
227      * whichever occurs first.
228      *
229      * No more calls to connectionAccepted() or acceptError() will be made
230      * after acceptStopped() is invoked.
231      */
acceptStopped()232     virtual void acceptStopped() noexcept {}
233   };
234 
235   static const uint32_t kDefaultMaxAcceptAtOnce = 30;
236   static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
237   static const uint32_t kDefaultMaxMessagesInQueue = 1024;
238 
239   /**
240    * Create a new AsyncServerSocket with the specified EventBase.
241    *
242    * @param eventBase  The EventBase to use for driving the asynchronous I/O.
243    *                   If this parameter is nullptr, attachEventBase() must be
244    *                   called before this socket can begin accepting
245    *                   connections.
246    */
247   explicit AsyncServerSocket(EventBase* eventBase = nullptr);
248 
249   /**
250    * Helper function to create a shared_ptr<AsyncServerSocket>.
251    *
252    * This passes in the correct destructor object, since AsyncServerSocket's
253    * destructor is protected and cannot be invoked directly.
254    */
255   static std::shared_ptr<AsyncServerSocket> newSocket(
256       EventBase* evb = nullptr) {
257     return std::shared_ptr<AsyncServerSocket>(
258         new AsyncServerSocket(evb), Destructor());
259   }
260 
261   void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wNewSS);
262 
263   /**
264    * Destroy the socket.
265    *
266    * AsyncServerSocket::destroy() must be called to destroy the socket.
267    * The normal destructor is private, and should not be invoked directly.
268    * This prevents callers from deleting a AsyncServerSocket while it is
269    * invoking a callback.
270    *
271    * destroy() must be invoked from the socket's primary EventBase thread.
272    *
273    * If there are AcceptCallbacks still installed when destroy() is called,
274    * acceptStopped() will be called on these callbacks to notify them that
275    * accepting has stopped.  Accept callbacks being driven by other EventBase
276    * threads may continue to receive new accept callbacks for a brief period of
277    * time after destroy() returns.  They will not receive any more callback
278    * invocations once acceptStopped() is invoked.
279    */
280   void destroy() override;
281 
282   /**
283    * Attach this AsyncServerSocket to its primary EventBase.
284    *
285    * This may only be called if the AsyncServerSocket is not already attached
286    * to a EventBase.  The AsyncServerSocket must be attached to a EventBase
287    * before it can begin accepting connections.
288    */
289   void attachEventBase(EventBase* eventBase);
290 
291   /**
292    * Detach the AsyncServerSocket from its primary EventBase.
293    *
294    * detachEventBase() may only be called if the AsyncServerSocket is not
295    * currently accepting connections.
296    */
297   void detachEventBase();
298 
299   /**
300    * Get the EventBase used by this socket.
301    */
getEventBase()302   EventBase* getEventBase() const override { return eventBase_; }
303 
304   /**
305    * Create a AsyncServerSocket from an existing socket file descriptor.
306    *
307    * useExistingSocket() will cause the AsyncServerSocket to take ownership of
308    * the specified file descriptor, and use it to listen for new connections.
309    * The AsyncServerSocket will close the file descriptor when it is
310    * destroyed.
311    *
312    * useExistingSocket() must be called before bind() or listen().
313    *
314    * The supplied file descriptor will automatically be put into non-blocking
315    * mode.  The caller may have already directly called bind() and possibly
316    * listen on the file descriptor.  If so the caller should skip calling the
317    * corresponding AsyncServerSocket::bind() and listen() methods.
318    *
319    * On error a AsyncSocketException will be thrown and the caller will retain
320    * ownership of the file descriptor.
321    */
322   void useExistingSocket(NetworkSocket fd);
323   void useExistingSockets(const std::vector<NetworkSocket>& fds);
324 
325   /**
326    * Return the underlying file descriptor
327    */
getNetworkSockets()328   std::vector<NetworkSocket> getNetworkSockets() const {
329     std::vector<NetworkSocket> sockets;
330     for (auto& handler : sockets_) {
331       sockets.push_back(handler.socket_);
332     }
333     return sockets;
334   }
335 
336   /**
337    * Backwards compatible getSocket, warns if > 1 socket
338    */
getNetworkSocket()339   NetworkSocket getNetworkSocket() const {
340     if (sockets_.size() > 1) {
341       VLOG(2) << "Warning: getSocket can return multiple fds, "
342               << "but getSockets was not called, so only returning the first";
343     }
344     if (sockets_.size() == 0) {
345       return NetworkSocket();
346     } else {
347       return sockets_[0].socket_;
348     }
349   }
350 
351   /* enable zerocopy support for the server sockets - the s = accept sockets
352    * inherit it
353    */
354   bool setZeroCopy(bool enable);
355 
356   using IPAddressIfNamePair = std::pair<IPAddress, std::string>;
357 
358   /**
359    * Bind to the specified address.
360    *
361    * This must be called from the primary EventBase thread.
362    *
363    * Throws AsyncSocketException on error.
364    */
365   virtual void bind(const SocketAddress& address);
366 
367   /**
368    * Bind to the specified address/if name
369    *
370    * This must be called from the primary EventBase thread.
371    *
372    * Throws AsyncSocketException on error.
373    */
374   virtual void bind(const SocketAddress& address, const std::string& ifName);
375 
376   /**
377    * Bind to the specified port for the specified addresses.
378    *
379    * This must be called from the primary EventBase thread.
380    *
381    * Throws AsyncSocketException on error.
382    */
383   virtual void bind(const std::vector<IPAddress>& ipAddresses, uint16_t port);
384 
385   /**
386    * Bind to the specified port for the specified addresses/if names.
387    *
388    * This must be called from the primary EventBase thread.
389    *
390    * Throws AsyncSocketException on error.
391    */
392   virtual void bind(
393       const std::vector<IPAddressIfNamePair>& addresses, uint16_t port);
394 
395   /**
396    * Bind to the specified port.
397    *
398    * This must be called from the primary EventBase thread.
399    *
400    * Throws AsyncSocketException on error.
401    */
402   virtual void bind(uint16_t port);
403 
404   /**
405    * Get the local address to which the socket is bound.
406    *
407    * Throws AsyncSocketException on error.
408    */
409   void getAddress(SocketAddress* addressReturn) const override;
410 
411   /**
412    * Get the local address to which the socket is bound.
413    *
414    * Throws AsyncSocketException on error.
415    */
getAddress()416   SocketAddress getAddress() const {
417     SocketAddress ret;
418     getAddress(&ret);
419     return ret;
420   }
421 
422   /**
423    * Get all the local addresses to which the socket is bound.
424    *
425    * Throws AsyncSocketException on error.
426    */
427   std::vector<SocketAddress> getAddresses() const;
428 
429   /**
430    * Begin listening for connections.
431    *
432    * This calls ::listen() with the specified backlog.
433    *
434    * Once listen() is invoked the socket will actually be open so that remote
435    * clients may establish connections.  (Clients that attempt to connect
436    * before listen() is called will receive a connection refused error.)
437    *
438    * At least one callback must be set and startAccepting() must be called to
439    * actually begin notifying the accept callbacks of newly accepted
440    * connections.  The backlog parameter controls how many connections the
441    * kernel will accept and buffer internally while the accept callbacks are
442    * paused (or if accepting is enabled but the callbacks cannot keep up).
443    *
444    * bind() must be called before calling listen().
445    * listen() must be called from the primary EventBase thread.
446    *
447    * Throws AsyncSocketException on error.
448    */
449   virtual void listen(int backlog);
450 
451   /**
452    * Add an AcceptCallback.
453    *
454    * When a new socket is accepted, one of the AcceptCallbacks will be invoked
455    * with the new socket.  The AcceptCallbacks are invoked in a round-robin
456    * fashion.  This allows the accepted sockets to be distributed among a pool
457    * of threads, each running its own EventBase object.  This is a common model,
458    * since most asynchronous-style servers typically run one EventBase thread
459    * per CPU.
460    *
461    * The EventBase object associated with each AcceptCallback must be running
462    * its loop.  If the EventBase loop is not running, sockets will still be
463    * scheduled for the callback, but the callback cannot actually get invoked
464    * until the loop runs.
465    *
466    * This method must be invoked from the AsyncServerSocket's primary
467    * EventBase thread.
468    *
469    * Note that startAccepting() must be called on the AsyncServerSocket to
470    * cause it to actually start accepting sockets once callbacks have been
471    * installed.
472    *
473    * @param callback   The callback to invoke.
474    * @param eventBase  The EventBase to use to invoke the callback.  This
475    *     parameter may be nullptr, in which case the callback will be invoked in
476    *     the AsyncServerSocket's primary EventBase.
477    * @param maxAtOnce  The maximum number of connections to accept in this
478    *                   callback on a single iteration of the event base loop.
479    *                   This only takes effect when eventBase is non-nullptr.
480    *                   When using a nullptr eventBase for the callback, the
481    *                   setMaxAcceptAtOnce() method controls how many
482    *                   connections the main event base will accept at once.
483    */
484   virtual void addAcceptCallback(
485       AcceptCallback* callback,
486       EventBase* eventBase,
487       uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
488 
489   /**
490    * Remove an AcceptCallback.
491    *
492    * This allows a single AcceptCallback to be removed from the round-robin
493    * pool.
494    *
495    * This method must be invoked from the AsyncServerSocket's primary
496    * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
497    * operation in the correct EventBase if your code is not in the server
498    * socket's primary EventBase.
499    *
500    * Given that the accept callback is being driven by a different EventBase,
501    * the AcceptCallback may continue to be invoked for a short period of time
502    * after removeAcceptCallback() returns in this thread.  Once the other
503    * EventBase thread receives the notification to stop, it will call
504    * acceptStopped() on the callback to inform it that it is fully stopped and
505    * will not receive any new sockets.
506    *
507    * If the last accept callback is removed while the socket is accepting,
508    * the socket will implicitly pause accepting.  If a callback is later added,
509    * it will resume accepting immediately, without requiring startAccepting()
510    * to be invoked.
511    *
512    * @param callback   The callback to uninstall.
513    * @param eventBase  The EventBase associated with this callback.  This must
514    *     be the same EventBase that was used when the callback was installed
515    *     with addAcceptCallback().
516    */
517   void removeAcceptCallback(AcceptCallback* callback, EventBase* eventBase);
518 
519   /**
520    * Begin accepting connctions on this socket.
521    *
522    * bind() and listen() must be called before calling startAccepting().
523    *
524    * When a AsyncServerSocket is initially created, it will not begin
525    * accepting connections until at least one callback has been added and
526    * startAccepting() has been called.  startAccepting() can also be used to
527    * resume accepting connections after a call to pauseAccepting().
528    *
529    * If startAccepting() is called when there are no accept callbacks
530    * installed, the socket will not actually begin accepting until an accept
531    * callback is added.
532    *
533    * This method may only be called from the primary EventBase thread.
534    */
535   virtual void startAccepting();
536 
537   /**
538    * Pause accepting connections.
539    *
540    * startAccepting() may be called to resume accepting.
541    *
542    * This method may only be called from the primary EventBase thread.
543    * If there are AcceptCallbacks being driven by other EventBase threads they
544    * may continue to receive callbacks for a short period of time after
545    * pauseAccepting() returns.
546    *
547    * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be
548    * called on the AcceptCallback objects simply due to a temporary pause.  If
549    * the server socket is later destroyed while paused, acceptStopped() will be
550    * called all of the installed AcceptCallbacks.
551    */
552   void pauseAccepting();
553 
554   /**
555    * Shutdown the listen socket and notify all callbacks that accept has
556    * stopped, but don't close the socket.  This invokes shutdown(2) with the
557    * supplied argument.  Passing -1 will close the socket now.  Otherwise, the
558    * close will be delayed until this object is destroyed.
559    *
560    * Only use this if you have reason to pass special flags to shutdown.
561    * Otherwise just destroy the socket.
562    *
563    * This method has no effect when a ShutdownSocketSet option is used.
564    *
565    * Returns the result of shutdown on sockets_[n-1]
566    */
567   int stopAccepting(int shutdownFlags = -1);
568 
569   /**
570    * Get the maximum number of connections that will be accepted each time
571    * around the event loop.
572    */
getMaxAcceptAtOnce()573   uint32_t getMaxAcceptAtOnce() const { return maxAcceptAtOnce_; }
574 
575   /**
576    * Set the maximum number of connections that will be accepted each time
577    * around the event loop.
578    *
579    * This provides a very coarse-grained way of controlling how fast the
580    * AsyncServerSocket will accept connections.  If you find that when your
581    * server is overloaded AsyncServerSocket accepts connections more quickly
582    * than your code can process them, you can try lowering this number so that
583    * fewer connections will be accepted each event loop iteration.
584    *
585    * For more explicit control over the accept rate, you can also use
586    * pauseAccepting() to temporarily pause accepting when your server is
587    * overloaded, and then use startAccepting() later to resume accepting.
588    */
setMaxAcceptAtOnce(uint32_t numConns)589   void setMaxAcceptAtOnce(uint32_t numConns) { maxAcceptAtOnce_ = numConns; }
590 
591   /**
592    * Get the duration after which new connection messages will be dropped from
593    * the NotificationQueue if it has not started processing yet.
594    */
595   const folly::observer::AtomicObserver<std::chrono::nanoseconds>&
getQueueTimeout()596   getQueueTimeout() const {
597     return queueTimeout_;
598   }
599 
600   /**
601    * Set the duration after which new connection messages will be dropped from
602    * the NotificationQueue if it has not started processing yet.
603    *
604    * This avoids the NotificationQueue from processing messages where the client
605    * socket has probably timed out already, or will time out before a response
606    * can be sent.
607    *
608    * The default value (of 0) means that messages will never expire.
609    */
setQueueTimeout(folly::observer::Observer<std::chrono::nanoseconds> duration)610   void setQueueTimeout(
611       folly::observer::Observer<std::chrono::nanoseconds> duration) {
612     queueTimeout_ = duration;
613   }
setQueueTimeout(std::chrono::nanoseconds duration)614   void setQueueTimeout(std::chrono::nanoseconds duration) {
615     setQueueTimeout(folly::observer::makeStaticObserver(duration));
616   }
617 
618   /**
619    * Get the maximum number of unprocessed messages which a NotificationQueue
620    * can hold.
621    */
getMaxNumMessagesInQueue()622   uint32_t getMaxNumMessagesInQueue() const { return maxNumMsgsInQueue_; }
623 
624   /**
625    * Set the maximum number of unprocessed messages in NotificationQueue.
626    * No new message will be sent to that NotificationQueue if there are more
627    * than such number of unprocessed messages in that queue.
628    */
setMaxNumMessagesInQueue(uint32_t num)629   void setMaxNumMessagesInQueue(uint32_t num) { maxNumMsgsInQueue_ = num; }
630 
631   /**
632    * Get the speed of adjusting connection accept rate.
633    */
getAcceptRateAdjustSpeed()634   double getAcceptRateAdjustSpeed() const { return acceptRateAdjustSpeed_; }
635 
636   /**
637    * Set the speed of adjusting connection accept rate.
638    */
setAcceptRateAdjustSpeed(double speed)639   void setAcceptRateAdjustSpeed(double speed) {
640     acceptRateAdjustSpeed_ = speed;
641   }
642 
643   /**
644    * Enable/Disable TOS reflection for the server socket
645    */
646   void setTosReflect(bool enable);
647 
getTosReflect()648   bool getTosReflect() { return tosReflect_; }
649 
650   /**
651    * Set/Get default TOS for listener socket
652    */
653   void setListenerTos(uint32_t tos);
654 
getListenerTos()655   uint32_t getListenerTos() const { return listenerTos_; }
656 
657   /**
658    * Get the number of connections dropped by the AsyncServerSocket
659    */
getNumDroppedConnections()660   std::size_t getNumDroppedConnections() const {
661     return numDroppedConnections_;
662   }
663 
664   /**
665    * Get the current number of unprocessed messages in NotificationQueue.
666    *
667    * This method must be invoked from the AsyncServerSocket's primary
668    * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
669    * operation in the correct EventBase if your code is not in the server
670    * socket's primary EventBase.
671    */
getNumPendingMessagesInQueue()672   int64_t getNumPendingMessagesInQueue() const {
673     if (eventBase_) {
674       eventBase_->dcheckIsInEventBaseThread();
675     }
676     int64_t numMsgs = 0;
677     for (const auto& callback : callbacks_) {
678       if (callback.consumer) {
679         numMsgs += callback.consumer->getQueue().size();
680       }
681     }
682     return numMsgs;
683   }
684 
685   /**
686    * Set whether or not SO_KEEPALIVE should be enabled on the server socket
687    * (and thus on all subsequently-accepted connections). By default, keepalive
688    * is enabled.
689    *
690    * Note that TCP keepalive usually only kicks in after the connection has
691    * been idle for several hours. Applications should almost always have their
692    * own, shorter idle timeout.
693    */
setKeepAliveEnabled(bool enabled)694   void setKeepAliveEnabled(bool enabled) {
695     keepAliveEnabled_ = enabled;
696 
697     for (auto& handler : sockets_) {
698       if (handler.socket_ == NetworkSocket()) {
699         continue;
700       }
701 
702       int val = (enabled) ? 1 : 0;
703       if (netops::setsockopt(
704               handler.socket_, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) !=
705           0) {
706         LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s"
707                    << errnoStr(errno);
708       }
709     }
710   }
711 
712   /**
713    * Get whether or not SO_KEEPALIVE is enabled on the server socket.
714    */
getKeepAliveEnabled()715   bool getKeepAliveEnabled() const { return keepAliveEnabled_; }
716 
717   /**
718    * Set whether or not SO_REUSEPORT should be enabled on the server socket,
719    * allowing multiple binds to the same port
720    */
setReusePortEnabled(bool enabled)721   void setReusePortEnabled(bool enabled) {
722     reusePortEnabled_ = enabled;
723 
724     for (auto& handler : sockets_) {
725       if (handler.socket_ == NetworkSocket()) {
726         continue;
727       }
728 
729       int val = (enabled) ? 1 : 0;
730       if (netops::setsockopt(
731               handler.socket_, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)) !=
732           0) {
733         auto errnoCopy = errno;
734         LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket "
735                    << errnoCopy;
736         folly::throwSystemErrorExplicit(
737             errnoCopy, "failed to set SO_REUSEPORT on async server socket");
738       }
739     }
740   }
741 
742   /**
743    * Get whether or not SO_REUSEPORT is enabled on the server socket.
744    */
getReusePortEnabled_()745   bool getReusePortEnabled_() const { return reusePortEnabled_; }
746 
747   /**
748    * Set whether or not the socket should close during exec() (FD_CLOEXEC). By
749    * default, this is enabled
750    */
setCloseOnExec(bool closeOnExec)751   void setCloseOnExec(bool closeOnExec) { closeOnExec_ = closeOnExec; }
752 
753   /**
754    * Get whether or not FD_CLOEXEC is enabled on the server socket.
755    */
getCloseOnExec()756   bool getCloseOnExec() const { return closeOnExec_; }
757 
758   /**
759    * Tries to enable TFO if the machine supports it.
760    */
setTFOEnabled(bool enabled,uint32_t maxTFOQueueSize)761   void setTFOEnabled(bool enabled, uint32_t maxTFOQueueSize) {
762     tfo_ = enabled;
763     tfoMaxQueueSize_ = maxTFOQueueSize;
764   }
765 
766   /**
767    * Do not attempt the transparent TLS handshake
768    */
disableTransparentTls()769   void disableTransparentTls() { noTransparentTls_ = true; }
770 
771   /**
772    * Get whether or not the socket is accepting new connections
773    */
getAccepting()774   bool getAccepting() const { return accepting_; }
775 
776   /**
777    * Set the ConnectionEventCallback
778    */
setConnectionEventCallback(ConnectionEventCallback * const connectionEventCallback)779   void setConnectionEventCallback(
780       ConnectionEventCallback* const connectionEventCallback) {
781     connectionEventCallback_ = connectionEventCallback;
782   }
783 
784   /**
785    * Get the ConnectionEventCallback
786    */
getConnectionEventCallback()787   ConnectionEventCallback* getConnectionEventCallback() const {
788     return connectionEventCallback_;
789   }
790 
791  protected:
792   /**
793    * Protected destructor.
794    *
795    * Invoke destroy() instead to destroy the AsyncServerSocket.
796    */
797   ~AsyncServerSocket() override;
798 
799  private:
800   class RemoteAcceptor;
801 
802   struct NewConnMessage {
803     NetworkSocket fd;
804     SocketAddress clientAddr;
805     std::chrono::steady_clock::time_point deadline;
806     std::chrono::steady_clock::time_point timeBeforeEnqueue;
807 
isExpiredNewConnMessage808     bool isExpired() const {
809       return deadline.time_since_epoch().count() != 0 &&
810           std::chrono::steady_clock::now() > deadline;
811     }
812 
813     AtomicNotificationQueueTaskStatus operator()(
814         RemoteAcceptor& acceptor) noexcept;
815   };
816 
817   struct ErrorMessage {
818     int err;
819     std::string msg;
820 
821     AtomicNotificationQueueTaskStatus operator()(
822         RemoteAcceptor& acceptor) noexcept;
823   };
824 
825   using QueueMessage = boost::variant<NewConnMessage, ErrorMessage>;
826 
827   /**
828    * A class to receive notifications to invoke AcceptCallback objects
829    * in other EventBase threads.
830    *
831    * A RemoteAcceptor object is created for each AcceptCallback that
832    * is installed in a separate EventBase thread.  The RemoteAcceptor
833    * receives notification of new sockets via a NotificationQueue,
834    * and then invokes the AcceptCallback.
835    */
836   class RemoteAcceptor {
837     struct Consumer {
operatorConsumer838       AtomicNotificationQueueTaskStatus operator()(
839           QueueMessage&& msg) noexcept {
840         return boost::apply_visitor(
841             [this](auto&& visitMsg) { return visitMsg(acceptor_); }, msg);
842       }
843 
ConsumerConsumer844       explicit Consumer(RemoteAcceptor& acceptor) : acceptor_(acceptor) {}
845       RemoteAcceptor& acceptor_;
846     };
847 
848     friend NewConnMessage;
849     friend ErrorMessage;
850 
851    public:
852     using Queue = EventBaseAtomicNotificationQueue<QueueMessage, Consumer>;
853 
RemoteAcceptor(AcceptCallback * callback,ConnectionEventCallback * connectionEventCallback)854     explicit RemoteAcceptor(
855         AcceptCallback* callback,
856         ConnectionEventCallback* connectionEventCallback)
857         : callback_(callback),
858           connectionEventCallback_(connectionEventCallback),
859           queue_(Consumer(*this)) {}
860 
861     void start(EventBase* eventBase, uint32_t maxAtOnce);
862     void stop(EventBase* eventBase, AcceptCallback* callback);
863 
getQueue()864     Queue& getQueue() { return queue_; }
865 
866    private:
867     AcceptCallback* callback_;
868     ConnectionEventCallback* connectionEventCallback_;
869     Queue queue_;
870   };
871 
872   /**
873    * A struct to keep track of the callbacks associated with this server
874    * socket.
875    */
876   struct CallbackInfo {
CallbackInfoCallbackInfo877     CallbackInfo(AcceptCallback* cb, EventBase* evb)
878         : callback(cb), eventBase(evb), consumer(nullptr) {}
879 
880     AcceptCallback* callback;
881     EventBase* eventBase;
882 
883     RemoteAcceptor* consumer;
884   };
885 
886   class BackoffTimeout;
887 
888   virtual void handlerReady(
889       uint16_t events, NetworkSocket fd, sa_family_t family) noexcept;
890 
891   NetworkSocket createSocket(int family);
892   void setupSocket(NetworkSocket fd, int family);
893   void bindInternal(const SocketAddress& address, const std::string& ifName);
894   void bindSocket(
895       NetworkSocket fd,
896       const SocketAddress& address,
897       bool isExistingSocket,
898       const std::string& ifName);
899   void dispatchSocket(NetworkSocket socket, SocketAddress&& address);
900   void dispatchError(const char* msg, int errnoValue);
901   void enterBackoff();
902   void backoffTimeoutExpired();
903 
nextCallback()904   CallbackInfo* nextCallback() {
905     CallbackInfo* info = &callbacks_[callbackIndex_];
906 
907     ++callbackIndex_;
908     if (callbackIndex_ >= callbacks_.size()) {
909       callbackIndex_ = 0;
910     }
911 
912     return info;
913   }
914 
915   struct ServerEventHandler : public EventHandler {
ServerEventHandlerServerEventHandler916     ServerEventHandler(
917         EventBase* eventBase,
918         NetworkSocket socket,
919         AsyncServerSocket* parent,
920         sa_family_t addressFamily)
921         : EventHandler(eventBase, socket),
922           eventBase_(eventBase),
923           socket_(socket),
924           parent_(parent),
925           addressFamily_(addressFamily) {}
926 
ServerEventHandlerServerEventHandler927     ServerEventHandler(const ServerEventHandler& other)
928         : EventHandler(other.eventBase_, other.socket_),
929           eventBase_(other.eventBase_),
930           socket_(other.socket_),
931           parent_(other.parent_),
932           addressFamily_(other.addressFamily_) {}
933 
934     ServerEventHandler& operator=(const ServerEventHandler& other) {
935       if (this != &other) {
936         eventBase_ = other.eventBase_;
937         socket_ = other.socket_;
938         parent_ = other.parent_;
939         addressFamily_ = other.addressFamily_;
940 
941         detachEventBase();
942         attachEventBase(other.eventBase_);
943         changeHandlerFD(other.socket_);
944       }
945       return *this;
946     }
947 
948     // Inherited from EventHandler
handlerReadyServerEventHandler949     void handlerReady(uint16_t events) noexcept override {
950       parent_->handlerReady(events, socket_, addressFamily_);
951     }
952 
953     EventBase* eventBase_;
954     NetworkSocket socket_;
955     AsyncServerSocket* parent_;
956     sa_family_t addressFamily_;
957   };
958 
959   EventBase* eventBase_;
960   std::vector<ServerEventHandler> sockets_;
961   std::vector<NetworkSocket> pendingCloseSockets_;
962   bool accepting_;
963   uint32_t maxAcceptAtOnce_;
964   uint32_t maxNumMsgsInQueue_;
965   double acceptRateAdjustSpeed_; // 0 to disable auto adjust
966   double acceptRate_;
967   std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_;
968   std::size_t numDroppedConnections_;
969   uint32_t callbackIndex_;
970   BackoffTimeout* backoffTimeout_;
971   std::vector<CallbackInfo> callbacks_;
972   bool keepAliveEnabled_;
973   bool reusePortEnabled_{false};
974   bool closeOnExec_;
975   bool tfo_{false};
976   bool noTransparentTls_{false};
977   uint32_t tfoMaxQueueSize_{0};
978   std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
979   ConnectionEventCallback* connectionEventCallback_{nullptr};
980   bool tosReflect_{false};
981   uint32_t listenerTos_{0};
982   bool zeroCopyVal_{false};
983   folly::observer::AtomicObserver<std::chrono::nanoseconds> queueTimeout_{
984       folly::observer::makeStaticObserver(std::chrono::nanoseconds::zero())};
985 };
986 
987 } // namespace folly
988