1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <sys/types.h> 20 21 #include <chrono> 22 #include <map> 23 #include <memory> 24 25 #include <folly/ConstructorCallback.h> 26 #include <folly/Optional.h> 27 #include <folly/SocketAddress.h> 28 #include <folly/detail/SocketFastOpen.h> 29 #include <folly/io/IOBuf.h> 30 #include <folly/io/IOBufIovecBuilder.h> 31 #include <folly/io/ShutdownSocketSet.h> 32 #include <folly/io/SocketOptionMap.h> 33 #include <folly/io/async/AsyncSocketException.h> 34 #include <folly/io/async/AsyncTimeout.h> 35 #include <folly/io/async/AsyncTransport.h> 36 #include <folly/io/async/DelayedDestruction.h> 37 #include <folly/io/async/EventHandler.h> 38 #include <folly/net/NetOpsDispatcher.h> 39 #include <folly/portability/Sockets.h> 40 #include <folly/small_vector.h> 41 42 namespace folly { 43 44 /** 45 * A class for performing asynchronous I/O on a socket. 46 * 47 * AsyncSocket allows users to asynchronously wait for data on a socket, and 48 * to asynchronously send data. 49 * 50 * The APIs for reading and writing are intentionally asymmetric. Waiting for 51 * data to read is a persistent API: a callback is installed, and is notified 52 * whenever new data is available. It continues to be notified of new events 53 * until it is uninstalled. 54 * 55 * AsyncSocket does not provide read timeout functionality, because it 56 * typically cannot determine when the timeout should be active. Generally, a 57 * timeout should only be enabled when processing is blocked waiting on data 58 * from the remote endpoint. For server sockets, the timeout should not be 59 * active if the server is currently processing one or more outstanding 60 * requests for this socket. For client sockets, the timeout should not be 61 * active if there are no requests pending on the socket. Additionally, if a 62 * client has multiple pending requests, it will ususally want a separate 63 * timeout for each request, rather than a single read timeout. 64 * 65 * The write API is fairly intuitive: a user can request to send a block of 66 * data, and a callback will be informed once the entire block has been 67 * transferred to the kernel, or on error. AsyncSocket does provide a send 68 * timeout, since most callers want to give up if the remote end stops 69 * responding and no further progress can be made sending the data. 70 */ 71 72 #if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS 73 #define SO_NO_TRANSPARENT_TLS 200 74 #endif 75 76 #if defined __linux__ && !defined SO_NO_TSOCKS 77 #define SO_NO_TSOCKS 201 78 #endif 79 80 class AsyncSocket : public AsyncTransport { 81 public: 82 using UniquePtr = std::unique_ptr<AsyncSocket, Destructor>; 83 84 class ConnectCallback { 85 public: 86 virtual ~ConnectCallback() = default; 87 88 /** 89 * connectSuccess() will be invoked when the connection has been 90 * successfully established. 91 */ 92 virtual void connectSuccess() noexcept = 0; 93 94 /** 95 * connectErr() will be invoked if the connection attempt fails. 96 * 97 * @param ex An exception describing the error that occurred. 98 */ 99 virtual void connectErr(const AsyncSocketException& ex) noexcept = 0; 100 101 /** 102 * preConnect() will be invoked just before the actual connect happens, 103 * default is no-ops. 104 * 105 * @param fd An underneath created socket, use for connection. 106 * 107 */ preConnect(NetworkSocket)108 virtual void preConnect(NetworkSocket /*fd*/) {} 109 }; 110 111 class EvbChangeCallback { 112 public: 113 virtual ~EvbChangeCallback() = default; 114 115 // Called when the socket has been attached to a new EVB 116 // and is called from within that EVB thread 117 virtual void evbAttached(AsyncSocket* socket) = 0; 118 119 // Called when the socket is detached from an EVB and 120 // is called from the EVB thread being detached 121 virtual void evbDetached(AsyncSocket* socket) = 0; 122 }; 123 124 /** 125 * This interface is implemented only for platforms supporting 126 * per-socket error queues. 127 */ 128 class ErrMessageCallback { 129 public: 130 virtual ~ErrMessageCallback() = default; 131 132 /** 133 * errMessage() will be invoked when kernel puts a message to 134 * the error queue associated with the socket. 135 * 136 * @param cmsg Reference to cmsghdr structure describing 137 * a message read from error queue associated 138 * with the socket. 139 */ 140 virtual void errMessage(const cmsghdr& cmsg) noexcept = 0; 141 142 /** 143 * errMessageError() will be invoked if an error occurs reading a message 144 * from the socket error stream. 145 * 146 * @param ex An exception describing the error that occurred. 147 */ 148 virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0; 149 }; 150 151 class ReadAncillaryDataCallback { 152 public: 153 virtual ~ReadAncillaryDataCallback() = default; 154 155 /** 156 * ancillaryData() will be invoked when we read a buffer 157 * from the socket together with the ancillary data. 158 * 159 * @param msgh Reference to msghdr structure describing 160 * a message read together with the data buffer associated 161 * with the socket. 162 */ 163 virtual void ancillaryData(struct msghdr& msgh) noexcept = 0; 164 165 /** 166 * getAncillaryDataCtrlBuffer() will be invoked in order to fill the 167 * ancillary data buffer when it is received. 168 * getAncillaryDataCtrlBuffer will never return nullptr. 169 */ 170 virtual folly::MutableByteRange getAncillaryDataCtrlBuffer() = 0; 171 }; 172 173 class SendMsgParamsCallback { 174 public: 175 virtual ~SendMsgParamsCallback() = default; 176 177 /** 178 * getFlags() will be invoked to retrieve the desired flags to be passed 179 * to ::sendmsg() system call. It is responsible for converting flags set in 180 * the passed folly::WriteFlags enum into a integer flag bitmask that can be 181 * passed to ::sendmsg. Some flags in folly::WriteFlags do not correspond to 182 * flags that can be passed to ::sendmsg and may instead be handled via 183 * getAncillaryData. 184 * 185 * This method was intentionally declared non-virtual, so there is no way to 186 * override it. Instead feel free to override getFlagsImpl(...) instead, and 187 * enjoy the convenience of defaultFlags passed there. 188 * 189 * @param flags Write flags requested for the given write operation 190 */ getFlags(folly::WriteFlags flags,bool zeroCopyEnabled)191 int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept { 192 return getFlagsImpl(flags, getDefaultFlags(flags, zeroCopyEnabled)); 193 } 194 195 /** 196 * getAncillaryData() will be invoked to initialize ancillary data buffer 197 * referred by "msg_control" field of msghdr structure passed to ::sendmsg() 198 * system call based on the flags set in the passed folly::WriteFlags enum. 199 * 200 * Some flags in folly::WriteFlags are not relevant during this process; 201 * the default implementation only handles timestamping flags. 202 * 203 * The function requires that the size of buffer passed is equal to the 204 * value returned by getAncillaryDataSize() method for the same combination 205 * of flags. 206 * 207 * @param flags Write flags requested for the given write operation 208 * @param data Pointer to ancillary data buffer to initialize. 209 * @param byteEventsEnabled If byte events are enabled for this socket. 210 * When enabled, flags relevant to socket 211 * timestamps (e.g., TIMESTAMP_TX) should be 212 * included in ancillary (msg_control) data. 213 */ 214 virtual void getAncillaryData( 215 folly::WriteFlags flags, 216 void* data, 217 const bool byteEventsEnabled = false) noexcept; 218 219 /** 220 * getAncillaryDataSize() will be invoked to retrieve the size of 221 * ancillary data buffer which should be passed to ::sendmsg() system call 222 * 223 * @param flags Write flags requested for the given write operation 224 * @param byteEventsEnabled If byte events are enabled for this socket. 225 * When enabled, flags relevant to socket 226 * timestamps (e.g., TIMESTAMP_TX) should be 227 * included in ancillary (msg_control) data. 228 */ 229 virtual uint32_t getAncillaryDataSize( 230 folly::WriteFlags flags, const bool byteEventsEnabled = false) noexcept; 231 232 static const size_t maxAncillaryDataSize{0x5000}; 233 234 private: 235 /** 236 * getFlagsImpl() will be invoked by getFlags(folly::WriteFlags flags) 237 * method to retrieve the flags to be passed to ::sendmsg() system call. 238 * SendMsgParamsCallback::getFlags() is calling this method, and returns 239 * its results directly to the caller in AsyncSocket. 240 * Classes inheriting from SendMsgParamsCallback are welcome to override 241 * this method to force SendMsgParamsCallback to return its own set 242 * of flags. 243 * 244 * @param flags Write flags requested for the given write operation 245 * @param defaultflags A set of message flags returned by getDefaultFlags() 246 * method for the given "flags" mask. 247 */ getFlagsImpl(folly::WriteFlags,int defaultFlags)248 virtual int getFlagsImpl(folly::WriteFlags /*flags*/, int defaultFlags) { 249 return defaultFlags; 250 } 251 252 /** 253 * getDefaultFlags() will be invoked by getFlags(folly::WriteFlags flags) 254 * to retrieve the default set of flags, and pass them to getFlagsImpl(...) 255 * 256 * @param flags Write flags requested for the given write operation 257 */ 258 int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept; 259 }; 260 261 /** 262 * Container with state and processing logic for ByteEvents. 263 */ 264 struct ByteEventHelper { 265 bool byteEventsEnabled{false}; 266 size_t rawBytesWrittenWhenByteEventsEnabled{0}; 267 folly::Optional<AsyncSocketException> maybeEx; 268 269 /** 270 * Process a Cmsg and return a ByteEvent if available. 271 * 272 * The kernel will pass two cmsg for each timestamp: 273 * 1. ScmTimestamping: Software / Hardware Timestamps. 274 * 2. SockExtendedErrTimestamping: Byte offset associated with timestamp. 275 * 276 * These messages will be passed back-to-back; processCmsg() can handle them 277 * in any order (1 then 2, or 2 then 1), as long the order is consistent 278 * across timestamps. 279 * 280 * processCmsg() gracefully ignores Cmsg unrelated to socket timestamps, but 281 * will throw if it receives a sequence of Cmsg that are not compliant with 282 * its expectations. 283 * 284 * @return If the helper has received all components required to generate a 285 * ByteEvent (e.g., ScmTimestamping and SockExtendedErrTimestamping 286 * messages), it returns a ByteEvent and clears its local state. 287 * Otherwise, returns an empty optional. 288 * 289 * If the helper has previously thrown a ByteEventHelper::Exception, 290 * it will not process further Cmsg and will continiously return an 291 * empty optional. 292 * 293 * @throw If the helper receives a sequence of Cmsg that violate its 294 * expectations (e.g., multiple ScmTimestamping messages in a row 295 * without corresponding SockExtendedErrTimestamping messages), it 296 * throws a ByteEventHelper::Exception. Subsequent calls will return 297 * an empty optional. 298 */ 299 folly::Optional<ByteEvent> processCmsg( 300 const cmsghdr& cmsg, const size_t rawBytesWritten); 301 302 /** 303 * Exception class thrown by processCmsg. 304 * 305 * ByteEventHelper does not know the socket address and thus cannot 306 * construct a AsyncSocketException. Instead, ByteEventHelper throws a 307 * custom Exception and AsyncSocket rewraps it as an AsyncSocketException. 308 */ 309 class Exception : public std::runtime_error { 310 using std::runtime_error::runtime_error; 311 }; 312 313 private: 314 // state, reinitialized each time a complete timestamp is processed 315 struct TimestampState { 316 bool serrReceived{false}; 317 uint32_t typeRaw{0}; 318 uint32_t byteOffsetKernel{0}; 319 320 bool scmTsReceived{false}; 321 folly::Optional<std::chrono::nanoseconds> maybeSoftwareTs; 322 folly::Optional<std::chrono::nanoseconds> maybeHardwareTs; 323 }; 324 folly::Optional<TimestampState> maybeTsState_; 325 }; 326 327 explicit AsyncSocket(); 328 /** 329 * Create a new unconnected AsyncSocket. 330 * 331 * connect() must later be called on this socket to establish a connection. 332 */ 333 explicit AsyncSocket(EventBase* evb); 334 335 void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wSS); 336 337 /** 338 * Create a new AsyncSocket and begin the connection process. 339 * 340 * @param evb EventBase that will manage this socket. 341 * @param address The address to connect to. 342 * @param connectTimeout Optional timeout in milliseconds for the connection 343 * attempt. 344 * @param useZeroCopy Optional zerocopy socket mode 345 */ 346 AsyncSocket( 347 EventBase* evb, 348 const folly::SocketAddress& address, 349 uint32_t connectTimeout = 0, 350 bool useZeroCopy = false); 351 352 /** 353 * Create a new AsyncSocket and begin the connection process. 354 * 355 * @param evb EventBase that will manage this socket. 356 * @param ip IP address to connect to (dotted-quad). 357 * @param port Destination port in host byte order. 358 * @param connectTimeout Optional timeout in milliseconds for the connection 359 * attempt. 360 * @param useZeroCopy Optional zerocopy socket mode 361 */ 362 AsyncSocket( 363 EventBase* evb, 364 const std::string& ip, 365 uint16_t port, 366 uint32_t connectTimeout = 0, 367 bool useZeroCopy = false); 368 369 /** 370 * Create a AsyncSocket from an already connected socket file descriptor. 371 * 372 * Note that while AsyncSocket enables TCP_NODELAY for sockets it creates 373 * when connecting, it does not change the socket options when given an 374 * existing file descriptor. If callers want TCP_NODELAY enabled when using 375 * this version of the constructor, they need to explicitly call 376 * setNoDelay(true) after the constructor returns. 377 * 378 * @param evb EventBase that will manage this socket. 379 * @param fd File descriptor to take over (should be a connected socket). 380 * @param zeroCopyBufId Zerocopy buf id to start with. 381 * @param peerAddress optional peer address (eg: returned from accept). If 382 * nullptr, AsyncSocket will lazily attempt to determine it from fd 383 * via a system call 384 */ 385 AsyncSocket( 386 EventBase* evb, 387 NetworkSocket fd, 388 uint32_t zeroCopyBufId = 0, 389 const SocketAddress* peerAddress = nullptr); 390 391 /** 392 * Create an AsyncSocket from a different, already connected AsyncSocket. 393 * 394 * Similar to AsyncSocket(evb, fd) when fd was previously owned by an 395 * AsyncSocket. 396 */ 397 explicit AsyncSocket(AsyncSocket::UniquePtr); 398 399 /** 400 * Create an AsyncSocket from a different, already connected AsyncSocket. 401 * 402 * Similar to AsyncSocket(evb, fd) when fd was previously owned by an 403 * AsyncSocket. Caller must call destroy on old AsyncSocket unless it is 404 * in a smart pointer with appropriate destructor. 405 */ 406 explicit AsyncSocket(AsyncSocket*); 407 408 /** 409 * Helper function to create an AsyncSocket.. 410 * 411 * This passes in the correct destructor object, since AsyncSocket's 412 * destructor is protected and cannot be invoked directly. 413 */ newSocket(EventBase * evb)414 static UniquePtr newSocket(EventBase* evb) { 415 return UniquePtr{new AsyncSocket(evb)}; 416 } 417 418 /** 419 * Helper function to create an AsyncSocket. 420 */ 421 static UniquePtr newSocket( 422 EventBase* evb, 423 const folly::SocketAddress& address, 424 uint32_t connectTimeout = 0, 425 bool useZeroCopy = false) { 426 return UniquePtr{ 427 new AsyncSocket(evb, address, connectTimeout, useZeroCopy)}; 428 } 429 430 /** 431 * Helper function to create an AsyncSocket. 432 */ 433 static UniquePtr newSocket( 434 EventBase* evb, 435 const std::string& ip, 436 uint16_t port, 437 uint32_t connectTimeout = 0, 438 bool useZeroCopy = false) { 439 return UniquePtr{ 440 new AsyncSocket(evb, ip, port, connectTimeout, useZeroCopy)}; 441 } 442 443 /** 444 * Helper function to create an AsyncSocket. 445 */ 446 static UniquePtr newSocket( 447 EventBase* evb, 448 NetworkSocket fd, 449 const SocketAddress* peerAddress = nullptr) { 450 return UniquePtr{new AsyncSocket(evb, fd, 0, peerAddress)}; 451 } 452 453 /** 454 * Destroy the socket. 455 * 456 * AsyncSocket::destroy() must be called to destroy the socket. 457 * The normal destructor is private, and should not be invoked directly. 458 * This prevents callers from deleting a AsyncSocket while it is invoking a 459 * callback. 460 */ 461 void destroy() override; 462 463 /** 464 * Get the EventBase used by this socket. 465 */ getEventBase()466 EventBase* getEventBase() const override { return eventBase_; } 467 468 /** 469 * Get the network socket used by the AsyncSocket. 470 */ getNetworkSocket()471 virtual NetworkSocket getNetworkSocket() const { return fd_; } 472 473 /** 474 * Extract the file descriptor from the AsyncSocket. 475 * 476 * This will immediately cause any installed callbacks to be invoked with an 477 * error. The AsyncSocket may no longer be used after the file descriptor 478 * has been extracted. 479 * 480 * This method should be used with care as the resulting fd is not guaranteed 481 * to perfectly reflect the state of the AsyncSocket (security state, 482 * pre-received data, etc.). 483 * 484 * Returns the file descriptor. The caller assumes ownership of the 485 * descriptor, and it will not be closed when the AsyncSocket is destroyed. 486 */ 487 virtual NetworkSocket detachNetworkSocket(); 488 489 static const folly::SocketAddress& anyAddress(); 490 491 /** 492 * Initiate a connection. 493 * 494 * @param callback The callback to inform when the connection attempt 495 * completes. 496 * @param address The address to connect to. 497 * @param timeout A timeout value, in milliseconds. If the connection 498 * does not succeed within this period, 499 * callback->connectError() will be invoked. 500 */ 501 virtual void connect( 502 ConnectCallback* callback, 503 const folly::SocketAddress& address, 504 int timeout = 0, 505 const SocketOptionMap& options = emptySocketOptionMap, 506 const folly::SocketAddress& bindAddr = anyAddress(), 507 const std::string& ifName = "") noexcept; 508 509 void connect( 510 ConnectCallback* callback, 511 const std::string& ip, 512 uint16_t port, 513 int timeout = 0, 514 const SocketOptionMap& options = emptySocketOptionMap) noexcept; 515 516 /** 517 * If a connect request is in-flight, cancels it and closes the socket 518 * immediately. Otherwise, this is a no-op. 519 * 520 * This does not invoke any connection related callbacks. Call this to 521 * prevent any connect callback while cleaning up, etc. 522 */ 523 virtual void cancelConnect(); 524 525 /** 526 * Set the send timeout. 527 * 528 * If write requests do not make any progress for more than the specified 529 * number of milliseconds, fail all pending writes and close the socket. 530 * 531 * If write requests are currently pending when setSendTimeout() is called, 532 * the timeout interval is immediately restarted using the new value. 533 * 534 * (See the comments for AsyncSocket for an explanation of why AsyncSocket 535 * provides setSendTimeout() but not setRecvTimeout().) 536 * 537 * @param milliseconds The timeout duration, in milliseconds. If 0, no 538 * timeout will be used. 539 */ 540 void setSendTimeout(uint32_t milliseconds) override; 541 542 /** 543 * Get the send timeout. 544 * 545 * @return Returns the current send timeout, in milliseconds. A return value 546 * of 0 indicates that no timeout is set. 547 */ getSendTimeout()548 uint32_t getSendTimeout() const override { return sendTimeout_; } 549 550 /** 551 * Set the maximum number of reads to execute from the underlying 552 * socket each time the EventBase detects that new ingress data is 553 * available. The default is unlimited, but callers can use this method 554 * to limit the amount of data read from the socket per event loop 555 * iteration. 556 * 557 * @param maxReads Maximum number of reads per data-available event; 558 * a value of zero means unlimited. 559 */ setMaxReadsPerEvent(uint16_t maxReads)560 void setMaxReadsPerEvent(uint16_t maxReads) { maxReadsPerEvent_ = maxReads; } 561 562 /** 563 * Get the maximum number of reads this object will execute from 564 * the underlying socket each time the EventBase detects that new 565 * ingress data is available. 566 * 567 * @returns Maximum number of reads per data-available event; a value 568 * of zero means unlimited. 569 */ getMaxReadsPerEvent()570 uint16_t getMaxReadsPerEvent() const { return maxReadsPerEvent_; } 571 572 /** 573 * Set a pointer to ErrMessageCallback implementation which will be 574 * receiving notifications for messages posted to the error queue 575 * associated with the socket. 576 * ErrMessageCallback is implemented only for platforms with 577 * per-socket error message queus support (recvmsg() system call must 578 * ) 579 * 580 */ 581 virtual void setErrMessageCB(ErrMessageCallback* callback); 582 583 /** 584 * Get a pointer to ErrMessageCallback implementation currently 585 * registered with this socket. 586 * 587 */ 588 virtual ErrMessageCallback* getErrMessageCallback() const; 589 590 /** 591 * Set a pointer to ReadAncillaryDataCallback implementation which will 592 * be invoked with the ancillary data when we read a buffer from the 593 * associated socket. 594 * ReadAncillaryDataCallback is implemented only for platforms with 595 * kernel timestamp support. 596 * 597 */ 598 virtual void setReadAncillaryDataCB(ReadAncillaryDataCallback* callback); 599 600 /** 601 * Get a pointer to ReadAncillaryDataCallback implementation currently 602 * registered with this socket. 603 * 604 */ 605 virtual ReadAncillaryDataCallback* getReadAncillaryDataCallback() const; 606 607 /** 608 * Set a pointer to SendMsgParamsCallback implementation which 609 * will be used to form ::sendmsg() system call parameters 610 * 611 */ 612 virtual void setSendMsgParamCB(SendMsgParamsCallback* callback); 613 614 /** 615 * Get a pointer to SendMsgParamsCallback implementation currently 616 * registered with this socket. 617 * 618 */ 619 virtual SendMsgParamsCallback* getSendMsgParamsCB() const; 620 621 /** 622 * Override netops::Dispatcher to be used for netops:: calls. 623 * 624 * Pass empty shared_ptr to reset to default. 625 * Override can be used by unit tests to intercept and mock netops:: calls. 626 */ setOverrideNetOpsDispatcher(std::shared_ptr<netops::Dispatcher> dispatcher)627 virtual void setOverrideNetOpsDispatcher( 628 std::shared_ptr<netops::Dispatcher> dispatcher) { 629 netops_.setOverride(std::move(dispatcher)); 630 } 631 632 /** 633 * Returns override netops::Dispatcher being used for netops:: calls. 634 * 635 * Returns empty shared_ptr if no override set. 636 * Override can be used by unit tests to intercept and mock netops:: calls. 637 */ getOverrideNetOpsDispatcher()638 virtual std::shared_ptr<netops::Dispatcher> getOverrideNetOpsDispatcher() 639 const { 640 return netops_.getOverride(); 641 } 642 643 // Read and write methods 644 void setReadCB(ReadCallback* callback) override; 645 ReadCallback* getReadCallback() const override; setEventCallback(EventRecvmsgCallback * cb)646 void setEventCallback(EventRecvmsgCallback* cb) override { 647 if (cb) { 648 ioHandler_.setEventCallback(cb); 649 } else { 650 ioHandler_.resetEventCallback(); 651 } 652 } 653 654 bool setZeroCopy(bool enable) override; getZeroCopy()655 bool getZeroCopy() const override { return zeroCopyEnabled_; } 656 getZeroCopyBufId()657 uint32_t getZeroCopyBufId() const { return zeroCopyBufId_; } 658 getZeroCopyReenableThreshold()659 size_t getZeroCopyReenableThreshold() const { 660 return zeroCopyReenableThreshold_; 661 } 662 663 void setZeroCopyEnableFunc(AsyncWriter::ZeroCopyEnableFunc func) override; 664 665 void setZeroCopyReenableThreshold(size_t threshold); 666 667 void write( 668 WriteCallback* callback, 669 const void* buf, 670 size_t bytes, 671 WriteFlags flags = WriteFlags::NONE) override; 672 void writev( 673 WriteCallback* callback, 674 const iovec* vec, 675 size_t count, 676 WriteFlags flags = WriteFlags::NONE) override; 677 void writeChain( 678 WriteCallback* callback, 679 std::unique_ptr<folly::IOBuf>&& buf, 680 WriteFlags flags = WriteFlags::NONE) override; 681 682 class WriteRequest; 683 virtual void writeRequest(WriteRequest* req); writeRequestReady()684 void writeRequestReady() { handleWrite(); } 685 686 // Methods inherited from AsyncTransport 687 void close() override; 688 void closeNow() override; 689 void closeWithReset() override; 690 void shutdownWrite() override; 691 void shutdownWriteNow() override; 692 693 bool readable() const override; 694 bool writable() const override; 695 bool isPending() const override; 696 virtual bool hangup() const; 697 bool good() const override; 698 bool error() const override; 699 void attachEventBase(EventBase* eventBase) override; 700 void detachEventBase() override; 701 bool isDetachable() const override; 702 703 void getLocalAddress(folly::SocketAddress* address) const override; 704 void getPeerAddress(folly::SocketAddress* address) const override; 705 isEorTrackingEnabled()706 bool isEorTrackingEnabled() const override { return trackEor_; } 707 setEorTracking(bool track)708 void setEorTracking(bool track) override { trackEor_ = track; } 709 connecting()710 bool connecting() const override { return (state_ == StateEnum::CONNECTING); } 711 isClosedByPeer()712 virtual bool isClosedByPeer() const { 713 return ( 714 state_ == StateEnum::CLOSED && 715 (readErr_ == READ_EOF || readErr_ == READ_ERROR)); 716 } 717 isClosedBySelf()718 virtual bool isClosedBySelf() const { 719 return ( 720 state_ == StateEnum::CLOSED && 721 (readErr_ != READ_EOF && readErr_ != READ_ERROR)); 722 } 723 getAppBytesWritten()724 size_t getAppBytesWritten() const override { return appBytesWritten_; } 725 getRawBytesWritten()726 size_t getRawBytesWritten() const override { return rawBytesWritten_; } 727 getAppBytesReceived()728 size_t getAppBytesReceived() const override { return appBytesReceived_; } 729 getRawBytesReceived()730 size_t getRawBytesReceived() const override { return getAppBytesReceived(); } 731 getAppBytesBuffered()732 size_t getAppBytesBuffered() const override { 733 return totalAppBytesScheduledForWrite_ - appBytesWritten_; 734 } getRawBytesBuffered()735 size_t getRawBytesBuffered() const override { return getAppBytesBuffered(); } 736 getAllocatedBytesBuffered()737 size_t getAllocatedBytesBuffered() const override { 738 return allocatedBytesBuffered_; 739 } 740 741 // End of methods inherited from AsyncTransport 742 getConnectTime()743 std::chrono::nanoseconds getConnectTime() const { 744 return connectEndTime_ - connectStartTime_; 745 } 746 getConnectTimeout()747 std::chrono::milliseconds getConnectTimeout() const { 748 return connectTimeout_; 749 } 750 getConnectStartTime()751 std::chrono::steady_clock::time_point getConnectStartTime() const { 752 return connectStartTime_; 753 } 754 getConnectEndTime()755 std::chrono::steady_clock::time_point getConnectEndTime() const { 756 return connectEndTime_; 757 } 758 getTFOAttempted()759 bool getTFOAttempted() const { return tfoAttempted_; } 760 761 /** 762 * Returns whether or not the attempt to use TFO 763 * finished successfully. This does not necessarily 764 * mean TFO worked, just that trying to use TFO 765 * succeeded. 766 */ getTFOFinished()767 bool getTFOFinished() const { return tfoFinished_; } 768 769 /** 770 * Returns whether or not TFO attempt succeded on this 771 * connection. 772 * For servers this is pretty straightforward API and can 773 * be invoked right after the connection is accepted. This API 774 * will perform one syscall. 775 * This API is a bit tricky to use for clients, since clients 776 * only know this for sure after the SYN-ACK is returned. So it's 777 * appropriate to call this only after the first application 778 * data is read from the socket when the caller knows that 779 * the SYN has been ACKed by the server. 780 */ 781 bool getTFOSucceded() const; 782 783 // Methods controlling socket options 784 785 /** 786 * Force writes to be transmitted immediately. 787 * 788 * This controls the TCP_NODELAY socket option. When enabled, TCP segments 789 * are sent as soon as possible, even if it is not a full frame of data. 790 * When disabled, the data may be buffered briefly to try and wait for a full 791 * frame of data. 792 * 793 * By default, TCP_NODELAY is enabled for AsyncSocket objects. 794 * 795 * This method will fail if the socket is not currently open. 796 * 797 * @return Returns 0 if the TCP_NODELAY flag was successfully updated, 798 * or a non-zero errno value on error. 799 */ 800 int setNoDelay(bool noDelay); 801 802 /** 803 * Set the FD_CLOEXEC flag so that the socket will be closed if the program 804 * later forks and execs. 805 */ 806 void setCloseOnExec(); 807 808 /* 809 * Set the Flavor of Congestion Control to be used for this Socket 810 * Please check '/lib/modules/<kernel>/kernel/net/ipv4' for tcp_*.ko 811 * first to make sure the module is available for plugging in 812 * Alternatively you can choose from net.ipv4.tcp_allowed_congestion_control 813 */ 814 int setCongestionFlavor(const std::string& cname); 815 816 /* 817 * Forces ACKs to be sent immediately 818 * 819 * @return Returns 0 if the TCP_QUICKACK flag was successfully updated, 820 * or a non-zero errno value on error. 821 */ 822 int setQuickAck(bool quickack); 823 824 /** 825 * Set the send bufsize 826 */ 827 int setSendBufSize(size_t bufsize); 828 829 /** 830 * Set the recv bufsize 831 */ 832 int setRecvBufSize(size_t bufsize); 833 834 #if defined(__linux__) 835 /** 836 * @brief This method is used to get the number of bytes that are currently 837 * stored in the TCP send/tx buffer 838 * 839 * @return the number of bytes in the send/tx buffer or folly::none if there 840 * was a problem 841 */ 842 size_t getSendBufInUse() const; 843 844 /** 845 * @brief This method is used to get the number of bytes that are currently 846 * stored in the TCP receive/rx buffer 847 * 848 * @return the number of bytes in the receive/rx buffer or folly::none if 849 * there was a problem 850 */ 851 size_t getRecvBufInUse() const; 852 #endif 853 854 /** 855 * Sets a specific tcp personality 856 * Available only on kernels 3.2 and greater 857 */ 858 #define SO_SET_NAMESPACE 41 859 int setTCPProfile(int profd); 860 861 /** 862 * Generic API for reading a socket option. 863 * 864 * @param level same as the "level" parameter in getsockopt(). 865 * @param optname same as the "optname" parameter in getsockopt(). 866 * @param optval pointer to the variable in which the option value should 867 * be returned. 868 * @param optlen value-result argument, initially containing the size of 869 * the buffer pointed to by optval, and modified on return 870 * to indicate the actual size of the value returned. 871 * @return same as the return value of getsockopt(). 872 */ 873 template <typename T> getSockOpt(int level,int optname,T * optval,socklen_t * optlen)874 int getSockOpt(int level, int optname, T* optval, socklen_t* optlen) { 875 return netops_->getsockopt(fd_, level, optname, (void*)optval, optlen); 876 } 877 878 /** 879 * Generic API for setting a socket option. 880 * 881 * @param level same as the "level" parameter in getsockopt(). 882 * @param optname same as the "optname" parameter in getsockopt(). 883 * @param optval the option value to set. 884 * @return same as the return value of setsockopt(). 885 */ 886 template <typename T> setSockOpt(int level,int optname,const T * optval)887 int setSockOpt(int level, int optname, const T* optval) { 888 return netops_->setsockopt(fd_, level, optname, optval, sizeof(T)); 889 } 890 891 /** 892 * Virtual method for reading a socket option returning integer 893 * value, which is the most typical case. Convenient for overriding 894 * and mocking. 895 * 896 * @param level same as the "level" parameter in getsockopt(). 897 * @param optname same as the "optname" parameter in getsockopt(). 898 * @param optval same as "optval" parameter in getsockopt(). 899 * @param optlen same as "optlen" parameter in getsockopt(). 900 * @return same as the return value of getsockopt(). 901 */ getSockOptVirtual(int level,int optname,void * optval,socklen_t * optlen)902 virtual int getSockOptVirtual( 903 int level, int optname, void* optval, socklen_t* optlen) { 904 return netops_->getsockopt(fd_, level, optname, optval, optlen); 905 } 906 907 /** 908 * Virtual method for setting a socket option accepting integer 909 * value, which is the most typical case. Convenient for overriding 910 * and mocking. 911 * 912 * @param level same as the "level" parameter in setsockopt(). 913 * @param optname same as the "optname" parameter in setsockopt(). 914 * @param optval same as "optval" parameter in setsockopt(). 915 * @param optlen same as "optlen" parameter in setsockopt(). 916 * @return same as the return value of setsockopt(). 917 */ setSockOptVirtual(int level,int optname,void const * optval,socklen_t optlen)918 virtual int setSockOptVirtual( 919 int level, int optname, void const* optval, socklen_t optlen) { 920 return netops_->setsockopt(fd_, level, optname, optval, optlen); 921 } 922 923 /** 924 * Set pre-received data, to be returned to read callback before any data 925 * from the socket. 926 */ setPreReceivedData(std::unique_ptr<IOBuf> data)927 virtual void setPreReceivedData(std::unique_ptr<IOBuf> data) { 928 if (preReceivedData_) { 929 preReceivedData_->prependChain(std::move(data)); 930 } else { 931 preReceivedData_ = std::move(data); 932 } 933 } 934 935 /** 936 * Enables TFO behavior on the AsyncSocket if FOLLY_ALLOW_TFO 937 * is set. 938 */ enableTFO()939 void enableTFO() { 940 // No-op if folly does not allow tfo 941 #if FOLLY_ALLOW_TFO 942 tfoEnabled_ = true; 943 #endif 944 } 945 disableTransparentTls()946 void disableTransparentTls() { noTransparentTls_ = true; } 947 disableTSocks()948 void disableTSocks() { noTSocks_ = true; } 949 950 enum class StateEnum : uint8_t { 951 UNINIT, 952 CONNECTING, 953 ESTABLISHED, 954 CLOSED, 955 ERROR, 956 FAST_OPEN, 957 }; 958 959 void setBufferCallback(BufferCallback* cb); 960 961 // Callers should set this prior to connecting the socket for the safest 962 // behavior. setEvbChangedCallback(std::unique_ptr<EvbChangeCallback> cb)963 void setEvbChangedCallback(std::unique_ptr<EvbChangeCallback> cb) { 964 evbChangeCb_ = std::move(cb); 965 } 966 967 /** 968 * Attempt to cache the current local and peer addresses (if not already 969 * cached) so that they are available from getPeerAddress() and 970 * getLocalAddress() even after the socket is closed. 971 */ 972 void cacheAddresses(); 973 974 /** 975 * Returns true if there is any zero copy write in progress 976 * Needs to be called from within the socket's EVB thread 977 */ 978 bool isZeroCopyWriteInProgress() const noexcept; 979 980 /** 981 * Tries to process the msg error queue 982 * And returns true if there are no more zero copy writes in progress 983 */ 984 bool processZeroCopyWriteInProgress() noexcept; 985 setPeerCertificate(std::unique_ptr<const AsyncTransportCertificate> cert)986 void setPeerCertificate( 987 std::unique_ptr<const AsyncTransportCertificate> cert) { 988 peerCertData_ = std::move(cert); 989 } getPeerCertificate()990 const AsyncTransportCertificate* getPeerCertificate() const override { 991 return peerCertData_.get(); 992 } 993 dropPeerCertificate()994 void dropPeerCertificate() noexcept override { peerCertData_.reset(); } 995 setSelfCertificate(std::unique_ptr<const AsyncTransportCertificate> cert)996 void setSelfCertificate( 997 std::unique_ptr<const AsyncTransportCertificate> cert) { 998 selfCertData_ = std::move(cert); 999 } 1000 dropSelfCertificate()1001 void dropSelfCertificate() noexcept override { selfCertData_.reset(); } 1002 getSelfCertificate()1003 const AsyncTransportCertificate* getSelfCertificate() const override { 1004 return selfCertData_.get(); 1005 } 1006 1007 /** 1008 * Whether socket should be closed on write failure (true by default). 1009 */ setCloseOnFailedWrite(bool closeOnFailedWrite)1010 void setCloseOnFailedWrite(bool closeOnFailedWrite) { 1011 closeOnFailedWrite_ = closeOnFailedWrite; 1012 } 1013 1014 /** 1015 * writeReturn is the total number of bytes written, or WRITE_ERROR on error. 1016 * If no data has been written, 0 is returned. 1017 * exception is a more specific exception that cause a write error. 1018 * Not all writes have exceptions associated with them thus writeReturn 1019 * should be checked to determine whether the operation resulted in an error. 1020 */ 1021 struct WriteResult { WriteResultWriteResult1022 explicit WriteResult(ssize_t ret) : writeReturn(ret) {} 1023 WriteResultWriteResult1024 WriteResult(ssize_t ret, std::unique_ptr<const AsyncSocketException> e) 1025 : writeReturn(ret), exception(std::move(e)) {} 1026 1027 ssize_t writeReturn; 1028 std::unique_ptr<const AsyncSocketException> exception; 1029 }; 1030 1031 /** 1032 * readReturn is the number of bytes read, or READ_EOF on EOF, or 1033 * READ_ERROR on error, or READ_BLOCKING if the operation will 1034 * block. 1035 * exception is a more specific exception that may have caused a read error. 1036 * Not all read errors have exceptions associated with them thus readReturn 1037 * should be checked to determine whether the operation resulted in an error. 1038 */ 1039 struct ReadResult { ReadResultReadResult1040 explicit ReadResult(ssize_t ret) : readReturn(ret) {} 1041 ReadResultReadResult1042 ReadResult(ssize_t ret, std::unique_ptr<const AsyncSocketException> e) 1043 : readReturn(ret), exception(std::move(e)) {} 1044 1045 ssize_t readReturn; 1046 std::unique_ptr<const AsyncSocketException> exception; 1047 }; 1048 1049 /** 1050 * A WriteRequest object tracks information about a pending write operation. 1051 */ 1052 class WriteRequest { 1053 public: WriteRequest(AsyncSocket * socket,WriteCallback * callback)1054 WriteRequest(AsyncSocket* socket, WriteCallback* callback) 1055 : socket_(socket), 1056 callback_(callback), 1057 releaseIOBufCallback_( 1058 callback ? callback->getReleaseIOBufCallback() : nullptr) {} 1059 start()1060 virtual void start() {} 1061 1062 virtual void destroy() = 0; 1063 1064 virtual WriteResult performWrite() = 0; 1065 1066 virtual void consume() = 0; 1067 1068 virtual bool isComplete() = 0; 1069 getNext()1070 WriteRequest* getNext() const { return next_; } 1071 getCallback()1072 WriteCallback* getCallback() const { return callback_; } 1073 getTotalBytesWritten()1074 uint32_t getTotalBytesWritten() const { return totalBytesWritten_; } 1075 append(WriteRequest * next)1076 void append(WriteRequest* next) { 1077 assert(next_ == nullptr); 1078 next_ = next; 1079 } 1080 fail(const char * fn,const AsyncSocketException & ex)1081 void fail(const char* fn, const AsyncSocketException& ex) { 1082 socket_->failWrite(fn, ex); 1083 } 1084 bytesWritten(size_t count)1085 void bytesWritten(size_t count) { 1086 totalBytesWritten_ += uint32_t(count); 1087 socket_->appBytesWritten_ += count; 1088 } 1089 1090 protected: 1091 // protected destructor, to ensure callers use destroy() ~WriteRequest()1092 virtual ~WriteRequest() {} 1093 1094 AsyncSocket* socket_; ///< parent socket 1095 WriteRequest* next_{nullptr}; ///< pointer to next WriteRequest 1096 WriteCallback* callback_; ///< completion callback 1097 ReleaseIOBufCallback* releaseIOBufCallback_; ///< release IOBuf callback 1098 uint32_t totalBytesWritten_{0}; ///< total bytes written 1099 }; 1100 1101 class LifecycleObserver : virtual public AsyncTransport::LifecycleObserver { 1102 public: 1103 using AsyncTransport::LifecycleObserver::LifecycleObserver; 1104 1105 /** 1106 * fdDetach() is invoked if the socket file descriptor is detached. 1107 * 1108 * detachNetworkSocket() will be triggered when a new AsyncSocket is being 1109 * constructed from an old one. See the moved() event for details about 1110 * this special case. 1111 * 1112 * @param socket Socket for which detachNetworkSocket was invoked. 1113 */ 1114 virtual void fdDetach(AsyncSocket* /* socket */) noexcept = 0; 1115 1116 /** 1117 * fdAttach() is invoked when the socket file descriptor is attached. 1118 * 1119 * @param socket Socket for which handleNetworkSocketAttached was 1120 * invoked. 1121 */ fdAttach(AsyncSocket *)1122 virtual void fdAttach(AsyncSocket* /* socket */) noexcept {} 1123 1124 /** 1125 * move() will be invoked when a new AsyncSocket is being constructed via 1126 * constructor AsyncSocket(AsyncSocket* oldAsyncSocket) from an AsyncSocket 1127 * that has an observer attached. 1128 * 1129 * This type of construction is common during TLS/SSL accept process. 1130 * wangle::Acceptor may transform an AsyncSocket to an AsyncFizzServer, and 1131 * then transform the AsyncFizzServer to an AsyncSSLSocket on fallback. 1132 * AsyncFizzServer and AsyncSSLSocket derive from AsyncSocket and at each 1133 * stage the aforementioned constructor will be called. 1134 * 1135 * Observers may be attached when the initial AsyncSocket is created, before 1136 * TLS/SSL accept handling has completed. As a result, AsyncSocket must 1137 * notify the observer during each transformation so that: 1138 * (1) The observer can track these transformations for debugging. 1139 * (2) The observer does not become separated from the underlying 1140 * operating system socket and corresponding file descriptor. 1141 * 1142 * When a new AsyncSocket is being constructed via the aforementioned 1143 * constructor, the following observer events will be triggered: 1144 * (1) fdDetach 1145 * (2) move 1146 * 1147 * When move is triggered, the observer can CHOOSE to detach the old socket 1148 * and attach to the new socket. This process will not happen automatically; 1149 * the observer must explicitly perform these steps. 1150 * 1151 * @param oldSocket Old socket that fd was detached from. 1152 * @param newSocket New socket being constructed with fd attached. 1153 */ 1154 virtual void move( 1155 AsyncSocket* /* oldSocket */, 1156 AsyncSocket* /* newSocket */) noexcept = 0; 1157 }; 1158 1159 /** 1160 * Adds a lifecycle observer. 1161 * 1162 * Observers can tie their lifetime to aspects of this socket's lifecycle / 1163 * lifetime and perform inspection at various states. 1164 * 1165 * This enables instrumentation to be added without changing / interfering 1166 * with how the application uses the socket. 1167 * 1168 * Observer should implement AsyncTransport::LifecycleObserver to receive 1169 * additional lifecycle events specific to AsyncSocket. 1170 * 1171 * @param observer Observer to add (implements LifecycleObserver). 1172 */ 1173 void addLifecycleObserver( 1174 AsyncTransport::LifecycleObserver* observer) override; 1175 1176 /** 1177 * Removes a lifecycle observer. 1178 * 1179 * @param observer Observer to remove. 1180 * @return Whether observer found and removed from list. 1181 */ 1182 bool removeLifecycleObserver( 1183 AsyncTransport::LifecycleObserver* observer) override; 1184 1185 /** 1186 * Returns installed lifecycle observers. 1187 * 1188 * @return Vector with installed observers. 1189 */ 1190 FOLLY_NODISCARD virtual std::vector<AsyncTransport::LifecycleObserver*> 1191 getLifecycleObservers() const override; 1192 1193 /** 1194 * Split iovec array at given byte offsets; produce a new array with result. 1195 */ 1196 static void splitIovecArray( 1197 const size_t startOffset, 1198 const size_t endOffset, 1199 const iovec* srcVec, 1200 const size_t srcCount, 1201 iovec* dstVec, 1202 size_t& dstCount); 1203 1204 protected: 1205 enum ReadResultEnum { 1206 READ_EOF = 0, 1207 READ_ERROR = -1, 1208 READ_BLOCKING = -2, 1209 READ_NO_ERROR = -3, 1210 }; 1211 1212 enum WriteResultEnum { 1213 WRITE_ERROR = -1, 1214 }; 1215 1216 /** 1217 * Protected destructor. 1218 * 1219 * Users of AsyncSocket must never delete it directly. Instead, invoke 1220 * destroy() instead. (See the documentation in DelayedDestruction.h for 1221 * more details.) 1222 */ 1223 ~AsyncSocket() override; 1224 1225 friend std::ostream& operator<<(std::ostream& os, const StateEnum& state); 1226 1227 enum ShutdownFlags { 1228 /// shutdownWrite() called, but we are still waiting on writes to drain 1229 SHUT_WRITE_PENDING = 0x01, 1230 /// writes have been completely shut down 1231 SHUT_WRITE = 0x02, 1232 /** 1233 * Reads have been shutdown. 1234 * 1235 * At the moment we don't distinguish between remote read shutdown 1236 * (received EOF from the remote end) and local read shutdown. We can 1237 * only receive EOF when a read callback is set, and we immediately inform 1238 * it of the EOF. Therefore there doesn't seem to be any reason to have a 1239 * separate state of "received EOF but the local side may still want to 1240 * read". 1241 * 1242 * We also don't currently provide any API for only shutting down the read 1243 * side of a socket. (This is a no-op as far as TCP is concerned, anyway.) 1244 */ 1245 SHUT_READ = 0x04, 1246 }; 1247 1248 class BytesWriteRequest; 1249 1250 class WriteTimeout : public AsyncTimeout { 1251 public: WriteTimeout(AsyncSocket * socket,EventBase * eventBase)1252 WriteTimeout(AsyncSocket* socket, EventBase* eventBase) 1253 : AsyncTimeout(eventBase), socket_(socket) {} 1254 timeoutExpired()1255 void timeoutExpired() noexcept override { socket_->timeoutExpired(); } 1256 1257 private: 1258 AsyncSocket* socket_; 1259 }; 1260 1261 class IoHandler : public EventHandler { 1262 public: IoHandler(AsyncSocket * socket,EventBase * eventBase)1263 IoHandler(AsyncSocket* socket, EventBase* eventBase) 1264 : EventHandler(eventBase, NetworkSocket()), socket_(socket) {} IoHandler(AsyncSocket * socket,EventBase * eventBase,NetworkSocket fd)1265 IoHandler(AsyncSocket* socket, EventBase* eventBase, NetworkSocket fd) 1266 : EventHandler(eventBase, fd), socket_(socket) {} 1267 handlerReady(uint16_t events)1268 void handlerReady(uint16_t events) noexcept override { 1269 socket_->ioReady(events); 1270 } 1271 1272 private: 1273 AsyncSocket* socket_; 1274 }; 1275 1276 void init(); 1277 1278 class ImmediateReadCB : public folly::EventBase::LoopCallback { 1279 public: ImmediateReadCB(AsyncSocket * socket)1280 explicit ImmediateReadCB(AsyncSocket* socket) : socket_(socket) {} runLoopCallback()1281 void runLoopCallback() noexcept override { 1282 DestructorGuard dg(socket_); 1283 socket_->checkForImmediateRead(); 1284 } 1285 1286 private: 1287 AsyncSocket* socket_; 1288 }; 1289 1290 /** 1291 * Schedule checkForImmediateRead to be executed in the next loop 1292 * iteration. 1293 */ scheduleImmediateRead()1294 void scheduleImmediateRead() noexcept { 1295 if (good()) { 1296 eventBase_->runInLoop(&immediateReadHandler_); 1297 } 1298 } 1299 1300 /** 1301 * Schedule handleInitalReadWrite to run in the next iteration. 1302 */ scheduleInitialReadWrite()1303 void scheduleInitialReadWrite() noexcept { 1304 if (good()) { 1305 DestructorGuard dg(this); 1306 eventBase_->runInLoop([this, dg] { 1307 if (good()) { 1308 handleInitialReadWrite(); 1309 } 1310 }); 1311 } 1312 } 1313 1314 // event notification methods 1315 void ioReady(uint16_t events) noexcept; 1316 virtual void checkForImmediateRead() noexcept; 1317 virtual void handleInitialReadWrite() noexcept; 1318 virtual void prepareReadBuffer(void** buf, size_t* buflen); 1319 virtual void prepareReadBuffers(IOBufIovecBuilder::IoVecVec& iovs); 1320 virtual size_t handleErrMessages() noexcept; 1321 virtual void handleRead() noexcept; 1322 virtual void handleWrite() noexcept; 1323 virtual void handleConnect() noexcept; 1324 void timeoutExpired() noexcept; 1325 1326 /** 1327 * Handler for when the file descriptor is attached to the AsyncSocket. 1328 1329 * This updates the EventHandler to start using the fd and notifies all 1330 * observers attached to the socket. This is necessary to let 1331 * observers know about an attached fd immediately (i.e., on connection 1332 * attempt) rather than when the connection succeeds. 1333 */ 1334 virtual void handleNetworkSocketAttached(); 1335 1336 /** 1337 * Attempt to read from the socket into a single buffer 1338 * 1339 * @param buf The buffer to read data into. 1340 * @param buflen The length of the buffer. 1341 * 1342 * @return Returns a read result. See read result for details. 1343 */ 1344 virtual ReadResult performRead(void** buf, size_t* buflen, size_t* offset); 1345 1346 /** 1347 * Attempt to read from the socket into an iovec array 1348 * 1349 * @param iovs The iovec array to read data into. 1350 * @param num The number of elements in the iovec array 1351 * 1352 * @return Returns a read result. See read result for details. 1353 */ 1354 virtual ReadResult performReadv(struct iovec* iovs, size_t num); 1355 1356 /** 1357 * Populate an iovec array from an IOBuf and attempt to write it. 1358 * 1359 * @param callback Write completion/error callback. 1360 * @param vec Target iovec array; caller retains ownership. 1361 * @param count Number of IOBufs to write, beginning at start of buf. 1362 * @param buf Chain of iovecs. 1363 * @param flags set of flags for the underlying write calls, like cork 1364 */ 1365 void writeChainImpl( 1366 WriteCallback* callback, 1367 iovec* vec, 1368 size_t count, 1369 std::unique_ptr<folly::IOBuf>&& buf, 1370 WriteFlags flags); 1371 1372 /** 1373 * Write as much data as possible to the socket without blocking, 1374 * and queue up any leftover data to send when the socket can 1375 * handle writes again. 1376 * 1377 * @param callback The callback to invoke when the write is completed. 1378 * @param vec Array of buffers to write; this method will make a 1379 * copy of the vector (but not the buffers themselves) 1380 * if the write has to be completed asynchronously. 1381 * @param count Number of elements in vec. 1382 * @param buf The IOBuf that manages the buffers referenced by 1383 * vec, or a pointer to nullptr if the buffers are not 1384 * associated with an IOBuf. Note that ownership of 1385 * the IOBuf is transferred here; upon completion of 1386 * the write, the AsyncSocket deletes the IOBuf. 1387 * @param totalBytes The total number of bytes to be written. 1388 * @param flags Set of write flags. 1389 */ 1390 void writeImpl( 1391 WriteCallback* callback, 1392 const iovec* vec, 1393 size_t count, 1394 std::unique_ptr<folly::IOBuf>&& buf, 1395 size_t totalBytes, 1396 WriteFlags flags = WriteFlags::NONE); 1397 1398 /** 1399 * Attempt to write to the socket. 1400 * 1401 * @param vec The iovec array pointing to the buffers to write. 1402 * @param count The length of the iovec array. 1403 * @param flags Set of write flags. 1404 * @param countWritten On return, the value pointed to by this parameter 1405 * will contain the number of iovec entries that were 1406 * fully written. 1407 * @param partialWritten On return, the value pointed to by this parameter 1408 * will contain the number of bytes written in the 1409 * partially written iovec entry. 1410 * 1411 * @return Returns a WriteResult. See WriteResult for more details. 1412 */ 1413 virtual WriteResult performWrite( 1414 const iovec* vec, 1415 uint32_t count, 1416 WriteFlags flags, 1417 uint32_t* countWritten, 1418 uint32_t* partialWritten); 1419 1420 /** 1421 * Prepares a msghdr and sends the message over the socket using sendmsg 1422 * 1423 * @param vec The iovec array pointing to the buffers to write. 1424 * @param count The length of the iovec array. 1425 * @param flags Set of write flags. 1426 */ 1427 virtual AsyncSocket::WriteResult sendSocketMessage( 1428 const iovec* vec, size_t count, WriteFlags flags); 1429 1430 /** 1431 * Sends the message over the socket using sendmsg 1432 * 1433 * @param msg Message to send 1434 * @param msg_flags Flags to pass to sendmsg 1435 */ 1436 virtual AsyncSocket::WriteResult sendSocketMessage( 1437 NetworkSocket fd, struct msghdr* msg, int msg_flags); 1438 1439 virtual ssize_t tfoSendMsg( 1440 NetworkSocket fd, struct msghdr* msg, int msg_flags); 1441 1442 int socketConnect(const struct sockaddr* addr, socklen_t len); 1443 1444 virtual void scheduleConnectTimeout(); 1445 void registerForConnectEvents(); 1446 1447 bool updateEventRegistration(); 1448 1449 /** 1450 * Update event registration. 1451 * 1452 * @param enable Flags of events to enable. Set it to 0 if no events 1453 * need to be enabled in this call. 1454 * @param disable Flags of events 1455 * to disable. Set it to 0 if no events need to be disabled in this 1456 * call. 1457 * 1458 * @return true iff the update is successful. 1459 */ 1460 bool updateEventRegistration(uint16_t enable, uint16_t disable); 1461 1462 // read methods 1463 ReadResult performReadInternal(struct iovec* iovs, size_t num); 1464 1465 // Actually close the file descriptor and set it to -1 so we don't 1466 // accidentally close it again. 1467 void doClose(); 1468 1469 // error handling methods 1470 void startFail(); 1471 void finishFail(); 1472 void finishFail(const AsyncSocketException& ex); 1473 void invokeAllErrors(const AsyncSocketException& ex); 1474 void fail(const char* fn, const AsyncSocketException& ex); 1475 void failConnect(const char* fn, const AsyncSocketException& ex); 1476 void failRead(const char* fn, const AsyncSocketException& ex); 1477 void failErrMessageRead(const char* fn, const AsyncSocketException& ex); 1478 void failWrite( 1479 const char* fn, 1480 WriteCallback* callback, 1481 size_t bytesWritten, 1482 const AsyncSocketException& ex); 1483 void failWrite(const char* fn, const AsyncSocketException& ex); 1484 void failAllWrites(const AsyncSocketException& ex); 1485 void failByteEvents(const AsyncSocketException& ex); 1486 virtual void invokeConnectErr(const AsyncSocketException& ex); 1487 virtual void invokeConnectSuccess(); 1488 virtual void invokeConnectAttempt(); 1489 void invalidState(ConnectCallback* callback); 1490 void invalidState(ErrMessageCallback* callback); 1491 void invalidState(ReadCallback* callback); 1492 void invalidState(WriteCallback* callback); 1493 1494 std::string withAddr(folly::StringPiece s); 1495 1496 void cacheLocalAddress() const; 1497 void cachePeerAddress() const; 1498 1499 void applyOptions( 1500 const SocketOptionMap& options, SocketOptionKey::ApplyPos pos); 1501 1502 bool isZeroCopyRequest(WriteFlags flags); 1503 1504 bool isZeroCopyMsg(const cmsghdr& cmsg) const; 1505 void processZeroCopyMsg(const cmsghdr& cmsg); 1506 getNextZeroCopyBufId()1507 uint32_t getNextZeroCopyBufId() { return zeroCopyBufId_++; } 1508 void adjustZeroCopyFlags(folly::WriteFlags& flags); 1509 void addZeroCopyBuf( 1510 std::unique_ptr<folly::IOBuf>&& buf, ReleaseIOBufCallback* cb); 1511 void addZeroCopyBuf(folly::IOBuf* ptr); 1512 void setZeroCopyBuf( 1513 std::unique_ptr<folly::IOBuf>&& buf, ReleaseIOBufCallback* cb); 1514 bool containsZeroCopyBuf(folly::IOBuf* ptr); 1515 void releaseZeroCopyBuf(uint32_t id); 1516 1517 void releaseIOBuf( 1518 std::unique_ptr<folly::IOBuf> buf, ReleaseIOBufCallback* callback); 1519 1520 /** 1521 * Attempt to enable Observer ByteEvents for this socket. 1522 * 1523 * Once enabled, ByteEvents rename enabled for the socket's life. 1524 * 1525 * ByteEvents are delivered to Observers; when an observer is added: 1526 * - If this function has already been called, byteEventsEnabled() or 1527 * byteEventsUnavailable() will be called, depending on ByteEvent state. 1528 * - Else if the socket is connected, this function is called immediately. 1529 * - Else if the socket has not yet connected, this function will be called 1530 * after the socket has connected (ByteEvents cannot be set up earlier). 1531 * 1532 * If ByteEvents are successfully enabled, byteEventsEnabled() will be called 1533 * on each Observer that has requested ByteEvents. If unable to enable, or if 1534 * ByteEvents become unavailable (e.g., due to close), byteEventsUnavailable() 1535 * will be called on each Observer that has requested ByteEvents. 1536 * 1537 * This function does need to be explicitly called under other circumstances. 1538 */ 1539 virtual void enableByteEvents(); 1540 1541 AsyncWriter::ZeroCopyEnableFunc zeroCopyEnableFunc_; 1542 1543 // a folly::IOBuf can be used in multiple partial requests 1544 // there is a that maps a buffer id to a raw folly::IOBuf ptr 1545 // and another one that adds a ref count for a folly::IOBuf that is either 1546 // the original ptr or nullptr 1547 uint32_t zeroCopyBufId_{0}; 1548 1549 struct IOBufInfo { 1550 uint32_t count_{0}; 1551 ReleaseIOBufCallback* cb_{nullptr}; 1552 std::unique_ptr<folly::IOBuf> buf_; 1553 }; 1554 1555 std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_; 1556 std::unordered_map<folly::IOBuf*, IOBufInfo> idZeroCopyBufInfoMap_; 1557 1558 StateEnum state_{StateEnum::UNINIT}; ///< StateEnum describing current state 1559 uint8_t shutdownFlags_{0}; ///< Shutdown state (ShutdownFlags) 1560 uint16_t eventFlags_; ///< EventBase::HandlerFlags settings 1561 NetworkSocket fd_; ///< The socket file descriptor 1562 mutable folly::SocketAddress addr_; ///< The address we tried to connect to 1563 mutable folly::SocketAddress localAddr_; 1564 ///< The address we are connecting from 1565 uint32_t sendTimeout_; ///< The send timeout, in milliseconds 1566 uint16_t maxReadsPerEvent_; ///< Max reads per event loop iteration 1567 1568 int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any 1569 1570 EventBase* eventBase_; ///< The EventBase 1571 WriteTimeout writeTimeout_; ///< A timeout for connect and write 1572 IoHandler ioHandler_; ///< A EventHandler to monitor the fd 1573 ImmediateReadCB immediateReadHandler_; ///< LoopCallback for checking read 1574 1575 ConnectCallback* connectCallback_; ///< ConnectCallback 1576 ErrMessageCallback* errMessageCallback_; ///< TimestampCallback 1577 ReadAncillaryDataCallback* 1578 readAncillaryDataCallback_; ///< AncillaryDataCallback 1579 SendMsgParamsCallback* ///< Callback for retrieving 1580 sendMsgParamCallback_; ///< ::sendmsg() parameters 1581 ReadCallback* readCallback_; ///< ReadCallback 1582 WriteRequest* writeReqHead_; ///< Chain of WriteRequests 1583 WriteRequest* writeReqTail_; ///< End of WriteRequest chain 1584 std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_; 1585 size_t appBytesReceived_; ///< Num of bytes received from socket 1586 size_t appBytesWritten_{0}; ///< Num of bytes written to socket 1587 size_t rawBytesWritten_{0}; ///< Num of (raw) bytes written to socket 1588 // The total num of bytes passed to AsyncSocket's write functions. It doesn't 1589 // include failed writes, but it does include buffered writes. 1590 size_t totalAppBytesScheduledForWrite_; 1591 // Num of bytes allocated in IOBufs pending write. 1592 size_t allocatedBytesBuffered_{0}; 1593 1594 // Lifecycle observers. 1595 // 1596 // Use small_vector to avoid heap allocation for up to two observers, unless 1597 // mobile, in which case we fallback to std::vector to prioritize code size. 1598 using LifecycleObserverVecImpl = conditional_t< 1599 !kIsMobile, 1600 folly::small_vector<AsyncTransport::LifecycleObserver*, 2>, 1601 std::vector<AsyncTransport::LifecycleObserver*>>; 1602 LifecycleObserverVecImpl lifecycleObservers_; 1603 1604 // Pre-received data, to be returned to read callback before any data from the 1605 // socket. 1606 std::unique_ptr<IOBuf> preReceivedData_; 1607 1608 std::chrono::steady_clock::time_point connectStartTime_; 1609 std::chrono::steady_clock::time_point connectEndTime_; 1610 1611 std::chrono::milliseconds connectTimeout_{0}; 1612 1613 std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr}; 1614 1615 BufferCallback* bufferCallback_{nullptr}; 1616 bool tfoEnabled_{false}; 1617 bool tfoAttempted_{false}; 1618 bool tfoFinished_{false}; 1619 bool noTransparentTls_{false}; 1620 bool noTSocks_{false}; 1621 // Whether to track EOR or not. 1622 bool trackEor_{false}; 1623 1624 // ByteEvent state 1625 std::unique_ptr<ByteEventHelper> byteEventHelper_; 1626 1627 bool zeroCopyEnabled_{false}; 1628 bool zeroCopyVal_{false}; 1629 // zerocopy re-enable logic 1630 size_t zeroCopyReenableThreshold_{0}; 1631 size_t zeroCopyReenableCounter_{0}; 1632 1633 // subclasses may cache these on first call to get 1634 mutable std::unique_ptr<const AsyncTransportCertificate> peerCertData_{ 1635 nullptr}; 1636 mutable std::unique_ptr<const AsyncTransportCertificate> selfCertData_{ 1637 nullptr}; 1638 1639 bool closeOnFailedWrite_{true}; 1640 1641 netops::DispatcherContainer netops_; 1642 1643 // allow other functions to register for callbacks when 1644 // new AsyncSocket()'s are created 1645 // must be LAST member defined to ensure other members are initialized 1646 // before access; see ConstructorCallback.h for details 1647 ConstructorCallback<AsyncSocket> constructorCallback_{this}; 1648 }; 1649 1650 } // namespace folly 1651