1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree.
7  */
8 
9 #pragma once
10 
11 #include "proxygen/lib/http/HTTPMessage.h"
12 #include <climits>
13 #include <folly/Optional.h>
14 #include <folly/SocketAddress.h>
15 #include <folly/io/async/AsyncTransport.h>
16 #include <folly/io/async/DelayedDestructionBase.h>
17 #include <folly/io/async/HHWheelTimer.h>
18 #include <folly/lang/Assume.h>
19 #include <iosfwd>
20 #include <proxygen/lib/http/HTTPConstants.h>
21 #include <proxygen/lib/http/HTTPHeaderSize.h>
22 #include <proxygen/lib/http/HTTPMessage.h>
23 #include <proxygen/lib/http/ProxygenErrorEnum.h>
24 #include <proxygen/lib/http/Window.h>
25 #include <proxygen/lib/http/codec/HTTPCodec.h>
26 #include <proxygen/lib/http/session/ByteEvents.h>
27 #include <proxygen/lib/http/session/HTTP2PriorityQueue.h>
28 #include <proxygen/lib/http/session/HTTPEvent.h>
29 #include <proxygen/lib/http/session/HTTPTransactionEgressSM.h>
30 #include <proxygen/lib/http/session/HTTPTransactionIngressSM.h>
31 #include <proxygen/lib/utils/Time.h>
32 #include <proxygen/lib/utils/TraceEvent.h>
33 #include <proxygen/lib/utils/TraceEventObserver.h>
34 #include <proxygen/lib/utils/WheelTimerInstance.h>
35 #include <set>
36 #include <wangle/acceptor/TransportInfo.h>
37 
38 namespace proxygen {
39 
40 /**
41  * Experimental
42  *
43  * A sender interface to send out DSR delegated packetization requests.
44  */
45 struct DSRRequestSender {
46   virtual ~DSRRequestSender() = default;
47 };
48 
49 /**
50  * An HTTPTransaction represents a single request/response pair
51  * for some HTTP-like protocol.  It works with a Transport that
52  * performs the network processing and wire-protocol formatting
53  * and a Handler that implements some sort of application logic.
54  *
55  * The typical sequence of events for a simple application is:
56  *
57  *   * The application accepts a connection and creates a Transport.
58  *   * The Transport reads from the connection, parses whatever
59  *     protocol the client is speaking, and creates a Transaction
60  *     to represent the first request.
61  *   * Once the Transport has received the full request headers,
62  *     it creates a Handler, plugs the handler into the Transaction,
63  *     and calls the Transaction's onIngressHeadersComplete() method.
64  *   * The Transaction calls the Handler's onHeadersComplete() method
65  *     and the Handler begins processing the request.
66  *   * If there is a request body, the Transport streams it through
67  *     the Transaction to the Handler.
68  *   * When the Handler is ready to produce a response, it streams
69  *     the response through the Transaction to the Transport.
70  *   * When the Transaction has seen the end of both the request
71  *     and the response, it detaches itself from the Handler and
72  *     Transport and deletes itself.
73  *   * The Handler deletes itself at some point after the Transaction
74  *     has detached from it.
75  *   * The Transport may, depending on the protocol, process other
76  *     requests after -- or even in parallel with -- that first
77  *     request.  Each request gets its own Transaction and Handler.
78  *
79  * For some applications, like proxying, a Handler implementation
80  * may obtain one or more upstream connections, each represented
81  * by another Transport, and create outgoing requests on the upstream
82  * connection(s), with each request represented as a new Transaction.
83  *
84  * With a multiplexing protocol like SPDY on both sides of a proxy,
85  * the cardinality relationship can be:
86  *
87  *                 +-----------+     +-----------+     +-------+
88  *   (Client-side) | Transport |1---*|Transaction|1---1|Handler|
89  *                 +-----------+     +-----------+     +-------+
90  *                                                         1
91  *                                                         |
92  *                                                         |
93  *                                                         1
94  *                                   +---------+     +-----------+
95  *                (Server-side)      |Transport|1---*|Transaction|
96  *                                   +---------+     +-----------+
97  *
98  * A key design goal of HTTPTransaction is to serve as a protocol-
99  * independent abstraction that insulates Handlers from the semantics
100  * different of HTTP-like protocols.
101  */
102 
103 /** Info about Transaction running on this session */
104 class TransactionInfo {
105  public:
TransactionInfo()106   TransactionInfo() {
107   }
108 
TransactionInfo(std::chrono::milliseconds ttfb,std::chrono::milliseconds ttlb,uint64_t eHeader,uint64_t inHeader,uint64_t eBody,uint64_t inBody,bool completed)109   TransactionInfo(std::chrono::milliseconds ttfb,
110                   std::chrono::milliseconds ttlb,
111                   uint64_t eHeader,
112                   uint64_t inHeader,
113                   uint64_t eBody,
114                   uint64_t inBody,
115                   bool completed)
116       : timeToFirstByte(ttfb),
117         timeToLastByte(ttlb),
118         egressHeaderBytes(eHeader),
119         ingressHeaderBytes(inHeader),
120         egressBodyBytes(eBody),
121         ingressBodyBytes(inBody),
122         isCompleted(completed) {
123   }
124 
125   /** Time to first byte */
126   std::chrono::milliseconds timeToFirstByte{0};
127   /** Time to last byte */
128   std::chrono::milliseconds timeToLastByte{0};
129 
130   /** Number of bytes send in headers */
131   uint64_t egressHeaderBytes{0};
132   /** Number of bytes receive headers */
133   uint64_t ingressHeaderBytes{0};
134   /** Number of bytes send in body */
135   uint64_t egressBodyBytes{0};
136   /** Number of bytes receive in body */
137   uint64_t ingressBodyBytes{0};
138 
139   /** Is the transaction was completed without error */
140   bool isCompleted{false};
141 };
142 
143 class HTTPSessionStats;
144 class HTTPTransaction;
145 class HTTPTransactionHandler : public TraceEventObserver {
146  public:
147   /**
148    * Called once per transaction. This notifies the handler of which
149    * transaction it should talk to and will receive callbacks from.
150    */
151   virtual void setTransaction(HTTPTransaction* txn) noexcept = 0;
152 
153   /**
154    * Called once after a transaction successfully completes. It
155    * will be called even if a read or write error happened earlier.
156    * This is a terminal callback, which means that the HTTPTransaction
157    * object that gives this call will be invalid after this function
158    * completes.
159    */
160   virtual void detachTransaction() noexcept = 0;
161 
162   /**
163    * Called at most once per transaction. This is usually the first
164    * ingress callback. It is possible to get a read error before this
165    * however. If you had previously called pauseIngress(), this callback
166    * will be delayed until you call resumeIngress().
167    */
168   virtual void onHeadersComplete(std::unique_ptr<HTTPMessage> msg) noexcept = 0;
169 
170   /**
171    * Can be called multiple times per transaction. If you had previously
172    * called pauseIngress(), this callback will be delayed until you call
173    * resumeIngress().
174    */
175   virtual void onBody(std::unique_ptr<folly::IOBuf> chain) noexcept = 0;
176 
177   /**
178    * Same as onBody() but with additional offset parameter.
179    */
onBodyWithOffset(uint64_t,std::unique_ptr<folly::IOBuf> chain)180   virtual void onBodyWithOffset(uint64_t /* bodyOffset */,
181                                 std::unique_ptr<folly::IOBuf> chain) {
182     onBody(std::move(chain));
183   }
184 
185   /**
186    * Can be called multiple times per transaction. If you had previously
187    * called pauseIngress(), this callback will be delayed until you call
188    * resumeIngress(). This signifies the beginning of a chunk of length
189    * 'length'. You will receive onBody() after this. Also, the length will
190    * be greater than zero.
191    */
onChunkHeader(size_t)192   virtual void onChunkHeader(size_t /* length */) noexcept {
193   }
194 
195   /**
196    * Can be called multiple times per transaction. If you had previously
197    * called pauseIngress(), this callback will be delayed until you call
198    * resumeIngress(). This signifies the end of a chunk.
199    */
onChunkComplete()200   virtual void onChunkComplete() noexcept {
201   }
202 
203   /**
204    * Can be called any number of times per transaction. If you had
205    * previously called pauseIngress(), this callback will be delayed until
206    * you call resumeIngress(). Trailers can be received once right before
207    * the EOM of a chunked HTTP/1.1 reponse or multiple times per
208    * transaction from SPDY and HTTP/2.0 HEADERS frames.
209    */
210   virtual void onTrailers(std::unique_ptr<HTTPHeaders> trailers) noexcept = 0;
211 
212   /**
213    * Can be called once per transaction. If you had previously called
214    * pauseIngress(), this callback will be delayed until you call
215    * resumeIngress(). After this callback is received, there will be no
216    * more normal ingress callbacks received (onEgress*() and onError()
217    * may still be invoked). The Handler should consider
218    * ingress complete after receiving this message. This Transaction is
219    * still valid, and work may still occur on it until detachTransaction
220    * is called.
221    */
222   virtual void onEOM() noexcept = 0;
223 
224   /**
225    * Can be called once per transaction. If you had previously called
226    * pauseIngress(), this callback will be delayed until you call
227    * resumeIngress(). After this callback is invoked, further data
228    * will be forwarded using the onBody() callback. Once the data transfer
229    * is completed (EOF recevied in case of CONNECT), onEOM() callback will
230    * be invoked.
231    */
232   virtual void onUpgrade(UpgradeProtocol protocol) noexcept = 0;
233 
234   /**
235    * Can be called at any time before detachTransaction(). This callback
236    * implies that an error has occurred. To determine if ingress or egress
237    * is affected, check the direciont on the HTTPException. If the
238    * direction is INGRESS, it MAY still be possible to send egress.
239    */
240   virtual void onError(const HTTPException& error) noexcept = 0;
241 
242   /**
243    * Can be called at any time before detachTransaction(). This callback is
244    * invoked in cases that violate an internal invariant that is fatal to the
245    * transaction but can be recoverable for the session or library.  One such
246    * example is mis-use of the egress APIs (sendBody() before sendHeaders()).
247    */
onInvariantViolation(const HTTPException & error)248   virtual void onInvariantViolation(const HTTPException& error) noexcept {
249     LOG(FATAL) << error.what();
250   }
251   /**
252    * If the remote side's receive buffer fills up, this callback will be
253    * invoked so you can attempt to stop sending to the remote side.
254    */
255   virtual void onEgressPaused() noexcept = 0;
256 
257   /**
258    * This callback lets you know that the remote side has resumed reading
259    * and you can now continue to send data.
260    */
261   virtual void onEgressResumed() noexcept = 0;
262 
263   /**
264    * Ask the handler to construct a handler for a pushed transaction associated
265    * with its transaction.
266    *
267    * TODO: Reconsider default implementation here. If the handler
268    * does not implement, better set max initiated to 0 in a settings frame?
269    */
onPushedTransaction(HTTPTransaction *)270   virtual void onPushedTransaction(HTTPTransaction* /* txn */) noexcept {
271   }
272 
273   /**
274    * Ask the handler to construct a handler for a ExTransaction associated
275    * with its transaction.
276    */
onExTransaction(HTTPTransaction *)277   virtual void onExTransaction(HTTPTransaction* /* txn */) noexcept {
278   }
279 
280   /**
281    * Inform the handler that a GOAWAY has been received on the
282    * transport. This callback will only be invoked if the transport is
283    * SPDY or HTTP/2. It may be invoked multiple times, as HTTP/2 allows this.
284    *
285    * @param code The error code received in the GOAWAY frame
286    */
onGoaway(ErrorCode)287   virtual void onGoaway(ErrorCode /* code */) noexcept {
288   }
289 
290   /**
291    * Can be called multiple times per transaction after onHeadersComplete and
292    * before detachTransaction()
293    *
294    * It does not obey pauseIngress/resumeIngress it is up to the handler
295    * to decide whether to buffer/drop datagrams
296    */
onDatagram(std::unique_ptr<folly::IOBuf>)297   virtual void onDatagram(std::unique_ptr<folly::IOBuf> /*datagram*/) noexcept {
298   }
299 
~HTTPTransactionHandler()300   virtual ~HTTPTransactionHandler() {
301   }
302 };
303 
304 class HTTPPushTransactionHandler : public HTTPTransactionHandler {
305  public:
~HTTPPushTransactionHandler()306   ~HTTPPushTransactionHandler() override {
307   }
308 
onHeadersComplete(std::unique_ptr<HTTPMessage>)309   void onHeadersComplete(std::unique_ptr<HTTPMessage>) noexcept final {
310     LOG(FATAL) << "push txn received headers";
311   }
312 
onBody(std::unique_ptr<folly::IOBuf>)313   void onBody(std::unique_ptr<folly::IOBuf>) noexcept final {
314     LOG(FATAL) << "push txn received body";
315   }
316 
onBodyWithOffset(uint64_t,std::unique_ptr<folly::IOBuf>)317   void onBodyWithOffset(uint64_t,
318                         std::unique_ptr<folly::IOBuf>) noexcept final {
319     LOG(FATAL) << "push txn received body with offset";
320   }
321 
onChunkHeader(size_t)322   void onChunkHeader(size_t /* length */) noexcept final {
323     LOG(FATAL) << "push txn received chunk header";
324   }
325 
onChunkComplete()326   void onChunkComplete() noexcept final {
327     LOG(FATAL) << "push txn received chunk complete";
328   }
329 
onTrailers(std::unique_ptr<HTTPHeaders>)330   void onTrailers(std::unique_ptr<HTTPHeaders>) noexcept final {
331     LOG(FATAL) << "push txn received trailers";
332   }
333 
onEOM()334   void onEOM() noexcept final {
335     LOG(FATAL) << "push txn received EOM";
336   }
337 
onUpgrade(UpgradeProtocol)338   void onUpgrade(UpgradeProtocol) noexcept final {
339     LOG(FATAL) << "push txn received upgrade";
340   }
341 
onPushedTransaction(HTTPTransaction *)342   void onPushedTransaction(HTTPTransaction*) noexcept final {
343     LOG(FATAL) << "push txn received push txn";
344   }
345 };
346 
347 /**
348  * Callback interface to be notified of events on the byte stream.
349  */
350 class HTTPTransactionTransportCallback {
351  public:
352   virtual void firstHeaderByteFlushed() noexcept = 0;
353 
354   virtual void firstByteFlushed() noexcept = 0;
355 
356   virtual void lastByteFlushed() noexcept = 0;
357 
trackedByteFlushed()358   virtual void trackedByteFlushed() noexcept {
359   }
360 
361   virtual void lastByteAcked(std::chrono::milliseconds latency) noexcept = 0;
362 
trackedByteEventTX(const ByteEvent &)363   virtual void trackedByteEventTX(const ByteEvent& /* event */) noexcept {
364   }
365 
trackedByteEventAck(const ByteEvent &)366   virtual void trackedByteEventAck(const ByteEvent& /* event */) noexcept {
367   }
368 
egressBufferEmpty()369   virtual void egressBufferEmpty() noexcept {
370   }
371 
372   virtual void headerBytesGenerated(HTTPHeaderSize& size) noexcept = 0;
373 
374   virtual void headerBytesReceived(const HTTPHeaderSize& size) noexcept = 0;
375 
376   // May include extra bytes for EOF/trailers
377   virtual void bodyBytesGenerated(size_t nbytes) noexcept = 0;
378 
379   virtual void bodyBytesReceived(size_t size) noexcept = 0;
380 
lastEgressHeaderByteAcked()381   virtual void lastEgressHeaderByteAcked() noexcept {
382   }
383 
bodyBytesDelivered(uint64_t)384   virtual void bodyBytesDelivered(uint64_t /* bodyOffset */) noexcept {
385   }
386 
bodyBytesDeliveryCancelled(uint64_t)387   virtual void bodyBytesDeliveryCancelled(uint64_t /* bodyOffset */) noexcept {
388   }
389 
transportAppRateLimited()390   virtual void transportAppRateLimited() noexcept {
391   }
392 
datagramBytesGenerated(size_t)393   virtual void datagramBytesGenerated(size_t /* nbytes */) noexcept {
394   }
datagramBytesReceived(size_t)395   virtual void datagramBytesReceived(size_t /* size */) noexcept {
396   }
397 
~HTTPTransactionTransportCallback()398   virtual ~HTTPTransactionTransportCallback() {
399   }
400 };
401 
402 class HTTPSessionBase;
403 class HTTPTransaction
404     : public folly::HHWheelTimer::Callback
405     , public folly::DelayedDestructionBase {
406  public:
407   using Handler = HTTPTransactionHandler;
408   using PushHandler = HTTPPushTransactionHandler;
409 
410   struct FlowControlInfo {
411     bool flowControlEnabled_{false};
412     int64_t sessionSendWindow_{-1};
413     int64_t sessionRecvWindow_{-1};
414     int64_t sessionSendOutstanding_{-1};
415     int64_t sessionRecvOutstanding_{-1};
416     int64_t streamSendWindow_{-1};
417     int64_t streamRecvWindow_{-1};
418     int64_t streamSendOutstanding_{-1};
419     int64_t streamRecvOutstanding_{-1};
420   };
421 
422   /**
423    * Experimental.
424    *
425    * BufferMeta represents a buffer. The real data will be sourced from another
426    * place.
427    */
428   struct BufferMeta {
429     size_t length{0};
430 
431     BufferMeta() = default;
BufferMetaBufferMeta432     explicit BufferMeta(size_t lengthIn) : length(lengthIn) {
433     }
434 
splitBufferMeta435     BufferMeta split(size_t splitLen) {
436       CHECK_GE(length, splitLen);
437       length -= splitLen;
438       return BufferMeta(splitLen);
439     }
440   };
441 
442   /**
443    * Opaque token that identifies the underlying connection.
444    * Transaction handlers can use this token to group different
445    * Transport instances by the distinct underlying connections.
446    * Its uniqueness is not enforced by the Transport.
447    */
448   using ConnectionToken = std::string;
449 
450   class Transport {
451    public:
452     enum class Type : uint8_t { TCP, QUIC };
453 
~Transport()454     virtual ~Transport() {
455     }
456 
457     virtual void pauseIngress(HTTPTransaction* txn) noexcept = 0;
458 
459     virtual void resumeIngress(HTTPTransaction* txn) noexcept = 0;
460 
461     virtual void transactionTimeout(HTTPTransaction* txn) noexcept = 0;
462 
463     virtual void sendHeaders(HTTPTransaction* txn,
464                              const HTTPMessage& headers,
465                              HTTPHeaderSize* size,
466                              bool eom) noexcept = 0;
467 
468     /**
469      * Experimental API
470      *
471      * Send headers and a DSRRequestSender to Transport.
472      * The Transport will generate header for DATA frame.
473      *
474      * dataFrameHeaderSize: An output parameter to get the size of DATA frame
475      * header.
476      */
sendHeadersWithDelegate(HTTPTransaction *,const HTTPMessage &,HTTPHeaderSize *,size_t *,uint64_t,std::unique_ptr<DSRRequestSender>)477     virtual bool sendHeadersWithDelegate(
478         HTTPTransaction*,
479         const HTTPMessage&,
480         HTTPHeaderSize*,
481         size_t* /* dataFrameHeaderSize */,
482         uint64_t /* contentLength */,
483         std::unique_ptr<DSRRequestSender>) noexcept {
484       return false;
485     }
486 
487     // Experimental
488     virtual size_t sendBody(HTTPTransaction*,
489                             const BufferMeta&,
490                             bool /* eom */) noexcept = 0;
491 
492     virtual size_t sendBody(HTTPTransaction* txn,
493                             std::unique_ptr<folly::IOBuf>,
494                             bool eom,
495                             bool trackLastByteFlushed) noexcept = 0;
496 
497     virtual size_t sendChunkHeader(HTTPTransaction* txn,
498                                    size_t length) noexcept = 0;
499 
500     virtual size_t sendChunkTerminator(HTTPTransaction* txn) noexcept = 0;
501 
502     virtual size_t sendEOM(HTTPTransaction* txn,
503                            const HTTPHeaders* trailers) noexcept = 0;
504 
505     virtual size_t sendAbort(HTTPTransaction* txn,
506                              ErrorCode statusCode) noexcept = 0;
507 
508     virtual size_t sendPriority(HTTPTransaction* txn,
509                                 const http2::PriorityUpdate& pri) noexcept = 0;
510     virtual size_t changePriority(HTTPTransaction* txn,
511                                   HTTPPriority pri) noexcept = 0;
512 
513     virtual size_t sendWindowUpdate(HTTPTransaction* txn,
514                                     uint32_t bytes) noexcept = 0;
515 
516     virtual void notifyPendingEgress() noexcept = 0;
517 
518     virtual void detach(HTTPTransaction* txn) noexcept = 0;
519 
520     virtual void notifyIngressBodyProcessed(uint32_t bytes) noexcept = 0;
521 
522     virtual void notifyEgressBodyBuffered(int64_t bytes) noexcept = 0;
523 
524     virtual const folly::SocketAddress& getLocalAddress() const noexcept = 0;
525 
526     virtual const folly::SocketAddress& getPeerAddress() const noexcept = 0;
527 
528     virtual void describe(std::ostream&) const = 0;
529 
530     virtual const wangle::TransportInfo& getSetupTransportInfo()
531         const noexcept = 0;
532 
533     virtual bool getCurrentTransportInfo(wangle::TransportInfo* tinfo) = 0;
534 
535     virtual void getFlowControlInfo(FlowControlInfo* info) = 0;
536 
537     virtual HTTPTransaction::Transport::Type getSessionType()
538         const noexcept = 0;
539 
540     virtual const HTTPCodec& getCodec() const noexcept = 0;
541 
542     /*
543      * Drain the underlying session. This will affect other transactions
544      * running on the same session and is discouraged unless you are confident
545      * that the session is broken.
546      */
547     virtual void drain() = 0;
548 
549     virtual bool isDraining() const = 0;
550 
551     virtual HTTPTransaction* newPushedTransaction(
552         HTTPCodec::StreamID assocStreamId,
553         HTTPTransaction::PushHandler* handler,
554         ProxygenError* error = nullptr) noexcept = 0;
555 
556     virtual HTTPTransaction* newExTransaction(HTTPTransaction::Handler* handler,
557                                               HTTPCodec::StreamID controlStream,
558                                               bool unidirectional) noexcept = 0;
559 
560     virtual std::string getSecurityProtocol() const = 0;
561 
562     virtual void addWaitingForReplaySafety(
563         folly::AsyncTransport::ReplaySafetyCallback* callback) noexcept = 0;
564 
565     virtual void removeWaitingForReplaySafety(
566         folly::AsyncTransport::ReplaySafetyCallback* callback) noexcept = 0;
567 
568     virtual bool needToBlockForReplaySafety() const = 0;
569 
570     virtual const folly::AsyncTransport* getUnderlyingTransport()
571         const noexcept = 0;
572 
573     /**
574      * Returns true if the underlying transport has completed full handshake.
575      */
576     virtual bool isReplaySafe() const = 0;
577 
578     virtual void setHTTP2PrioritiesEnabled(bool enabled) = 0;
579     virtual bool getHTTP2PrioritiesEnabled() const = 0;
580 
581     virtual HTTPSessionBase* getHTTPSessionBase() = 0;
582 
583     virtual folly::Optional<const HTTPMessage::HTTP2Priority> getHTTPPriority(
584         uint8_t level) = 0;
585 
getHTTPPriority()586     virtual folly::Optional<HTTPPriority> getHTTPPriority() {
587       return folly::none;
588     }
589 
getDatagramSizeLimit()590     virtual uint16_t getDatagramSizeLimit() const noexcept {
591       return 0;
592     }
593 
sendDatagram(std::unique_ptr<folly::IOBuf>)594     virtual bool sendDatagram(std::unique_ptr<folly::IOBuf> /*datagram*/) {
595       LOG(FATAL) << __func__ << " not supported";
596       folly::assume_unreachable();
597     }
598 
599     /**
600      * Ask transport to track and ack body delivery.
601      */
trackEgressBodyDelivery(uint64_t)602     virtual void trackEgressBodyDelivery(uint64_t /* bodyOffset */) {
603       LOG(FATAL) << __func__ << " not supported";
604       folly::assume_unreachable();
605     }
606 
607     virtual folly::Optional<HTTPTransaction::ConnectionToken>
608     getConnectionToken() const noexcept = 0;
609   };
610 
611   using TransportCallback = HTTPTransactionTransportCallback;
612 
613   /**
614    * readBufLimit and sendWindow are only used if useFlowControl is
615    * true. Furthermore, if flow control is enabled, no guarantees can be
616    * made on the borders of the L7 chunking/data frames of the outbound
617    * messages.
618    *
619    * priority is only used by SPDY. The -1 default makes sure that all
620    * plain HTTP transactions land up in the same queue as the control data.
621    */
622   HTTPTransaction(
623       TransportDirection direction,
624       HTTPCodec::StreamID id,
625       uint32_t seqNo,
626       Transport& transport,
627       HTTP2PriorityQueueBase& egressQueue,
628       folly::HHWheelTimer* timer = nullptr,
629       const folly::Optional<std::chrono::milliseconds>& defaultIdleTimeout =
630           folly::Optional<std::chrono::milliseconds>(),
631       HTTPSessionStats* stats = nullptr,
632       bool useFlowControl = false,
633       uint32_t receiveInitialWindowSize = 0,
634       uint32_t sendInitialWindowSize = 0,
635       http2::PriorityUpdate = http2::DefaultPriority,
636       folly::Optional<HTTPCodec::StreamID> assocStreamId = HTTPCodec::NoStream,
637       folly::Optional<HTTPCodec::ExAttributes> exAttributes =
638           HTTPCodec::NoExAttributes,
639       bool setIngressTimeoutAfterEom = false);
640 
641   ~HTTPTransaction() override;
642 
643   void reset(bool useFlowControl,
644              uint32_t receiveInitialWindowSize,
645              uint32_t receiveStreamWindowSize,
646              uint32_t sendInitialWindowSize);
647 
getID()648   HTTPCodec::StreamID getID() const {
649     return id_;
650   }
651 
getSequenceNumber()652   uint32_t getSequenceNumber() const {
653     return seqNo_;
654   }
655 
getTransport()656   const Transport& getTransport() const {
657     return transport_;
658   }
659 
getTransport()660   Transport& getTransport() {
661     return transport_;
662   }
663 
setHandler(Handler * handler)664   virtual void setHandler(Handler* handler) {
665     handler_ = handler;
666     if (handler_) {
667       handler_->setTransaction(this);
668     }
669   }
670 
getHandler()671   const Handler* getHandler() const {
672     return handler_;
673   }
674 
getHandler()675   Handler* getHandler() {
676     return handler_;
677   }
678 
getPriority()679   http2::PriorityUpdate getPriority() const {
680     return priority_;
681   }
682 
getPrioritySummary()683   std::tuple<uint64_t, uint64_t, double> getPrioritySummary() const {
684     return std::make_tuple(insertDepth_,
685                            currentDepth_,
686                            egressCalls_ > 0 ? cumulativeRatio_ / egressCalls_
687                                             : 0);
688   }
689 
getHTTPPriority()690   folly::Optional<HTTPPriority> getHTTPPriority() const {
691     return transport_.getHTTPPriority();
692   }
693 
getPriorityFallback()694   bool getPriorityFallback() const {
695     return priorityFallback_;
696   }
697 
getEgressState()698   HTTPTransactionEgressSM::State getEgressState() const {
699     return egressState_;
700   }
701 
getIngressState()702   HTTPTransactionIngressSM::State getIngressState() const {
703     return ingressState_;
704   }
705 
isUpstream()706   bool isUpstream() const {
707     return direction_ == TransportDirection::UPSTREAM;
708   }
709 
isDownstream()710   bool isDownstream() const {
711     return direction_ == TransportDirection::DOWNSTREAM;
712   }
713 
getLocalAddress(folly::SocketAddress & addr)714   void getLocalAddress(folly::SocketAddress& addr) const {
715     addr = transport_.getLocalAddress();
716   }
717 
getPeerAddress(folly::SocketAddress & addr)718   void getPeerAddress(folly::SocketAddress& addr) const {
719     addr = transport_.getPeerAddress();
720   }
721 
getLocalAddress()722   const folly::SocketAddress& getLocalAddress() const noexcept {
723     return transport_.getLocalAddress();
724   }
725 
getPeerAddress()726   const folly::SocketAddress& getPeerAddress() const noexcept {
727     return transport_.getPeerAddress();
728   }
729 
getSetupTransportInfo()730   const wangle::TransportInfo& getSetupTransportInfo() const noexcept {
731     return transport_.getSetupTransportInfo();
732   }
733 
getCurrentTransportInfo(wangle::TransportInfo * tinfo)734   void getCurrentTransportInfo(wangle::TransportInfo* tinfo) const {
735     transport_.getCurrentTransportInfo(tinfo);
736   }
737 
getCurrentFlowControlInfo(FlowControlInfo * info)738   void getCurrentFlowControlInfo(FlowControlInfo* info) const {
739     transport_.getFlowControlInfo(info);
740     info->streamSendWindow_ = sendWindow_.getCapacity();
741     info->streamSendOutstanding_ = sendWindow_.getOutstanding();
742     info->streamRecvWindow_ = recvWindow_.getCapacity();
743     info->streamRecvOutstanding_ = recvWindow_.getOutstanding();
744   }
745 
getSessionStats()746   HTTPSessionStats* getSessionStats() const {
747     return stats_;
748   }
749 
750   /**
751    * Check whether more response is expected. One or more 1xx status
752    * responses can be received prior to the regular response.
753    * Note: 101 is handled by the codec using a separate onUpgrade callback
754    */
extraResponseExpected()755   virtual bool extraResponseExpected() const {
756     return (lastResponseStatus_ >= 100 && lastResponseStatus_ < 200) &&
757            lastResponseStatus_ != 101;
758   }
759 
760   /**
761    * Change the size of the receive window and propagate the change to the
762    * remote end using a window update.
763    *
764    * TODO: when HTTPSession sends a SETTINGS frame indicating a
765    * different initial window, it should call this function on all its
766    * transactions.
767    */
768   virtual void setReceiveWindow(uint32_t capacity);
769 
770   /**
771    * Get the receive window of the transaction
772    */
getReceiveWindow()773   virtual const Window& getReceiveWindow() const {
774     return recvWindow_;
775   }
776 
getMaxDeferredSize()777   uint32_t getMaxDeferredSize() {
778     return maxDeferredIngress_;
779   }
780 
781   /**
782    * Invoked by the session when the ingress headers are complete
783    */
784   void onIngressHeadersComplete(std::unique_ptr<HTTPMessage> msg);
785 
786   /**
787    * Invoked by the session when some or all of the ingress entity-body has
788    * been parsed.
789    */
790   void onIngressBody(std::unique_ptr<folly::IOBuf> chain, uint16_t padding);
791 
792   /**
793    * Invoked by the session when a chunk header has been parsed.
794    */
795   void onIngressChunkHeader(size_t length);
796 
797   /**
798    * Invoked by the session when the CRLF terminating a chunk has been parsed.
799    */
800   void onIngressChunkComplete();
801 
802   /**
803    * Invoked by the session when the ingress trailers have been parsed.
804    */
805   void onIngressTrailers(std::unique_ptr<HTTPHeaders> trailers);
806 
807   /**
808    * Invoked by the session when the session and transaction need to be
809    * upgraded to a different protocol
810    */
811   void onIngressUpgrade(UpgradeProtocol protocol);
812 
813   /**
814    * Invoked by the session when the ingress message is complete.
815    */
816   void onIngressEOM();
817 
818   /**
819    * Invoked by the session when there is an error (e.g., invalid syntax,
820    * TCP RST) in either the ingress or egress stream. Note that this
821    * message is processed immediately even if this transaction normally
822    * would queue ingress.
823    *
824    * @param error Details for the error. This exception also has
825    * information about whether the error applies to the ingress, egress,
826    * or both directions of the transaction
827    */
828   void onError(const HTTPException& error);
829 
830   /**
831    * Invoked by the session when a GOAWAY frame is received.
832    * TODO: we may consider exposing the additional debug data here in the
833    * future.
834    *
835    * @param code The error code received in the GOAWAY frame
836    */
837   void onGoaway(ErrorCode code);
838 
839   /**
840    * Invoked by the session when there is a timeout on the ingress stream.
841    * Note that each transaction has its own timer but the session
842    * is the effective target of the timer.
843    */
844   void onIngressTimeout();
845 
846   /**
847    * Invoked by the session when the remote endpoint of this transaction
848    * signals that it has consumed 'amount' bytes. This is only for
849    * versions of HTTP that support per transaction flow control.
850    */
851   void onIngressWindowUpdate(uint32_t amount);
852 
853   /**
854    * Invoked by the session when the remote endpoint signals that we
855    * should change our send window. This is only for
856    * versions of HTTP that support per transaction flow control.
857    */
858   void onIngressSetSendWindow(uint32_t newWindowSize);
859 
860   /**
861    * Notify this transaction that it is ok to egress.  Returns true if there
862    * is additional pending egress
863    */
864   bool onWriteReady(uint32_t maxEgress, double ratio);
865 
866   /**
867    * Invoked by the session when there is a timeout on the egress stream.
868    */
869   void onEgressTimeout();
870 
871   /**
872    * Invoked by the session when the first header byte is flushed.
873    */
874   void onEgressHeaderFirstByte();
875 
876   /**
877    * Invoked by the session when the first byte is flushed.
878    */
879   void onEgressBodyFirstByte();
880 
881   /**
882    * Invoked by the session when the last byte is flushed.
883    */
884   void onEgressBodyLastByte();
885 
886   /**
887    * Invoked by the session when the tracked byte is flushed.
888    */
889   void onEgressTrackedByte();
890 
891   /**
892    * Invoked when the ACK_LATENCY event is delivered
893    *
894    * @param latency the time between the moment when the last byte was sent
895    *        and the moment when we received the ACK from the client
896    */
897   void onEgressLastByteAck(std::chrono::milliseconds latency);
898 
899   /**
900    * Invoked by the session when last egress headers have been acked by the
901    * peer.
902    */
903   void onLastEgressHeaderByteAcked();
904 
905   /**
906    * Invoked by the session when egress body has been acked by the
907    * peer. Called for each sendBody() call if body bytes tracking is enabled.
908    */
909   void onEgressBodyBytesAcked(uint64_t bodyOffset);
910 
911   /**
912    * Invoked by the session when egress body delivery has been cancelled by the
913    * peer.
914    */
915   void onEgressBodyDeliveryCanceled(uint64_t bodyOffset);
916 
917   /**
918    * Invoked by the session when a tracked ByteEvent is transmitted by NIC.
919    */
920   void onEgressTrackedByteEventTX(const ByteEvent& event);
921 
922   /**
923    * Invoked by the session when a tracked ByteEvent is ACKed by remote peer.
924    *
925    * LAST_BYTE events are processed by legacy functions.
926    */
927   void onEgressTrackedByteEventAck(const ByteEvent& event);
928 
929   /**
930    * Invoked if the egress transport becomes app rate limited.
931    *
932    * TODO(bschlinker): Add support for QUIC.
933    */
934   void onEgressTransportAppRateLimited();
935 
936   /**
937    * Can be called multiple times per transaction after onHeadersComplete and
938    * before detachTransaction()
939    *
940    * It does not obey pauseIngress/resumeIngress it is up to the handler
941    * to decide whether to buffer/drop datagrams
942    */
943   void onDatagram(std::unique_ptr<folly::IOBuf> datagram) noexcept;
944 
945   /**
946    * Invoked by the handlers that are interested in tracking
947    * performance stats.
948    */
setTransportCallback(TransportCallback * cb)949   virtual void setTransportCallback(TransportCallback* cb) {
950     transportCallback_ = cb;
951   }
952 
953   /**
954    * @return true if ingress has started on this transaction.
955    */
isIngressStarted()956   bool isIngressStarted() const {
957     return ingressState_ != HTTPTransactionIngressSM::State::Start;
958   }
959 
960   /**
961    * @return true iff the ingress EOM has been queued in HTTPTransaction
962    * but the handler has not yet been notified of this event.
963    */
isIngressEOMQueued()964   bool isIngressEOMQueued() const {
965     return ingressState_ == HTTPTransactionIngressSM::State::EOMQueued;
966   }
967 
968   /**
969    * @return true iff the handler has been notified of the ingress EOM.
970    */
isIngressComplete()971   bool isIngressComplete() const {
972     return ingressState_ == HTTPTransactionIngressSM::State::ReceivingDone;
973   }
974 
975   /**
976    * @return true iff onIngressEOM() has been called.
977    */
isIngressEOMSeen()978   bool isIngressEOMSeen() const {
979     return isIngressEOMQueued() || isIngressComplete();
980   }
981 
982   /**
983    * @return true if egress has started on this transaction.
984    */
isEgressStarted()985   bool isEgressStarted() const {
986     return egressState_ != HTTPTransactionEgressSM::State::Start;
987   }
988 
989   /**
990    * @return true iff sendEOM() has been called, but the eom has not been
991    * flushed to the socket yet.
992    */
isEgressEOMQueued()993   bool isEgressEOMQueued() const {
994     return egressState_ == HTTPTransactionEgressSM::State::EOMQueued;
995   }
996 
997   /**
998    * @return true iff the egress EOM has been flushed to the socket.
999    */
isEgressComplete()1000   bool isEgressComplete() const {
1001     return egressState_ == HTTPTransactionEgressSM::State::SendingDone;
1002   }
1003 
1004   /**
1005    * @return true iff the remote side initiated this transaction.
1006    */
isRemoteInitiated()1007   bool isRemoteInitiated() const {
1008     return (direction_ == TransportDirection::DOWNSTREAM && id_ % 2 == 1) ||
1009            (direction_ == TransportDirection::UPSTREAM && id_ % 2 == 0);
1010   }
1011 
1012   /**
1013    * @return true iff sendEOM() has been called.
1014    */
isEgressEOMSeen()1015   bool isEgressEOMSeen() const {
1016     return isEgressEOMQueued() || isEgressComplete();
1017   }
1018 
1019   /**
1020    * @return true if we can send headers on this transaction
1021    *
1022    * Here's the logic:
1023    *  1) state machine says sendHeaders is OK AND
1024    *   2a) this is an upstream (allows for mid-stream headers) OR
1025    *   2b) this downstream has not sent a response
1026    *   2c) this downstream has only sent 1xx responses
1027    */
canSendHeaders()1028   virtual bool canSendHeaders() const {
1029     return HTTPTransactionEgressSM::canTransit(
1030                egressState_, HTTPTransactionEgressSM::Event::sendHeaders) &&
1031            (isUpstream() || lastResponseStatus_ == 0 ||
1032             extraResponseExpected());
1033   }
1034 
1035   /**
1036    * Send the egress message headers to the Transport. This method does
1037    * not actually write the message out on the wire immediately. All
1038    * writes happen at the end of the event loop at the earliest.
1039    * Note: This method should be called once per message unless the first
1040    * headers sent indicate a 1xx status.
1041    *
1042    * sendHeaders will not set EOM flag in header frame, whereas
1043    * sendHeadersWithEOM will. sendHeadersWithOptionalEOM backs both of them.
1044    *
1045    * @param headers  Message headers
1046    */
1047   virtual void sendHeaders(const HTTPMessage& headers);
1048   virtual void sendHeadersWithEOM(const HTTPMessage& headers);
1049   virtual void sendHeadersWithOptionalEOM(const HTTPMessage& headers, bool eom);
1050 
1051   /**
1052    * Experimental API
1053    *
1054    * Send the egress message header and a DSRRequestSender to the Transport.
1055    * Handler does NOT have to explicitly sendBody and sendEOM after this.
1056    */
1057   virtual bool sendHeadersWithDelegate(const HTTPMessage& headers,
1058                                        std::unique_ptr<DSRRequestSender>);
1059 
1060   /**
1061    * Send part or all of the egress message body to the Transport. If flow
1062    * control is enabled, the chunk boundaries may not be respected.
1063    * This method does not actually write the message out on the wire
1064    * immediately. All writes happen at the end of the event loop at the
1065    * earliest.
1066    * Note: This method may be called zero or more times per message.
1067    *
1068    * @param body Message body data; the Transport will take care of
1069    *             applying any necessary protocol framing, such as
1070    *             chunk headers.
1071    */
1072   virtual void sendBody(std::unique_ptr<folly::IOBuf> body);
1073 
1074   /**
1075    * Write any protocol framing required for the subsequent call(s)
1076    * to sendBody(). This method does not actually write the message out on
1077    * the wire immediately. All writes happen at the end of the event loop
1078    * at the earliest.
1079    * @param length  Length in bytes of the body data to follow.
1080    */
sendChunkHeader(size_t length)1081   virtual void sendChunkHeader(size_t length) {
1082     if (!validateEgressStateTransition(
1083             HTTPTransactionEgressSM::Event::sendChunkHeader)) {
1084       return;
1085     }
1086     CHECK_EQ(deferredBufferMeta_.length, 0)
1087         << "Chunked-encoding doesn't support BufferMeta write";
1088     // TODO: move this logic down to session/codec
1089     if (!transport_.getCodec().supportsParallelRequests()) {
1090       chunkHeaders_.emplace_back(length);
1091     }
1092   }
1093 
1094   /**
1095    * Write any protocol syntax needed to terminate the data. This method
1096    * does not actually write the message out on the wire immediately. All
1097    * writes happen at the end of the event loop at the earliest.
1098    * Frame begun by the last call to sendChunkHeader().
1099    */
sendChunkTerminator()1100   virtual void sendChunkTerminator() {
1101     validateEgressStateTransition(
1102         HTTPTransactionEgressSM::Event::sendChunkTerminator);
1103     CHECK_EQ(deferredBufferMeta_.length, 0)
1104         << "Chunked-encoding doesn't support BufferMeta write";
1105   }
1106 
1107   /**
1108    * Send message trailers to the Transport. This method does
1109    * not actually write the message out on the wire immediately. All
1110    * writes happen at the end of the event loop at the earliest.
1111    * Note: This method may be called at most once per message.
1112    *
1113    * @param trailers  Message trailers.
1114    */
sendTrailers(const HTTPHeaders & trailers)1115   virtual void sendTrailers(const HTTPHeaders& trailers) {
1116     if (!validateEgressStateTransition(
1117             HTTPTransactionEgressSM::Event::sendTrailers)) {
1118       return;
1119     }
1120     trailers_.reset(new HTTPHeaders(trailers));
1121   }
1122 
1123   /**
1124    * Finalize the egress message; depending on the protocol used
1125    * by the Transport, this may involve sending an explicit "end
1126    * of message" indicator. This method does not actually write the
1127    * message out on the wire immediately. All writes happen at the end
1128    * of the event loop at the earliest.
1129    *
1130    * If the ingress message also is complete, the transaction may
1131    * detach itself from the Handler and Transport and delete itself
1132    * as part of this method.
1133    *
1134    * Note: Either this method or sendAbort() should be called once
1135    *       per message.
1136    */
1137   virtual void sendEOM();
1138 
1139   /**
1140    * Terminate the transaction. Depending on the underlying protocol, this
1141    * may cause the connection to close or write egress bytes. This method
1142    * does not actually write the message out on the wire immediately. All
1143    * writes happen at the end of the event loop at the earliest.
1144    *
1145    * This function may also cause additional callbacks such as
1146    * detachTransaction() to the handler either immediately or after it returns.
1147    */
1148   virtual void sendAbort();
1149 
1150   /**
1151    * Pause ingress processing.  Upon pause, the HTTPTransaction
1152    * will call its Transport's pauseIngress() method.  The Transport
1153    * should make a best effort to stop invoking the HTTPTransaction's
1154    * onIngress* callbacks.  If the Transport does invoke any of those
1155    * methods while the transaction is paused, however, the transaction
1156    * will queue the ingress events and data and delay delivery to the
1157    * Handler until the transaction is unpaused.
1158    */
1159   virtual void pauseIngress();
1160 
1161   /**
1162    * Resume ingress processing. Only useful after a call to pauseIngress().
1163    */
1164   virtual void resumeIngress();
1165 
1166   /**
1167    * @return true iff ingress processing is paused for the handler
1168    */
isIngressPaused()1169   bool isIngressPaused() const {
1170     return ingressPaused_;
1171   }
1172 
1173   /**
1174    * Pause egress generation. HTTPTransaction may call its Handler's
1175    * onEgressPaused() method if there is a state change as a result of
1176    * this call.
1177    *
1178    * On receiving onEgressPaused(), the Handler should make a best effort
1179    * to stop invoking the HTTPTransaction's egress generating methods.  If
1180    * the Handler does invoke any of those methods while the transaction is
1181    * paused, however, the transaction will forward them anyway, unless it
1182    * is a body event. If flow control is enabled, body events will be
1183    * buffered for later transmission when egress is unpaused.
1184    */
1185   void pauseEgress();
1186 
1187   /**
1188    * Resume egress generation. The Handler's onEgressResumed() will not be
1189    * invoked if the HTTP/2 send window is full or there is too much
1190    * buffered egress data on this transaction already. In that case,
1191    * once the send window is not full or the buffer usage decreases, the
1192    * handler will finally get onEgressResumed().
1193    */
1194   void resumeEgress();
1195 
1196   /**
1197    * Specify a rate limit for egressing bytes.
1198    * The transaction will buffer extra bytes if doing so would cause it to go
1199    * over the specified rate limit.  Setting to a value of 0 will cause no
1200    * rate-limiting to occur.
1201    */
1202   void setEgressRateLimit(uint64_t bitsPerSecond);
1203 
1204   /**
1205    * @return true iff egress processing is paused for the handler
1206    */
isEgressPaused()1207   bool isEgressPaused() const {
1208     return handlerEgressPaused_;
1209   }
1210 
1211   /**
1212    * @return true iff egress processing is paused due to flow control
1213    * to the handler
1214    */
isFlowControlPaused()1215   bool isFlowControlPaused() const {
1216     return flowControlPaused_;
1217   }
1218 
1219   /**
1220    * @return true iff this transaction can be used to push resources to
1221    * the remote side.
1222    */
supportsPushTransactions()1223   bool supportsPushTransactions() const {
1224     return direction_ == TransportDirection::DOWNSTREAM &&
1225            transport_.getCodec().supportsPushTransactions();
1226   }
1227 
1228   /**
1229    * Create a new pushed transaction associated with this transaction,
1230    * and assign the given handler and priority.
1231    *
1232    * @return the new transaction for the push, or nullptr if a new push
1233    * transaction is impossible right now.
1234    */
1235   virtual HTTPTransaction* newPushedTransaction(
1236       HTTPPushTransactionHandler* handler, ProxygenError* error = nullptr) {
1237     if (isDelegated_) {
1238       LOG(ERROR)
1239           << "Creating Pushed transaction on a delegated HTTPTransaction "
1240           << "is not supported.";
1241       return nullptr;
1242     }
1243 
1244     if (isEgressEOMSeen()) {
1245       SET_PROXYGEN_ERROR_IF(error,
1246                             ProxygenError::kErrorEgressEOMSeenOnParentStream);
1247       return nullptr;
1248     }
1249     auto txn = transport_.newPushedTransaction(id_, handler, error);
1250     if (txn) {
1251       pushedTransactions_.insert(txn->getID());
1252     }
1253     return txn;
1254   }
1255 
1256   /**
1257    * Create a new extended transaction associated with this transaction,
1258    * and assign the given handler and priority.
1259    *
1260    * @return the new transaction for pubsub, or nullptr if a new push
1261    * transaction is impossible right now.
1262    */
1263   virtual HTTPTransaction* newExTransaction(HTTPTransactionHandler* handler,
1264                                             bool unidirectional = false) {
1265     if (isDelegated_) {
1266       LOG(ERROR) << "Creating ExTransaction on a delegated HTTPTransaction is "
1267                  << "not supported.";
1268       return nullptr;
1269     }
1270     auto txn = transport_.newExTransaction(handler, id_, unidirectional);
1271     if (txn) {
1272       exTransactions_.insert(txn->getID());
1273     }
1274     return txn;
1275   }
1276 
1277   /**
1278    * Invoked by the session (upstream only) when a new pushed transaction
1279    * arrives.  The txn's handler will be notified and is responsible for
1280    * installing a handler.  If no handler is installed in the callback,
1281    * the pushed transaction will be aborted.
1282    */
1283   bool onPushedTransaction(HTTPTransaction* txn);
1284 
1285   /**
1286    * Invoked by the session when a new ExTransaction arrives.  The txn's handler
1287    * will be notified and is responsible for installing a handler.  If no
1288    * handler is installed in the callback, the transaction will be aborted.
1289    */
1290   bool onExTransaction(HTTPTransaction* txn);
1291 
1292   /**
1293    * True if this transaction is a server push transaction
1294    */
isPushed()1295   bool isPushed() const {
1296     return assocStreamId_.has_value();
1297   }
1298 
isExTransaction()1299   bool isExTransaction() const {
1300     return exAttributes_.has_value();
1301   }
1302 
isUnidirectional()1303   bool isUnidirectional() const {
1304     return isExTransaction() && exAttributes_->unidirectional;
1305   }
1306 
1307   /**
1308    * @return true iff we should notify the error occured on EX_TXN
1309    * This logic only applies to EX_TXN with QoS 0
1310    */
shouldNotifyExTxnError(HTTPException::Direction errorDirection)1311   bool shouldNotifyExTxnError(HTTPException::Direction errorDirection) const {
1312     if (isUnidirectional()) {
1313       if (isRemoteInitiated()) {
1314         // We care about EGRESS errors in this case,
1315         // because we marked EGRESS state to be completed
1316         // If EGRESS error is happening, we need to know
1317         // Same for INGRESS direction, when EX_TXN is not remoteInitiated()
1318         return errorDirection == HTTPException::Direction::EGRESS;
1319       } else {
1320         return errorDirection == HTTPException::Direction::INGRESS;
1321       }
1322     }
1323     return false;
1324   }
1325 
1326   /**
1327    * Overrides the default idle timeout value.
1328    */
1329   void setIdleTimeout(std::chrono::milliseconds idleTimeout);
1330 
hasIdleTimeout()1331   bool hasIdleTimeout() const {
1332     return idleTimeout_.has_value();
1333   }
1334 
1335   /**
1336    * Returns the associated transaction ID for pushed transactions, 0 otherwise
1337    */
getAssocTxnId()1338   folly::Optional<HTTPCodec::StreamID> getAssocTxnId() const {
1339     return assocStreamId_;
1340   }
1341 
1342   /**
1343    * Returns the control channel transaction ID for this transaction,
1344    * folly::none otherwise
1345    */
getControlStream()1346   folly::Optional<HTTPCodec::StreamID> getControlStream() const {
1347     return exAttributes_ ? exAttributes_->controlStream : HTTPCodec::NoStream;
1348   }
1349 
1350   /*
1351    * Returns attributes of EX stream (folly::none if not an EX transaction)
1352    */
getExAttributes()1353   folly::Optional<HTTPCodec::ExAttributes> getExAttributes() const {
1354     return exAttributes_;
1355   }
1356 
1357   /**
1358    * Get a set of server-pushed transactions associated with this transaction.
1359    */
getPushedTransactions()1360   const std::set<HTTPCodec::StreamID>& getPushedTransactions() const {
1361     return pushedTransactions_;
1362   }
1363 
1364   /**
1365    * Get a set of exTransactions associated with this transaction.
1366    */
getExTransactions()1367   std::set<HTTPCodec::StreamID> getExTransactions() const {
1368     return exTransactions_;
1369   }
1370 
1371   /**
1372    * Remove the pushed txn ID from the set of pushed txns
1373    * associated with this txn.
1374    */
removePushedTransaction(HTTPCodec::StreamID pushStreamId)1375   void removePushedTransaction(HTTPCodec::StreamID pushStreamId) {
1376     pushedTransactions_.erase(pushStreamId);
1377   }
1378 
1379   /**
1380    * Remove the exTxn ID from the control stream txn.
1381    */
removeExTransaction(HTTPCodec::StreamID exStreamId)1382   void removeExTransaction(HTTPCodec::StreamID exStreamId) {
1383     exTransactions_.erase(exStreamId);
1384   }
1385 
1386   /**
1387    * Schedule or refresh the idle timeout for this transaction
1388    */
refreshTimeout()1389   void refreshTimeout() {
1390     if (timer_ && hasIdleTimeout()) {
1391       timer_->scheduleTimeout(this, idleTimeout_.value());
1392     }
1393   }
1394 
1395   /**
1396    * Tests if the first byte has already been sent, and if it
1397    * hasn't yet then it marks it as sent.
1398    */
testAndSetFirstByteSent()1399   bool testAndSetFirstByteSent() {
1400     bool ret = firstByteSent_;
1401     firstByteSent_ = true;
1402     return ret;
1403   }
1404 
testAndClearIsCountedTowardsStreamLimit()1405   bool testAndClearIsCountedTowardsStreamLimit() {
1406     bool ret = isCountedTowardsStreamLimit_;
1407     isCountedTowardsStreamLimit_ = false;
1408     return ret;
1409   }
1410 
setIsCountedTowardsStreamLimit()1411   void setIsCountedTowardsStreamLimit() {
1412     isCountedTowardsStreamLimit_ = true;
1413   }
1414 
1415   /**
1416    * Tests if the very first byte of Header has already been set.
1417    * If it hasn't yet, it marks it as sent.
1418    */
testAndSetFirstHeaderByteSent()1419   bool testAndSetFirstHeaderByteSent() {
1420     bool ret = firstHeaderByteSent_;
1421     firstHeaderByteSent_ = true;
1422     return ret;
1423   }
1424 
1425   /**
1426    * HTTPTransaction will not detach until it has 0 pending byte events.  If
1427    * you call incrementPendingByteEvents, you must make a corresponding call
1428    * to decrementPendingByteEvents or the transaction will never be destroyed.
1429    */
incrementPendingByteEvents()1430   void incrementPendingByteEvents() {
1431     CHECK_LT(pendingByteEvents_,
1432              std::numeric_limits<decltype(pendingByteEvents_)>::max());
1433     pendingByteEvents_++;
1434   }
1435 
decrementPendingByteEvents()1436   void decrementPendingByteEvents() {
1437     DestructorGuard dg(this);
1438     CHECK_GT(pendingByteEvents_, 0);
1439     pendingByteEvents_--;
1440   }
1441 
getNumPendingByteEvents()1442   uint64_t getNumPendingByteEvents() const {
1443     return pendingByteEvents_;
1444   }
1445 
1446   /**
1447    * Timeout callback for this transaction.  The timer is active
1448    * until the ingress message is complete or terminated by error.
1449    */
timeoutExpired()1450   void timeoutExpired() noexcept override {
1451     transport_.transactionTimeout(this);
1452   }
1453 
1454   /**
1455    * Write a description of the transaction to a stream
1456    */
1457   void describe(std::ostream& os) const;
1458 
1459   /**
1460    * Change the priority of this transaction, may generate a PRIORITY frame.
1461    * The first variant is SPDY priority. The second is HTTP/2 priority. The
1462    * third one is a new proposal in a draft for both HTTP/2 and HTTP/3.
1463    */
1464   void updateAndSendPriority(int8_t newPriority);
1465   void updateAndSendPriority(const http2::PriorityUpdate& pri);
1466   virtual void updateAndSendPriority(uint8_t urgency, bool incremental);
1467 
1468   /**
1469    * Notify of priority change, will not generate a PRIORITY frame
1470    */
1471   void onPriorityUpdate(const http2::PriorityUpdate& priority);
1472 
1473   /**
1474    * Add a callback waiting for this transaction to have a transport with
1475    * replay protection.
1476    */
addWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback * callback)1477   virtual void addWaitingForReplaySafety(
1478       folly::AsyncTransport::ReplaySafetyCallback* callback) {
1479     transport_.addWaitingForReplaySafety(callback);
1480   }
1481 
1482   /**
1483    * Remove a callback waiting for replay protection (if it was canceled).
1484    */
removeWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback * callback)1485   virtual void removeWaitingForReplaySafety(
1486       folly::AsyncTransport::ReplaySafetyCallback* callback) {
1487     transport_.removeWaitingForReplaySafety(callback);
1488   }
1489 
needToBlockForReplaySafety()1490   virtual bool needToBlockForReplaySafety() const {
1491     return transport_.needToBlockForReplaySafety();
1492   }
1493 
1494   int32_t getRecvToAck() const;
1495 
isPrioritySampled()1496   bool isPrioritySampled() const {
1497     return prioritySample_ != nullptr;
1498   }
1499 
1500   void setPrioritySampled(bool sampled);
1501   void updateContentionsCount(uint64_t contentions);
1502   void updateRelativeWeight(double ratio);
1503   void updateSessionBytesSheduled(uint64_t bytes);
1504   void updateTransactionBytesSent(uint64_t bytes);
1505   void checkIfEgressRateLimitedByUpstream();
1506 
1507   struct PrioritySampleSummary {
1508     struct WeightedAverage {
1509       double byTransactionBytes_{0};
1510       double bySessionBytes_{0};
1511     };
1512     WeightedAverage contentions_;
1513     WeightedAverage depth_;
1514     double expected_weight_;
1515     double measured_weight_;
1516   };
1517 
1518   bool getPrioritySampleSummary(PrioritySampleSummary& summary) const;
1519 
1520   const CompressionInfo& getCompressionInfo() const;
1521 
hasPendingBody()1522   bool hasPendingBody() const {
1523     return getOutstandingEgressBodyBytes() > 0;
1524   }
1525 
getOutstandingEgressBodyBytes()1526   size_t getOutstandingEgressBodyBytes() const {
1527     return deferredEgressBody_.chainLength() + deferredBufferMeta_.length;
1528   }
1529 
setLastByteFlushedTrackingEnabled(bool enabled)1530   void setLastByteFlushedTrackingEnabled(bool enabled) {
1531     enableLastByteFlushedTracking_ = enabled;
1532   }
1533 
setBodyLastByteDeliveryTrackingEnabled(bool enabled)1534   bool setBodyLastByteDeliveryTrackingEnabled(bool enabled) {
1535     if (transport_.getSessionType() != Transport::Type::QUIC) {
1536       return false;
1537     }
1538 
1539     enableBodyLastByteDeliveryTracking_ = enabled;
1540     return true;
1541   }
1542 
1543   uint16_t getDatagramSizeLimit() const noexcept;
1544   virtual bool sendDatagram(std::unique_ptr<folly::IOBuf> datagram);
1545 
1546   folly::Optional<ConnectionToken> getConnectionToken() const noexcept;
1547 
setEgressBufferLimit(uint64_t limit)1548   static void setEgressBufferLimit(uint64_t limit) {
1549     egressBufferLimit_ = limit;
1550   }
1551 
1552  private:
1553   HTTPTransaction(const HTTPTransaction&) = delete;
1554   HTTPTransaction& operator=(const HTTPTransaction&) = delete;
1555 
dequeue()1556   void dequeue() {
1557     CHECK(isEnqueued());
1558     egressQueue_.clearPendingEgress(queueHandle_);
1559   }
1560 
1561   bool delegatedTransactionChecks(const HTTPMessage& headers) noexcept;
1562   bool delegatedTransactionChecks() noexcept;
1563 
1564   bool addBufferMeta() noexcept;
1565 
1566   void onDelayedDestroy(bool delayed) override;
1567 
1568   /**
1569    * Invokes the handler's onEgressPaused/Resumed if the handler's pause
1570    * state needs updating
1571    */
1572   void updateHandlerPauseState();
1573 
1574   /**
1575    * Update the CompressionInfo (tableInfo_) struct
1576    */
1577   void updateEgressCompressionInfo(const CompressionInfo&);
1578 
1579   void updateIngressCompressionInfo(const CompressionInfo&);
1580 
1581   bool mustQueueIngress() const;
1582 
1583   /**
1584    * Check if deferredIngress_ points to some queue before pushing HTTPEvent
1585    * to it.
1586    */
1587   void checkCreateDeferredIngress();
1588 
1589   /**
1590    * Implementation of sending an abort for this transaction.
1591    */
1592   void sendAbort(ErrorCode statusCode);
1593 
1594   // Internal implementations of the ingress-related callbacks
1595   // that work whether the ingress events are immediate or deferred.
1596   void processIngressHeadersComplete(std::unique_ptr<HTTPMessage> msg);
1597   void processIngressBody(std::unique_ptr<folly::IOBuf> chain, size_t len);
1598   void processIngressChunkHeader(size_t length);
1599   void processIngressChunkComplete();
1600   void processIngressTrailers(std::unique_ptr<HTTPHeaders> trailers);
1601   void processIngressUpgrade(UpgradeProtocol protocol);
1602   void processIngressEOM();
1603 
1604   void sendBodyFlowControlled(std::unique_ptr<folly::IOBuf> body = nullptr);
1605   size_t sendBodyNow(std::unique_ptr<folly::IOBuf> body,
1606                      size_t bodyLen,
1607                      bool eom);
1608   size_t sendEOMNow();
1609   void onDeltaSendWindowSize(int32_t windowDelta);
1610 
1611   void notifyTransportPendingEgress();
1612 
1613   size_t sendDeferredBody(uint32_t maxEgress);
1614 
1615   size_t sendDeferredBufferMeta(uint32_t maxEgress);
1616 
1617   bool maybeDelayForRateLimit();
1618 
isEnqueued()1619   bool isEnqueued() const {
1620     return queueHandle_ ? queueHandle_->isEnqueued() : false;
1621   }
1622 
1623   // Whther the txn has a pending EOM that can be send out (i.e., no more body
1624   // bytes need to go before it.)
hasPendingEOM()1625   bool hasPendingEOM() const {
1626     return isEgressEOMQueued() && getOutstandingEgressBodyBytes() == 0;
1627   }
1628 
1629   bool isExpectingIngress() const;
1630 
1631   bool isExpectingWindowUpdate() const;
1632 
1633   void updateReadTimeout();
1634 
1635   /**
1636    * Causes isIngressComplete() to return true, removes any queued
1637    * ingress, and cancels the read timeout.
1638    */
1639   void markIngressComplete();
1640 
1641   /**
1642    * Causes isEgressComplete() to return true, removes any queued egress,
1643    * and cancels the write timeout.
1644    */
1645   void markEgressComplete();
1646 
1647   /**
1648    * Validates the ingress state transition. Returns false and sends an
1649    * abort with INTERNAL_ERROR if the transition fails. Otherwise it
1650    * returns true.
1651    */
1652   bool validateIngressStateTransition(HTTPTransactionIngressSM::Event);
1653 
1654   /**
1655    * Validates the egress state transition.
1656    *
1657    * If the transition fails, it will invoke onInvariantViolation, and the
1658    * default implementation is to CHECK/crash.  If you have a custom
1659    * onInvariantViolation handler, this function can return false.
1660    */
1661   bool validateEgressStateTransition(HTTPTransactionEgressSM::Event);
1662 
1663   void invariantViolation(HTTPException ex);
1664 
1665   /**
1666    * Flushes any pending window updates.  This can happen from setReceiveWindow
1667    * or sendHeaders depending on transaction state.
1668    */
1669   void flushWindowUpdate();
1670 
1671   bool updateContentLengthRemaining(size_t len);
1672 
1673   void rateLimitTimeoutExpired();
1674 
1675   class RateLimitCallback : public folly::HHWheelTimer::Callback {
1676    public:
RateLimitCallback(HTTPTransaction & txn)1677     explicit RateLimitCallback(HTTPTransaction& txn) : txn_(txn) {
1678     }
1679 
timeoutExpired()1680     void timeoutExpired() noexcept override {
1681       txn_.rateLimitTimeoutExpired();
1682     }
callbackCanceled()1683     void callbackCanceled() noexcept override {
1684       // no op
1685     }
1686 
1687    private:
1688     HTTPTransaction& txn_;
1689   };
1690 
1691   RateLimitCallback rateLimitCallback_{*this};
1692 
1693   /**
1694    * Queue to hold any events that we receive from the Transaction
1695    * while the ingress is supposed to be paused.
1696    */
1697   std::unique_ptr<std::queue<HTTPEvent>> deferredIngress_;
1698 
1699   /**
1700    * Queue to hold any body bytes to be sent out
1701    * while egress to the remote is supposed to be paused.
1702    */
1703   folly::IOBufQueue deferredEgressBody_{folly::IOBufQueue::cacheChainLength()};
1704 
1705   /**
1706    * BufferMeta queued at this HTTPTransaction to be sent to Transport.
1707    */
1708   BufferMeta deferredBufferMeta_;
1709 
1710   const TransportDirection direction_;
1711   HTTPTransactionEgressSM::State egressState_{
1712       HTTPTransactionEgressSM::getNewInstance()};
1713   HTTPTransactionIngressSM::State ingressState_{
1714       HTTPTransactionIngressSM::getNewInstance()};
1715   /**
1716    * bytes we need to acknowledge to the remote end using a window update
1717    */
1718   int32_t recvToAck_{0};
1719 
1720   HTTPCodec::StreamID id_;
1721   uint32_t seqNo_;
1722   uint32_t maxDeferredIngress_{0};
1723   Handler* handler_{nullptr};
1724   Transport& transport_;
1725 
1726   HTTPSessionStats* stats_{nullptr};
1727 
1728   CompressionInfo tableInfo_;
1729 
1730   /**
1731    * The recv window and associated data. This keeps track of how many
1732    * bytes we are allowed to buffer.
1733    */
1734   Window recvWindow_;
1735 
1736   /**
1737    * The send window and associated data. This keeps track of how many
1738    * bytes we are allowed to send and have outstanding.
1739    */
1740   Window sendWindow_;
1741 
1742   TransportCallback* transportCallback_{nullptr};
1743 
1744   /**
1745    * Trailers to send, if any.
1746    */
1747   std::unique_ptr<HTTPHeaders> trailers_;
1748 
1749   struct Chunk {
ChunkChunk1750     explicit Chunk(size_t inLength) : length(inLength), headerSent(false) {
1751     }
1752     size_t length;
1753     bool headerSent;
1754   };
1755   std::list<Chunk> chunkHeaders_;
1756 
1757   /**
1758    * Reference to our priority queue
1759    */
1760   HTTP2PriorityQueueBase& egressQueue_;
1761 
1762   /**
1763    * Handle to our position in the priority queue.
1764    */
1765   HTTP2PriorityQueueBase::Handle queueHandle_{nullptr};
1766 
1767   /**
1768    * ID of request transaction (for pushed txns only)
1769    */
1770   folly::Optional<HTTPCodec::StreamID> assocStreamId_;
1771 
1772   /**
1773    * Attributes of http2 Ex_HEADERS
1774    */
1775   folly::Optional<HTTPCodec::ExAttributes> exAttributes_;
1776 
1777   /**
1778    * Set of all push transactions IDs associated with this transaction.
1779    */
1780   std::set<HTTPCodec::StreamID> pushedTransactions_;
1781 
1782   /**
1783    * Set of all exTransaction IDs associated with this transaction.
1784    */
1785   std::set<HTTPCodec::StreamID> exTransactions_;
1786 
1787   /**
1788    * Priority of this transaction
1789    */
1790   http2::PriorityUpdate priority_;
1791 
1792   /**
1793    * Information about this transaction's priority.
1794    *
1795    * insertDepth_ is the depth of this node in the tree when the txn was created
1796    * currentDepth_ is the depth of this node in the tree after the last
1797    *               onPriorityUpdate. It may not reflect its real position in
1798    *               realtime, since after the last onPriorityUpdate, it may get
1799    *               reparented as parent transactions complete.
1800    * cumulativeRatio_ / egressCalls_ is the average relative weight of this
1801    *                                 txn during egress
1802    */
1803   uint64_t insertDepth_{0};
1804   uint64_t currentDepth_{0};
1805   double cumulativeRatio_{0};
1806   uint64_t egressCalls_{0};
1807 
1808   uint64_t pendingByteEvents_{0};
1809   folly::Optional<uint64_t> expectedIngressContentLength_;
1810   folly::Optional<uint64_t> expectedIngressContentLengthRemaining_;
1811   folly::Optional<uint64_t> expectedResponseLength_;
1812   folly::Optional<uint64_t> actualResponseLength_{0};
1813 
1814   bool ingressPaused_ : 1;
1815   bool egressPaused_ : 1;
1816   bool flowControlPaused_ : 1;
1817   bool handlerEgressPaused_ : 1;
1818   bool egressRateLimited_ : 1;
1819   bool useFlowControl_ : 1;
1820   bool aborted_ : 1;
1821   bool deleting_ : 1;
1822   bool firstByteSent_ : 1;
1823   bool firstHeaderByteSent_ : 1;
1824   bool inResume_ : 1;
1825   bool isCountedTowardsStreamLimit_ : 1;
1826   bool ingressErrorSeen_ : 1;
1827   bool priorityFallback_ : 1;
1828   bool headRequest_ : 1;
1829   bool enableLastByteFlushedTracking_ : 1;
1830   bool enableBodyLastByteDeliveryTracking_ : 1;
1831 
1832   // Prevents the application from calling skipBodyTo() before egress
1833   // headers have been delivered.
1834   bool egressHeadersDelivered_ : 1;
1835 
1836   // Whether the HTTPTransaction has sent out a 1xx response HTTPMessage.
1837   bool has1xxResponse_ : 1;
1838 
1839   // Whether this HTTPTransaction delegates body sending to another entity.
1840   bool isDelegated_ : 1;
1841 
1842   /**
1843    * If this transaction represents a request (ie, it is backed by an
1844    * HTTPUpstreamSession) , this field indicates the last response status
1845    * received from the server. If this transaction represents a response,
1846    * this field indicates the last status we've sent. For instances, this
1847    * could take on multiple 1xx values, and then take on 200.
1848    */
1849   uint16_t lastResponseStatus_{0};
1850 
1851   // Maximum size of egress buffer before invoking onEgressPaused
1852   static uint64_t egressBufferLimit_;
1853 
1854   uint64_t egressLimitBytesPerMs_{0};
1855   proxygen::TimePoint startRateLimit_;
1856   uint64_t numLimitedBytesEgressed_{0};
1857 
1858   folly::Optional<std::chrono::milliseconds> idleTimeout_;
1859 
1860   folly::HHWheelTimer* timer_;
1861 
1862   class PrioritySample;
1863   std::unique_ptr<PrioritySample> prioritySample_;
1864 
1865   // Keeps track for body offset processed so far.
1866   uint64_t ingressBodyOffset_{0};
1867 
1868   bool setIngressTimeoutAfterEom_{false};
1869 };
1870 
1871 /**
1872  * Write a description of an HTTPTransaction to an ostream
1873  */
1874 std::ostream& operator<<(std::ostream& os, const HTTPTransaction& txn);
1875 
1876 } // namespace proxygen
1877