1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * All rights reserved. 4 * 5 * This source code is licensed under the BSD-style license found in the 6 * LICENSE file in the root directory of this source tree. 7 */ 8 9 #pragma once 10 11 #include "proxygen/lib/http/HTTPMessage.h" 12 #include <climits> 13 #include <folly/Optional.h> 14 #include <folly/SocketAddress.h> 15 #include <folly/io/async/AsyncTransport.h> 16 #include <folly/io/async/DelayedDestructionBase.h> 17 #include <folly/io/async/HHWheelTimer.h> 18 #include <folly/lang/Assume.h> 19 #include <iosfwd> 20 #include <proxygen/lib/http/HTTPConstants.h> 21 #include <proxygen/lib/http/HTTPHeaderSize.h> 22 #include <proxygen/lib/http/HTTPMessage.h> 23 #include <proxygen/lib/http/ProxygenErrorEnum.h> 24 #include <proxygen/lib/http/Window.h> 25 #include <proxygen/lib/http/codec/HTTPCodec.h> 26 #include <proxygen/lib/http/session/ByteEvents.h> 27 #include <proxygen/lib/http/session/HTTP2PriorityQueue.h> 28 #include <proxygen/lib/http/session/HTTPEvent.h> 29 #include <proxygen/lib/http/session/HTTPTransactionEgressSM.h> 30 #include <proxygen/lib/http/session/HTTPTransactionIngressSM.h> 31 #include <proxygen/lib/utils/Time.h> 32 #include <proxygen/lib/utils/TraceEvent.h> 33 #include <proxygen/lib/utils/TraceEventObserver.h> 34 #include <proxygen/lib/utils/WheelTimerInstance.h> 35 #include <set> 36 #include <wangle/acceptor/TransportInfo.h> 37 38 namespace proxygen { 39 40 /** 41 * Experimental 42 * 43 * A sender interface to send out DSR delegated packetization requests. 44 */ 45 struct DSRRequestSender { 46 virtual ~DSRRequestSender() = default; 47 }; 48 49 /** 50 * An HTTPTransaction represents a single request/response pair 51 * for some HTTP-like protocol. It works with a Transport that 52 * performs the network processing and wire-protocol formatting 53 * and a Handler that implements some sort of application logic. 54 * 55 * The typical sequence of events for a simple application is: 56 * 57 * * The application accepts a connection and creates a Transport. 58 * * The Transport reads from the connection, parses whatever 59 * protocol the client is speaking, and creates a Transaction 60 * to represent the first request. 61 * * Once the Transport has received the full request headers, 62 * it creates a Handler, plugs the handler into the Transaction, 63 * and calls the Transaction's onIngressHeadersComplete() method. 64 * * The Transaction calls the Handler's onHeadersComplete() method 65 * and the Handler begins processing the request. 66 * * If there is a request body, the Transport streams it through 67 * the Transaction to the Handler. 68 * * When the Handler is ready to produce a response, it streams 69 * the response through the Transaction to the Transport. 70 * * When the Transaction has seen the end of both the request 71 * and the response, it detaches itself from the Handler and 72 * Transport and deletes itself. 73 * * The Handler deletes itself at some point after the Transaction 74 * has detached from it. 75 * * The Transport may, depending on the protocol, process other 76 * requests after -- or even in parallel with -- that first 77 * request. Each request gets its own Transaction and Handler. 78 * 79 * For some applications, like proxying, a Handler implementation 80 * may obtain one or more upstream connections, each represented 81 * by another Transport, and create outgoing requests on the upstream 82 * connection(s), with each request represented as a new Transaction. 83 * 84 * With a multiplexing protocol like SPDY on both sides of a proxy, 85 * the cardinality relationship can be: 86 * 87 * +-----------+ +-----------+ +-------+ 88 * (Client-side) | Transport |1---*|Transaction|1---1|Handler| 89 * +-----------+ +-----------+ +-------+ 90 * 1 91 * | 92 * | 93 * 1 94 * +---------+ +-----------+ 95 * (Server-side) |Transport|1---*|Transaction| 96 * +---------+ +-----------+ 97 * 98 * A key design goal of HTTPTransaction is to serve as a protocol- 99 * independent abstraction that insulates Handlers from the semantics 100 * different of HTTP-like protocols. 101 */ 102 103 /** Info about Transaction running on this session */ 104 class TransactionInfo { 105 public: TransactionInfo()106 TransactionInfo() { 107 } 108 TransactionInfo(std::chrono::milliseconds ttfb,std::chrono::milliseconds ttlb,uint64_t eHeader,uint64_t inHeader,uint64_t eBody,uint64_t inBody,bool completed)109 TransactionInfo(std::chrono::milliseconds ttfb, 110 std::chrono::milliseconds ttlb, 111 uint64_t eHeader, 112 uint64_t inHeader, 113 uint64_t eBody, 114 uint64_t inBody, 115 bool completed) 116 : timeToFirstByte(ttfb), 117 timeToLastByte(ttlb), 118 egressHeaderBytes(eHeader), 119 ingressHeaderBytes(inHeader), 120 egressBodyBytes(eBody), 121 ingressBodyBytes(inBody), 122 isCompleted(completed) { 123 } 124 125 /** Time to first byte */ 126 std::chrono::milliseconds timeToFirstByte{0}; 127 /** Time to last byte */ 128 std::chrono::milliseconds timeToLastByte{0}; 129 130 /** Number of bytes send in headers */ 131 uint64_t egressHeaderBytes{0}; 132 /** Number of bytes receive headers */ 133 uint64_t ingressHeaderBytes{0}; 134 /** Number of bytes send in body */ 135 uint64_t egressBodyBytes{0}; 136 /** Number of bytes receive in body */ 137 uint64_t ingressBodyBytes{0}; 138 139 /** Is the transaction was completed without error */ 140 bool isCompleted{false}; 141 }; 142 143 class HTTPSessionStats; 144 class HTTPTransaction; 145 class HTTPTransactionHandler : public TraceEventObserver { 146 public: 147 /** 148 * Called once per transaction. This notifies the handler of which 149 * transaction it should talk to and will receive callbacks from. 150 */ 151 virtual void setTransaction(HTTPTransaction* txn) noexcept = 0; 152 153 /** 154 * Called once after a transaction successfully completes. It 155 * will be called even if a read or write error happened earlier. 156 * This is a terminal callback, which means that the HTTPTransaction 157 * object that gives this call will be invalid after this function 158 * completes. 159 */ 160 virtual void detachTransaction() noexcept = 0; 161 162 /** 163 * Called at most once per transaction. This is usually the first 164 * ingress callback. It is possible to get a read error before this 165 * however. If you had previously called pauseIngress(), this callback 166 * will be delayed until you call resumeIngress(). 167 */ 168 virtual void onHeadersComplete(std::unique_ptr<HTTPMessage> msg) noexcept = 0; 169 170 /** 171 * Can be called multiple times per transaction. If you had previously 172 * called pauseIngress(), this callback will be delayed until you call 173 * resumeIngress(). 174 */ 175 virtual void onBody(std::unique_ptr<folly::IOBuf> chain) noexcept = 0; 176 177 /** 178 * Same as onBody() but with additional offset parameter. 179 */ onBodyWithOffset(uint64_t,std::unique_ptr<folly::IOBuf> chain)180 virtual void onBodyWithOffset(uint64_t /* bodyOffset */, 181 std::unique_ptr<folly::IOBuf> chain) { 182 onBody(std::move(chain)); 183 } 184 185 /** 186 * Can be called multiple times per transaction. If you had previously 187 * called pauseIngress(), this callback will be delayed until you call 188 * resumeIngress(). This signifies the beginning of a chunk of length 189 * 'length'. You will receive onBody() after this. Also, the length will 190 * be greater than zero. 191 */ onChunkHeader(size_t)192 virtual void onChunkHeader(size_t /* length */) noexcept { 193 } 194 195 /** 196 * Can be called multiple times per transaction. If you had previously 197 * called pauseIngress(), this callback will be delayed until you call 198 * resumeIngress(). This signifies the end of a chunk. 199 */ onChunkComplete()200 virtual void onChunkComplete() noexcept { 201 } 202 203 /** 204 * Can be called any number of times per transaction. If you had 205 * previously called pauseIngress(), this callback will be delayed until 206 * you call resumeIngress(). Trailers can be received once right before 207 * the EOM of a chunked HTTP/1.1 reponse or multiple times per 208 * transaction from SPDY and HTTP/2.0 HEADERS frames. 209 */ 210 virtual void onTrailers(std::unique_ptr<HTTPHeaders> trailers) noexcept = 0; 211 212 /** 213 * Can be called once per transaction. If you had previously called 214 * pauseIngress(), this callback will be delayed until you call 215 * resumeIngress(). After this callback is received, there will be no 216 * more normal ingress callbacks received (onEgress*() and onError() 217 * may still be invoked). The Handler should consider 218 * ingress complete after receiving this message. This Transaction is 219 * still valid, and work may still occur on it until detachTransaction 220 * is called. 221 */ 222 virtual void onEOM() noexcept = 0; 223 224 /** 225 * Can be called once per transaction. If you had previously called 226 * pauseIngress(), this callback will be delayed until you call 227 * resumeIngress(). After this callback is invoked, further data 228 * will be forwarded using the onBody() callback. Once the data transfer 229 * is completed (EOF recevied in case of CONNECT), onEOM() callback will 230 * be invoked. 231 */ 232 virtual void onUpgrade(UpgradeProtocol protocol) noexcept = 0; 233 234 /** 235 * Can be called at any time before detachTransaction(). This callback 236 * implies that an error has occurred. To determine if ingress or egress 237 * is affected, check the direciont on the HTTPException. If the 238 * direction is INGRESS, it MAY still be possible to send egress. 239 */ 240 virtual void onError(const HTTPException& error) noexcept = 0; 241 242 /** 243 * Can be called at any time before detachTransaction(). This callback is 244 * invoked in cases that violate an internal invariant that is fatal to the 245 * transaction but can be recoverable for the session or library. One such 246 * example is mis-use of the egress APIs (sendBody() before sendHeaders()). 247 */ onInvariantViolation(const HTTPException & error)248 virtual void onInvariantViolation(const HTTPException& error) noexcept { 249 LOG(FATAL) << error.what(); 250 } 251 /** 252 * If the remote side's receive buffer fills up, this callback will be 253 * invoked so you can attempt to stop sending to the remote side. 254 */ 255 virtual void onEgressPaused() noexcept = 0; 256 257 /** 258 * This callback lets you know that the remote side has resumed reading 259 * and you can now continue to send data. 260 */ 261 virtual void onEgressResumed() noexcept = 0; 262 263 /** 264 * Ask the handler to construct a handler for a pushed transaction associated 265 * with its transaction. 266 * 267 * TODO: Reconsider default implementation here. If the handler 268 * does not implement, better set max initiated to 0 in a settings frame? 269 */ onPushedTransaction(HTTPTransaction *)270 virtual void onPushedTransaction(HTTPTransaction* /* txn */) noexcept { 271 } 272 273 /** 274 * Ask the handler to construct a handler for a ExTransaction associated 275 * with its transaction. 276 */ onExTransaction(HTTPTransaction *)277 virtual void onExTransaction(HTTPTransaction* /* txn */) noexcept { 278 } 279 280 /** 281 * Inform the handler that a GOAWAY has been received on the 282 * transport. This callback will only be invoked if the transport is 283 * SPDY or HTTP/2. It may be invoked multiple times, as HTTP/2 allows this. 284 * 285 * @param code The error code received in the GOAWAY frame 286 */ onGoaway(ErrorCode)287 virtual void onGoaway(ErrorCode /* code */) noexcept { 288 } 289 290 /** 291 * Can be called multiple times per transaction after onHeadersComplete and 292 * before detachTransaction() 293 * 294 * It does not obey pauseIngress/resumeIngress it is up to the handler 295 * to decide whether to buffer/drop datagrams 296 */ onDatagram(std::unique_ptr<folly::IOBuf>)297 virtual void onDatagram(std::unique_ptr<folly::IOBuf> /*datagram*/) noexcept { 298 } 299 ~HTTPTransactionHandler()300 virtual ~HTTPTransactionHandler() { 301 } 302 }; 303 304 class HTTPPushTransactionHandler : public HTTPTransactionHandler { 305 public: ~HTTPPushTransactionHandler()306 ~HTTPPushTransactionHandler() override { 307 } 308 onHeadersComplete(std::unique_ptr<HTTPMessage>)309 void onHeadersComplete(std::unique_ptr<HTTPMessage>) noexcept final { 310 LOG(FATAL) << "push txn received headers"; 311 } 312 onBody(std::unique_ptr<folly::IOBuf>)313 void onBody(std::unique_ptr<folly::IOBuf>) noexcept final { 314 LOG(FATAL) << "push txn received body"; 315 } 316 onBodyWithOffset(uint64_t,std::unique_ptr<folly::IOBuf>)317 void onBodyWithOffset(uint64_t, 318 std::unique_ptr<folly::IOBuf>) noexcept final { 319 LOG(FATAL) << "push txn received body with offset"; 320 } 321 onChunkHeader(size_t)322 void onChunkHeader(size_t /* length */) noexcept final { 323 LOG(FATAL) << "push txn received chunk header"; 324 } 325 onChunkComplete()326 void onChunkComplete() noexcept final { 327 LOG(FATAL) << "push txn received chunk complete"; 328 } 329 onTrailers(std::unique_ptr<HTTPHeaders>)330 void onTrailers(std::unique_ptr<HTTPHeaders>) noexcept final { 331 LOG(FATAL) << "push txn received trailers"; 332 } 333 onEOM()334 void onEOM() noexcept final { 335 LOG(FATAL) << "push txn received EOM"; 336 } 337 onUpgrade(UpgradeProtocol)338 void onUpgrade(UpgradeProtocol) noexcept final { 339 LOG(FATAL) << "push txn received upgrade"; 340 } 341 onPushedTransaction(HTTPTransaction *)342 void onPushedTransaction(HTTPTransaction*) noexcept final { 343 LOG(FATAL) << "push txn received push txn"; 344 } 345 }; 346 347 /** 348 * Callback interface to be notified of events on the byte stream. 349 */ 350 class HTTPTransactionTransportCallback { 351 public: 352 virtual void firstHeaderByteFlushed() noexcept = 0; 353 354 virtual void firstByteFlushed() noexcept = 0; 355 356 virtual void lastByteFlushed() noexcept = 0; 357 trackedByteFlushed()358 virtual void trackedByteFlushed() noexcept { 359 } 360 361 virtual void lastByteAcked(std::chrono::milliseconds latency) noexcept = 0; 362 trackedByteEventTX(const ByteEvent &)363 virtual void trackedByteEventTX(const ByteEvent& /* event */) noexcept { 364 } 365 trackedByteEventAck(const ByteEvent &)366 virtual void trackedByteEventAck(const ByteEvent& /* event */) noexcept { 367 } 368 egressBufferEmpty()369 virtual void egressBufferEmpty() noexcept { 370 } 371 372 virtual void headerBytesGenerated(HTTPHeaderSize& size) noexcept = 0; 373 374 virtual void headerBytesReceived(const HTTPHeaderSize& size) noexcept = 0; 375 376 // May include extra bytes for EOF/trailers 377 virtual void bodyBytesGenerated(size_t nbytes) noexcept = 0; 378 379 virtual void bodyBytesReceived(size_t size) noexcept = 0; 380 lastEgressHeaderByteAcked()381 virtual void lastEgressHeaderByteAcked() noexcept { 382 } 383 bodyBytesDelivered(uint64_t)384 virtual void bodyBytesDelivered(uint64_t /* bodyOffset */) noexcept { 385 } 386 bodyBytesDeliveryCancelled(uint64_t)387 virtual void bodyBytesDeliveryCancelled(uint64_t /* bodyOffset */) noexcept { 388 } 389 transportAppRateLimited()390 virtual void transportAppRateLimited() noexcept { 391 } 392 datagramBytesGenerated(size_t)393 virtual void datagramBytesGenerated(size_t /* nbytes */) noexcept { 394 } datagramBytesReceived(size_t)395 virtual void datagramBytesReceived(size_t /* size */) noexcept { 396 } 397 ~HTTPTransactionTransportCallback()398 virtual ~HTTPTransactionTransportCallback() { 399 } 400 }; 401 402 class HTTPSessionBase; 403 class HTTPTransaction 404 : public folly::HHWheelTimer::Callback 405 , public folly::DelayedDestructionBase { 406 public: 407 using Handler = HTTPTransactionHandler; 408 using PushHandler = HTTPPushTransactionHandler; 409 410 struct FlowControlInfo { 411 bool flowControlEnabled_{false}; 412 int64_t sessionSendWindow_{-1}; 413 int64_t sessionRecvWindow_{-1}; 414 int64_t sessionSendOutstanding_{-1}; 415 int64_t sessionRecvOutstanding_{-1}; 416 int64_t streamSendWindow_{-1}; 417 int64_t streamRecvWindow_{-1}; 418 int64_t streamSendOutstanding_{-1}; 419 int64_t streamRecvOutstanding_{-1}; 420 }; 421 422 /** 423 * Experimental. 424 * 425 * BufferMeta represents a buffer. The real data will be sourced from another 426 * place. 427 */ 428 struct BufferMeta { 429 size_t length{0}; 430 431 BufferMeta() = default; BufferMetaBufferMeta432 explicit BufferMeta(size_t lengthIn) : length(lengthIn) { 433 } 434 splitBufferMeta435 BufferMeta split(size_t splitLen) { 436 CHECK_GE(length, splitLen); 437 length -= splitLen; 438 return BufferMeta(splitLen); 439 } 440 }; 441 442 /** 443 * Opaque token that identifies the underlying connection. 444 * Transaction handlers can use this token to group different 445 * Transport instances by the distinct underlying connections. 446 * Its uniqueness is not enforced by the Transport. 447 */ 448 using ConnectionToken = std::string; 449 450 class Transport { 451 public: 452 enum class Type : uint8_t { TCP, QUIC }; 453 ~Transport()454 virtual ~Transport() { 455 } 456 457 virtual void pauseIngress(HTTPTransaction* txn) noexcept = 0; 458 459 virtual void resumeIngress(HTTPTransaction* txn) noexcept = 0; 460 461 virtual void transactionTimeout(HTTPTransaction* txn) noexcept = 0; 462 463 virtual void sendHeaders(HTTPTransaction* txn, 464 const HTTPMessage& headers, 465 HTTPHeaderSize* size, 466 bool eom) noexcept = 0; 467 468 /** 469 * Experimental API 470 * 471 * Send headers and a DSRRequestSender to Transport. 472 * The Transport will generate header for DATA frame. 473 * 474 * dataFrameHeaderSize: An output parameter to get the size of DATA frame 475 * header. 476 */ sendHeadersWithDelegate(HTTPTransaction *,const HTTPMessage &,HTTPHeaderSize *,size_t *,uint64_t,std::unique_ptr<DSRRequestSender>)477 virtual bool sendHeadersWithDelegate( 478 HTTPTransaction*, 479 const HTTPMessage&, 480 HTTPHeaderSize*, 481 size_t* /* dataFrameHeaderSize */, 482 uint64_t /* contentLength */, 483 std::unique_ptr<DSRRequestSender>) noexcept { 484 return false; 485 } 486 487 // Experimental 488 virtual size_t sendBody(HTTPTransaction*, 489 const BufferMeta&, 490 bool /* eom */) noexcept = 0; 491 492 virtual size_t sendBody(HTTPTransaction* txn, 493 std::unique_ptr<folly::IOBuf>, 494 bool eom, 495 bool trackLastByteFlushed) noexcept = 0; 496 497 virtual size_t sendChunkHeader(HTTPTransaction* txn, 498 size_t length) noexcept = 0; 499 500 virtual size_t sendChunkTerminator(HTTPTransaction* txn) noexcept = 0; 501 502 virtual size_t sendEOM(HTTPTransaction* txn, 503 const HTTPHeaders* trailers) noexcept = 0; 504 505 virtual size_t sendAbort(HTTPTransaction* txn, 506 ErrorCode statusCode) noexcept = 0; 507 508 virtual size_t sendPriority(HTTPTransaction* txn, 509 const http2::PriorityUpdate& pri) noexcept = 0; 510 virtual size_t changePriority(HTTPTransaction* txn, 511 HTTPPriority pri) noexcept = 0; 512 513 virtual size_t sendWindowUpdate(HTTPTransaction* txn, 514 uint32_t bytes) noexcept = 0; 515 516 virtual void notifyPendingEgress() noexcept = 0; 517 518 virtual void detach(HTTPTransaction* txn) noexcept = 0; 519 520 virtual void notifyIngressBodyProcessed(uint32_t bytes) noexcept = 0; 521 522 virtual void notifyEgressBodyBuffered(int64_t bytes) noexcept = 0; 523 524 virtual const folly::SocketAddress& getLocalAddress() const noexcept = 0; 525 526 virtual const folly::SocketAddress& getPeerAddress() const noexcept = 0; 527 528 virtual void describe(std::ostream&) const = 0; 529 530 virtual const wangle::TransportInfo& getSetupTransportInfo() 531 const noexcept = 0; 532 533 virtual bool getCurrentTransportInfo(wangle::TransportInfo* tinfo) = 0; 534 535 virtual void getFlowControlInfo(FlowControlInfo* info) = 0; 536 537 virtual HTTPTransaction::Transport::Type getSessionType() 538 const noexcept = 0; 539 540 virtual const HTTPCodec& getCodec() const noexcept = 0; 541 542 /* 543 * Drain the underlying session. This will affect other transactions 544 * running on the same session and is discouraged unless you are confident 545 * that the session is broken. 546 */ 547 virtual void drain() = 0; 548 549 virtual bool isDraining() const = 0; 550 551 virtual HTTPTransaction* newPushedTransaction( 552 HTTPCodec::StreamID assocStreamId, 553 HTTPTransaction::PushHandler* handler, 554 ProxygenError* error = nullptr) noexcept = 0; 555 556 virtual HTTPTransaction* newExTransaction(HTTPTransaction::Handler* handler, 557 HTTPCodec::StreamID controlStream, 558 bool unidirectional) noexcept = 0; 559 560 virtual std::string getSecurityProtocol() const = 0; 561 562 virtual void addWaitingForReplaySafety( 563 folly::AsyncTransport::ReplaySafetyCallback* callback) noexcept = 0; 564 565 virtual void removeWaitingForReplaySafety( 566 folly::AsyncTransport::ReplaySafetyCallback* callback) noexcept = 0; 567 568 virtual bool needToBlockForReplaySafety() const = 0; 569 570 virtual const folly::AsyncTransport* getUnderlyingTransport() 571 const noexcept = 0; 572 573 /** 574 * Returns true if the underlying transport has completed full handshake. 575 */ 576 virtual bool isReplaySafe() const = 0; 577 578 virtual void setHTTP2PrioritiesEnabled(bool enabled) = 0; 579 virtual bool getHTTP2PrioritiesEnabled() const = 0; 580 581 virtual HTTPSessionBase* getHTTPSessionBase() = 0; 582 583 virtual folly::Optional<const HTTPMessage::HTTP2Priority> getHTTPPriority( 584 uint8_t level) = 0; 585 getHTTPPriority()586 virtual folly::Optional<HTTPPriority> getHTTPPriority() { 587 return folly::none; 588 } 589 getDatagramSizeLimit()590 virtual uint16_t getDatagramSizeLimit() const noexcept { 591 return 0; 592 } 593 sendDatagram(std::unique_ptr<folly::IOBuf>)594 virtual bool sendDatagram(std::unique_ptr<folly::IOBuf> /*datagram*/) { 595 LOG(FATAL) << __func__ << " not supported"; 596 folly::assume_unreachable(); 597 } 598 599 /** 600 * Ask transport to track and ack body delivery. 601 */ trackEgressBodyDelivery(uint64_t)602 virtual void trackEgressBodyDelivery(uint64_t /* bodyOffset */) { 603 LOG(FATAL) << __func__ << " not supported"; 604 folly::assume_unreachable(); 605 } 606 607 virtual folly::Optional<HTTPTransaction::ConnectionToken> 608 getConnectionToken() const noexcept = 0; 609 }; 610 611 using TransportCallback = HTTPTransactionTransportCallback; 612 613 /** 614 * readBufLimit and sendWindow are only used if useFlowControl is 615 * true. Furthermore, if flow control is enabled, no guarantees can be 616 * made on the borders of the L7 chunking/data frames of the outbound 617 * messages. 618 * 619 * priority is only used by SPDY. The -1 default makes sure that all 620 * plain HTTP transactions land up in the same queue as the control data. 621 */ 622 HTTPTransaction( 623 TransportDirection direction, 624 HTTPCodec::StreamID id, 625 uint32_t seqNo, 626 Transport& transport, 627 HTTP2PriorityQueueBase& egressQueue, 628 folly::HHWheelTimer* timer = nullptr, 629 const folly::Optional<std::chrono::milliseconds>& defaultIdleTimeout = 630 folly::Optional<std::chrono::milliseconds>(), 631 HTTPSessionStats* stats = nullptr, 632 bool useFlowControl = false, 633 uint32_t receiveInitialWindowSize = 0, 634 uint32_t sendInitialWindowSize = 0, 635 http2::PriorityUpdate = http2::DefaultPriority, 636 folly::Optional<HTTPCodec::StreamID> assocStreamId = HTTPCodec::NoStream, 637 folly::Optional<HTTPCodec::ExAttributes> exAttributes = 638 HTTPCodec::NoExAttributes, 639 bool setIngressTimeoutAfterEom = false); 640 641 ~HTTPTransaction() override; 642 643 void reset(bool useFlowControl, 644 uint32_t receiveInitialWindowSize, 645 uint32_t receiveStreamWindowSize, 646 uint32_t sendInitialWindowSize); 647 getID()648 HTTPCodec::StreamID getID() const { 649 return id_; 650 } 651 getSequenceNumber()652 uint32_t getSequenceNumber() const { 653 return seqNo_; 654 } 655 getTransport()656 const Transport& getTransport() const { 657 return transport_; 658 } 659 getTransport()660 Transport& getTransport() { 661 return transport_; 662 } 663 setHandler(Handler * handler)664 virtual void setHandler(Handler* handler) { 665 handler_ = handler; 666 if (handler_) { 667 handler_->setTransaction(this); 668 } 669 } 670 getHandler()671 const Handler* getHandler() const { 672 return handler_; 673 } 674 getHandler()675 Handler* getHandler() { 676 return handler_; 677 } 678 getPriority()679 http2::PriorityUpdate getPriority() const { 680 return priority_; 681 } 682 getPrioritySummary()683 std::tuple<uint64_t, uint64_t, double> getPrioritySummary() const { 684 return std::make_tuple(insertDepth_, 685 currentDepth_, 686 egressCalls_ > 0 ? cumulativeRatio_ / egressCalls_ 687 : 0); 688 } 689 getHTTPPriority()690 folly::Optional<HTTPPriority> getHTTPPriority() const { 691 return transport_.getHTTPPriority(); 692 } 693 getPriorityFallback()694 bool getPriorityFallback() const { 695 return priorityFallback_; 696 } 697 getEgressState()698 HTTPTransactionEgressSM::State getEgressState() const { 699 return egressState_; 700 } 701 getIngressState()702 HTTPTransactionIngressSM::State getIngressState() const { 703 return ingressState_; 704 } 705 isUpstream()706 bool isUpstream() const { 707 return direction_ == TransportDirection::UPSTREAM; 708 } 709 isDownstream()710 bool isDownstream() const { 711 return direction_ == TransportDirection::DOWNSTREAM; 712 } 713 getLocalAddress(folly::SocketAddress & addr)714 void getLocalAddress(folly::SocketAddress& addr) const { 715 addr = transport_.getLocalAddress(); 716 } 717 getPeerAddress(folly::SocketAddress & addr)718 void getPeerAddress(folly::SocketAddress& addr) const { 719 addr = transport_.getPeerAddress(); 720 } 721 getLocalAddress()722 const folly::SocketAddress& getLocalAddress() const noexcept { 723 return transport_.getLocalAddress(); 724 } 725 getPeerAddress()726 const folly::SocketAddress& getPeerAddress() const noexcept { 727 return transport_.getPeerAddress(); 728 } 729 getSetupTransportInfo()730 const wangle::TransportInfo& getSetupTransportInfo() const noexcept { 731 return transport_.getSetupTransportInfo(); 732 } 733 getCurrentTransportInfo(wangle::TransportInfo * tinfo)734 void getCurrentTransportInfo(wangle::TransportInfo* tinfo) const { 735 transport_.getCurrentTransportInfo(tinfo); 736 } 737 getCurrentFlowControlInfo(FlowControlInfo * info)738 void getCurrentFlowControlInfo(FlowControlInfo* info) const { 739 transport_.getFlowControlInfo(info); 740 info->streamSendWindow_ = sendWindow_.getCapacity(); 741 info->streamSendOutstanding_ = sendWindow_.getOutstanding(); 742 info->streamRecvWindow_ = recvWindow_.getCapacity(); 743 info->streamRecvOutstanding_ = recvWindow_.getOutstanding(); 744 } 745 getSessionStats()746 HTTPSessionStats* getSessionStats() const { 747 return stats_; 748 } 749 750 /** 751 * Check whether more response is expected. One or more 1xx status 752 * responses can be received prior to the regular response. 753 * Note: 101 is handled by the codec using a separate onUpgrade callback 754 */ extraResponseExpected()755 virtual bool extraResponseExpected() const { 756 return (lastResponseStatus_ >= 100 && lastResponseStatus_ < 200) && 757 lastResponseStatus_ != 101; 758 } 759 760 /** 761 * Change the size of the receive window and propagate the change to the 762 * remote end using a window update. 763 * 764 * TODO: when HTTPSession sends a SETTINGS frame indicating a 765 * different initial window, it should call this function on all its 766 * transactions. 767 */ 768 virtual void setReceiveWindow(uint32_t capacity); 769 770 /** 771 * Get the receive window of the transaction 772 */ getReceiveWindow()773 virtual const Window& getReceiveWindow() const { 774 return recvWindow_; 775 } 776 getMaxDeferredSize()777 uint32_t getMaxDeferredSize() { 778 return maxDeferredIngress_; 779 } 780 781 /** 782 * Invoked by the session when the ingress headers are complete 783 */ 784 void onIngressHeadersComplete(std::unique_ptr<HTTPMessage> msg); 785 786 /** 787 * Invoked by the session when some or all of the ingress entity-body has 788 * been parsed. 789 */ 790 void onIngressBody(std::unique_ptr<folly::IOBuf> chain, uint16_t padding); 791 792 /** 793 * Invoked by the session when a chunk header has been parsed. 794 */ 795 void onIngressChunkHeader(size_t length); 796 797 /** 798 * Invoked by the session when the CRLF terminating a chunk has been parsed. 799 */ 800 void onIngressChunkComplete(); 801 802 /** 803 * Invoked by the session when the ingress trailers have been parsed. 804 */ 805 void onIngressTrailers(std::unique_ptr<HTTPHeaders> trailers); 806 807 /** 808 * Invoked by the session when the session and transaction need to be 809 * upgraded to a different protocol 810 */ 811 void onIngressUpgrade(UpgradeProtocol protocol); 812 813 /** 814 * Invoked by the session when the ingress message is complete. 815 */ 816 void onIngressEOM(); 817 818 /** 819 * Invoked by the session when there is an error (e.g., invalid syntax, 820 * TCP RST) in either the ingress or egress stream. Note that this 821 * message is processed immediately even if this transaction normally 822 * would queue ingress. 823 * 824 * @param error Details for the error. This exception also has 825 * information about whether the error applies to the ingress, egress, 826 * or both directions of the transaction 827 */ 828 void onError(const HTTPException& error); 829 830 /** 831 * Invoked by the session when a GOAWAY frame is received. 832 * TODO: we may consider exposing the additional debug data here in the 833 * future. 834 * 835 * @param code The error code received in the GOAWAY frame 836 */ 837 void onGoaway(ErrorCode code); 838 839 /** 840 * Invoked by the session when there is a timeout on the ingress stream. 841 * Note that each transaction has its own timer but the session 842 * is the effective target of the timer. 843 */ 844 void onIngressTimeout(); 845 846 /** 847 * Invoked by the session when the remote endpoint of this transaction 848 * signals that it has consumed 'amount' bytes. This is only for 849 * versions of HTTP that support per transaction flow control. 850 */ 851 void onIngressWindowUpdate(uint32_t amount); 852 853 /** 854 * Invoked by the session when the remote endpoint signals that we 855 * should change our send window. This is only for 856 * versions of HTTP that support per transaction flow control. 857 */ 858 void onIngressSetSendWindow(uint32_t newWindowSize); 859 860 /** 861 * Notify this transaction that it is ok to egress. Returns true if there 862 * is additional pending egress 863 */ 864 bool onWriteReady(uint32_t maxEgress, double ratio); 865 866 /** 867 * Invoked by the session when there is a timeout on the egress stream. 868 */ 869 void onEgressTimeout(); 870 871 /** 872 * Invoked by the session when the first header byte is flushed. 873 */ 874 void onEgressHeaderFirstByte(); 875 876 /** 877 * Invoked by the session when the first byte is flushed. 878 */ 879 void onEgressBodyFirstByte(); 880 881 /** 882 * Invoked by the session when the last byte is flushed. 883 */ 884 void onEgressBodyLastByte(); 885 886 /** 887 * Invoked by the session when the tracked byte is flushed. 888 */ 889 void onEgressTrackedByte(); 890 891 /** 892 * Invoked when the ACK_LATENCY event is delivered 893 * 894 * @param latency the time between the moment when the last byte was sent 895 * and the moment when we received the ACK from the client 896 */ 897 void onEgressLastByteAck(std::chrono::milliseconds latency); 898 899 /** 900 * Invoked by the session when last egress headers have been acked by the 901 * peer. 902 */ 903 void onLastEgressHeaderByteAcked(); 904 905 /** 906 * Invoked by the session when egress body has been acked by the 907 * peer. Called for each sendBody() call if body bytes tracking is enabled. 908 */ 909 void onEgressBodyBytesAcked(uint64_t bodyOffset); 910 911 /** 912 * Invoked by the session when egress body delivery has been cancelled by the 913 * peer. 914 */ 915 void onEgressBodyDeliveryCanceled(uint64_t bodyOffset); 916 917 /** 918 * Invoked by the session when a tracked ByteEvent is transmitted by NIC. 919 */ 920 void onEgressTrackedByteEventTX(const ByteEvent& event); 921 922 /** 923 * Invoked by the session when a tracked ByteEvent is ACKed by remote peer. 924 * 925 * LAST_BYTE events are processed by legacy functions. 926 */ 927 void onEgressTrackedByteEventAck(const ByteEvent& event); 928 929 /** 930 * Invoked if the egress transport becomes app rate limited. 931 * 932 * TODO(bschlinker): Add support for QUIC. 933 */ 934 void onEgressTransportAppRateLimited(); 935 936 /** 937 * Can be called multiple times per transaction after onHeadersComplete and 938 * before detachTransaction() 939 * 940 * It does not obey pauseIngress/resumeIngress it is up to the handler 941 * to decide whether to buffer/drop datagrams 942 */ 943 void onDatagram(std::unique_ptr<folly::IOBuf> datagram) noexcept; 944 945 /** 946 * Invoked by the handlers that are interested in tracking 947 * performance stats. 948 */ setTransportCallback(TransportCallback * cb)949 virtual void setTransportCallback(TransportCallback* cb) { 950 transportCallback_ = cb; 951 } 952 953 /** 954 * @return true if ingress has started on this transaction. 955 */ isIngressStarted()956 bool isIngressStarted() const { 957 return ingressState_ != HTTPTransactionIngressSM::State::Start; 958 } 959 960 /** 961 * @return true iff the ingress EOM has been queued in HTTPTransaction 962 * but the handler has not yet been notified of this event. 963 */ isIngressEOMQueued()964 bool isIngressEOMQueued() const { 965 return ingressState_ == HTTPTransactionIngressSM::State::EOMQueued; 966 } 967 968 /** 969 * @return true iff the handler has been notified of the ingress EOM. 970 */ isIngressComplete()971 bool isIngressComplete() const { 972 return ingressState_ == HTTPTransactionIngressSM::State::ReceivingDone; 973 } 974 975 /** 976 * @return true iff onIngressEOM() has been called. 977 */ isIngressEOMSeen()978 bool isIngressEOMSeen() const { 979 return isIngressEOMQueued() || isIngressComplete(); 980 } 981 982 /** 983 * @return true if egress has started on this transaction. 984 */ isEgressStarted()985 bool isEgressStarted() const { 986 return egressState_ != HTTPTransactionEgressSM::State::Start; 987 } 988 989 /** 990 * @return true iff sendEOM() has been called, but the eom has not been 991 * flushed to the socket yet. 992 */ isEgressEOMQueued()993 bool isEgressEOMQueued() const { 994 return egressState_ == HTTPTransactionEgressSM::State::EOMQueued; 995 } 996 997 /** 998 * @return true iff the egress EOM has been flushed to the socket. 999 */ isEgressComplete()1000 bool isEgressComplete() const { 1001 return egressState_ == HTTPTransactionEgressSM::State::SendingDone; 1002 } 1003 1004 /** 1005 * @return true iff the remote side initiated this transaction. 1006 */ isRemoteInitiated()1007 bool isRemoteInitiated() const { 1008 return (direction_ == TransportDirection::DOWNSTREAM && id_ % 2 == 1) || 1009 (direction_ == TransportDirection::UPSTREAM && id_ % 2 == 0); 1010 } 1011 1012 /** 1013 * @return true iff sendEOM() has been called. 1014 */ isEgressEOMSeen()1015 bool isEgressEOMSeen() const { 1016 return isEgressEOMQueued() || isEgressComplete(); 1017 } 1018 1019 /** 1020 * @return true if we can send headers on this transaction 1021 * 1022 * Here's the logic: 1023 * 1) state machine says sendHeaders is OK AND 1024 * 2a) this is an upstream (allows for mid-stream headers) OR 1025 * 2b) this downstream has not sent a response 1026 * 2c) this downstream has only sent 1xx responses 1027 */ canSendHeaders()1028 virtual bool canSendHeaders() const { 1029 return HTTPTransactionEgressSM::canTransit( 1030 egressState_, HTTPTransactionEgressSM::Event::sendHeaders) && 1031 (isUpstream() || lastResponseStatus_ == 0 || 1032 extraResponseExpected()); 1033 } 1034 1035 /** 1036 * Send the egress message headers to the Transport. This method does 1037 * not actually write the message out on the wire immediately. All 1038 * writes happen at the end of the event loop at the earliest. 1039 * Note: This method should be called once per message unless the first 1040 * headers sent indicate a 1xx status. 1041 * 1042 * sendHeaders will not set EOM flag in header frame, whereas 1043 * sendHeadersWithEOM will. sendHeadersWithOptionalEOM backs both of them. 1044 * 1045 * @param headers Message headers 1046 */ 1047 virtual void sendHeaders(const HTTPMessage& headers); 1048 virtual void sendHeadersWithEOM(const HTTPMessage& headers); 1049 virtual void sendHeadersWithOptionalEOM(const HTTPMessage& headers, bool eom); 1050 1051 /** 1052 * Experimental API 1053 * 1054 * Send the egress message header and a DSRRequestSender to the Transport. 1055 * Handler does NOT have to explicitly sendBody and sendEOM after this. 1056 */ 1057 virtual bool sendHeadersWithDelegate(const HTTPMessage& headers, 1058 std::unique_ptr<DSRRequestSender>); 1059 1060 /** 1061 * Send part or all of the egress message body to the Transport. If flow 1062 * control is enabled, the chunk boundaries may not be respected. 1063 * This method does not actually write the message out on the wire 1064 * immediately. All writes happen at the end of the event loop at the 1065 * earliest. 1066 * Note: This method may be called zero or more times per message. 1067 * 1068 * @param body Message body data; the Transport will take care of 1069 * applying any necessary protocol framing, such as 1070 * chunk headers. 1071 */ 1072 virtual void sendBody(std::unique_ptr<folly::IOBuf> body); 1073 1074 /** 1075 * Write any protocol framing required for the subsequent call(s) 1076 * to sendBody(). This method does not actually write the message out on 1077 * the wire immediately. All writes happen at the end of the event loop 1078 * at the earliest. 1079 * @param length Length in bytes of the body data to follow. 1080 */ sendChunkHeader(size_t length)1081 virtual void sendChunkHeader(size_t length) { 1082 if (!validateEgressStateTransition( 1083 HTTPTransactionEgressSM::Event::sendChunkHeader)) { 1084 return; 1085 } 1086 CHECK_EQ(deferredBufferMeta_.length, 0) 1087 << "Chunked-encoding doesn't support BufferMeta write"; 1088 // TODO: move this logic down to session/codec 1089 if (!transport_.getCodec().supportsParallelRequests()) { 1090 chunkHeaders_.emplace_back(length); 1091 } 1092 } 1093 1094 /** 1095 * Write any protocol syntax needed to terminate the data. This method 1096 * does not actually write the message out on the wire immediately. All 1097 * writes happen at the end of the event loop at the earliest. 1098 * Frame begun by the last call to sendChunkHeader(). 1099 */ sendChunkTerminator()1100 virtual void sendChunkTerminator() { 1101 validateEgressStateTransition( 1102 HTTPTransactionEgressSM::Event::sendChunkTerminator); 1103 CHECK_EQ(deferredBufferMeta_.length, 0) 1104 << "Chunked-encoding doesn't support BufferMeta write"; 1105 } 1106 1107 /** 1108 * Send message trailers to the Transport. This method does 1109 * not actually write the message out on the wire immediately. All 1110 * writes happen at the end of the event loop at the earliest. 1111 * Note: This method may be called at most once per message. 1112 * 1113 * @param trailers Message trailers. 1114 */ sendTrailers(const HTTPHeaders & trailers)1115 virtual void sendTrailers(const HTTPHeaders& trailers) { 1116 if (!validateEgressStateTransition( 1117 HTTPTransactionEgressSM::Event::sendTrailers)) { 1118 return; 1119 } 1120 trailers_.reset(new HTTPHeaders(trailers)); 1121 } 1122 1123 /** 1124 * Finalize the egress message; depending on the protocol used 1125 * by the Transport, this may involve sending an explicit "end 1126 * of message" indicator. This method does not actually write the 1127 * message out on the wire immediately. All writes happen at the end 1128 * of the event loop at the earliest. 1129 * 1130 * If the ingress message also is complete, the transaction may 1131 * detach itself from the Handler and Transport and delete itself 1132 * as part of this method. 1133 * 1134 * Note: Either this method or sendAbort() should be called once 1135 * per message. 1136 */ 1137 virtual void sendEOM(); 1138 1139 /** 1140 * Terminate the transaction. Depending on the underlying protocol, this 1141 * may cause the connection to close or write egress bytes. This method 1142 * does not actually write the message out on the wire immediately. All 1143 * writes happen at the end of the event loop at the earliest. 1144 * 1145 * This function may also cause additional callbacks such as 1146 * detachTransaction() to the handler either immediately or after it returns. 1147 */ 1148 virtual void sendAbort(); 1149 1150 /** 1151 * Pause ingress processing. Upon pause, the HTTPTransaction 1152 * will call its Transport's pauseIngress() method. The Transport 1153 * should make a best effort to stop invoking the HTTPTransaction's 1154 * onIngress* callbacks. If the Transport does invoke any of those 1155 * methods while the transaction is paused, however, the transaction 1156 * will queue the ingress events and data and delay delivery to the 1157 * Handler until the transaction is unpaused. 1158 */ 1159 virtual void pauseIngress(); 1160 1161 /** 1162 * Resume ingress processing. Only useful after a call to pauseIngress(). 1163 */ 1164 virtual void resumeIngress(); 1165 1166 /** 1167 * @return true iff ingress processing is paused for the handler 1168 */ isIngressPaused()1169 bool isIngressPaused() const { 1170 return ingressPaused_; 1171 } 1172 1173 /** 1174 * Pause egress generation. HTTPTransaction may call its Handler's 1175 * onEgressPaused() method if there is a state change as a result of 1176 * this call. 1177 * 1178 * On receiving onEgressPaused(), the Handler should make a best effort 1179 * to stop invoking the HTTPTransaction's egress generating methods. If 1180 * the Handler does invoke any of those methods while the transaction is 1181 * paused, however, the transaction will forward them anyway, unless it 1182 * is a body event. If flow control is enabled, body events will be 1183 * buffered for later transmission when egress is unpaused. 1184 */ 1185 void pauseEgress(); 1186 1187 /** 1188 * Resume egress generation. The Handler's onEgressResumed() will not be 1189 * invoked if the HTTP/2 send window is full or there is too much 1190 * buffered egress data on this transaction already. In that case, 1191 * once the send window is not full or the buffer usage decreases, the 1192 * handler will finally get onEgressResumed(). 1193 */ 1194 void resumeEgress(); 1195 1196 /** 1197 * Specify a rate limit for egressing bytes. 1198 * The transaction will buffer extra bytes if doing so would cause it to go 1199 * over the specified rate limit. Setting to a value of 0 will cause no 1200 * rate-limiting to occur. 1201 */ 1202 void setEgressRateLimit(uint64_t bitsPerSecond); 1203 1204 /** 1205 * @return true iff egress processing is paused for the handler 1206 */ isEgressPaused()1207 bool isEgressPaused() const { 1208 return handlerEgressPaused_; 1209 } 1210 1211 /** 1212 * @return true iff egress processing is paused due to flow control 1213 * to the handler 1214 */ isFlowControlPaused()1215 bool isFlowControlPaused() const { 1216 return flowControlPaused_; 1217 } 1218 1219 /** 1220 * @return true iff this transaction can be used to push resources to 1221 * the remote side. 1222 */ supportsPushTransactions()1223 bool supportsPushTransactions() const { 1224 return direction_ == TransportDirection::DOWNSTREAM && 1225 transport_.getCodec().supportsPushTransactions(); 1226 } 1227 1228 /** 1229 * Create a new pushed transaction associated with this transaction, 1230 * and assign the given handler and priority. 1231 * 1232 * @return the new transaction for the push, or nullptr if a new push 1233 * transaction is impossible right now. 1234 */ 1235 virtual HTTPTransaction* newPushedTransaction( 1236 HTTPPushTransactionHandler* handler, ProxygenError* error = nullptr) { 1237 if (isDelegated_) { 1238 LOG(ERROR) 1239 << "Creating Pushed transaction on a delegated HTTPTransaction " 1240 << "is not supported."; 1241 return nullptr; 1242 } 1243 1244 if (isEgressEOMSeen()) { 1245 SET_PROXYGEN_ERROR_IF(error, 1246 ProxygenError::kErrorEgressEOMSeenOnParentStream); 1247 return nullptr; 1248 } 1249 auto txn = transport_.newPushedTransaction(id_, handler, error); 1250 if (txn) { 1251 pushedTransactions_.insert(txn->getID()); 1252 } 1253 return txn; 1254 } 1255 1256 /** 1257 * Create a new extended transaction associated with this transaction, 1258 * and assign the given handler and priority. 1259 * 1260 * @return the new transaction for pubsub, or nullptr if a new push 1261 * transaction is impossible right now. 1262 */ 1263 virtual HTTPTransaction* newExTransaction(HTTPTransactionHandler* handler, 1264 bool unidirectional = false) { 1265 if (isDelegated_) { 1266 LOG(ERROR) << "Creating ExTransaction on a delegated HTTPTransaction is " 1267 << "not supported."; 1268 return nullptr; 1269 } 1270 auto txn = transport_.newExTransaction(handler, id_, unidirectional); 1271 if (txn) { 1272 exTransactions_.insert(txn->getID()); 1273 } 1274 return txn; 1275 } 1276 1277 /** 1278 * Invoked by the session (upstream only) when a new pushed transaction 1279 * arrives. The txn's handler will be notified and is responsible for 1280 * installing a handler. If no handler is installed in the callback, 1281 * the pushed transaction will be aborted. 1282 */ 1283 bool onPushedTransaction(HTTPTransaction* txn); 1284 1285 /** 1286 * Invoked by the session when a new ExTransaction arrives. The txn's handler 1287 * will be notified and is responsible for installing a handler. If no 1288 * handler is installed in the callback, the transaction will be aborted. 1289 */ 1290 bool onExTransaction(HTTPTransaction* txn); 1291 1292 /** 1293 * True if this transaction is a server push transaction 1294 */ isPushed()1295 bool isPushed() const { 1296 return assocStreamId_.has_value(); 1297 } 1298 isExTransaction()1299 bool isExTransaction() const { 1300 return exAttributes_.has_value(); 1301 } 1302 isUnidirectional()1303 bool isUnidirectional() const { 1304 return isExTransaction() && exAttributes_->unidirectional; 1305 } 1306 1307 /** 1308 * @return true iff we should notify the error occured on EX_TXN 1309 * This logic only applies to EX_TXN with QoS 0 1310 */ shouldNotifyExTxnError(HTTPException::Direction errorDirection)1311 bool shouldNotifyExTxnError(HTTPException::Direction errorDirection) const { 1312 if (isUnidirectional()) { 1313 if (isRemoteInitiated()) { 1314 // We care about EGRESS errors in this case, 1315 // because we marked EGRESS state to be completed 1316 // If EGRESS error is happening, we need to know 1317 // Same for INGRESS direction, when EX_TXN is not remoteInitiated() 1318 return errorDirection == HTTPException::Direction::EGRESS; 1319 } else { 1320 return errorDirection == HTTPException::Direction::INGRESS; 1321 } 1322 } 1323 return false; 1324 } 1325 1326 /** 1327 * Overrides the default idle timeout value. 1328 */ 1329 void setIdleTimeout(std::chrono::milliseconds idleTimeout); 1330 hasIdleTimeout()1331 bool hasIdleTimeout() const { 1332 return idleTimeout_.has_value(); 1333 } 1334 1335 /** 1336 * Returns the associated transaction ID for pushed transactions, 0 otherwise 1337 */ getAssocTxnId()1338 folly::Optional<HTTPCodec::StreamID> getAssocTxnId() const { 1339 return assocStreamId_; 1340 } 1341 1342 /** 1343 * Returns the control channel transaction ID for this transaction, 1344 * folly::none otherwise 1345 */ getControlStream()1346 folly::Optional<HTTPCodec::StreamID> getControlStream() const { 1347 return exAttributes_ ? exAttributes_->controlStream : HTTPCodec::NoStream; 1348 } 1349 1350 /* 1351 * Returns attributes of EX stream (folly::none if not an EX transaction) 1352 */ getExAttributes()1353 folly::Optional<HTTPCodec::ExAttributes> getExAttributes() const { 1354 return exAttributes_; 1355 } 1356 1357 /** 1358 * Get a set of server-pushed transactions associated with this transaction. 1359 */ getPushedTransactions()1360 const std::set<HTTPCodec::StreamID>& getPushedTransactions() const { 1361 return pushedTransactions_; 1362 } 1363 1364 /** 1365 * Get a set of exTransactions associated with this transaction. 1366 */ getExTransactions()1367 std::set<HTTPCodec::StreamID> getExTransactions() const { 1368 return exTransactions_; 1369 } 1370 1371 /** 1372 * Remove the pushed txn ID from the set of pushed txns 1373 * associated with this txn. 1374 */ removePushedTransaction(HTTPCodec::StreamID pushStreamId)1375 void removePushedTransaction(HTTPCodec::StreamID pushStreamId) { 1376 pushedTransactions_.erase(pushStreamId); 1377 } 1378 1379 /** 1380 * Remove the exTxn ID from the control stream txn. 1381 */ removeExTransaction(HTTPCodec::StreamID exStreamId)1382 void removeExTransaction(HTTPCodec::StreamID exStreamId) { 1383 exTransactions_.erase(exStreamId); 1384 } 1385 1386 /** 1387 * Schedule or refresh the idle timeout for this transaction 1388 */ refreshTimeout()1389 void refreshTimeout() { 1390 if (timer_ && hasIdleTimeout()) { 1391 timer_->scheduleTimeout(this, idleTimeout_.value()); 1392 } 1393 } 1394 1395 /** 1396 * Tests if the first byte has already been sent, and if it 1397 * hasn't yet then it marks it as sent. 1398 */ testAndSetFirstByteSent()1399 bool testAndSetFirstByteSent() { 1400 bool ret = firstByteSent_; 1401 firstByteSent_ = true; 1402 return ret; 1403 } 1404 testAndClearIsCountedTowardsStreamLimit()1405 bool testAndClearIsCountedTowardsStreamLimit() { 1406 bool ret = isCountedTowardsStreamLimit_; 1407 isCountedTowardsStreamLimit_ = false; 1408 return ret; 1409 } 1410 setIsCountedTowardsStreamLimit()1411 void setIsCountedTowardsStreamLimit() { 1412 isCountedTowardsStreamLimit_ = true; 1413 } 1414 1415 /** 1416 * Tests if the very first byte of Header has already been set. 1417 * If it hasn't yet, it marks it as sent. 1418 */ testAndSetFirstHeaderByteSent()1419 bool testAndSetFirstHeaderByteSent() { 1420 bool ret = firstHeaderByteSent_; 1421 firstHeaderByteSent_ = true; 1422 return ret; 1423 } 1424 1425 /** 1426 * HTTPTransaction will not detach until it has 0 pending byte events. If 1427 * you call incrementPendingByteEvents, you must make a corresponding call 1428 * to decrementPendingByteEvents or the transaction will never be destroyed. 1429 */ incrementPendingByteEvents()1430 void incrementPendingByteEvents() { 1431 CHECK_LT(pendingByteEvents_, 1432 std::numeric_limits<decltype(pendingByteEvents_)>::max()); 1433 pendingByteEvents_++; 1434 } 1435 decrementPendingByteEvents()1436 void decrementPendingByteEvents() { 1437 DestructorGuard dg(this); 1438 CHECK_GT(pendingByteEvents_, 0); 1439 pendingByteEvents_--; 1440 } 1441 getNumPendingByteEvents()1442 uint64_t getNumPendingByteEvents() const { 1443 return pendingByteEvents_; 1444 } 1445 1446 /** 1447 * Timeout callback for this transaction. The timer is active 1448 * until the ingress message is complete or terminated by error. 1449 */ timeoutExpired()1450 void timeoutExpired() noexcept override { 1451 transport_.transactionTimeout(this); 1452 } 1453 1454 /** 1455 * Write a description of the transaction to a stream 1456 */ 1457 void describe(std::ostream& os) const; 1458 1459 /** 1460 * Change the priority of this transaction, may generate a PRIORITY frame. 1461 * The first variant is SPDY priority. The second is HTTP/2 priority. The 1462 * third one is a new proposal in a draft for both HTTP/2 and HTTP/3. 1463 */ 1464 void updateAndSendPriority(int8_t newPriority); 1465 void updateAndSendPriority(const http2::PriorityUpdate& pri); 1466 virtual void updateAndSendPriority(uint8_t urgency, bool incremental); 1467 1468 /** 1469 * Notify of priority change, will not generate a PRIORITY frame 1470 */ 1471 void onPriorityUpdate(const http2::PriorityUpdate& priority); 1472 1473 /** 1474 * Add a callback waiting for this transaction to have a transport with 1475 * replay protection. 1476 */ addWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback * callback)1477 virtual void addWaitingForReplaySafety( 1478 folly::AsyncTransport::ReplaySafetyCallback* callback) { 1479 transport_.addWaitingForReplaySafety(callback); 1480 } 1481 1482 /** 1483 * Remove a callback waiting for replay protection (if it was canceled). 1484 */ removeWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback * callback)1485 virtual void removeWaitingForReplaySafety( 1486 folly::AsyncTransport::ReplaySafetyCallback* callback) { 1487 transport_.removeWaitingForReplaySafety(callback); 1488 } 1489 needToBlockForReplaySafety()1490 virtual bool needToBlockForReplaySafety() const { 1491 return transport_.needToBlockForReplaySafety(); 1492 } 1493 1494 int32_t getRecvToAck() const; 1495 isPrioritySampled()1496 bool isPrioritySampled() const { 1497 return prioritySample_ != nullptr; 1498 } 1499 1500 void setPrioritySampled(bool sampled); 1501 void updateContentionsCount(uint64_t contentions); 1502 void updateRelativeWeight(double ratio); 1503 void updateSessionBytesSheduled(uint64_t bytes); 1504 void updateTransactionBytesSent(uint64_t bytes); 1505 void checkIfEgressRateLimitedByUpstream(); 1506 1507 struct PrioritySampleSummary { 1508 struct WeightedAverage { 1509 double byTransactionBytes_{0}; 1510 double bySessionBytes_{0}; 1511 }; 1512 WeightedAverage contentions_; 1513 WeightedAverage depth_; 1514 double expected_weight_; 1515 double measured_weight_; 1516 }; 1517 1518 bool getPrioritySampleSummary(PrioritySampleSummary& summary) const; 1519 1520 const CompressionInfo& getCompressionInfo() const; 1521 hasPendingBody()1522 bool hasPendingBody() const { 1523 return getOutstandingEgressBodyBytes() > 0; 1524 } 1525 getOutstandingEgressBodyBytes()1526 size_t getOutstandingEgressBodyBytes() const { 1527 return deferredEgressBody_.chainLength() + deferredBufferMeta_.length; 1528 } 1529 setLastByteFlushedTrackingEnabled(bool enabled)1530 void setLastByteFlushedTrackingEnabled(bool enabled) { 1531 enableLastByteFlushedTracking_ = enabled; 1532 } 1533 setBodyLastByteDeliveryTrackingEnabled(bool enabled)1534 bool setBodyLastByteDeliveryTrackingEnabled(bool enabled) { 1535 if (transport_.getSessionType() != Transport::Type::QUIC) { 1536 return false; 1537 } 1538 1539 enableBodyLastByteDeliveryTracking_ = enabled; 1540 return true; 1541 } 1542 1543 uint16_t getDatagramSizeLimit() const noexcept; 1544 virtual bool sendDatagram(std::unique_ptr<folly::IOBuf> datagram); 1545 1546 folly::Optional<ConnectionToken> getConnectionToken() const noexcept; 1547 setEgressBufferLimit(uint64_t limit)1548 static void setEgressBufferLimit(uint64_t limit) { 1549 egressBufferLimit_ = limit; 1550 } 1551 1552 private: 1553 HTTPTransaction(const HTTPTransaction&) = delete; 1554 HTTPTransaction& operator=(const HTTPTransaction&) = delete; 1555 dequeue()1556 void dequeue() { 1557 CHECK(isEnqueued()); 1558 egressQueue_.clearPendingEgress(queueHandle_); 1559 } 1560 1561 bool delegatedTransactionChecks(const HTTPMessage& headers) noexcept; 1562 bool delegatedTransactionChecks() noexcept; 1563 1564 bool addBufferMeta() noexcept; 1565 1566 void onDelayedDestroy(bool delayed) override; 1567 1568 /** 1569 * Invokes the handler's onEgressPaused/Resumed if the handler's pause 1570 * state needs updating 1571 */ 1572 void updateHandlerPauseState(); 1573 1574 /** 1575 * Update the CompressionInfo (tableInfo_) struct 1576 */ 1577 void updateEgressCompressionInfo(const CompressionInfo&); 1578 1579 void updateIngressCompressionInfo(const CompressionInfo&); 1580 1581 bool mustQueueIngress() const; 1582 1583 /** 1584 * Check if deferredIngress_ points to some queue before pushing HTTPEvent 1585 * to it. 1586 */ 1587 void checkCreateDeferredIngress(); 1588 1589 /** 1590 * Implementation of sending an abort for this transaction. 1591 */ 1592 void sendAbort(ErrorCode statusCode); 1593 1594 // Internal implementations of the ingress-related callbacks 1595 // that work whether the ingress events are immediate or deferred. 1596 void processIngressHeadersComplete(std::unique_ptr<HTTPMessage> msg); 1597 void processIngressBody(std::unique_ptr<folly::IOBuf> chain, size_t len); 1598 void processIngressChunkHeader(size_t length); 1599 void processIngressChunkComplete(); 1600 void processIngressTrailers(std::unique_ptr<HTTPHeaders> trailers); 1601 void processIngressUpgrade(UpgradeProtocol protocol); 1602 void processIngressEOM(); 1603 1604 void sendBodyFlowControlled(std::unique_ptr<folly::IOBuf> body = nullptr); 1605 size_t sendBodyNow(std::unique_ptr<folly::IOBuf> body, 1606 size_t bodyLen, 1607 bool eom); 1608 size_t sendEOMNow(); 1609 void onDeltaSendWindowSize(int32_t windowDelta); 1610 1611 void notifyTransportPendingEgress(); 1612 1613 size_t sendDeferredBody(uint32_t maxEgress); 1614 1615 size_t sendDeferredBufferMeta(uint32_t maxEgress); 1616 1617 bool maybeDelayForRateLimit(); 1618 isEnqueued()1619 bool isEnqueued() const { 1620 return queueHandle_ ? queueHandle_->isEnqueued() : false; 1621 } 1622 1623 // Whther the txn has a pending EOM that can be send out (i.e., no more body 1624 // bytes need to go before it.) hasPendingEOM()1625 bool hasPendingEOM() const { 1626 return isEgressEOMQueued() && getOutstandingEgressBodyBytes() == 0; 1627 } 1628 1629 bool isExpectingIngress() const; 1630 1631 bool isExpectingWindowUpdate() const; 1632 1633 void updateReadTimeout(); 1634 1635 /** 1636 * Causes isIngressComplete() to return true, removes any queued 1637 * ingress, and cancels the read timeout. 1638 */ 1639 void markIngressComplete(); 1640 1641 /** 1642 * Causes isEgressComplete() to return true, removes any queued egress, 1643 * and cancels the write timeout. 1644 */ 1645 void markEgressComplete(); 1646 1647 /** 1648 * Validates the ingress state transition. Returns false and sends an 1649 * abort with INTERNAL_ERROR if the transition fails. Otherwise it 1650 * returns true. 1651 */ 1652 bool validateIngressStateTransition(HTTPTransactionIngressSM::Event); 1653 1654 /** 1655 * Validates the egress state transition. 1656 * 1657 * If the transition fails, it will invoke onInvariantViolation, and the 1658 * default implementation is to CHECK/crash. If you have a custom 1659 * onInvariantViolation handler, this function can return false. 1660 */ 1661 bool validateEgressStateTransition(HTTPTransactionEgressSM::Event); 1662 1663 void invariantViolation(HTTPException ex); 1664 1665 /** 1666 * Flushes any pending window updates. This can happen from setReceiveWindow 1667 * or sendHeaders depending on transaction state. 1668 */ 1669 void flushWindowUpdate(); 1670 1671 bool updateContentLengthRemaining(size_t len); 1672 1673 void rateLimitTimeoutExpired(); 1674 1675 class RateLimitCallback : public folly::HHWheelTimer::Callback { 1676 public: RateLimitCallback(HTTPTransaction & txn)1677 explicit RateLimitCallback(HTTPTransaction& txn) : txn_(txn) { 1678 } 1679 timeoutExpired()1680 void timeoutExpired() noexcept override { 1681 txn_.rateLimitTimeoutExpired(); 1682 } callbackCanceled()1683 void callbackCanceled() noexcept override { 1684 // no op 1685 } 1686 1687 private: 1688 HTTPTransaction& txn_; 1689 }; 1690 1691 RateLimitCallback rateLimitCallback_{*this}; 1692 1693 /** 1694 * Queue to hold any events that we receive from the Transaction 1695 * while the ingress is supposed to be paused. 1696 */ 1697 std::unique_ptr<std::queue<HTTPEvent>> deferredIngress_; 1698 1699 /** 1700 * Queue to hold any body bytes to be sent out 1701 * while egress to the remote is supposed to be paused. 1702 */ 1703 folly::IOBufQueue deferredEgressBody_{folly::IOBufQueue::cacheChainLength()}; 1704 1705 /** 1706 * BufferMeta queued at this HTTPTransaction to be sent to Transport. 1707 */ 1708 BufferMeta deferredBufferMeta_; 1709 1710 const TransportDirection direction_; 1711 HTTPTransactionEgressSM::State egressState_{ 1712 HTTPTransactionEgressSM::getNewInstance()}; 1713 HTTPTransactionIngressSM::State ingressState_{ 1714 HTTPTransactionIngressSM::getNewInstance()}; 1715 /** 1716 * bytes we need to acknowledge to the remote end using a window update 1717 */ 1718 int32_t recvToAck_{0}; 1719 1720 HTTPCodec::StreamID id_; 1721 uint32_t seqNo_; 1722 uint32_t maxDeferredIngress_{0}; 1723 Handler* handler_{nullptr}; 1724 Transport& transport_; 1725 1726 HTTPSessionStats* stats_{nullptr}; 1727 1728 CompressionInfo tableInfo_; 1729 1730 /** 1731 * The recv window and associated data. This keeps track of how many 1732 * bytes we are allowed to buffer. 1733 */ 1734 Window recvWindow_; 1735 1736 /** 1737 * The send window and associated data. This keeps track of how many 1738 * bytes we are allowed to send and have outstanding. 1739 */ 1740 Window sendWindow_; 1741 1742 TransportCallback* transportCallback_{nullptr}; 1743 1744 /** 1745 * Trailers to send, if any. 1746 */ 1747 std::unique_ptr<HTTPHeaders> trailers_; 1748 1749 struct Chunk { ChunkChunk1750 explicit Chunk(size_t inLength) : length(inLength), headerSent(false) { 1751 } 1752 size_t length; 1753 bool headerSent; 1754 }; 1755 std::list<Chunk> chunkHeaders_; 1756 1757 /** 1758 * Reference to our priority queue 1759 */ 1760 HTTP2PriorityQueueBase& egressQueue_; 1761 1762 /** 1763 * Handle to our position in the priority queue. 1764 */ 1765 HTTP2PriorityQueueBase::Handle queueHandle_{nullptr}; 1766 1767 /** 1768 * ID of request transaction (for pushed txns only) 1769 */ 1770 folly::Optional<HTTPCodec::StreamID> assocStreamId_; 1771 1772 /** 1773 * Attributes of http2 Ex_HEADERS 1774 */ 1775 folly::Optional<HTTPCodec::ExAttributes> exAttributes_; 1776 1777 /** 1778 * Set of all push transactions IDs associated with this transaction. 1779 */ 1780 std::set<HTTPCodec::StreamID> pushedTransactions_; 1781 1782 /** 1783 * Set of all exTransaction IDs associated with this transaction. 1784 */ 1785 std::set<HTTPCodec::StreamID> exTransactions_; 1786 1787 /** 1788 * Priority of this transaction 1789 */ 1790 http2::PriorityUpdate priority_; 1791 1792 /** 1793 * Information about this transaction's priority. 1794 * 1795 * insertDepth_ is the depth of this node in the tree when the txn was created 1796 * currentDepth_ is the depth of this node in the tree after the last 1797 * onPriorityUpdate. It may not reflect its real position in 1798 * realtime, since after the last onPriorityUpdate, it may get 1799 * reparented as parent transactions complete. 1800 * cumulativeRatio_ / egressCalls_ is the average relative weight of this 1801 * txn during egress 1802 */ 1803 uint64_t insertDepth_{0}; 1804 uint64_t currentDepth_{0}; 1805 double cumulativeRatio_{0}; 1806 uint64_t egressCalls_{0}; 1807 1808 uint64_t pendingByteEvents_{0}; 1809 folly::Optional<uint64_t> expectedIngressContentLength_; 1810 folly::Optional<uint64_t> expectedIngressContentLengthRemaining_; 1811 folly::Optional<uint64_t> expectedResponseLength_; 1812 folly::Optional<uint64_t> actualResponseLength_{0}; 1813 1814 bool ingressPaused_ : 1; 1815 bool egressPaused_ : 1; 1816 bool flowControlPaused_ : 1; 1817 bool handlerEgressPaused_ : 1; 1818 bool egressRateLimited_ : 1; 1819 bool useFlowControl_ : 1; 1820 bool aborted_ : 1; 1821 bool deleting_ : 1; 1822 bool firstByteSent_ : 1; 1823 bool firstHeaderByteSent_ : 1; 1824 bool inResume_ : 1; 1825 bool isCountedTowardsStreamLimit_ : 1; 1826 bool ingressErrorSeen_ : 1; 1827 bool priorityFallback_ : 1; 1828 bool headRequest_ : 1; 1829 bool enableLastByteFlushedTracking_ : 1; 1830 bool enableBodyLastByteDeliveryTracking_ : 1; 1831 1832 // Prevents the application from calling skipBodyTo() before egress 1833 // headers have been delivered. 1834 bool egressHeadersDelivered_ : 1; 1835 1836 // Whether the HTTPTransaction has sent out a 1xx response HTTPMessage. 1837 bool has1xxResponse_ : 1; 1838 1839 // Whether this HTTPTransaction delegates body sending to another entity. 1840 bool isDelegated_ : 1; 1841 1842 /** 1843 * If this transaction represents a request (ie, it is backed by an 1844 * HTTPUpstreamSession) , this field indicates the last response status 1845 * received from the server. If this transaction represents a response, 1846 * this field indicates the last status we've sent. For instances, this 1847 * could take on multiple 1xx values, and then take on 200. 1848 */ 1849 uint16_t lastResponseStatus_{0}; 1850 1851 // Maximum size of egress buffer before invoking onEgressPaused 1852 static uint64_t egressBufferLimit_; 1853 1854 uint64_t egressLimitBytesPerMs_{0}; 1855 proxygen::TimePoint startRateLimit_; 1856 uint64_t numLimitedBytesEgressed_{0}; 1857 1858 folly::Optional<std::chrono::milliseconds> idleTimeout_; 1859 1860 folly::HHWheelTimer* timer_; 1861 1862 class PrioritySample; 1863 std::unique_ptr<PrioritySample> prioritySample_; 1864 1865 // Keeps track for body offset processed so far. 1866 uint64_t ingressBodyOffset_{0}; 1867 1868 bool setIngressTimeoutAfterEom_{false}; 1869 }; 1870 1871 /** 1872 * Write a description of an HTTPTransaction to an ostream 1873 */ 1874 std::ostream& operator<<(std::ostream& os, const HTTPTransaction& txn); 1875 1876 } // namespace proxygen 1877