1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/AsyncServerSocket.h>
22
23 #include <sys/types.h>
24
25 #include <cerrno>
26 #include <cstring>
27
28 #include <folly/FileUtil.h>
29 #include <folly/GLog.h>
30 #include <folly/Portability.h>
31 #include <folly/SocketAddress.h>
32 #include <folly/String.h>
33 #include <folly/detail/SocketFastOpen.h>
34 #include <folly/io/async/EventBase.h>
35 #include <folly/io/async/NotificationQueue.h>
36 #include <folly/portability/Fcntl.h>
37 #include <folly/portability/Sockets.h>
38 #include <folly/portability/Unistd.h>
39
40 namespace folly {
41
42 #ifndef TCP_SAVE_SYN
43 #define TCP_SAVE_SYN 27
44 #endif
45
46 #ifndef TCP_SAVED_SYN
47 #define TCP_SAVED_SYN 28
48 #endif
49
50 static constexpr bool msgErrQueueSupported =
51 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
52 true;
53 #else
54 false;
55 #endif // FOLLY_HAVE_MSG_ERRQUEUE
56
57 const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce;
58 const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce;
59 const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue;
60
start(EventBase * eventBase,uint32_t maxAtOnce)61 void AsyncServerSocket::RemoteAcceptor::start(
62 EventBase* eventBase, uint32_t maxAtOnce) {
63 queue_.setMaxReadAtOnce(maxAtOnce);
64
65 eventBase->runInEventBaseThread([=]() {
66 callback_->acceptStarted();
67 queue_.startConsuming(eventBase);
68 });
69 }
70
stop(EventBase * eventBase,AcceptCallback * callback)71 void AsyncServerSocket::RemoteAcceptor::stop(
72 EventBase* eventBase, AcceptCallback* callback) {
73 eventBase->runInEventBaseThread([=]() {
74 callback->acceptStopped();
75 delete this;
76 });
77 }
78
operator ()(RemoteAcceptor & acceptor)79 AtomicNotificationQueueTaskStatus AsyncServerSocket::NewConnMessage::operator()(
80 RemoteAcceptor& acceptor) noexcept {
81 if (isExpired()) {
82 closeNoInt(fd);
83 if (acceptor.connectionEventCallback_) {
84 acceptor.connectionEventCallback_->onConnectionDropped(fd, clientAddr);
85 }
86 return AtomicNotificationQueueTaskStatus::DISCARD;
87 }
88 if (acceptor.connectionEventCallback_) {
89 acceptor.connectionEventCallback_->onConnectionDequeuedByAcceptorCallback(
90 fd, clientAddr);
91 }
92 acceptor.callback_->connectionAccepted(fd, clientAddr, {timeBeforeEnqueue});
93 return AtomicNotificationQueueTaskStatus::CONSUMED;
94 }
95
operator ()(RemoteAcceptor & acceptor)96 AtomicNotificationQueueTaskStatus AsyncServerSocket::ErrorMessage::operator()(
97 RemoteAcceptor& acceptor) noexcept {
98 auto ex = make_exception_wrapper<std::runtime_error>(msg);
99 acceptor.callback_->acceptError(std::move(ex));
100 return AtomicNotificationQueueTaskStatus::CONSUMED;
101 }
102
103 /*
104 * AsyncServerSocket::BackoffTimeout
105 */
106 class AsyncServerSocket::BackoffTimeout : public AsyncTimeout {
107 public:
108 // Disallow copy, move, and default constructors.
109 BackoffTimeout(BackoffTimeout&&) = delete;
BackoffTimeout(AsyncServerSocket * socket)110 explicit BackoffTimeout(AsyncServerSocket* socket)
111 : AsyncTimeout(socket->getEventBase()), socket_(socket) {}
112
timeoutExpired()113 void timeoutExpired() noexcept override { socket_->backoffTimeoutExpired(); }
114
115 private:
116 AsyncServerSocket* socket_;
117 };
118
119 /*
120 * AsyncServerSocket methods
121 */
122
AsyncServerSocket(EventBase * eventBase)123 AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
124 : eventBase_(eventBase),
125 accepting_(false),
126 maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
127 maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
128 acceptRateAdjustSpeed_(0),
129 acceptRate_(1),
130 lastAccepTimestamp_(std::chrono::steady_clock::now()),
131 numDroppedConnections_(0),
132 callbackIndex_(0),
133 backoffTimeout_(nullptr),
134 callbacks_(),
135 keepAliveEnabled_(true),
136 closeOnExec_(true) {
137 disableTransparentTls();
138 }
139
setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet> & wNewSS)140 void AsyncServerSocket::setShutdownSocketSet(
141 const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
142 const auto newSS = wNewSS.lock();
143 const auto shutdownSocketSet = wShutdownSocketSet_.lock();
144
145 if (shutdownSocketSet == newSS) {
146 return;
147 }
148
149 if (shutdownSocketSet) {
150 for (auto& h : sockets_) {
151 shutdownSocketSet->remove(h.socket_);
152 }
153 }
154
155 if (newSS) {
156 for (auto& h : sockets_) {
157 newSS->add(h.socket_);
158 }
159 }
160
161 wShutdownSocketSet_ = wNewSS;
162 }
163
~AsyncServerSocket()164 AsyncServerSocket::~AsyncServerSocket() {
165 assert(callbacks_.empty());
166 }
167
stopAccepting(int shutdownFlags)168 int AsyncServerSocket::stopAccepting(int shutdownFlags) {
169 int result = 0;
170 for (auto& handler : sockets_) {
171 VLOG(10) << "AsyncServerSocket::stopAccepting " << this << handler.socket_;
172 }
173 if (eventBase_) {
174 eventBase_->dcheckIsInEventBaseThread();
175 }
176
177 // When destroy is called, unregister and close the socket immediately.
178 accepting_ = false;
179
180 // Close the sockets in reverse order as they were opened to avoid
181 // the condition where another process concurrently tries to open
182 // the same port, succeed to bind the first socket but fails on the
183 // second because it hasn't been closed yet.
184 for (; !sockets_.empty(); sockets_.pop_back()) {
185 auto& handler = sockets_.back();
186 handler.unregisterHandler();
187 if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
188 shutdownSocketSet->close(handler.socket_);
189 } else if (shutdownFlags >= 0) {
190 result = shutdownNoInt(handler.socket_, shutdownFlags);
191 pendingCloseSockets_.push_back(handler.socket_);
192 } else {
193 closeNoInt(handler.socket_);
194 }
195 }
196
197 // Destroy the backoff timout. This will cancel it if it is running.
198 delete backoffTimeout_;
199 backoffTimeout_ = nullptr;
200
201 // Close all of the callback queues to notify them that they are being
202 // destroyed. No one should access the AsyncServerSocket any more once
203 // destroy() is called. However, clear out callbacks_ before invoking the
204 // accept callbacks just in case. This will potentially help us detect the
205 // bug if one of the callbacks calls addAcceptCallback() or
206 // removeAcceptCallback().
207 std::vector<CallbackInfo> callbacksCopy;
208 callbacks_.swap(callbacksCopy);
209 for (const auto& callback : callbacksCopy) {
210 // consumer may not be set if we are running in primary event base
211 if (callback.consumer) {
212 DCHECK(callback.eventBase);
213 callback.consumer->stop(callback.eventBase, callback.callback);
214 } else {
215 DCHECK(callback.callback);
216 callback.callback->acceptStopped();
217 }
218 }
219
220 return result;
221 }
222
destroy()223 void AsyncServerSocket::destroy() {
224 stopAccepting();
225 for (auto s : pendingCloseSockets_) {
226 closeNoInt(s);
227 }
228 // Then call DelayedDestruction::destroy() to take care of
229 // whether or not we need immediate or delayed destruction
230 DelayedDestruction::destroy();
231 }
232
attachEventBase(EventBase * eventBase)233 void AsyncServerSocket::attachEventBase(EventBase* eventBase) {
234 assert(eventBase_ == nullptr);
235 eventBase->dcheckIsInEventBaseThread();
236
237 eventBase_ = eventBase;
238 for (auto& handler : sockets_) {
239 handler.attachEventBase(eventBase);
240 }
241 }
242
detachEventBase()243 void AsyncServerSocket::detachEventBase() {
244 assert(eventBase_ != nullptr);
245 eventBase_->dcheckIsInEventBaseThread();
246 assert(!accepting_);
247
248 eventBase_ = nullptr;
249 for (auto& handler : sockets_) {
250 handler.detachEventBase();
251 }
252 }
253
useExistingSockets(const std::vector<NetworkSocket> & fds)254 void AsyncServerSocket::useExistingSockets(
255 const std::vector<NetworkSocket>& fds) {
256 if (eventBase_) {
257 eventBase_->dcheckIsInEventBaseThread();
258 }
259
260 if (!sockets_.empty()) {
261 throw std::invalid_argument(
262 "cannot call useExistingSocket() on a "
263 "AsyncServerSocket that already has a socket");
264 }
265
266 for (auto fd : fds) {
267 // Set addressFamily_ from this socket.
268 // Note that the socket may not have been bound yet, but
269 // setFromLocalAddress() will still work and get the correct address family.
270 // We will update addressFamily_ again anyway if bind() is called later.
271 SocketAddress address;
272 address.setFromLocalAddress(fd);
273
274 #if defined(__linux__)
275 if (noTransparentTls_) {
276 // Ignore return value, errors are ok
277 netops::setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
278 }
279 #endif
280
281 setupSocket(fd, address.getFamily());
282 sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
283 sockets_.back().changeHandlerFD(fd);
284 }
285 }
286
useExistingSocket(NetworkSocket fd)287 void AsyncServerSocket::useExistingSocket(NetworkSocket fd) {
288 useExistingSockets({fd});
289 }
290
bindSocket(NetworkSocket fd,const SocketAddress & address,bool isExistingSocket,const std::string & ifName)291 void AsyncServerSocket::bindSocket(
292 NetworkSocket fd,
293 const SocketAddress& address,
294 bool isExistingSocket,
295 const std::string& ifName) {
296 sockaddr_storage addrStorage;
297 address.getAddress(&addrStorage);
298 auto saddr = reinterpret_cast<sockaddr*>(&addrStorage);
299
300 #if defined(__linux__)
301 if (!ifName.empty() &&
302 netops::setsockopt(
303 fd, SOL_SOCKET, SO_BINDTODEVICE, ifName.c_str(), ifName.length())) {
304 auto errnoCopy = errno;
305 if (!isExistingSocket) {
306 closeNoInt(fd);
307 }
308 folly::throwSystemErrorExplicit(
309 errnoCopy, "failed to bind to device: " + ifName);
310 }
311 #else
312 (void)ifName;
313 #endif
314
315 if (netops::bind(fd, saddr, address.getActualSize()) != 0) {
316 if (errno != EINPROGRESS) {
317 // Get a copy of errno so that it is not overwritten by subsequent calls.
318 auto errnoCopy = errno;
319 if (!isExistingSocket) {
320 closeNoInt(fd);
321 }
322 folly::throwSystemErrorExplicit(
323 errnoCopy,
324 "failed to bind to async server socket: " + address.describe());
325 }
326 }
327
328 #if defined(__linux__)
329 if (noTransparentTls_) {
330 // Ignore return value, errors are ok
331 netops::setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
332 }
333 #endif
334
335 // If we just created this socket, update the EventHandler and set socket_
336 if (!isExistingSocket) {
337 sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
338 }
339 }
340
setZeroCopy(bool enable)341 bool AsyncServerSocket::setZeroCopy(bool enable) {
342 if (msgErrQueueSupported) {
343 // save the enable flag here
344 zeroCopyVal_ = enable;
345 int val = enable ? 1 : 0;
346 size_t num = 0;
347 for (auto& s : sockets_) {
348 int ret = netops::setsockopt(
349 s.socket_, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
350
351 num += (0 == ret) ? 1 : 0;
352 }
353
354 return num != 0;
355 }
356
357 return false;
358 }
359
bindInternal(const SocketAddress & address,const std::string & ifName)360 void AsyncServerSocket::bindInternal(
361 const SocketAddress& address, const std::string& ifName) {
362 if (eventBase_) {
363 eventBase_->dcheckIsInEventBaseThread();
364 }
365
366 // useExistingSocket() may have been called to initialize socket_ already.
367 // However, in the normal case we need to create a new socket now.
368 // Don't set socket_ yet, so that socket_ will remain uninitialized if an
369 // error occurs.
370 NetworkSocket fd;
371 if (sockets_.empty()) {
372 fd = createSocket(address.getFamily());
373 } else if (sockets_.size() == 1) {
374 if (address.getFamily() != sockets_[0].addressFamily_) {
375 throw std::invalid_argument(
376 "Attempted to bind address to socket with "
377 "different address family");
378 }
379 fd = sockets_[0].socket_;
380 } else {
381 throw std::invalid_argument("Attempted to bind to multiple fds");
382 }
383
384 bindSocket(fd, address, !sockets_.empty(), ifName);
385 }
386
bind(const SocketAddress & address)387 void AsyncServerSocket::bind(const SocketAddress& address) {
388 bindInternal(address, "");
389 }
390
bind(const SocketAddress & address,const std::string & ifName)391 void AsyncServerSocket::bind(
392 const SocketAddress& address, const std::string& ifName) {
393 bindInternal(address, ifName);
394 }
395
bind(const std::vector<IPAddress> & ipAddresses,uint16_t port)396 void AsyncServerSocket::bind(
397 const std::vector<IPAddress>& ipAddresses, uint16_t port) {
398 if (ipAddresses.empty()) {
399 throw std::invalid_argument("No ip addresses were provided");
400 }
401 if (eventBase_) {
402 eventBase_->dcheckIsInEventBaseThread();
403 }
404
405 for (const IPAddress& ipAddress : ipAddresses) {
406 SocketAddress address(ipAddress.toFullyQualified(), port);
407 auto fd = createSocket(address.getFamily());
408
409 bindSocket(fd, address, false, "");
410 }
411 if (sockets_.empty()) {
412 throw std::runtime_error(
413 "did not bind any async server socket for port and addresses");
414 }
415 }
416
bind(const std::vector<IPAddressIfNamePair> & addresses,uint16_t port)417 void AsyncServerSocket::bind(
418 const std::vector<IPAddressIfNamePair>& addresses, uint16_t port) {
419 if (addresses.empty()) {
420 throw std::invalid_argument("No ip addresses were provided");
421 }
422 if (eventBase_) {
423 eventBase_->dcheckIsInEventBaseThread();
424 }
425
426 for (const auto& addr : addresses) {
427 SocketAddress address(addr.first.toFullyQualified(), port);
428 auto fd = createSocket(address.getFamily());
429
430 bindSocket(fd, address, false, addr.second);
431 }
432 if (sockets_.empty()) {
433 throw std::runtime_error(
434 "did not bind any async server socket for port and addresses");
435 }
436 }
437
bind(uint16_t port)438 void AsyncServerSocket::bind(uint16_t port) {
439 struct addrinfo hints, *res0;
440 char sport[sizeof("65536")];
441
442 memset(&hints, 0, sizeof(hints));
443 hints.ai_family = AF_UNSPEC;
444 hints.ai_socktype = SOCK_STREAM;
445 hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
446 snprintf(sport, sizeof(sport), "%u", port);
447
448 // On Windows the value we need to pass to bind to all available
449 // addresses is an empty string. Everywhere else, it's nullptr.
450 constexpr const char* kWildcardNode = kIsWindows ? "" : nullptr;
451 if (getaddrinfo(kWildcardNode, sport, &hints, &res0)) {
452 throw std::invalid_argument(
453 "Attempted to bind address to socket with "
454 "bad getaddrinfo");
455 }
456
457 SCOPE_EXIT { freeaddrinfo(res0); };
458
459 auto setupAddress = [&](struct addrinfo* res) {
460 auto s = netops::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
461 // IPv6/IPv4 may not be supported by the kernel
462 if (s == NetworkSocket() && errno == EAFNOSUPPORT) {
463 return;
464 }
465 CHECK_NE(s, NetworkSocket());
466
467 try {
468 setupSocket(s, res->ai_family);
469 } catch (...) {
470 closeNoInt(s);
471 throw;
472 }
473
474 if (res->ai_family == AF_INET6) {
475 int v6only = 1;
476 CHECK(
477 0 ==
478 netops::setsockopt(
479 s, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)));
480 }
481
482 // Bind to the socket
483 if (netops::bind(s, res->ai_addr, socklen_t(res->ai_addrlen)) != 0) {
484 folly::throwSystemError(
485 errno,
486 "failed to bind to async server socket for port ",
487 SocketAddress::getPortFrom(res->ai_addr),
488 " family ",
489 SocketAddress::getFamilyNameFrom(res->ai_addr, "<unknown>"));
490 }
491
492 #if defined(__linux__)
493 if (noTransparentTls_) {
494 // Ignore return value, errors are ok
495 netops::setsockopt(s, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
496 }
497 #endif
498
499 SocketAddress address;
500 address.setFromLocalAddress(s);
501
502 sockets_.emplace_back(eventBase_, s, this, address.getFamily());
503 };
504
505 const int kNumTries = 25;
506 for (int tries = 1; true; tries++) {
507 // Prefer AF_INET6 addresses. RFC 3484 mandates that getaddrinfo
508 // should return IPv6 first and then IPv4 addresses, but glibc's
509 // getaddrinfo(nullptr) with AI_PASSIVE returns:
510 // - 0.0.0.0 (IPv4-only)
511 // - :: (IPv6+IPv4) in this order
512 // See: https://sourceware.org/bugzilla/show_bug.cgi?id=9981
513 for (struct addrinfo* res = res0; res; res = res->ai_next) {
514 if (res->ai_family == AF_INET6) {
515 setupAddress(res);
516 }
517 }
518
519 // If port == 0, then we should try to bind to the same port on ipv4 and
520 // ipv6. So if we did bind to ipv6, figure out that port and use it.
521 if (sockets_.size() == 1 && port == 0) {
522 SocketAddress address;
523 address.setFromLocalAddress(sockets_.back().socket_);
524 snprintf(sport, sizeof(sport), "%u", address.getPort());
525 freeaddrinfo(res0);
526 CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
527 }
528
529 try {
530 for (struct addrinfo* res = res0; res; res = res->ai_next) {
531 if (res->ai_family != AF_INET6) {
532 setupAddress(res);
533 }
534 }
535 } catch (const std::system_error&) {
536 // If we can't bind to the same port on ipv4 as ipv6 when using
537 // port=0 then we will retry again before giving up after
538 // kNumTries attempts. We do this by closing the sockets that
539 // were opened, then restarting from scratch.
540 if (port == 0 && !sockets_.empty() && tries != kNumTries) {
541 for (const auto& socket : sockets_) {
542 if (socket.socket_ == NetworkSocket()) {
543 continue;
544 } else if (
545 const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
546 shutdownSocketSet->close(socket.socket_);
547 } else {
548 closeNoInt(socket.socket_);
549 }
550 }
551 sockets_.clear();
552 snprintf(sport, sizeof(sport), "%u", port);
553 freeaddrinfo(res0);
554 CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
555 continue;
556 }
557
558 throw;
559 }
560
561 break;
562 }
563
564 if (sockets_.empty()) {
565 throw std::runtime_error("did not bind any async server socket for port");
566 }
567 }
568
listen(int backlog)569 void AsyncServerSocket::listen(int backlog) {
570 if (eventBase_) {
571 eventBase_->dcheckIsInEventBaseThread();
572 }
573
574 // Start listening
575 for (auto& handler : sockets_) {
576 if (netops::listen(handler.socket_, backlog) == -1) {
577 folly::throwSystemError(errno, "failed to listen on async server socket");
578 }
579 }
580 }
581
getAddress(SocketAddress * addressReturn) const582 void AsyncServerSocket::getAddress(SocketAddress* addressReturn) const {
583 CHECK(!sockets_.empty());
584 VLOG_IF(2, sockets_.size() > 1)
585 << "Warning: getAddress() called and multiple addresses available ("
586 << sockets_.size() << "). Returning only the first one.";
587
588 addressReturn->setFromLocalAddress(sockets_[0].socket_);
589 }
590
getAddresses() const591 std::vector<SocketAddress> AsyncServerSocket::getAddresses() const {
592 CHECK(!sockets_.empty());
593 auto tsaVec = std::vector<SocketAddress>(sockets_.size());
594 auto tsaIter = tsaVec.begin();
595 for (const auto& socket : sockets_) {
596 (tsaIter++)->setFromLocalAddress(socket.socket_);
597 };
598 return tsaVec;
599 }
600
addAcceptCallback(AcceptCallback * callback,EventBase * eventBase,uint32_t maxAtOnce)601 void AsyncServerSocket::addAcceptCallback(
602 AcceptCallback* callback, EventBase* eventBase, uint32_t maxAtOnce) {
603 if (eventBase_) {
604 eventBase_->dcheckIsInEventBaseThread();
605 }
606
607 // If this is the first accept callback and we are supposed to be accepting,
608 // start accepting once the callback is installed.
609 bool runStartAccepting = accepting_ && callbacks_.empty();
610
611 callbacks_.emplace_back(callback, eventBase);
612
613 SCOPE_SUCCESS {
614 // If this is the first accept callback and we are supposed to be accepting,
615 // start accepting.
616 if (runStartAccepting) {
617 startAccepting();
618 }
619 };
620
621 if (!eventBase) {
622 // Run in AsyncServerSocket's eventbase; notify that we are
623 // starting to accept connections
624 callback->acceptStarted();
625 return;
626 }
627
628 // Start the remote acceptor.
629 //
630 // It would be nice if we could avoid starting the remote acceptor if
631 // eventBase == eventBase_. However, that would cause issues if
632 // detachEventBase() and attachEventBase() were ever used to change the
633 // primary EventBase for the server socket. Therefore we require the caller
634 // to specify a nullptr EventBase if they want to ensure that the callback is
635 // always invoked in the primary EventBase, and to be able to invoke that
636 // callback more efficiently without having to use a notification queue.
637 RemoteAcceptor* acceptor = nullptr;
638 try {
639 acceptor = new RemoteAcceptor(callback, connectionEventCallback_);
640 acceptor->start(eventBase, maxAtOnce);
641 } catch (...) {
642 callbacks_.pop_back();
643 delete acceptor;
644 throw;
645 }
646 callbacks_.back().consumer = acceptor;
647 }
648
removeAcceptCallback(AcceptCallback * callback,EventBase * eventBase)649 void AsyncServerSocket::removeAcceptCallback(
650 AcceptCallback* callback, EventBase* eventBase) {
651 if (eventBase_) {
652 eventBase_->dcheckIsInEventBaseThread();
653 }
654
655 // Find the matching AcceptCallback.
656 // We just do a simple linear search; we don't expect removeAcceptCallback()
657 // to be called frequently, and we expect there to only be a small number of
658 // callbacks anyway.
659 auto it = callbacks_.begin();
660 uint32_t n = 0;
661 while (true) {
662 if (it == callbacks_.end()) {
663 throw std::runtime_error(
664 "AsyncServerSocket::removeAcceptCallback(): "
665 "accept callback not found");
666 }
667 if (it->callback == callback &&
668 (it->eventBase == eventBase || eventBase == nullptr)) {
669 break;
670 }
671 ++it;
672 ++n;
673 }
674
675 // Remove this callback from callbacks_.
676 //
677 // Do this before invoking the acceptStopped() callback, in case
678 // acceptStopped() invokes one of our methods that examines callbacks_.
679 //
680 // Save a copy of the CallbackInfo first.
681 CallbackInfo info(*it);
682 callbacks_.erase(it);
683 if (n < callbackIndex_) {
684 // We removed an element before callbackIndex_. Move callbackIndex_ back
685 // one step, since things after n have been shifted back by 1.
686 --callbackIndex_;
687 } else {
688 // We removed something at or after callbackIndex_.
689 // If we removed the last element and callbackIndex_ was pointing at it,
690 // we need to reset callbackIndex_ to 0.
691 if (callbackIndex_ >= callbacks_.size()) {
692 callbackIndex_ = 0;
693 }
694 }
695
696 if (info.consumer) {
697 // consumer could be nullptr is we run callbacks in primary event
698 // base
699 DCHECK(info.eventBase);
700 info.consumer->stop(info.eventBase, info.callback);
701 } else {
702 // callback invoked in the primary event base, just call directly
703 DCHECK(info.callback);
704 callback->acceptStopped();
705 }
706
707 // If we are supposed to be accepting but the last accept callback
708 // was removed, unregister for events until a callback is added.
709 if (accepting_ && callbacks_.empty()) {
710 for (auto& handler : sockets_) {
711 handler.unregisterHandler();
712 }
713 }
714 }
715
startAccepting()716 void AsyncServerSocket::startAccepting() {
717 if (eventBase_) {
718 eventBase_->dcheckIsInEventBaseThread();
719 }
720
721 accepting_ = true;
722 if (callbacks_.empty()) {
723 // We can't actually begin accepting if no callbacks are defined.
724 // Wait until a callback is added to start accepting.
725 return;
726 }
727
728 for (auto& handler : sockets_) {
729 if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) {
730 throw std::runtime_error("failed to register for accept events");
731 }
732 }
733 }
734
pauseAccepting()735 void AsyncServerSocket::pauseAccepting() {
736 if (eventBase_) {
737 eventBase_->dcheckIsInEventBaseThread();
738 }
739 accepting_ = false;
740 for (auto& handler : sockets_) {
741 handler.unregisterHandler();
742 }
743
744 // If we were in the accept backoff state, disable the backoff timeout
745 if (backoffTimeout_) {
746 backoffTimeout_->cancelTimeout();
747 }
748 }
749
createSocket(int family)750 NetworkSocket AsyncServerSocket::createSocket(int family) {
751 auto fd = netops::socket(family, SOCK_STREAM, 0);
752 if (fd == NetworkSocket()) {
753 folly::throwSystemError(errno, "error creating async server socket");
754 }
755
756 try {
757 setupSocket(fd, family);
758 } catch (...) {
759 closeNoInt(fd);
760 throw;
761 }
762 return fd;
763 }
764
765 /**
766 * Enable/Disable TOS reflection for the server socket
767 * If enabled, the 'accepted' connections will reflect the
768 * TOS derived from the client's connect request
769 */
setTosReflect(bool enable)770 void AsyncServerSocket::setTosReflect(bool enable) {
771 if (!kIsLinux || !enable) {
772 tosReflect_ = false;
773 return;
774 }
775
776 for (auto& handler : sockets_) {
777 if (handler.socket_ == NetworkSocket()) {
778 continue;
779 }
780
781 int val = (enable) ? 1 : 0;
782 int ret = netops::setsockopt(
783 handler.socket_, IPPROTO_TCP, TCP_SAVE_SYN, &val, sizeof(val));
784
785 if (ret == 0) {
786 VLOG(10) << "Enabled SYN save for socket " << handler.socket_;
787 } else {
788 folly::throwSystemError(errno, "failed to enable TOS reflect");
789 }
790 }
791 tosReflect_ = true;
792 }
793
setListenerTos(uint32_t tos)794 void AsyncServerSocket::setListenerTos(uint32_t tos) {
795 if (!kIsLinux || tos == 0) {
796 listenerTos_ = 0;
797 return;
798 }
799
800 for (auto& handler : sockets_) {
801 if (handler.socket_ == NetworkSocket()) {
802 continue;
803 }
804
805 const auto proto =
806 (handler.addressFamily_ == AF_INET) ? IPPROTO_IP : IPPROTO_IPV6;
807 const auto optName =
808 (handler.addressFamily_ == AF_INET) ? IP_TOS : IPV6_TCLASS;
809
810 int ret =
811 netops::setsockopt(handler.socket_, proto, optName, &tos, sizeof(tos));
812
813 if (ret == 0) {
814 VLOG(10) << "Set TOS " << tos << " for for socket " << handler.socket_;
815 } else {
816 folly::throwSystemError(errno, "failed to set TOS for socket");
817 }
818 }
819 listenerTos_ = tos;
820 }
821
setupSocket(NetworkSocket fd,int family)822 void AsyncServerSocket::setupSocket(NetworkSocket fd, int family) {
823 // Put the socket in non-blocking mode
824 if (netops::set_socket_non_blocking(fd) != 0) {
825 folly::throwSystemError(errno, "failed to put socket in non-blocking mode");
826 }
827
828 // Set reuseaddr to avoid 2MSL delay on server restart
829 int one = 1;
830 if (netops::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) !=
831 0) {
832 auto errnoCopy = errno;
833 // This isn't a fatal error; just log an error message and continue
834 LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket "
835 << errnoCopy;
836 }
837
838 // Set reuseport to support multiple accept threads
839 int zero = 0;
840 if (reusePortEnabled_ &&
841 netops::setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)) !=
842 0) {
843 auto errnoCopy = errno;
844 LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket "
845 << errnoStr(errnoCopy);
846 #ifdef WIN32
847 folly::throwSystemErrorExplicit(
848 errnoCopy, "failed to set SO_REUSEPORT on async server socket");
849 #else
850 SocketAddress address;
851 address.setFromLocalAddress(fd);
852 folly::throwSystemErrorExplicit(
853 errnoCopy,
854 "failed to set SO_REUSEPORT on async server socket: " +
855 address.describe());
856 #endif
857 }
858
859 // Set keepalive as desired
860 if (netops::setsockopt(
861 fd,
862 SOL_SOCKET,
863 SO_KEEPALIVE,
864 (keepAliveEnabled_) ? &one : &zero,
865 sizeof(int)) != 0) {
866 auto errnoCopy = errno;
867 LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: "
868 << errnoStr(errnoCopy);
869 }
870
871 // Setup FD_CLOEXEC flag
872 if (closeOnExec_ && (-1 == netops::set_socket_close_on_exec(fd))) {
873 auto errnoCopy = errno;
874 LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: "
875 << errnoStr(errnoCopy);
876 }
877
878 // Set TCP nodelay if available, MAC OS X Hack
879 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
880 #ifndef TCP_NOPUSH
881 if (family != AF_UNIX) {
882 if (netops::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) !=
883 0) {
884 auto errnoCopy = errno;
885 // This isn't a fatal error; just log an error message and continue
886 LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: "
887 << errnoStr(errnoCopy);
888 }
889 }
890 #else
891 (void)family; // to avoid unused parameter warning
892 #endif
893
894 #if FOLLY_ALLOW_TFO
895 if (tfo_ && detail::tfo_enable(fd, tfoMaxQueueSize_) != 0) {
896 auto errnoCopy = errno;
897 // This isn't a fatal error; just log an error message and continue
898 LOG(WARNING) << "failed to set TCP_FASTOPEN on async server socket: "
899 << folly::errnoStr(errnoCopy);
900 }
901 #endif
902
903 if (zeroCopyVal_) {
904 int val = 1;
905 int ret =
906 netops::setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
907 if (ret) {
908 auto errnoCopy = errno;
909 LOG(WARNING) << "failed to set SO_ZEROCOPY on async server socket: "
910 << folly::errnoStr(errnoCopy);
911 }
912 }
913
914 if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
915 shutdownSocketSet->add(fd);
916 }
917 }
918
handlerReady(uint16_t,NetworkSocket fd,sa_family_t addressFamily)919 void AsyncServerSocket::handlerReady(
920 uint16_t /* events */,
921 NetworkSocket fd,
922 sa_family_t addressFamily) noexcept {
923 assert(!callbacks_.empty());
924 DestructorGuard dg(this);
925
926 // Only accept up to maxAcceptAtOnce_ connections at a time,
927 // to avoid starving other I/O handlers using this EventBase.
928 for (uint32_t n = 0; n < maxAcceptAtOnce_; ++n) {
929 SocketAddress address;
930
931 sockaddr_storage addrStorage = {};
932 socklen_t addrLen = sizeof(addrStorage);
933 auto saddr = reinterpret_cast<sockaddr*>(&addrStorage);
934
935 // In some cases, accept() doesn't seem to update these correctly.
936 saddr->sa_family = addressFamily;
937 if (addressFamily == AF_UNIX) {
938 addrLen = sizeof(struct sockaddr_un);
939 }
940
941 // Accept a new client socket
942 #if FOLLY_HAVE_ACCEPT4
943 auto clientSocket = NetworkSocket::fromFd(
944 accept4(fd.toFd(), saddr, &addrLen, SOCK_NONBLOCK));
945 #else
946 auto clientSocket = netops::accept(fd, saddr, &addrLen);
947 #endif
948
949 address.setFromSockaddr(saddr, addrLen);
950
951 if (clientSocket != NetworkSocket() && connectionEventCallback_) {
952 connectionEventCallback_->onConnectionAccepted(clientSocket, address);
953 }
954
955 // Connection accepted, get the SYN packet from the client if
956 // TOS reflect is enabled
957 if (kIsLinux && clientSocket != NetworkSocket() && tosReflect_) {
958 std::array<uint32_t, 64> buffer;
959 socklen_t len = sizeof(buffer);
960 int ret = netops::getsockopt(
961 clientSocket, IPPROTO_TCP, TCP_SAVED_SYN, &buffer, &len);
962
963 if (ret == 0) {
964 uint32_t tosWord = folly::Endian::big(buffer[0]);
965 if (addressFamily == AF_INET6) {
966 tosWord = (tosWord & 0x0FC00000) >> 20;
967 // Set the TOS on the return socket only if it is non-zero
968 if (tosWord) {
969 ret = netops::setsockopt(
970 clientSocket,
971 IPPROTO_IPV6,
972 IPV6_TCLASS,
973 &tosWord,
974 sizeof(tosWord));
975 }
976 } else if (addressFamily == AF_INET) {
977 tosWord = (tosWord & 0x00FC0000) >> 16;
978 if (tosWord) {
979 ret = netops::setsockopt(
980 clientSocket, IPPROTO_IP, IP_TOS, &tosWord, sizeof(tosWord));
981 }
982 }
983
984 if (ret != 0) {
985 LOG(ERROR) << "Unable to set TOS for accepted socket "
986 << clientSocket;
987 }
988 } else {
989 LOG(ERROR) << "Unable to get SYN packet for accepted socket "
990 << clientSocket;
991 }
992 }
993
994 std::chrono::time_point<std::chrono::steady_clock> nowMs =
995 std::chrono::steady_clock::now();
996 auto timeSinceLastAccept = std::max<int64_t>(
997 0,
998 nowMs.time_since_epoch().count() -
999 lastAccepTimestamp_.time_since_epoch().count());
1000 lastAccepTimestamp_ = nowMs;
1001 if (acceptRate_ < 1) {
1002 acceptRate_ *= 1 + acceptRateAdjustSpeed_ * timeSinceLastAccept;
1003 if (acceptRate_ >= 1) {
1004 acceptRate_ = 1;
1005 } else if (rand() > acceptRate_ * RAND_MAX) {
1006 ++numDroppedConnections_;
1007 if (clientSocket != NetworkSocket()) {
1008 closeNoInt(clientSocket);
1009 if (connectionEventCallback_) {
1010 connectionEventCallback_->onConnectionDropped(
1011 clientSocket, address);
1012 }
1013 }
1014 continue;
1015 }
1016 }
1017
1018 if (clientSocket == NetworkSocket()) {
1019 if (errno == EAGAIN) {
1020 // No more sockets to accept right now.
1021 // Check for this code first, since it's the most common.
1022 return;
1023 } else if (errno == EMFILE || errno == ENFILE) {
1024 // We're out of file descriptors. Perhaps we're accepting connections
1025 // too quickly. Pause accepting briefly to back off and give the server
1026 // a chance to recover.
1027 LOG(ERROR) << "accept failed: out of file descriptors; entering accept "
1028 "back-off state";
1029 enterBackoff();
1030
1031 // Dispatch the error message
1032 dispatchError("accept() failed", errno);
1033 } else {
1034 dispatchError("accept() failed", errno);
1035 }
1036 if (connectionEventCallback_) {
1037 connectionEventCallback_->onConnectionAcceptError(errno);
1038 }
1039 return;
1040 }
1041
1042 #if !FOLLY_HAVE_ACCEPT4
1043 // Explicitly set the new connection to non-blocking mode
1044 if (netops::set_socket_non_blocking(clientSocket) != 0) {
1045 closeNoInt(clientSocket);
1046 dispatchError(
1047 "failed to set accepted socket to non-blocking mode", errno);
1048 if (connectionEventCallback_) {
1049 connectionEventCallback_->onConnectionDropped(clientSocket, address);
1050 }
1051 return;
1052 }
1053 #endif
1054
1055 // Inform the callback about the new connection
1056 dispatchSocket(clientSocket, std::move(address));
1057
1058 // If we aren't accepting any more, break out of the loop
1059 if (!accepting_ || callbacks_.empty()) {
1060 break;
1061 }
1062 }
1063 }
1064
dispatchSocket(NetworkSocket socket,SocketAddress && address)1065 void AsyncServerSocket::dispatchSocket(
1066 NetworkSocket socket, SocketAddress&& address) {
1067 uint32_t startingIndex = callbackIndex_;
1068
1069 auto timeBeforeEnqueue = std::chrono::steady_clock::now();
1070
1071 // Short circuit if the callback is in the primary EventBase thread
1072
1073 CallbackInfo* info = nextCallback();
1074 if (info->eventBase == nullptr || info->eventBase == this->eventBase_) {
1075 info->callback->connectionAccepted(socket, address, {timeBeforeEnqueue});
1076 return;
1077 }
1078
1079 const SocketAddress addr(address);
1080 // Create a message to send over the notification queue
1081 auto queueTimeout = *queueTimeout_;
1082 std::chrono::steady_clock::time_point deadline;
1083 if (queueTimeout.count() != 0) {
1084 deadline = timeBeforeEnqueue + queueTimeout;
1085 }
1086
1087 NewConnMessage msg{socket, std::move(address), deadline, timeBeforeEnqueue};
1088
1089 // Loop until we find a free queue to write to
1090 while (true) {
1091 if (info->consumer->getQueue().tryPutMessage(
1092 std::move(msg), maxNumMsgsInQueue_)) {
1093 if (connectionEventCallback_) {
1094 connectionEventCallback_->onConnectionEnqueuedForAcceptorCallback(
1095 socket, addr);
1096 }
1097 // Success! return.
1098 return;
1099 }
1100
1101 // We couldn't add to queue. Fall through to below
1102
1103 ++numDroppedConnections_;
1104 if (acceptRateAdjustSpeed_ > 0) {
1105 // aggressively decrease accept rate when in trouble
1106 static const double kAcceptRateDecreaseSpeed = 0.1;
1107 acceptRate_ *= 1 - kAcceptRateDecreaseSpeed;
1108 }
1109
1110 if (callbackIndex_ == startingIndex) {
1111 // The notification queue was full
1112 // We can't really do anything at this point other than close the socket.
1113 //
1114 // This should only happen if a user's service is behaving extremely
1115 // badly and none of the EventBase threads are looping fast enough to
1116 // process the incoming connections. If the service is overloaded, it
1117 // should use pauseAccepting() to temporarily back off accepting new
1118 // connections, before they reach the point where their threads can't
1119 // even accept new messages.
1120 FB_LOG_EVERY_MS(ERROR, 1000)
1121 << "failed to dispatch newly accepted socket:"
1122 << " all accept callback queues are full";
1123 closeNoInt(socket);
1124 if (connectionEventCallback_) {
1125 connectionEventCallback_->onConnectionDropped(socket, addr);
1126 }
1127 return;
1128 }
1129
1130 info = nextCallback();
1131 }
1132 }
1133
dispatchError(const char * msgstr,int errnoValue)1134 void AsyncServerSocket::dispatchError(const char* msgstr, int errnoValue) {
1135 uint32_t startingIndex = callbackIndex_;
1136 CallbackInfo* info = nextCallback();
1137
1138 // Create a message to send over the notification queue
1139 ErrorMessage msg{errnoValue, msgstr};
1140
1141 while (true) {
1142 // Short circuit if the callback is in the primary EventBase thread
1143 if (info->eventBase == nullptr || info->eventBase == this->eventBase_) {
1144 auto ex = make_exception_wrapper<std::runtime_error>(
1145 std::string(msgstr) + folly::to<std::string>(errnoValue));
1146 info->callback->acceptError(std::move(ex));
1147 return;
1148 }
1149
1150 if (info->consumer->getQueue().tryPutMessage(
1151 std::move(msg), maxNumMsgsInQueue_)) {
1152 return;
1153 }
1154 // Fall through and try another callback
1155
1156 if (callbackIndex_ == startingIndex) {
1157 // The notification queues for all of the callbacks were full.
1158 // We can't really do anything at this point.
1159 FB_LOG_EVERY_MS(ERROR, 1000)
1160 << "failed to dispatch accept error: all accept"
1161 << " callback queues are full: error msg: " << msg.msg << ": "
1162 << errnoValue;
1163 return;
1164 }
1165 info = nextCallback();
1166 }
1167 }
1168
enterBackoff()1169 void AsyncServerSocket::enterBackoff() {
1170 // If this is the first time we have entered the backoff state,
1171 // allocate backoffTimeout_.
1172 if (backoffTimeout_ == nullptr) {
1173 try {
1174 backoffTimeout_ = new BackoffTimeout(this);
1175 } catch (const std::bad_alloc&) {
1176 // Man, we couldn't even allocate the timer to re-enable accepts.
1177 // We must be in pretty bad shape. Don't pause accepting for now,
1178 // since we won't be able to re-enable ourselves later.
1179 LOG(ERROR) << "failed to allocate AsyncServerSocket backoff"
1180 << " timer; unable to temporarly pause accepting";
1181 if (connectionEventCallback_) {
1182 connectionEventCallback_->onBackoffError();
1183 }
1184 return;
1185 }
1186 }
1187
1188 // For now, we simply pause accepting for 1 second.
1189 //
1190 // We could add some smarter backoff calculation here in the future. (e.g.,
1191 // start sleeping for longer if we keep hitting the backoff frequently.)
1192 // Typically the user needs to figure out why the server is overloaded and
1193 // fix it in some other way, though. The backoff timer is just a simple
1194 // mechanism to try and give the connection processing code a little bit of
1195 // breathing room to catch up, and to avoid just spinning and failing to
1196 // accept over and over again.
1197 const uint32_t timeoutMS = 1000;
1198 if (!backoffTimeout_->scheduleTimeout(timeoutMS)) {
1199 LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;"
1200 << "unable to temporarly pause accepting";
1201 if (connectionEventCallback_) {
1202 connectionEventCallback_->onBackoffError();
1203 }
1204 return;
1205 }
1206
1207 // The backoff timer is scheduled to re-enable accepts.
1208 // Go ahead and disable accepts for now. We leave accepting_ set to true,
1209 // since that tracks the desired state requested by the user.
1210 for (auto& handler : sockets_) {
1211 handler.unregisterHandler();
1212 }
1213 if (connectionEventCallback_) {
1214 connectionEventCallback_->onBackoffStarted();
1215 }
1216 }
1217
backoffTimeoutExpired()1218 void AsyncServerSocket::backoffTimeoutExpired() {
1219 // accepting_ should still be true.
1220 // If pauseAccepting() was called while in the backoff state it will cancel
1221 // the backoff timeout.
1222 assert(accepting_);
1223 // We can't be detached from the EventBase without being paused
1224 assert(eventBase_ != nullptr);
1225 eventBase_->dcheckIsInEventBaseThread();
1226
1227 // If all of the callbacks were removed, we shouldn't re-enable accepts
1228 if (callbacks_.empty()) {
1229 if (connectionEventCallback_) {
1230 connectionEventCallback_->onBackoffEnded();
1231 }
1232 return;
1233 }
1234
1235 // Register the handler.
1236 for (auto& handler : sockets_) {
1237 if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) {
1238 // We're hosed. We could just re-schedule backoffTimeout_ to
1239 // re-try again after a little bit. However, we don't want to
1240 // loop retrying forever if we can't re-enable accepts. Just
1241 // abort the entire program in this state; things are really bad
1242 // and restarting the entire server is probably the best remedy.
1243 LOG(ERROR)
1244 << "failed to re-enable AsyncServerSocket accepts after backoff; "
1245 << "crashing now";
1246 abort();
1247 }
1248 }
1249 if (connectionEventCallback_) {
1250 connectionEventCallback_->onBackoffEnded();
1251 }
1252 }
1253
1254 } // namespace folly
1255