1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <limits.h> 20 #include <stddef.h> 21 22 #include <chrono> 23 #include <exception> 24 #include <memory> 25 #include <vector> 26 #include <boost/variant.hpp> 27 28 #include <folly/ExceptionWrapper.h> 29 #include <folly/SocketAddress.h> 30 #include <folly/String.h> 31 #include <folly/experimental/observer/Observer.h> 32 #include <folly/io/ShutdownSocketSet.h> 33 #include <folly/io/async/AsyncSocketBase.h> 34 #include <folly/io/async/AsyncTimeout.h> 35 #include <folly/io/async/DelayedDestruction.h> 36 #include <folly/io/async/EventBase.h> 37 #include <folly/io/async/EventBaseAtomicNotificationQueue.h> 38 #include <folly/io/async/EventHandler.h> 39 #include <folly/net/NetOps.h> 40 #include <folly/net/NetworkSocket.h> 41 #include <folly/portability/Sockets.h> 42 43 // Due to the way kernel headers are included, this may or may not be defined. 44 // Number pulled from 3.10 kernel headers. 45 #ifndef SO_REUSEPORT 46 #define SO_REUSEPORT 15 47 #endif 48 49 #if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS 50 #define SO_NO_TRANSPARENT_TLS 200 51 #endif 52 53 // TODO(yichengfb): remove this 54 // For temporarily marking the new interface of 55 // folly::AsyncServerSocket::AcceptCallback::connectionAccepted() 56 #define FOLLY_ASYNCSERVERSOCKET_ACCEPTINFO_DEFINED 1 57 58 namespace folly { 59 60 /** 61 * A listening socket that asynchronously informs a callback whenever a new 62 * connection has been accepted. 63 * 64 * Unlike most async interfaces that always invoke their callback in the same 65 * EventBase thread, AsyncServerSocket is unusual in that it can distribute 66 * the callbacks across multiple EventBase threads. 67 * 68 * This supports a common use case for network servers to distribute incoming 69 * connections across a number of EventBase threads. (Servers typically run 70 * with one EventBase thread per CPU.) 71 * 72 * Despite being able to invoke callbacks in multiple EventBase threads, 73 * AsyncServerSocket still has one "primary" EventBase. Operations that 74 * modify the AsyncServerSocket state may only be performed from the primary 75 * EventBase thread. 76 */ 77 class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase { 78 public: 79 typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr; 80 // Disallow copy, move, and default construction. 81 AsyncServerSocket(AsyncServerSocket&&) = delete; 82 83 /** 84 * A callback interface to get notified of client socket events. 85 * 86 * The ConnectionEventCallback implementations need to be thread-safe as the 87 * callbacks may be called from different threads. 88 */ 89 class ConnectionEventCallback { 90 public: 91 virtual ~ConnectionEventCallback() = default; 92 93 /** 94 * onConnectionAccepted() is called right after a client connection 95 * is accepted using the system accept()/accept4() APIs. 96 */ 97 virtual void onConnectionAccepted( 98 const NetworkSocket socket, const SocketAddress& addr) noexcept = 0; 99 100 /** 101 * onConnectionAcceptError() is called when an error occurred accepting 102 * a connection. 103 */ 104 virtual void onConnectionAcceptError(const int err) noexcept = 0; 105 106 /** 107 * onConnectionDropped() is called when a connection is dropped, 108 * probably because of some error encountered. 109 */ 110 virtual void onConnectionDropped( 111 const NetworkSocket socket, const SocketAddress& addr) noexcept = 0; 112 113 /** 114 * onConnectionEnqueuedForAcceptorCallback() is called when the 115 * connection is successfully enqueued for an AcceptCallback to pick up. 116 */ 117 virtual void onConnectionEnqueuedForAcceptorCallback( 118 const NetworkSocket socket, const SocketAddress& addr) noexcept = 0; 119 120 /** 121 * onConnectionDequeuedByAcceptorCallback() is called when the 122 * connection is successfully dequeued by an AcceptCallback. 123 */ 124 virtual void onConnectionDequeuedByAcceptorCallback( 125 const NetworkSocket socket, const SocketAddress& addr) noexcept = 0; 126 127 /** 128 * onBackoffStarted is called when the socket has successfully started 129 * backing off accepting new client sockets. 130 */ 131 virtual void onBackoffStarted() noexcept = 0; 132 133 /** 134 * onBackoffEnded is called when the backoff period has ended and the socket 135 * has successfully resumed accepting new connections if there is any 136 * AcceptCallback registered. 137 */ 138 virtual void onBackoffEnded() noexcept = 0; 139 140 /** 141 * onBackoffError is called when there is an error entering backoff 142 */ 143 virtual void onBackoffError() noexcept = 0; 144 }; 145 146 class AcceptCallback { 147 public: 148 struct AcceptInfo { 149 std::chrono::steady_clock::time_point timeBeforeEnqueue; 150 }; 151 152 virtual ~AcceptCallback() = default; 153 154 /** 155 * connectionAccepted() is called whenever a new client connection is 156 * received. 157 * 158 * The AcceptCallback will remain installed after connectionAccepted() 159 * returns. 160 * 161 * @param fd The newly accepted client socket. The AcceptCallback 162 * assumes ownership of this socket, and is responsible 163 * for closing it when done. The newly accepted file 164 * descriptor will have already been put into 165 * non-blocking mode. 166 * @param clientAddr A reference to a SocketAddress struct containing the 167 * client's address. This struct is only guaranteed to 168 * remain valid until connectionAccepted() returns. 169 * @param info A simple structure that contains auxiliary information 170 * about this accepted socket, for example, when it's 171 * getting pushed into the waiting queue. 172 */ 173 virtual void connectionAccepted( 174 NetworkSocket fd, 175 const SocketAddress& clientAddr, 176 AcceptInfo info) noexcept = 0; 177 178 /** 179 * acceptError() is called if an error occurs while accepting. 180 * 181 * The AcceptCallback will remain installed even after an accept error, 182 * as the errors are typically somewhat transient, such as being out of 183 * file descriptors. The server socket must be explicitly stopped if you 184 * wish to stop accepting after an error. 185 * 186 * @param ex An exception representing the error. 187 */ 188 189 // TODO(T81599451): Remove the acceptError(const std::exception&) 190 // after migration and remove compile warning supression. 191 FOLLY_PUSH_WARNING 192 FOLLY_GNU_DISABLE_WARNING("-Woverloaded-virtual") acceptError(exception_wrapper ew)193 virtual void acceptError(exception_wrapper ew) noexcept { 194 auto ex = ew.get_exception<std::exception>(); 195 FOLLY_SAFE_CHECK(ex, "no exception"); 196 acceptError(*ex); 197 } 198 acceptError(const std::exception &)199 virtual void acceptError(const std::exception& /* unused */) noexcept {} 200 FOLLY_POP_WARNING 201 202 /** 203 * acceptStarted() will be called in the callback's EventBase thread 204 * after this callback has been added to the AsyncServerSocket. 205 * 206 * acceptStarted() will be called before any calls to connectionAccepted() 207 * or acceptError() are made on this callback. 208 * 209 * acceptStarted() makes it easier for callbacks to perform initialization 210 * inside the callback thread. (The call to addAcceptCallback() must 211 * always be made from the AsyncServerSocket's primary EventBase thread. 212 * acceptStarted() provides a hook that will always be invoked in the 213 * callback's thread.) 214 * 215 * Note that the call to acceptStarted() is made once the callback is 216 * added, regardless of whether or not the AsyncServerSocket is actually 217 * accepting at the moment. acceptStarted() will be called even if the 218 * AsyncServerSocket is paused when the callback is added (including if 219 * the initial call to startAccepting() on the AsyncServerSocket has not 220 * been made yet). 221 */ acceptStarted()222 virtual void acceptStarted() noexcept {} 223 224 /** 225 * acceptStopped() will be called when this AcceptCallback is removed from 226 * the AsyncServerSocket, or when the AsyncServerSocket is destroyed, 227 * whichever occurs first. 228 * 229 * No more calls to connectionAccepted() or acceptError() will be made 230 * after acceptStopped() is invoked. 231 */ acceptStopped()232 virtual void acceptStopped() noexcept {} 233 }; 234 235 static const uint32_t kDefaultMaxAcceptAtOnce = 30; 236 static const uint32_t kDefaultCallbackAcceptAtOnce = 5; 237 static const uint32_t kDefaultMaxMessagesInQueue = 1024; 238 239 /** 240 * Create a new AsyncServerSocket with the specified EventBase. 241 * 242 * @param eventBase The EventBase to use for driving the asynchronous I/O. 243 * If this parameter is nullptr, attachEventBase() must be 244 * called before this socket can begin accepting 245 * connections. 246 */ 247 explicit AsyncServerSocket(EventBase* eventBase = nullptr); 248 249 /** 250 * Helper function to create a shared_ptr<AsyncServerSocket>. 251 * 252 * This passes in the correct destructor object, since AsyncServerSocket's 253 * destructor is protected and cannot be invoked directly. 254 */ 255 static std::shared_ptr<AsyncServerSocket> newSocket( 256 EventBase* evb = nullptr) { 257 return std::shared_ptr<AsyncServerSocket>( 258 new AsyncServerSocket(evb), Destructor()); 259 } 260 261 void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wNewSS); 262 263 /** 264 * Destroy the socket. 265 * 266 * AsyncServerSocket::destroy() must be called to destroy the socket. 267 * The normal destructor is private, and should not be invoked directly. 268 * This prevents callers from deleting a AsyncServerSocket while it is 269 * invoking a callback. 270 * 271 * destroy() must be invoked from the socket's primary EventBase thread. 272 * 273 * If there are AcceptCallbacks still installed when destroy() is called, 274 * acceptStopped() will be called on these callbacks to notify them that 275 * accepting has stopped. Accept callbacks being driven by other EventBase 276 * threads may continue to receive new accept callbacks for a brief period of 277 * time after destroy() returns. They will not receive any more callback 278 * invocations once acceptStopped() is invoked. 279 */ 280 void destroy() override; 281 282 /** 283 * Attach this AsyncServerSocket to its primary EventBase. 284 * 285 * This may only be called if the AsyncServerSocket is not already attached 286 * to a EventBase. The AsyncServerSocket must be attached to a EventBase 287 * before it can begin accepting connections. 288 */ 289 void attachEventBase(EventBase* eventBase); 290 291 /** 292 * Detach the AsyncServerSocket from its primary EventBase. 293 * 294 * detachEventBase() may only be called if the AsyncServerSocket is not 295 * currently accepting connections. 296 */ 297 void detachEventBase(); 298 299 /** 300 * Get the EventBase used by this socket. 301 */ getEventBase()302 EventBase* getEventBase() const override { return eventBase_; } 303 304 /** 305 * Create a AsyncServerSocket from an existing socket file descriptor. 306 * 307 * useExistingSocket() will cause the AsyncServerSocket to take ownership of 308 * the specified file descriptor, and use it to listen for new connections. 309 * The AsyncServerSocket will close the file descriptor when it is 310 * destroyed. 311 * 312 * useExistingSocket() must be called before bind() or listen(). 313 * 314 * The supplied file descriptor will automatically be put into non-blocking 315 * mode. The caller may have already directly called bind() and possibly 316 * listen on the file descriptor. If so the caller should skip calling the 317 * corresponding AsyncServerSocket::bind() and listen() methods. 318 * 319 * On error a AsyncSocketException will be thrown and the caller will retain 320 * ownership of the file descriptor. 321 */ 322 void useExistingSocket(NetworkSocket fd); 323 void useExistingSockets(const std::vector<NetworkSocket>& fds); 324 325 /** 326 * Return the underlying file descriptor 327 */ getNetworkSockets()328 std::vector<NetworkSocket> getNetworkSockets() const { 329 std::vector<NetworkSocket> sockets; 330 for (auto& handler : sockets_) { 331 sockets.push_back(handler.socket_); 332 } 333 return sockets; 334 } 335 336 /** 337 * Backwards compatible getSocket, warns if > 1 socket 338 */ getNetworkSocket()339 NetworkSocket getNetworkSocket() const { 340 if (sockets_.size() > 1) { 341 VLOG(2) << "Warning: getSocket can return multiple fds, " 342 << "but getSockets was not called, so only returning the first"; 343 } 344 if (sockets_.size() == 0) { 345 return NetworkSocket(); 346 } else { 347 return sockets_[0].socket_; 348 } 349 } 350 351 /* enable zerocopy support for the server sockets - the s = accept sockets 352 * inherit it 353 */ 354 bool setZeroCopy(bool enable); 355 356 using IPAddressIfNamePair = std::pair<IPAddress, std::string>; 357 358 /** 359 * Bind to the specified address. 360 * 361 * This must be called from the primary EventBase thread. 362 * 363 * Throws AsyncSocketException on error. 364 */ 365 virtual void bind(const SocketAddress& address); 366 367 /** 368 * Bind to the specified address/if name 369 * 370 * This must be called from the primary EventBase thread. 371 * 372 * Throws AsyncSocketException on error. 373 */ 374 virtual void bind(const SocketAddress& address, const std::string& ifName); 375 376 /** 377 * Bind to the specified port for the specified addresses. 378 * 379 * This must be called from the primary EventBase thread. 380 * 381 * Throws AsyncSocketException on error. 382 */ 383 virtual void bind(const std::vector<IPAddress>& ipAddresses, uint16_t port); 384 385 /** 386 * Bind to the specified port for the specified addresses/if names. 387 * 388 * This must be called from the primary EventBase thread. 389 * 390 * Throws AsyncSocketException on error. 391 */ 392 virtual void bind( 393 const std::vector<IPAddressIfNamePair>& addresses, uint16_t port); 394 395 /** 396 * Bind to the specified port. 397 * 398 * This must be called from the primary EventBase thread. 399 * 400 * Throws AsyncSocketException on error. 401 */ 402 virtual void bind(uint16_t port); 403 404 /** 405 * Get the local address to which the socket is bound. 406 * 407 * Throws AsyncSocketException on error. 408 */ 409 void getAddress(SocketAddress* addressReturn) const override; 410 411 /** 412 * Get the local address to which the socket is bound. 413 * 414 * Throws AsyncSocketException on error. 415 */ getAddress()416 SocketAddress getAddress() const { 417 SocketAddress ret; 418 getAddress(&ret); 419 return ret; 420 } 421 422 /** 423 * Get all the local addresses to which the socket is bound. 424 * 425 * Throws AsyncSocketException on error. 426 */ 427 std::vector<SocketAddress> getAddresses() const; 428 429 /** 430 * Begin listening for connections. 431 * 432 * This calls ::listen() with the specified backlog. 433 * 434 * Once listen() is invoked the socket will actually be open so that remote 435 * clients may establish connections. (Clients that attempt to connect 436 * before listen() is called will receive a connection refused error.) 437 * 438 * At least one callback must be set and startAccepting() must be called to 439 * actually begin notifying the accept callbacks of newly accepted 440 * connections. The backlog parameter controls how many connections the 441 * kernel will accept and buffer internally while the accept callbacks are 442 * paused (or if accepting is enabled but the callbacks cannot keep up). 443 * 444 * bind() must be called before calling listen(). 445 * listen() must be called from the primary EventBase thread. 446 * 447 * Throws AsyncSocketException on error. 448 */ 449 virtual void listen(int backlog); 450 451 /** 452 * Add an AcceptCallback. 453 * 454 * When a new socket is accepted, one of the AcceptCallbacks will be invoked 455 * with the new socket. The AcceptCallbacks are invoked in a round-robin 456 * fashion. This allows the accepted sockets to be distributed among a pool 457 * of threads, each running its own EventBase object. This is a common model, 458 * since most asynchronous-style servers typically run one EventBase thread 459 * per CPU. 460 * 461 * The EventBase object associated with each AcceptCallback must be running 462 * its loop. If the EventBase loop is not running, sockets will still be 463 * scheduled for the callback, but the callback cannot actually get invoked 464 * until the loop runs. 465 * 466 * This method must be invoked from the AsyncServerSocket's primary 467 * EventBase thread. 468 * 469 * Note that startAccepting() must be called on the AsyncServerSocket to 470 * cause it to actually start accepting sockets once callbacks have been 471 * installed. 472 * 473 * @param callback The callback to invoke. 474 * @param eventBase The EventBase to use to invoke the callback. This 475 * parameter may be nullptr, in which case the callback will be invoked in 476 * the AsyncServerSocket's primary EventBase. 477 * @param maxAtOnce The maximum number of connections to accept in this 478 * callback on a single iteration of the event base loop. 479 * This only takes effect when eventBase is non-nullptr. 480 * When using a nullptr eventBase for the callback, the 481 * setMaxAcceptAtOnce() method controls how many 482 * connections the main event base will accept at once. 483 */ 484 virtual void addAcceptCallback( 485 AcceptCallback* callback, 486 EventBase* eventBase, 487 uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce); 488 489 /** 490 * Remove an AcceptCallback. 491 * 492 * This allows a single AcceptCallback to be removed from the round-robin 493 * pool. 494 * 495 * This method must be invoked from the AsyncServerSocket's primary 496 * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the 497 * operation in the correct EventBase if your code is not in the server 498 * socket's primary EventBase. 499 * 500 * Given that the accept callback is being driven by a different EventBase, 501 * the AcceptCallback may continue to be invoked for a short period of time 502 * after removeAcceptCallback() returns in this thread. Once the other 503 * EventBase thread receives the notification to stop, it will call 504 * acceptStopped() on the callback to inform it that it is fully stopped and 505 * will not receive any new sockets. 506 * 507 * If the last accept callback is removed while the socket is accepting, 508 * the socket will implicitly pause accepting. If a callback is later added, 509 * it will resume accepting immediately, without requiring startAccepting() 510 * to be invoked. 511 * 512 * @param callback The callback to uninstall. 513 * @param eventBase The EventBase associated with this callback. This must 514 * be the same EventBase that was used when the callback was installed 515 * with addAcceptCallback(). 516 */ 517 void removeAcceptCallback(AcceptCallback* callback, EventBase* eventBase); 518 519 /** 520 * Begin accepting connctions on this socket. 521 * 522 * bind() and listen() must be called before calling startAccepting(). 523 * 524 * When a AsyncServerSocket is initially created, it will not begin 525 * accepting connections until at least one callback has been added and 526 * startAccepting() has been called. startAccepting() can also be used to 527 * resume accepting connections after a call to pauseAccepting(). 528 * 529 * If startAccepting() is called when there are no accept callbacks 530 * installed, the socket will not actually begin accepting until an accept 531 * callback is added. 532 * 533 * This method may only be called from the primary EventBase thread. 534 */ 535 virtual void startAccepting(); 536 537 /** 538 * Pause accepting connections. 539 * 540 * startAccepting() may be called to resume accepting. 541 * 542 * This method may only be called from the primary EventBase thread. 543 * If there are AcceptCallbacks being driven by other EventBase threads they 544 * may continue to receive callbacks for a short period of time after 545 * pauseAccepting() returns. 546 * 547 * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be 548 * called on the AcceptCallback objects simply due to a temporary pause. If 549 * the server socket is later destroyed while paused, acceptStopped() will be 550 * called all of the installed AcceptCallbacks. 551 */ 552 void pauseAccepting(); 553 554 /** 555 * Shutdown the listen socket and notify all callbacks that accept has 556 * stopped, but don't close the socket. This invokes shutdown(2) with the 557 * supplied argument. Passing -1 will close the socket now. Otherwise, the 558 * close will be delayed until this object is destroyed. 559 * 560 * Only use this if you have reason to pass special flags to shutdown. 561 * Otherwise just destroy the socket. 562 * 563 * This method has no effect when a ShutdownSocketSet option is used. 564 * 565 * Returns the result of shutdown on sockets_[n-1] 566 */ 567 int stopAccepting(int shutdownFlags = -1); 568 569 /** 570 * Get the maximum number of connections that will be accepted each time 571 * around the event loop. 572 */ getMaxAcceptAtOnce()573 uint32_t getMaxAcceptAtOnce() const { return maxAcceptAtOnce_; } 574 575 /** 576 * Set the maximum number of connections that will be accepted each time 577 * around the event loop. 578 * 579 * This provides a very coarse-grained way of controlling how fast the 580 * AsyncServerSocket will accept connections. If you find that when your 581 * server is overloaded AsyncServerSocket accepts connections more quickly 582 * than your code can process them, you can try lowering this number so that 583 * fewer connections will be accepted each event loop iteration. 584 * 585 * For more explicit control over the accept rate, you can also use 586 * pauseAccepting() to temporarily pause accepting when your server is 587 * overloaded, and then use startAccepting() later to resume accepting. 588 */ setMaxAcceptAtOnce(uint32_t numConns)589 void setMaxAcceptAtOnce(uint32_t numConns) { maxAcceptAtOnce_ = numConns; } 590 591 /** 592 * Get the duration after which new connection messages will be dropped from 593 * the NotificationQueue if it has not started processing yet. 594 */ 595 const folly::observer::AtomicObserver<std::chrono::nanoseconds>& getQueueTimeout()596 getQueueTimeout() const { 597 return queueTimeout_; 598 } 599 600 /** 601 * Set the duration after which new connection messages will be dropped from 602 * the NotificationQueue if it has not started processing yet. 603 * 604 * This avoids the NotificationQueue from processing messages where the client 605 * socket has probably timed out already, or will time out before a response 606 * can be sent. 607 * 608 * The default value (of 0) means that messages will never expire. 609 */ setQueueTimeout(folly::observer::Observer<std::chrono::nanoseconds> duration)610 void setQueueTimeout( 611 folly::observer::Observer<std::chrono::nanoseconds> duration) { 612 queueTimeout_ = duration; 613 } setQueueTimeout(std::chrono::nanoseconds duration)614 void setQueueTimeout(std::chrono::nanoseconds duration) { 615 setQueueTimeout(folly::observer::makeStaticObserver(duration)); 616 } 617 618 /** 619 * Get the maximum number of unprocessed messages which a NotificationQueue 620 * can hold. 621 */ getMaxNumMessagesInQueue()622 uint32_t getMaxNumMessagesInQueue() const { return maxNumMsgsInQueue_; } 623 624 /** 625 * Set the maximum number of unprocessed messages in NotificationQueue. 626 * No new message will be sent to that NotificationQueue if there are more 627 * than such number of unprocessed messages in that queue. 628 */ setMaxNumMessagesInQueue(uint32_t num)629 void setMaxNumMessagesInQueue(uint32_t num) { maxNumMsgsInQueue_ = num; } 630 631 /** 632 * Get the speed of adjusting connection accept rate. 633 */ getAcceptRateAdjustSpeed()634 double getAcceptRateAdjustSpeed() const { return acceptRateAdjustSpeed_; } 635 636 /** 637 * Set the speed of adjusting connection accept rate. 638 */ setAcceptRateAdjustSpeed(double speed)639 void setAcceptRateAdjustSpeed(double speed) { 640 acceptRateAdjustSpeed_ = speed; 641 } 642 643 /** 644 * Enable/Disable TOS reflection for the server socket 645 */ 646 void setTosReflect(bool enable); 647 getTosReflect()648 bool getTosReflect() { return tosReflect_; } 649 650 /** 651 * Set/Get default TOS for listener socket 652 */ 653 void setListenerTos(uint32_t tos); 654 getListenerTos()655 uint32_t getListenerTos() const { return listenerTos_; } 656 657 /** 658 * Get the number of connections dropped by the AsyncServerSocket 659 */ getNumDroppedConnections()660 std::size_t getNumDroppedConnections() const { 661 return numDroppedConnections_; 662 } 663 664 /** 665 * Get the current number of unprocessed messages in NotificationQueue. 666 * 667 * This method must be invoked from the AsyncServerSocket's primary 668 * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the 669 * operation in the correct EventBase if your code is not in the server 670 * socket's primary EventBase. 671 */ getNumPendingMessagesInQueue()672 int64_t getNumPendingMessagesInQueue() const { 673 if (eventBase_) { 674 eventBase_->dcheckIsInEventBaseThread(); 675 } 676 int64_t numMsgs = 0; 677 for (const auto& callback : callbacks_) { 678 if (callback.consumer) { 679 numMsgs += callback.consumer->getQueue().size(); 680 } 681 } 682 return numMsgs; 683 } 684 685 /** 686 * Set whether or not SO_KEEPALIVE should be enabled on the server socket 687 * (and thus on all subsequently-accepted connections). By default, keepalive 688 * is enabled. 689 * 690 * Note that TCP keepalive usually only kicks in after the connection has 691 * been idle for several hours. Applications should almost always have their 692 * own, shorter idle timeout. 693 */ setKeepAliveEnabled(bool enabled)694 void setKeepAliveEnabled(bool enabled) { 695 keepAliveEnabled_ = enabled; 696 697 for (auto& handler : sockets_) { 698 if (handler.socket_ == NetworkSocket()) { 699 continue; 700 } 701 702 int val = (enabled) ? 1 : 0; 703 if (netops::setsockopt( 704 handler.socket_, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 705 0) { 706 LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s" 707 << errnoStr(errno); 708 } 709 } 710 } 711 712 /** 713 * Get whether or not SO_KEEPALIVE is enabled on the server socket. 714 */ getKeepAliveEnabled()715 bool getKeepAliveEnabled() const { return keepAliveEnabled_; } 716 717 /** 718 * Set whether or not SO_REUSEPORT should be enabled on the server socket, 719 * allowing multiple binds to the same port 720 */ setReusePortEnabled(bool enabled)721 void setReusePortEnabled(bool enabled) { 722 reusePortEnabled_ = enabled; 723 724 for (auto& handler : sockets_) { 725 if (handler.socket_ == NetworkSocket()) { 726 continue; 727 } 728 729 int val = (enabled) ? 1 : 0; 730 if (netops::setsockopt( 731 handler.socket_, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)) != 732 0) { 733 auto errnoCopy = errno; 734 LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket " 735 << errnoCopy; 736 folly::throwSystemErrorExplicit( 737 errnoCopy, "failed to set SO_REUSEPORT on async server socket"); 738 } 739 } 740 } 741 742 /** 743 * Get whether or not SO_REUSEPORT is enabled on the server socket. 744 */ getReusePortEnabled_()745 bool getReusePortEnabled_() const { return reusePortEnabled_; } 746 747 /** 748 * Set whether or not the socket should close during exec() (FD_CLOEXEC). By 749 * default, this is enabled 750 */ setCloseOnExec(bool closeOnExec)751 void setCloseOnExec(bool closeOnExec) { closeOnExec_ = closeOnExec; } 752 753 /** 754 * Get whether or not FD_CLOEXEC is enabled on the server socket. 755 */ getCloseOnExec()756 bool getCloseOnExec() const { return closeOnExec_; } 757 758 /** 759 * Tries to enable TFO if the machine supports it. 760 */ setTFOEnabled(bool enabled,uint32_t maxTFOQueueSize)761 void setTFOEnabled(bool enabled, uint32_t maxTFOQueueSize) { 762 tfo_ = enabled; 763 tfoMaxQueueSize_ = maxTFOQueueSize; 764 } 765 766 /** 767 * Do not attempt the transparent TLS handshake 768 */ disableTransparentTls()769 void disableTransparentTls() { noTransparentTls_ = true; } 770 771 /** 772 * Get whether or not the socket is accepting new connections 773 */ getAccepting()774 bool getAccepting() const { return accepting_; } 775 776 /** 777 * Set the ConnectionEventCallback 778 */ setConnectionEventCallback(ConnectionEventCallback * const connectionEventCallback)779 void setConnectionEventCallback( 780 ConnectionEventCallback* const connectionEventCallback) { 781 connectionEventCallback_ = connectionEventCallback; 782 } 783 784 /** 785 * Get the ConnectionEventCallback 786 */ getConnectionEventCallback()787 ConnectionEventCallback* getConnectionEventCallback() const { 788 return connectionEventCallback_; 789 } 790 791 protected: 792 /** 793 * Protected destructor. 794 * 795 * Invoke destroy() instead to destroy the AsyncServerSocket. 796 */ 797 ~AsyncServerSocket() override; 798 799 private: 800 class RemoteAcceptor; 801 802 struct NewConnMessage { 803 NetworkSocket fd; 804 SocketAddress clientAddr; 805 std::chrono::steady_clock::time_point deadline; 806 std::chrono::steady_clock::time_point timeBeforeEnqueue; 807 isExpiredNewConnMessage808 bool isExpired() const { 809 return deadline.time_since_epoch().count() != 0 && 810 std::chrono::steady_clock::now() > deadline; 811 } 812 813 AtomicNotificationQueueTaskStatus operator()( 814 RemoteAcceptor& acceptor) noexcept; 815 }; 816 817 struct ErrorMessage { 818 int err; 819 std::string msg; 820 821 AtomicNotificationQueueTaskStatus operator()( 822 RemoteAcceptor& acceptor) noexcept; 823 }; 824 825 using QueueMessage = boost::variant<NewConnMessage, ErrorMessage>; 826 827 /** 828 * A class to receive notifications to invoke AcceptCallback objects 829 * in other EventBase threads. 830 * 831 * A RemoteAcceptor object is created for each AcceptCallback that 832 * is installed in a separate EventBase thread. The RemoteAcceptor 833 * receives notification of new sockets via a NotificationQueue, 834 * and then invokes the AcceptCallback. 835 */ 836 class RemoteAcceptor { 837 struct Consumer { operatorConsumer838 AtomicNotificationQueueTaskStatus operator()( 839 QueueMessage&& msg) noexcept { 840 return boost::apply_visitor( 841 [this](auto&& visitMsg) { return visitMsg(acceptor_); }, msg); 842 } 843 ConsumerConsumer844 explicit Consumer(RemoteAcceptor& acceptor) : acceptor_(acceptor) {} 845 RemoteAcceptor& acceptor_; 846 }; 847 848 friend NewConnMessage; 849 friend ErrorMessage; 850 851 public: 852 using Queue = EventBaseAtomicNotificationQueue<QueueMessage, Consumer>; 853 RemoteAcceptor(AcceptCallback * callback,ConnectionEventCallback * connectionEventCallback)854 explicit RemoteAcceptor( 855 AcceptCallback* callback, 856 ConnectionEventCallback* connectionEventCallback) 857 : callback_(callback), 858 connectionEventCallback_(connectionEventCallback), 859 queue_(Consumer(*this)) {} 860 861 void start(EventBase* eventBase, uint32_t maxAtOnce); 862 void stop(EventBase* eventBase, AcceptCallback* callback); 863 getQueue()864 Queue& getQueue() { return queue_; } 865 866 private: 867 AcceptCallback* callback_; 868 ConnectionEventCallback* connectionEventCallback_; 869 Queue queue_; 870 }; 871 872 /** 873 * A struct to keep track of the callbacks associated with this server 874 * socket. 875 */ 876 struct CallbackInfo { CallbackInfoCallbackInfo877 CallbackInfo(AcceptCallback* cb, EventBase* evb) 878 : callback(cb), eventBase(evb), consumer(nullptr) {} 879 880 AcceptCallback* callback; 881 EventBase* eventBase; 882 883 RemoteAcceptor* consumer; 884 }; 885 886 class BackoffTimeout; 887 888 virtual void handlerReady( 889 uint16_t events, NetworkSocket fd, sa_family_t family) noexcept; 890 891 NetworkSocket createSocket(int family); 892 void setupSocket(NetworkSocket fd, int family); 893 void bindInternal(const SocketAddress& address, const std::string& ifName); 894 void bindSocket( 895 NetworkSocket fd, 896 const SocketAddress& address, 897 bool isExistingSocket, 898 const std::string& ifName); 899 void dispatchSocket(NetworkSocket socket, SocketAddress&& address); 900 void dispatchError(const char* msg, int errnoValue); 901 void enterBackoff(); 902 void backoffTimeoutExpired(); 903 nextCallback()904 CallbackInfo* nextCallback() { 905 CallbackInfo* info = &callbacks_[callbackIndex_]; 906 907 ++callbackIndex_; 908 if (callbackIndex_ >= callbacks_.size()) { 909 callbackIndex_ = 0; 910 } 911 912 return info; 913 } 914 915 struct ServerEventHandler : public EventHandler { ServerEventHandlerServerEventHandler916 ServerEventHandler( 917 EventBase* eventBase, 918 NetworkSocket socket, 919 AsyncServerSocket* parent, 920 sa_family_t addressFamily) 921 : EventHandler(eventBase, socket), 922 eventBase_(eventBase), 923 socket_(socket), 924 parent_(parent), 925 addressFamily_(addressFamily) {} 926 ServerEventHandlerServerEventHandler927 ServerEventHandler(const ServerEventHandler& other) 928 : EventHandler(other.eventBase_, other.socket_), 929 eventBase_(other.eventBase_), 930 socket_(other.socket_), 931 parent_(other.parent_), 932 addressFamily_(other.addressFamily_) {} 933 934 ServerEventHandler& operator=(const ServerEventHandler& other) { 935 if (this != &other) { 936 eventBase_ = other.eventBase_; 937 socket_ = other.socket_; 938 parent_ = other.parent_; 939 addressFamily_ = other.addressFamily_; 940 941 detachEventBase(); 942 attachEventBase(other.eventBase_); 943 changeHandlerFD(other.socket_); 944 } 945 return *this; 946 } 947 948 // Inherited from EventHandler handlerReadyServerEventHandler949 void handlerReady(uint16_t events) noexcept override { 950 parent_->handlerReady(events, socket_, addressFamily_); 951 } 952 953 EventBase* eventBase_; 954 NetworkSocket socket_; 955 AsyncServerSocket* parent_; 956 sa_family_t addressFamily_; 957 }; 958 959 EventBase* eventBase_; 960 std::vector<ServerEventHandler> sockets_; 961 std::vector<NetworkSocket> pendingCloseSockets_; 962 bool accepting_; 963 uint32_t maxAcceptAtOnce_; 964 uint32_t maxNumMsgsInQueue_; 965 double acceptRateAdjustSpeed_; // 0 to disable auto adjust 966 double acceptRate_; 967 std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_; 968 std::size_t numDroppedConnections_; 969 uint32_t callbackIndex_; 970 BackoffTimeout* backoffTimeout_; 971 std::vector<CallbackInfo> callbacks_; 972 bool keepAliveEnabled_; 973 bool reusePortEnabled_{false}; 974 bool closeOnExec_; 975 bool tfo_{false}; 976 bool noTransparentTls_{false}; 977 uint32_t tfoMaxQueueSize_{0}; 978 std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_; 979 ConnectionEventCallback* connectionEventCallback_{nullptr}; 980 bool tosReflect_{false}; 981 uint32_t listenerTos_{0}; 982 bool zeroCopyVal_{false}; 983 folly::observer::AtomicObserver<std::chrono::nanoseconds> queueTimeout_{ 984 folly::observer::makeStaticObserver(std::chrono::nanoseconds::zero())}; 985 }; 986 987 } // namespace folly 988