1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree.
7  */
8 
9 #pragma once
10 
11 #include <folly/container/EvictingCacheMap.h>
12 #include <folly/io/IOBufQueue.h>
13 #include <folly/io/async/AsyncSocket.h>
14 #include <folly/io/async/DelayedDestructionBase.h>
15 #include <folly/io/async/EventBase.h>
16 #include <folly/lang/Assume.h>
17 #include <proxygen/lib/http/codec/HQControlCodec.h>
18 #include <proxygen/lib/http/codec/HQUnidirectionalCodec.h>
19 #include <proxygen/lib/http/codec/HQUtils.h>
20 #include <proxygen/lib/http/codec/HTTP1xCodec.h>
21 #include <proxygen/lib/http/codec/HTTP2Framer.h>
22 #include <proxygen/lib/http/codec/HTTPChecks.h>
23 #include <proxygen/lib/http/codec/HTTPCodec.h>
24 #include <proxygen/lib/http/codec/HTTPCodecFilter.h>
25 #include <proxygen/lib/http/codec/HTTPSettings.h>
26 #include <proxygen/lib/http/session/HQByteEventTracker.h>
27 #include <proxygen/lib/http/session/HQStreamBase.h>
28 #include <proxygen/lib/http/session/HQUnidirectionalCallbacks.h>
29 #include <proxygen/lib/http/session/HTTPSessionBase.h>
30 #include <proxygen/lib/http/session/HTTPSessionController.h>
31 #include <proxygen/lib/http/session/HTTPTransaction.h>
32 #include <proxygen/lib/http/session/ServerPushLifecycle.h>
33 #include <proxygen/lib/utils/ConditionalGate.h>
34 #include <quic/api/QuicSocket.h>
35 #include <quic/common/BufUtil.h>
36 
37 namespace proxygen {
38 
39 class HTTPSessionController;
40 class HQSession;
41 class VersionUtils;
42 
43 namespace hq {
44 class HQStreamCodec;
45 }
46 
47 std::ostream& operator<<(std::ostream& os, const HQSession& session);
48 
49 enum class HQVersion : uint8_t {
50   H1Q_FB_V1, // HTTP1.1 on each stream, no control stream
51   H1Q_FB_V2, // HTTP1.1 on each stream, control stream for GOAWAY
52   HQ,        // The real McCoy
53 };
54 
55 extern const std::string kH3;
56 extern const std::string kHQ;
57 extern const std::string kH3FBCurrentDraft;
58 extern const std::string kH3CurrentDraft;
59 extern const std::string kH3LegacyDraft;
60 extern const std::string kHQCurrentDraft;
61 
62 // Default Priority Node
63 extern const proxygen::http2::PriorityUpdate hqDefaultPriority;
64 
65 using HQVersionType = std::underlying_type<HQVersion>::type;
66 
67 constexpr uint8_t kMaxDatagramHeaderSize = 16;
68 // Maximum number of datagrams to buffer per stream
69 constexpr uint8_t kDefaultMaxBufferedDatagrams = 5;
70 // Maximum number of streams with datagrams buffered
71 constexpr uint8_t kMaxStreamsWithBufferedDatagrams = 10;
72 // Maximum number of priority updates received when stream is not available
73 constexpr uint8_t kMaxBufferedPriorityUpdates = 10;
74 
75 /**
76  * Session-level protocol info.
77  */
78 struct QuicProtocolInfo : public wangle::ProtocolInfo {
79   virtual ~QuicProtocolInfo() override = default;
80 
81   folly::Optional<quic::ConnectionId> clientChosenDestConnectionId;
82   folly::Optional<quic::ConnectionId> clientConnectionId;
83   folly::Optional<quic::ConnectionId> serverConnectionId;
84   folly::Optional<quic::TransportSettings> transportSettings;
85 
86   uint32_t ptoCount{0};
87   uint32_t totalPTOCount{0};
88   uint64_t totalTransportBytesSent{0};
89   uint64_t totalTransportBytesRecvd{0};
90   bool usedZeroRtt{false};
91 };
92 
93 /**
94  *  Stream level protocol info. Contains all data from
95  *  the sessinon info, plus stream-specific information.
96  *  This structure is owned by each individual stream,
97  *  and is updated when requested.
98  *  If instance of HQ Transport Stream outlives the corresponding QUIC socket,
99  *  has been destroyed, this structure will contain the last snapshot
100  *  of the data received from the QUIC socket.
101  *
102  * Usage:
103  *   TransportInfo tinfo;
104  *   txn.getCurrentTransportInfo(&tinfo); // txn is the HTTP transaction object
105  *   auto streamInfo = dynamic_cast<QuicStreamProtocolInfo>(tinfo.protocolInfo);
106  *   if (streamInfo) {
107  *      // stream level AND connection level info is available
108  *   };
109  *   auto connectionInfo = dynamic_cast<QuicProtocolInfo>(tinfo.protocolInfo);
110  *   if (connectionInfo) {
111  *     // ONLY connection level info is available. No stream level info.
112  *   }
113  *
114  */
115 struct QuicStreamProtocolInfo : public QuicProtocolInfo {
116 
117   // Slicing assignment operator to initialize the per-stream protocol info
118   // with the values of the per-session protocol info.
119   QuicStreamProtocolInfo& operator=(const QuicProtocolInfo& other) {
120     if (this != &other) {
121       *(static_cast<QuicProtocolInfo*>(this)) = other;
122     }
123     return *this;
124   }
125 
126   quic::QuicSocket::StreamTransportInfo streamTransportInfo;
127   // NOTE: when the control stream latency stats will be reintroduced,
128   // collect it here.
129 };
130 
131 class HQSession
132     : public quic::QuicSocket::ConnectionCallback
133     , public quic::QuicSocket::ReadCallback
134     , public quic::QuicSocket::WriteCallback
135     , public quic::QuicSocket::DeliveryCallback
136     , public quic::QuicSocket::DatagramCallback
137     , public HTTPSessionBase
138     , public folly::EventBase::LoopCallback
139     , public HQUnidirStreamDispatcher::Callback {
140   // Forward declarations
141  public:
142   class HQStreamTransportBase;
143 
144  protected:
145   class HQStreamTransport;
146 
147  private:
148   class HQControlStream;
149   class H1QFBV1VersionUtils;
150   class H1QFBV2VersionUtils;
151   class HQVersionUtils;
152 
153   static constexpr uint8_t kMaxCodecStackDepth = 3;
154 
155  public:
getMaxAllowedPushId()156   folly::Optional<hq::PushId> getMaxAllowedPushId() {
157     return maxAllowedPushId_;
158   }
159 
setServerPushLifecycleCallback(ServerPushLifecycleCallback * cb)160   void setServerPushLifecycleCallback(ServerPushLifecycleCallback* cb) {
161     serverPushLifecycleCb_ = cb;
162   }
163 
164   class ConnectCallback {
165    public:
~ConnectCallback()166     virtual ~ConnectCallback() {
167     }
168 
169     /**
170      * This function is not terminal of the callback, downstream should expect
171      * onReplaySafe to be invoked after connectSuccess.
172      * onReplaySafe is invoked right after connectSuccess if zero rtt is not
173      * attempted.
174      * In zero rtt case, onReplaySafe might never be invoked if e.g. server
175      * does not respond.
176      */
connectSuccess()177     virtual void connectSuccess() {
178       // Default empty implementation is provided in case downstream does not
179       // attempt zero rtt data.
180     }
181 
182     /**
183      * Terminal callback.
184      */
185     virtual void onReplaySafe() = 0;
186 
187     /**
188      * Terminal callback.
189      */
190     virtual void connectError(
191         std::pair<quic::QuicErrorCode, std::string> code) = 0;
192 
193     /**
194      * Callback for the first time transport has processed a packet from peer.
195      */
onFirstPeerPacketProcessed()196     virtual void onFirstPeerPacketProcessed() {
197     }
198   };
199 
200   virtual ~HQSession();
201 
getType()202   HTTPTransaction::Transport::Type getType() const noexcept override {
203     return HTTPTransaction::Transport::Type::QUIC;
204   }
205 
setSocket(std::shared_ptr<quic::QuicSocket> sock)206   void setSocket(std::shared_ptr<quic::QuicSocket> sock) noexcept {
207     sock_ = sock;
208     if (infoCallback_) {
209       infoCallback_->onCreate(*this);
210     }
211 
212     if (quicInfo_) {
213       quicInfo_->transportSettings = sock_->getTransportSettings();
214     }
215   }
216 
setForceUpstream1_1(bool force)217   void setForceUpstream1_1(bool force) {
218     forceUpstream1_1_ = force;
219   }
setStrictValidation(bool strictValidation)220   void setStrictValidation(bool strictValidation) {
221     strictValidation_ = strictValidation;
222   }
223 
224   void setSessionStats(HTTPSessionStats* stats) override;
225 
226   void onNewBidirectionalStream(quic::StreamId id) noexcept override;
227 
228   void onNewUnidirectionalStream(quic::StreamId id) noexcept override;
229 
230   void onStopSending(quic::StreamId id,
231                      quic::ApplicationErrorCode error) noexcept override;
232 
233   void onConnectionEnd() noexcept override;
234 
235   void onConnectionSetupError(
236       std::pair<quic::QuicErrorCode, std::string> code) noexcept override;
237 
238   void onConnectionError(
239       std::pair<quic::QuicErrorCode, std::string> code) noexcept override;
240 
241   void onKnob(uint64_t knobSpace, uint64_t knobId, quic::Buf knobBlob) override;
242 
243   // returns false in case of failure
244   bool onTransportReadyCommon() noexcept;
245 
246   void onReplaySafe() noexcept override;
247 
248   void onFlowControlUpdate(quic::StreamId id) noexcept override;
249 
250   // quic::QuicSocket::ReadCallback
251   void readAvailable(quic::StreamId id) noexcept override;
252 
253   void readError(
254       quic::StreamId id,
255       std::pair<quic::QuicErrorCode, folly::Optional<folly::StringPiece>>
256           error) noexcept override;
257 
258   // quic::QuicSocket::WriteCallback
259   void onConnectionWriteReady(uint64_t maxToSend) noexcept override;
260 
261   void onConnectionWriteError(
262       std::pair<quic::QuicErrorCode, folly::Optional<folly::StringPiece>>
263           error) noexcept override;
264 
265   // quic::QuicSocket::DatagramCallback
266   void onDatagramsAvailable() noexcept override;
267 
268   // Only for UpstreamSession
269   HTTPTransaction* newTransaction(HTTPTransaction::Handler* handler) override;
270 
271   void startNow() override;
272 
describe(std::ostream & os)273   void describe(std::ostream& os) const override {
274     using quic::operator<<;
275     os << "proto=" << alpn_;
276     auto clientCid = (sock_ && sock_->getClientConnectionId())
277                          ? *sock_->getClientConnectionId()
278                          : quic::ConnectionId({0, 0, 0, 0});
279     auto serverCid = (sock_ && sock_->getServerConnectionId())
280                          ? *sock_->getServerConnectionId()
281                          : quic::ConnectionId({0, 0, 0, 0});
282     if (direction_ == TransportDirection::DOWNSTREAM) {
283       os << ", UA=" << userAgent_ << ", client CID=" << clientCid
284          << ", server CID=" << serverCid << ", downstream=" << getPeerAddress()
285          << ", " << getLocalAddress() << "=local";
286     } else {
287       os << ", client CID=" << clientCid << ", server CID=" << serverCid
288          << ", local=" << getLocalAddress() << ", " << getPeerAddress()
289          << "=upstream";
290     }
291     os << ", drain=" << drainState_;
292   }
293 
294   void onGoaway(uint64_t lastGoodStreamID,
295                 ErrorCode code,
296                 std::unique_ptr<folly::IOBuf> debugData = nullptr);
297 
298   void onSettings(const SettingsList& settings);
299 
300   void onPriority(quic::StreamId streamId, const HTTPPriority& pri);
301   void onPushPriority(hq::PushId pushId, const HTTPPriority& pri);
302 
getTransport()303   folly::AsyncTransport* getTransport() override {
304     return nullptr;
305   }
306 
getEventBase()307   folly::EventBase* getEventBase() const override {
308     if (sock_) {
309       return sock_->getEventBase();
310     }
311     return nullptr;
312   }
313 
getTransport()314   const folly::AsyncTransport* getTransport() const override {
315     return nullptr;
316   }
317 
hasActiveTransactions()318   bool hasActiveTransactions() const override {
319     return getNumStreams() > 0;
320   }
321 
getNumStreams()322   uint32_t getNumStreams() const override {
323     return getNumOutgoingStreams() + getNumIncomingStreams();
324   }
325 
getCodecProtocol()326   CodecProtocol getCodecProtocol() const override {
327     if (!versionUtils_) {
328       // return a default protocol before alpn is set
329       return CodecProtocol::HTTP_1_1;
330     }
331     return versionUtils_->getCodecProtocol();
332   }
333 
334   // for testing only
getDispatcher()335   HQUnidirStreamDispatcher* getDispatcher() {
336     return &unidirectionalReadDispatcher_;
337   }
338 
339   /**
340    * Set flow control properties on an already started session.
341    * QUIC requires both stream and connection flow control window sizes to be
342    * specified in the initial transport handshake. Specifying
343    * SETTINGS_INITIAL_WINDOW_SIZE in the SETTINGS frame is an error.
344    *
345    * @param initialReceiveWindow      (unused)
346    * @param receiveStreamWindowSize   per-stream receive window for NEW streams;
347    * @param receiveSessionWindowSize  per-session receive window;
348    */
setFlowControl(size_t,size_t receiveStreamWindowSize,size_t receiveSessionWindowSize)349   void setFlowControl(size_t /* initialReceiveWindow */,
350                       size_t receiveStreamWindowSize,
351                       size_t receiveSessionWindowSize) override {
352     if (sock_) {
353       sock_->setConnectionFlowControlWindow(receiveSessionWindowSize);
354     }
355     receiveStreamWindowSize_ = (uint32_t)receiveStreamWindowSize;
356     HTTPSessionBase::setReadBufferLimit((uint32_t)receiveSessionWindowSize);
357   }
358 
359   /**
360    * Set outgoing settings for this session
361    */
setEgressSettings(const SettingsList & settings)362   void setEgressSettings(const SettingsList& settings) override {
363     for (const auto& setting : settings) {
364       egressSettings_.setSetting(setting.id, setting.value);
365     }
366     const auto maxHeaderListSize =
367         egressSettings_.getSetting(SettingsId::MAX_HEADER_LIST_SIZE);
368     if (maxHeaderListSize) {
369       versionUtilsReady_.then([this, size = maxHeaderListSize->value] {
370         versionUtils_->setMaxUncompressed(size);
371       });
372     }
373     auto datagramEnabled = egressSettings_.getSetting(SettingsId::_HQ_DATAGRAM);
374     // if enabling H3 datagrams check that the transport supports datagrams
375     if (datagramEnabled && datagramEnabled->value) {
376       datagramEnabled_ = true;
377     }
378   }
379 
setMaxConcurrentIncomingStreams(uint32_t)380   void setMaxConcurrentIncomingStreams(uint32_t /*num*/) override {
381     // need transport API
382   }
383 
384   /**
385    * Send a settings frame
386    */
387   size_t sendSettings() override;
388 
389   /**
390    * Causes a ping to be sent on the session. If the underlying protocol
391    * doesn't support pings, this will return 0. Otherwise, it will return
392    * the number of bytes written on the transport to send the ping.
393    */
sendPing()394   size_t sendPing() override {
395     sock_->sendPing(nullptr, std::chrono::milliseconds(0));
396     return 0;
397   }
398 
399   /**
400    * Sends a knob frame on the session.
401    */
sendKnob(uint64_t knobSpace,uint64_t knobId,quic::Buf knobBlob)402   folly::Expected<folly::Unit, quic::LocalErrorCode> sendKnob(
403       uint64_t knobSpace, uint64_t knobId, quic::Buf knobBlob) {
404     return sock_->setKnob(knobSpace, knobId, std::move(knobBlob));
405   }
406 
407   /**
408    * Sends a priority message on this session.  If the underlying protocol
409    * doesn't support priority, this is a no-op.  A new stream identifier will
410    * be selected and returned.
411    */
sendPriority(http2::PriorityUpdate)412   HTTPCodec::StreamID sendPriority(http2::PriorityUpdate /*pri*/) override {
413     return 0;
414   }
415 
416   /**
417    * As above, but updates an existing priority node.  Do not use for
418    * real nodes, prefer HTTPTransaction::changePriority.
419    */
sendPriority(HTTPCodec::StreamID,http2::PriorityUpdate)420   size_t sendPriority(HTTPCodec::StreamID /*id*/,
421                       http2::PriorityUpdate /*pri*/) override {
422     return 0;
423   }
424 
425   size_t sendPriority(HTTPCodec::StreamID id, HTTPPriority pri);
426   size_t sendPushPriority(hq::PushId pushId, HTTPPriority pri);
427 
428   /**
429    * Get session-level transport info.
430    * NOTE: The protocolInfo will be set to connection-level pointer.
431    */
432   bool getCurrentTransportInfo(wangle::TransportInfo* /*tinfo*/) override;
433 
434   /**
435    *  Get session level AND stream level transport info.
436    *  NOTE: the protocolInfo will be set to stream-level pointer.
437    */
438   bool getCurrentStreamTransportInfo(QuicStreamProtocolInfo* /*qspinfo*/,
439                                      quic::StreamId /*streamId*/);
440 
connCloseByRemote()441   bool connCloseByRemote() override {
442     return false;
443   }
444 
445   // From ManagedConnection
446   void timeoutExpired() noexcept override;
447 
isBusy()448   bool isBusy() const override {
449     return getNumStreams() > 0;
450   }
451   void notifyPendingShutdown() override;
452   void closeWhenIdle() override;
453   void dropConnection(const std::string& errorMsg = "") override;
dumpConnectionState(uint8_t)454   void dumpConnectionState(uint8_t /*loglevel*/) override {
455   }
456 
457   virtual void injectTraceEventIntoAllTransactions(TraceEvent& event) override;
458 
459   /*
460    * dropConnectionSync drops the connection immediately.
461    * This means that when invoked internally may need a destructor guard and
462    * the socket will be invalid after it is invoked.
463    *
464    * errorCode is passed to transport CLOSE_CONNNECTION frame
465    *
466    * proxygenError is delivered to open transactions
467    */
468   void dropConnectionSync(std::pair<quic::QuicErrorCode, std::string> errorCode,
469                           ProxygenError proxygenError);
470 
471   // Invokes dropConnectionSync at the beginning of the next loopCallback
472   void dropConnectionAsync(
473       std::pair<quic::QuicErrorCode, std::string> errorCode,
474       ProxygenError proxygenError);
475 
476   bool getCurrentTransportInfoWithoutUpdate(
477       wangle::TransportInfo* /*tinfo*/) const override;
478 
setHeaderCodecStats(HeaderCodec::Stats * stats)479   void setHeaderCodecStats(HeaderCodec::Stats* stats) override {
480     versionUtilsReady_.then(
481         [this, stats] { versionUtils_->setHeaderCodecStats(stats); });
482   }
483 
enableDoubleGoawayDrain()484   void enableDoubleGoawayDrain() override {
485   }
486 
487   // Upstream interface
isReusable()488   bool isReusable() const override {
489     VLOG(4) << __func__ << " sess=" << *this;
490     return !isClosing();
491   }
492 
isClosing()493   bool isClosing() const override {
494     VLOG(4) << __func__ << " sess=" << *this;
495     return (drainState_ != DrainState::NONE || dropping_);
496   }
497 
drain()498   void drain() override {
499     notifyPendingShutdown();
500   }
501 
getHTTPPriority(uint8_t)502   folly::Optional<const HTTPMessage::HTTP2Priority> getHTTPPriority(
503       uint8_t /*level*/) override {
504     return folly::none;
505   }
506 
getQuicSocket()507   quic::QuicSocket* getQuicSocket() const {
508     return sock_.get();
509   }
510 
511   // Override HTTPSessionBase address getter functions
getLocalAddress()512   const folly::SocketAddress& getLocalAddress() const noexcept override {
513     return sock_ && sock_->good() ? sock_->getLocalAddress() : localAddr_;
514   }
515 
getPeerAddress()516   const folly::SocketAddress& getPeerAddress() const noexcept override {
517     return sock_ && sock_->good() ? sock_->getPeerAddress() : peerAddr_;
518   }
519 
520   // Returns creation time point for logging of handshake duration
getCreatedTime()521   const std::chrono::steady_clock::time_point& getCreatedTime() const {
522     return createTime_;
523   }
524 
enablePingProbes(std::chrono::seconds,std::chrono::seconds,bool,bool)525   void enablePingProbes(std::chrono::seconds /*interval*/,
526                         std::chrono::seconds /*timeout*/,
527                         bool /*extendIntervalOnIngress*/,
528                         bool /*immediate*/) override {
529     // TODO
530   }
531 
532  protected:
533   // Finds any transport-like stream that has not been detached
534   // by quic stream id
535   HQStreamTransportBase* findNonDetachedStream(quic::StreamId streamId);
536 
537   //  Find any transport-like stream by quic stream id
538   HQStreamTransportBase* findStream(quic::StreamId streamId);
539 
540   // Find any transport-like stream suitable for ingress (request/push-ingress)
541   HQStreamTransportBase* findIngressStream(quic::StreamId streamId,
542                                            bool includeDetached = false);
543   // Find any transport-like stream suitable for egress (request/push-egress)
544   HQStreamTransportBase* findEgressStream(quic::StreamId streamId,
545                                           bool includeDetached = false);
546 
547   /**
548    * The following functions invoke a callback on all or on all non-detached
549    * request streams. It does an extra lookup per stream but it is safe. Note
550    * that if the callback *adds* streams, they will not get the callback.
551    */
invokeOnAllStreams(std::function<void (HQStreamTransportBase *)> fn)552   void invokeOnAllStreams(std::function<void(HQStreamTransportBase*)> fn) {
553     invokeOnStreamsImpl(
554         std::move(fn),
555         [this](quic::StreamId id) { return this->findStream(id); },
556         true);
557   }
558 
559   void invokeOnEgressStreams(std::function<void(HQStreamTransportBase*)> fn,
560                              bool includeDetached = false) {
561     invokeOnStreamsImpl(std::move(fn),
562                         [this, includeDetached](quic::StreamId id) {
563                           return this->findEgressStream(id, includeDetached);
564                         });
565   }
566 
567   void invokeOnIngressStreams(std::function<void(HQStreamTransportBase*)> fn,
568                               bool includeDetached = false) {
569     invokeOnStreamsImpl(
570         std::move(fn),
571         [this, includeDetached](quic::StreamId id) {
572           return this->findIngressStream(id, includeDetached);
573         },
574         true);
575   }
576 
invokeOnNonDetachedStreams(std::function<void (HQStreamTransportBase *)> fn)577   void invokeOnNonDetachedStreams(
578       std::function<void(HQStreamTransportBase*)> fn) {
579     invokeOnStreamsImpl(std::move(fn), [this](quic::StreamId id) {
580       return this->findNonDetachedStream(id);
581     });
582   }
583 
584   virtual HQStreamTransportBase* findPushStream(quic::StreamId) = 0;
585 
586   virtual void findPushStreams(
587       std::unordered_set<HQStreamTransportBase*>& streams) = 0;
588 
589   // Apply the function on the streams found by the two locators.
590   // Note that same stream can be returned by a find-by-stream-id
591   // and find-by-push-id locators.
592   // This is mitigated by collecting the streams in an unordered set
593   // prior to application of the funtion
594   // Note that the function is allowed to delete a stream by invoking
595   // erase stream, but the locators are not allowed to do so.
596   // Note that neither the locators nor the function are allowed
597   // to call "invokeOnStreamsImpl"
598   void invokeOnStreamsImpl(
599       std::function<void(HQStreamTransportBase*)> fn,
600       std::function<HQStreamTransportBase*(quic::StreamId)> findByStreamIdFn,
601       bool includePush = false) {
602     DestructorGuard g(this);
603     std::unordered_set<HQStreamTransportBase*> streams;
604     streams.reserve(getNumStreams());
605 
606     for (const auto& txn : streams_) {
607       HQStreamTransportBase* pstream = findByStreamIdFn(txn.first);
608       if (pstream) {
609         streams.insert(pstream);
610       }
611     }
612 
613     if (includePush) {
614       findPushStreams(streams);
615     }
616 
617     for (HQStreamTransportBase* pstream : streams) {
618       CHECK(pstream);
619       fn(pstream);
620     }
621   }
622 
623   // Erase the stream. Returns true if the stream
624   // has been erased
625   bool eraseStream(quic::StreamId);
626 
627   virtual bool erasePushStream(quic::StreamId streamId) = 0;
628 
resumeReadsForPushStream(quic::StreamId streamId)629   void resumeReadsForPushStream(quic::StreamId streamId) {
630     pendingProcessReadSet_.insert(streamId);
631     resumeReads(streamId);
632   }
633 
634   // Find a control stream by type
635   HQControlStream* findControlStream(hq::UnidirectionalStreamType streamType);
636 
637   // Find a control stream by stream id (either ingress or egress)
638   HQControlStream* findControlStream(quic::StreamId streamId);
639 
createIngressPushStream(quic::StreamId,hq::PushId)640   virtual HQStreamTransportBase* createIngressPushStream(quic::StreamId,
641                                                          hq::PushId) {
642     return nullptr;
643   }
644 
eraseUnboundStream(HQStreamTransportBase *)645   virtual void eraseUnboundStream(HQStreamTransportBase*) {
646   }
647 
newPushedTransaction(HTTPCodec::StreamID,HTTPTransaction::PushHandler *,ProxygenError *)648   virtual HTTPTransaction* newPushedTransaction(
649       HTTPCodec::StreamID,           /* parentRequestStreamId */
650       HTTPTransaction::PushHandler*, /* handler */
651       ProxygenError*) {
652     return nullptr;
653   }
654 
655   /*
656    * for HQ we need a read callback for unidirectional streams to read the
657    * stream type from the the wire to decide whether a stream is
658    * a control stream, a header codec/decoder stream or a push stream
659    *
660    * This part is now implemented in HQUnidirStreamDispatcher
661    */
662 
663   // Callback methods that are invoked by the dispatcher
664   void assignPeekCallback(
665       quic::StreamId /* id */,
666       hq::UnidirectionalStreamType /* type */,
667       size_t /* toConsume */,
668       quic::QuicSocket::PeekCallback* const /* cb */) override;
669 
670   void assignReadCallback(
671       quic::StreamId /* id */,
672       hq::UnidirectionalStreamType /* type */,
673       size_t /* toConsume */,
674       quic::QuicSocket::ReadCallback* const /* cb */) override;
675 
676   void rejectStream(quic::StreamId /* id */) override;
677 
678   folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface(
679       uint64_t preface) override;
680 
681   void controlStreamReadAvailable(quic::StreamId /* id */) override;
682 
683   void controlStreamReadError(
684       quic::StreamId /* id */,
685       const HQUnidirStreamDispatcher::Callback::ReadError& /* err */) override;
686 
687   /**
688    * HQSession is an HTTPSessionBase that uses QUIC as the underlying transport
689    *
690    * HQSession is an abstract base class and cannot be instantiated
691    * directly. If you want to handle requests and send responses (act as a
692    * server), construct a HQDownstreamSession. If you want to make
693    * requests and handle responses (act as a client), construct a
694    * HQUpstreamSession.
695    */
696   HQSession(const std::chrono::milliseconds transactionsTimeout,
697             HTTPSessionController* controller,
698             proxygen::TransportDirection direction,
699             const wangle::TransportInfo& tinfo,
700             InfoCallback* sessionInfoCb,
701             folly::Function<void(HTTPCodecFilterChain& chain)>
702             /* codecFilterCallbackFn */
703             = nullptr)
HTTPSessionBase(folly::SocketAddress (),folly::SocketAddress (),controller,tinfo,sessionInfoCb,std::make_unique<HTTP1xCodec> (direction),WheelTimerInstance (),hq::kSessionStreamId)704       : HTTPSessionBase(folly::SocketAddress(),
705                         folly::SocketAddress(),
706                         controller,
707                         tinfo,
708                         sessionInfoCb,
709                         std::make_unique<HTTP1xCodec>(direction),
710                         WheelTimerInstance(),
711                         hq::kSessionStreamId),
712         direction_(direction),
713         transactionsTimeout_(transactionsTimeout),
714         started_(false),
715         dropping_(false),
716         inLoopCallback_(false),
717         unidirectionalReadDispatcher_(*this, direction),
718         createTime_(std::chrono::steady_clock::now()) {
719     codec_.add<HTTPChecks>();
720     // dummy, ingress, egress
721     codecStack_.reserve(kMaxCodecStackDepth);
722     codecStack_.emplace_back(nullptr, nullptr, nullptr);
723 
724     attachToSessionController();
725     nextEgressResults_.reserve(maxConcurrentIncomingStreams_);
726     quicInfo_ = std::make_shared<QuicProtocolInfo>();
727   }
728 
729   // EventBase::LoopCallback methods
730   void runLoopCallback() noexcept override;
731 
732   /**
733    * Called by transactionTimeout if the transaction has no handler.
734    */
735   virtual HTTPTransaction::Handler* getTransactionTimeoutHandler(
736       HTTPTransaction* txn) = 0;
737 
738   /**
739    * Called by onHeadersComplete(). This function allows downstream and
740    * upstream to do any setup (like preparing a handler) when headers are
741    * first received from the remote side on a given transaction.
742    */
743   virtual void setupOnHeadersComplete(HTTPTransaction* txn,
744                                       HTTPMessage* msg) = 0;
745 
746   virtual void onConnectionErrorHandler(
747       std::pair<quic::QuicErrorCode, std::string> error) noexcept = 0;
748 
749   void applySettings(const SettingsList& settings);
750 
connectSuccess()751   virtual void connectSuccess() noexcept {
752   }
753 
isPeerUniStream(quic::StreamId id)754   bool isPeerUniStream(quic::StreamId id) {
755     return sock_->isUnidirectionalStream(id) &&
756            ((direction_ == TransportDirection::DOWNSTREAM &&
757              sock_->isClientStream(id)) ||
758             (direction_ == TransportDirection::UPSTREAM &&
759              sock_->isServerStream(id)));
760   }
761 
isSelfUniStream(quic::StreamId id)762   bool isSelfUniStream(quic::StreamId id) {
763     return sock_->isUnidirectionalStream(id) &&
764            ((direction_ == TransportDirection::DOWNSTREAM &&
765              sock_->isServerStream(id)) ||
766             (direction_ == TransportDirection::UPSTREAM &&
767              sock_->isClientStream(id)));
768   }
769 
770   void abortStream(HTTPException::Direction dir,
771                    quic::StreamId id,
772                    HTTP3::ErrorCode err);
773 
774   // Get extra HTTP headers we want to add to the HTTPMessage in sendHeaders.
getExtraHeaders(const HTTPMessage &,quic::StreamId)775   virtual folly::Optional<HTTPHeaders> getExtraHeaders(const HTTPMessage&,
776                                                        quic::StreamId) {
777     return folly::none;
778   }
779 
780   proxygen::TransportDirection direction_;
781   std::chrono::milliseconds transactionsTimeout_;
782   TimePoint transportStart_;
783 
784   std::shared_ptr<quic::QuicSocket> sock_;
785 
786   // Callback pointer used for correctness testing. Not used
787   // for session logic.
788   ServerPushLifecycleCallback* serverPushLifecycleCb_{nullptr};
789 
790  private:
791   std::unique_ptr<HTTPCodec> createStreamCodec(quic::StreamId streamId);
792 
793   // Creates a request stream. All streams that are not control streams
794   // or Push streams are request streams.
795   HQStreamTransport* createStreamTransport(quic::StreamId streamId);
796 
797   bool createEgressControlStreams();
798   HQControlStream* tryCreateIngressControlStream(quic::StreamId id,
799                                                  uint64_t preface);
800 
801   // Creates outgoing control stream.
802   bool createEgressControlStream(hq::UnidirectionalStreamType streamType);
803 
804   // Creates incoming control stream
805   HQControlStream* createIngressControlStream(
806       quic::StreamId id, hq::UnidirectionalStreamType streamType);
807 
cleanupUnboundPushStreams(std::vector<quic::StreamId> &)808   virtual void cleanupUnboundPushStreams(std::vector<quic::StreamId>&) {
809   }
810 
811   // gets the ALPN from the transport and returns whether the protocol is
812   // supported. Drops the connection if not supported
813   bool getAndCheckApplicationProtocol();
814 
815   // Use ALPN to set the correct version utils strategy.
816   void setVersionUtils();
817 
818   // Used during 2-phased GOAWAY messages, and EOF sending.
819   void onDeliveryAck(quic::StreamId id,
820                      uint64_t offset,
821                      std::chrono::microseconds rtt) override;
822 
823   void onCanceled(quic::StreamId id, uint64_t offset) override;
824 
825   // helper functions for reads
826   void readRequestStream(quic::StreamId id) noexcept;
827   void readControlStream(HQControlStream* controlStream);
828 
829   // Runs the codecs on all request streams that have received data
830   // during the last event loop
831   void processReadData();
832 
833   // Pausing reads prevents the read callback to be invoked on the stream
834   void resumeReads(quic::StreamId id);
835 
836   // Resume all ingress transactions
837   void resumeReads();
838 
839   // Resuming the reads allows the read callback to be involved
840   void pauseReads(quic::StreamId id);
841 
842   // Pause all ingress transactions
843   void pauseReads();
844 
845   void notifyEgressBodyBuffered(int64_t bytes);
846 
847   // The max allowed push id value. Value folly::none indicates that
848   // a. For downstream session: MAX_PUSH_ID has not been received
849   // b. For upstream session: MAX_PUSH_ID has been explicitly set to none
850   // In both cases, maxAllowedPushId_ == folly::none means that no push id
851   // is allowed. Default to kEightByteLimit assuming this session will
852   // be using push.
853   folly::Optional<hq::PushId> maxAllowedPushId_{folly::none};
854 
855   // Schedule the loop callback.
856   // To keep this consistent with EventBase::runInLoop run in the next loop
857   // by default
858   void scheduleLoopCallback(bool thisIteration = false);
859 
860   // helper functions for writes
861   uint64_t writeRequestStreams(uint64_t maxEgress) noexcept;
862   void scheduleWrite();
863   void handleWriteError(HQStreamTransportBase* hqStream,
864                         quic::QuicErrorCode err);
865 
866   /**
867    * Handles the write to the socket and errors for a request stream.
868    * Returns the number of bytes written from data.
869    */
870   template <typename WriteFunc, typename DataType>
871   size_t handleWrite(WriteFunc writeFunc,
872                      HQStreamTransportBase* hqStream,
873                      DataType dataType,
874                      size_t dataChainLen,
875                      bool sendEof);
876 
877   /**
878    * Helper function to perform writes on a single request stream
879    * The first argument defines whether the implementation should
880    * call onWriteReady on the transaction to get data allocated
881    * in the write buffer.
882    * Returns the number of bytes written to the transport
883    */
884   uint64_t requestStreamWriteImpl(HQStreamTransportBase* hqStream,
885                                   uint64_t maxEgress,
886                                   double ratio);
887 
888   uint64_t writeControlStreams(uint64_t maxEgress);
889   uint64_t controlStreamWriteImpl(HQControlStream* ctrlStream,
890                                   uint64_t maxEgress);
891   void handleSessionError(HQStreamBase* stream,
892                           hq::StreamDirection streamDir,
893                           quic::QuicErrorCode err,
894                           ProxygenError proxygenError);
895 
896   void detachStreamTransport(HQStreamTransportBase* hqStream);
897 
898   void drainImpl();
899 
900   void checkForShutdown();
901   void onGoawayAck();
902   quic::StreamId getGoawayStreamId();
903 
904   void errorOnTransactionId(quic::StreamId id, HTTPException ex);
905 
906   /**
907    * Shared implementation of "findXXXstream" methods
908    */
909   HQStreamTransportBase* findStreamImpl(quic::StreamId streamId,
910                                         bool includeEgress = true,
911                                         bool includeIngress = true,
912                                         bool includeDetached = true);
913 
914   /**
915    * Shared implementation of "numberOfXXX" methods
916    */
917   uint32_t countStreamsImpl(bool includeEgress = true,
918                             bool includeIngress = true) const;
919 
920   std::list<folly::AsyncTransport::ReplaySafetyCallback*>
921       waitingForReplaySafety_;
922 
923   /**
924    * With HTTP/1.1 codecs, graceful shutdown happens when the session has sent
925    * and received a Connection: close header, and all streams have completed.
926    *
927    * The application can signal intent to drain by calling notifyPendingShutdown
928    * (or its alias, drain).  The peer can signal intent to drain by including
929    * a Connection: close header.
930    *
931    * closeWhenIdle will bypass the requirement to send/receive Connection:
932    * close, and the socket will terminate as soon as the stream count reaches 0.
933    *
934    * dropConnection will forcibly close all streams and guarantee that the
935    * HQSession has been deleted before exiting.
936    *
937    * The intent is that an application will first notifyPendingShutdown() all
938    * open sessions.  Then after some period of time, it will call closeWhenIdle.
939    * As a last resort, it will call dropConnection.
940    *
941    * Note we allow the peer to create streams after draining because of out
942    * of order delivery.
943    *
944    * drainState_ tracks the progress towards shutdown.
945    *
946    *  NONE - no shutdown requested
947    *  PENDING - shutdown requested but no Connection: close seen
948    *  CLOSE_SENT - sent Connection: close but not received
949    *  CLOSE_RECEIVED - received Connection: close but not sent
950    *  DONE - sent and received Connection: close.
951    *
952    *  NONE ---> PENDING ---> CLOSE_SENT --+--> DONE
953    *    |          |                      |
954    *    +----------+-------> CLOSE_RECV --+
955    *
956    * For sessions with a control stream shutdown is driven by GOAWAYs.
957    * Only the server can send GOAWAYs so the behavior is asymmetric between
958    * upstream and downstream
959    *
960    *  NONE - no shutdown requested
961    *  PENDING - shutdown requested but no GOAWAY sent/received yet
962    *  FIRST_GOAWAY - first GOAWAY received/sent
963    *  SECOND_GOAWAY - downstream only - second GOAWAY sent
964    *  DONE - two GOAWAYs sent/received. can close when request streams are done
965    *
966    */
967   enum DrainState : uint8_t {
968     NONE = 0,
969     PENDING = 1,
970     CLOSE_SENT = 2,
971     CLOSE_RECEIVED = 3,
972     FIRST_GOAWAY = 4,
973     SECOND_GOAWAY = 5,
974     DONE = 6
975   };
976 
977   DrainState drainState_{DrainState::NONE};
978   bool started_ : 1;
979   bool dropping_ : 1;
980   bool inLoopCallback_ : 1;
981   folly::Optional<
982       std::pair<std::pair<quic::QuicErrorCode, std::string>, ProxygenError>>
983       dropInNextLoop_;
984 
985 #ifdef _MSC_VER
986 #pragma warning(push)
987 #pragma warning(disable : 4250) // inherits 'proxygen::detail::..' via dominance
988 #endif
989 
990   // A control stream is created as egress first, then the ingress counterpart
991   // is linked as soon as we read the stream preface on the associated stream
992   class HQControlStream
993       : public detail::composite::CSBidir
994       , public HQStreamBase
995       , public hq::HQUnidirectionalCodec::Callback
996       , public quic::QuicSocket::DeliveryCallback {
997    public:
998     HQControlStream() = delete;
HQControlStream(HQSession & session,quic::StreamId egressStreamId,hq::UnidirectionalStreamType type)999     HQControlStream(HQSession& session,
1000                     quic::StreamId egressStreamId,
1001                     hq::UnidirectionalStreamType type)
1002         : detail::composite::CSBidir(egressStreamId, folly::none),
1003           HQStreamBase(session, session.codec_, type) {
1004       createEgressCodec();
1005     }
1006 
createEgressCodec()1007     void createEgressCodec() {
1008       CHECK(type_.has_value());
1009       switch (*type_) {
1010         case hq::UnidirectionalStreamType::H1Q_CONTROL:
1011         case hq::UnidirectionalStreamType::CONTROL:
1012           realCodec_ =
1013               std::make_unique<hq::HQControlCodec>(getEgressStreamId(),
1014                                                    session_.direction_,
1015                                                    hq::StreamDirection::EGRESS,
1016                                                    session_.egressSettings_,
1017                                                    *type_);
1018           break;
1019         case hq::UnidirectionalStreamType::QPACK_ENCODER:
1020         case hq::UnidirectionalStreamType::QPACK_DECODER:
1021           // These are statically allocated in the session
1022           break;
1023         default:
1024           LOG(FATAL)
1025               << "Failed to create egress codec."
1026               << " unrecognized stream type=" << static_cast<uint64_t>(*type_);
1027       }
1028     }
1029 
setIngressCodec(std::unique_ptr<hq::HQUnidirectionalCodec> codec)1030     void setIngressCodec(std::unique_ptr<hq::HQUnidirectionalCodec> codec) {
1031       ingressCodec_ = std::move(codec);
1032     }
1033 
1034     void processReadData();
1035 
1036     // QuicSocket::DeliveryCallback
1037     void onDeliveryAck(quic::StreamId id,
1038                        uint64_t offset,
1039                        std::chrono::microseconds rtt) override;
1040     void onCanceled(quic::StreamId id, uint64_t offset) override;
1041 
1042     // HTTPCodec::Callback
onMessageBegin(HTTPCodec::StreamID,HTTPMessage *)1043     void onMessageBegin(HTTPCodec::StreamID /*stream*/,
1044                         HTTPMessage* /*msg*/) override {
1045       LOG(FATAL) << __func__ << " called on a Control Stream.";
1046     }
1047 
onHeadersComplete(HTTPCodec::StreamID,std::unique_ptr<HTTPMessage>)1048     void onHeadersComplete(HTTPCodec::StreamID /*stream*/,
1049                            std::unique_ptr<HTTPMessage> /*msg*/) override {
1050       LOG(FATAL) << __func__ << " called on a Control Stream.";
1051     }
1052 
onBody(HTTPCodec::StreamID,std::unique_ptr<folly::IOBuf>,uint16_t)1053     void onBody(HTTPCodec::StreamID /*stream*/,
1054                 std::unique_ptr<folly::IOBuf> /*chain*/,
1055                 uint16_t /*padding*/) override {
1056       LOG(FATAL) << __func__ << " called on a Control Stream.";
1057     }
1058 
onTrailersComplete(HTTPCodec::StreamID,std::unique_ptr<HTTPHeaders>)1059     void onTrailersComplete(
1060         HTTPCodec::StreamID /*stream*/,
1061         std::unique_ptr<HTTPHeaders> /*trailers*/) override {
1062       LOG(FATAL) << __func__ << " called on a Control Stream.";
1063     }
1064 
onMessageComplete(HTTPCodec::StreamID,bool)1065     void onMessageComplete(HTTPCodec::StreamID /*stream*/,
1066                            bool /*upgrade*/) override {
1067       LOG(FATAL) << __func__ << " called on a Control Stream.";
1068     }
1069 
1070     void onError(HTTPCodec::StreamID /*stream*/,
1071                  const HTTPException& /*error*/,
1072                  bool /* newTxn */ = false) override;
1073 
1074     void onGoaway(uint64_t lastGoodStreamID,
1075                   ErrorCode code,
1076                   std::unique_ptr<folly::IOBuf> debugData = nullptr) override {
1077       session_.onGoaway(lastGoodStreamID, code, std::move(debugData));
1078     }
1079 
onSettings(const SettingsList & settings)1080     void onSettings(const SettingsList& settings) override {
1081       session_.onSettings(settings);
1082     }
1083 
onPriority(HTTPCodec::StreamID id,const HTTPPriority & pri)1084     void onPriority(HTTPCodec::StreamID id, const HTTPPriority& pri) override {
1085       session_.onPriority(id, pri);
1086     }
1087 
onPushPriority(HTTPCodec::StreamID id,const HTTPPriority & pri)1088     void onPushPriority(HTTPCodec::StreamID id,
1089                         const HTTPPriority& pri) override {
1090       session_.onPushPriority(id, pri);
1091     }
1092 
1093     std::unique_ptr<hq::HQUnidirectionalCodec> ingressCodec_;
1094     bool readEOF_{false};
1095   }; // HQControlStream
1096 
1097  public:
1098   class HQStreamTransportBase
1099       : public HQStreamBase
1100       , public HTTPTransaction::Transport
1101       , public HTTP2PriorityQueueBase
1102       , public quic::QuicSocket::DeliveryCallback {
1103    protected:
1104     HQStreamTransportBase(
1105         HQSession& session,
1106         TransportDirection direction,
1107         quic::StreamId streamId,
1108         uint32_t seqNo,
1109         const WheelTimerInstance& wheelTimer,
1110         HTTPSessionStats* stats = nullptr,
1111         http2::PriorityUpdate priority = hqDefaultPriority,
1112         folly::Optional<HTTPCodec::StreamID> parentTxnId = HTTPCodec::NoStream,
1113         folly::Optional<hq::UnidirectionalStreamType> type = folly::none);
1114 
1115     void initCodec(std::unique_ptr<HTTPCodec> /* codec */,
1116                    const std::string& /* where */);
1117 
1118     void initIngress(const std::string& /* where */);
1119 
getHTTPSessionBase()1120     HTTPSessionBase* getHTTPSessionBase() override {
1121       return &(getSession());
1122     }
1123 
1124    public:
1125     HQStreamTransportBase() = delete;
1126 
hasCodec()1127     bool hasCodec() const {
1128       return hasCodec_;
1129     }
1130 
hasIngress()1131     bool hasIngress() const {
1132       return hasIngress_;
1133     }
1134 
1135     // process data in the read buffer, returns true if the codec is blocked
1136     bool processReadData();
1137 
1138     // Process data from QUIC onDataAvailable callback.
1139     void processPeekData(
1140         const folly::Range<quic::QuicSocket::PeekIterator>& peekData);
1141 
1142     // QuicSocket::DeliveryCallback
1143     void onDeliveryAck(quic::StreamId id,
1144                        uint64_t offset,
1145                        std::chrono::microseconds rtt) override;
1146 
1147     void onCanceled(quic::StreamId id, uint64_t offset) override;
1148 
1149     // HTTPCodec::Callback methods
1150     void onMessageBegin(HTTPCodec::StreamID streamID,
1151                         HTTPMessage* /* msg */) override;
1152 
1153     void onPushMessageBegin(HTTPCodec::StreamID /* pushID */,
1154                             HTTPCodec::StreamID /* parentTxnId */,
1155                             HTTPMessage* /* msg */) override;
1156 
onExMessageBegin(HTTPCodec::StreamID,HTTPCodec::StreamID,bool,HTTPMessage *)1157     void onExMessageBegin(HTTPCodec::StreamID /* streamID */,
1158                           HTTPCodec::StreamID /* controlStream */,
1159                           bool /* unidirectional */,
1160                           HTTPMessage* /* msg */) override {
1161       LOG(ERROR) << "exMessage: txn=" << txn_ << " TODO";
1162     }
1163 
onPushPromiseHeadersComplete(hq::PushId,HTTPCodec::StreamID,std::unique_ptr<HTTPMessage>)1164     virtual void onPushPromiseHeadersComplete(
1165         hq::PushId /* pushID */,
1166         HTTPCodec::StreamID /* assoc streamID */,
1167         std::unique_ptr<HTTPMessage> /* msg */) {
1168       LOG(ERROR) << "push promise: txn=" << txn_ << " TODO";
1169     }
1170 
1171     void onHeadersComplete(HTTPCodec::StreamID streamID,
1172                            std::unique_ptr<HTTPMessage> msg) override;
1173 
onBody(HTTPCodec::StreamID,std::unique_ptr<folly::IOBuf> chain,uint16_t padding)1174     void onBody(HTTPCodec::StreamID /* streamID */,
1175                 std::unique_ptr<folly::IOBuf> chain,
1176                 uint16_t padding) override {
1177       VLOG(4) << __func__ << " txn=" << txn_;
1178       CHECK(chain);
1179       auto len = chain->computeChainDataLength();
1180       if (session_.onBodyImpl(std::move(chain), len, padding, &txn_)) {
1181         session_.pauseReads();
1182       };
1183     }
1184 
onChunkHeader(HTTPCodec::StreamID,size_t length)1185     void onChunkHeader(HTTPCodec::StreamID /* stream */,
1186                        size_t length) override {
1187       VLOG(4) << __func__ << " txn=" << txn_;
1188       txn_.onIngressChunkHeader(length);
1189     }
1190 
onChunkComplete(HTTPCodec::StreamID)1191     void onChunkComplete(HTTPCodec::StreamID /* stream */) override {
1192       VLOG(4) << __func__ << " txn=" << txn_;
1193       txn_.onIngressChunkComplete();
1194     }
1195 
onTrailersComplete(HTTPCodec::StreamID,std::unique_ptr<HTTPHeaders> trailers)1196     void onTrailersComplete(HTTPCodec::StreamID /* streamID */,
1197                             std::unique_ptr<HTTPHeaders> trailers) override {
1198       VLOG(4) << __func__ << " txn=" << txn_;
1199       txn_.onIngressTrailers(std::move(trailers));
1200     }
1201 
onMessageComplete(HTTPCodec::StreamID,bool)1202     void onMessageComplete(HTTPCodec::StreamID /* streamID */,
1203                            bool /* upgrade */) override {
1204       VLOG(4) << __func__ << " txn=" << txn_;
1205       // for 1xx responses (excluding 101) onMessageComplete may be called
1206       // more than once
1207       if (txn_.isUpstream() && txn_.extraResponseExpected()) {
1208         return;
1209       }
1210       if (session_.infoCallback_) {
1211         session_.infoCallback_->onRequestEnd(session_,
1212                                              txn_.getMaxDeferredSize());
1213       }
1214       // Pause the parser, which will prevent more than one message from being
1215       // processed
1216       auto g = folly::makeGuard(setActiveCodec(__func__));
1217       codecFilterChain->setParserPaused(true);
1218       eomGate_.set(EOMType::CODEC);
1219     }
1220 
onIngressEOF()1221     void onIngressEOF() {
1222       // Can only call this once
1223       CHECK(!eomGate_.get(EOMType::TRANSPORT));
1224       if (ingressError_) {
1225         // This codec has already errored, no need to give it more input
1226         return;
1227       }
1228       auto g = folly::makeGuard(setActiveCodec(__func__));
1229       codecFilterChain->onIngressEOF();
1230       eomGate_.set(EOMType::TRANSPORT);
1231     }
1232 
1233     void onError(HTTPCodec::StreamID streamID,
1234                  const HTTPException& error,
1235                  bool newTxn) override;
1236 
1237     // Invoked when we get a RST_STREAM from the transport
1238     void onResetStream(HTTP3::ErrorCode error, HTTPException ex);
1239 
onAbort(HTTPCodec::StreamID,ErrorCode)1240     void onAbort(HTTPCodec::StreamID /* streamID */,
1241                  ErrorCode /* code */) override {
1242       VLOG(4) << __func__ << " txn=" << txn_;
1243       // Can't really get here since no HQ codecs can produce aborts.
1244       // The entry point is onResetStream via readError()
1245       LOG(DFATAL) << "Unexpected abort";
1246     }
1247 
1248     void onFrameHeader(HTTPCodec::StreamID /* stream_id */,
1249                        uint8_t /* flags */,
1250                        uint64_t /* length */,
1251                        uint64_t /* type */,
1252                        uint16_t /* version */ = 0) override {
1253       VLOG(4) << __func__ << " txn=" << txn_;
1254     }
1255 
1256     void onGoaway(
1257         uint64_t /* lastGoodStreamID */,
1258         ErrorCode /* code */,
1259         std::unique_ptr<folly::IOBuf> /* debugData */ = nullptr) override {
1260       VLOG(4) << __func__ << " txn=" << txn_;
1261     }
1262 
onPingRequest(uint64_t)1263     void onPingRequest(uint64_t /* data */) override {
1264       VLOG(4) << __func__ << " txn=" << txn_;
1265     }
1266 
onPingReply(uint64_t)1267     void onPingReply(uint64_t /* data */) override {
1268       // This method should not get called
1269       LOG(FATAL) << __func__ << " txn=" << txn_;
1270     }
1271 
onWindowUpdate(HTTPCodec::StreamID,uint32_t)1272     void onWindowUpdate(HTTPCodec::StreamID /* stream */,
1273                         uint32_t /* amount */) override {
1274       VLOG(4) << __func__ << " txn=" << txn_;
1275     }
1276 
onSettings(const SettingsList &)1277     void onSettings(const SettingsList& /*settings*/) override {
1278       VLOG(4) << __func__ << " txn=" << txn_;
1279     }
1280 
onSettingsAck()1281     void onSettingsAck() override {
1282       VLOG(4) << __func__ << " txn=" << txn_;
1283     }
1284 
onPriority(HTTPCodec::StreamID,const HTTPMessage::HTTP2Priority &)1285     void onPriority(HTTPCodec::StreamID /* stream */,
1286                     const HTTPMessage::HTTP2Priority& /* priority */) override {
1287       VLOG(4) << __func__ << " txn=" << txn_;
1288     }
1289 
onNativeProtocolUpgrade(HTTPCodec::StreamID,CodecProtocol,const std::string &,HTTPMessage &)1290     bool onNativeProtocolUpgrade(HTTPCodec::StreamID /* stream */,
1291                                  CodecProtocol /* protocol */,
1292                                  const std::string& /* protocolString */,
1293                                  HTTPMessage& /* msg */) override {
1294       VLOG(4) << __func__ << " txn=" << txn_;
1295       return false;
1296     }
1297 
numOutgoingStreams()1298     uint32_t numOutgoingStreams() const override {
1299       VLOG(4) << __func__ << " txn=" << txn_;
1300       return 0;
1301     }
1302 
numIncomingStreams()1303     uint32_t numIncomingStreams() const override {
1304       VLOG(4) << __func__ << " txn=" << txn_;
1305       return 0;
1306     }
1307 
1308     // HTTPTransaction::Transport methods
1309 
1310     // For parity with H2, pause/resumeIngress now a no-op.  All transactions
1311     // will pause when total buffered egress exceeds the configured limit, which
1312     // should be equal to the recv flow control window
pauseIngress(HTTPTransaction *)1313     void pauseIngress(HTTPTransaction* /* txn */) noexcept override {
1314       VLOG(4) << __func__ << " txn=" << txn_;
1315     }
1316 
resumeIngress(HTTPTransaction *)1317     void resumeIngress(HTTPTransaction* /* txn */) noexcept override {
1318       VLOG(4) << __func__ << " txn=" << txn_;
1319     }
1320 
1321     void transactionTimeout(HTTPTransaction* /* txn */) noexcept override;
1322 
1323     void sendHeaders(HTTPTransaction* txn,
1324                      const HTTPMessage& headers,
1325                      HTTPHeaderSize* size,
1326                      bool includeEOM) noexcept override;
1327 
1328     bool sendHeadersWithDelegate(
1329         HTTPTransaction* txn,
1330         const HTTPMessage& headers,
1331         HTTPHeaderSize* size,
1332         size_t* dataFrameHeaderSize,
1333         uint64_t contentLength,
1334         std::unique_ptr<DSRRequestSender> dsrSender) noexcept override;
1335 
1336     size_t sendBody(HTTPTransaction* txn,
1337                     std::unique_ptr<folly::IOBuf> body,
1338                     bool includeEOM,
1339                     bool trackLastByteFlushed) noexcept override;
1340 
1341     size_t sendBody(HTTPTransaction* txn,
1342                     const HTTPTransaction::BufferMeta& body,
1343                     bool eom) noexcept override;
1344 
1345     size_t sendChunkHeader(HTTPTransaction* txn,
1346                            size_t length) noexcept override;
1347 
1348     size_t sendChunkTerminator(HTTPTransaction* txn) noexcept override;
1349 
1350     size_t sendEOM(HTTPTransaction* txn,
1351                    const HTTPHeaders* trailers) noexcept override;
1352 
1353     size_t sendAbort(HTTPTransaction* txn,
1354                      ErrorCode statusCode) noexcept override;
1355 
1356     size_t sendAbortImpl(HTTP3::ErrorCode errorCode, std::string errorMsg);
1357 
1358     size_t sendPriority(
1359         HTTPTransaction* /* txn */,
1360         const http2::PriorityUpdate& /* pri */) noexcept override;
1361     size_t changePriority(HTTPTransaction* txn,
1362                           HTTPPriority pri) noexcept override;
1363 
sendWindowUpdate(HTTPTransaction *,uint32_t)1364     size_t sendWindowUpdate(HTTPTransaction* /* txn */,
1365                             uint32_t /* bytes */) noexcept override {
1366       VLOG(4) << __func__ << " txn=" << txn_;
1367       CHECK(hasEgressStreamId())
1368           << __func__ << " invoked on stream without egress";
1369       return 0;
1370     }
1371 
1372     // Send a push promise. Has different implementations in
1373     // request streams / push streams
sendPushPromise(HTTPTransaction *,folly::Optional<hq::PushId>,const HTTPMessage &,HTTPHeaderSize *,bool)1374     virtual void sendPushPromise(HTTPTransaction* /* txn */,
1375                                  folly::Optional<hq::PushId> /* pushId */,
1376                                  const HTTPMessage& /* headers */,
1377                                  HTTPHeaderSize* /* outSize */,
1378                                  bool /* includeEOM */) {
1379       VLOG(4) << __func__ << " txn=" << txn_;
1380       CHECK(hasEgressStreamId())
1381           << __func__ << " invoked on stream without egress";
1382     }
1383 
1384     void notifyPendingEgress() noexcept override;
1385 
detach(HTTPTransaction *)1386     void detach(HTTPTransaction* /* txn */) noexcept override {
1387       VLOG(4) << __func__ << " txn=" << txn_;
1388       detached_ = true;
1389       session_.scheduleLoopCallback();
1390     }
1391     void checkForDetach();
1392 
notifyIngressBodyProcessed(uint32_t bytes)1393     void notifyIngressBodyProcessed(uint32_t bytes) noexcept override {
1394       VLOG(4) << __func__ << " txn=" << txn_;
1395       if (session_.notifyBodyProcessed(bytes)) {
1396         session_.resumeReads();
1397       }
1398     }
1399 
notifyEgressBodyBuffered(int64_t bytes)1400     void notifyEgressBodyBuffered(int64_t bytes) noexcept override {
1401       session_.notifyEgressBodyBuffered(bytes);
1402     }
1403 
getLocalAddress()1404     const folly::SocketAddress& getLocalAddress() const noexcept override {
1405       return session_.getLocalAddress();
1406     }
1407 
getPeerAddress()1408     const folly::SocketAddress& getPeerAddress() const noexcept override {
1409       return session_.getPeerAddress();
1410     }
1411 
describe(std::ostream & os)1412     void describe(std::ostream& os) const override {
1413       session_.describe(os);
1414     }
1415 
getSetupTransportInfo()1416     const wangle::TransportInfo& getSetupTransportInfo()
1417         const noexcept override {
1418       VLOG(4) << __func__ << " txn=" << txn_;
1419       return session_.transportInfo_;
1420     }
1421 
1422     bool getCurrentTransportInfo(wangle::TransportInfo* tinfo) override;
1423 
getFlowControlInfo(HTTPTransaction::FlowControlInfo *)1424     void getFlowControlInfo(
1425         HTTPTransaction::FlowControlInfo* /*info*/) override {
1426       // Not implemented
1427     }
1428 
1429     HTTPTransaction::Transport::Type getSessionType() const noexcept override;
1430 
getCodec()1431     virtual const HTTPCodec& getCodec() const noexcept override {
1432       return HQStreamBase::getCodec();
1433     }
1434 
drain()1435     void drain() override {
1436       VLOG(4) << __func__ << " txn=" << txn_;
1437     }
1438 
isDraining()1439     bool isDraining() const override {
1440       VLOG(4) << __func__ << " txn=" << txn_;
1441       return false;
1442     }
1443 
1444     HTTPTransaction* newPushedTransaction(
1445         HTTPCodec::StreamID /* parentTxnId */,
1446         HTTPTransaction::PushHandler* /* handler */,
1447         ProxygenError* /* error */ = nullptr) noexcept override {
1448       LOG(FATAL) << __func__ << " Only available via request stream";
1449       folly::assume_unreachable();
1450     }
1451 
newExTransaction(HTTPTransactionHandler *,HTTPCodec::StreamID,bool)1452     HTTPTransaction* newExTransaction(
1453         HTTPTransactionHandler* /* handler */,
1454         HTTPCodec::StreamID /* controlStream */,
1455         bool /* unidirectional */) noexcept override {
1456       VLOG(4) << __func__ << " txn=" << txn_;
1457       return nullptr;
1458     }
1459 
getSecurityProtocol()1460     std::string getSecurityProtocol() const override {
1461       VLOG(4) << __func__ << " txn=" << txn_;
1462       return "quic/tls1.3";
1463     }
1464 
addWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback * callback)1465     void addWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback*
1466                                        callback) noexcept override {
1467       VLOG(4) << __func__ << " txn=" << txn_;
1468       if (session_.sock_->replaySafe()) {
1469         callback->onReplaySafe();
1470       } else {
1471         session_.waitingForReplaySafety_.push_back(callback);
1472       }
1473     }
1474 
removeWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback * callback)1475     void removeWaitingForReplaySafety(
1476         folly::AsyncTransport::ReplaySafetyCallback* callback) noexcept
1477         override {
1478       VLOG(4) << __func__ << " txn=" << txn_;
1479       session_.waitingForReplaySafety_.remove(callback);
1480     }
1481 
needToBlockForReplaySafety()1482     bool needToBlockForReplaySafety() const override {
1483       VLOG(4) << __func__ << " txn=" << txn_;
1484       return false;
1485     }
1486 
getUnderlyingTransport()1487     const folly::AsyncTransport* getUnderlyingTransport()
1488         const noexcept override {
1489       VLOG(4) << __func__ << " txn=" << txn_;
1490       return nullptr;
1491     }
1492 
isReplaySafe()1493     bool isReplaySafe() const override {
1494       return session_.isReplaySafe();
1495     }
1496 
setHTTP2PrioritiesEnabled(bool)1497     void setHTTP2PrioritiesEnabled(bool /* enabled */) override {
1498     }
getHTTP2PrioritiesEnabled()1499     bool getHTTP2PrioritiesEnabled() const override {
1500       return false;
1501     }
1502 
getHTTPPriority(uint8_t)1503     folly::Optional<const HTTPMessage::HTTP2Priority> getHTTPPriority(
1504         uint8_t /* pri */) override {
1505       VLOG(4) << __func__ << " txn=" << txn_;
1506       return HTTPMessage::HTTP2Priority(hqDefaultPriority.streamDependency,
1507                                         hqDefaultPriority.exclusive,
1508                                         hqDefaultPriority.weight);
1509     }
1510 
getHTTPPriority()1511     folly::Optional<HTTPPriority> getHTTPPriority() override {
1512       if (session_.sock_ && hasStreamId()) {
1513         auto sp = session_.sock_->getStreamPriority(getStreamId());
1514         if (sp) {
1515           return HTTPPriority(sp.value().level, sp.value().incremental);
1516         }
1517       }
1518       return folly::none;
1519     }
1520 
getConnectionToken()1521     folly::Optional<HTTPTransaction::ConnectionToken> getConnectionToken()
1522         const noexcept override {
1523       return session_.connectionToken_;
1524     }
1525 
1526     void generateGoaway();
1527 
1528     void trackEgressBodyDelivery(uint64_t bodyOffset) override;
1529 
1530     /**
1531      * Returns whether or no we have any body bytes buffered in the stream, or
1532      * the txn has any body bytes buffered.
1533      */
1534     size_t writeBufferSize() const;
1535     bool hasWriteBuffer() const;
1536     bool hasPendingBody() const;
1537     bool hasPendingEOM() const;
1538     bool hasPendingEgress() const;
1539 
1540     /**
1541      * Adapter class for managing different enqueued state between
1542      * HTTPTransaction and HQStreamTransport.  The decouples whether the
1543      * transaction thinks it is enqueued for egress (which impacts txn lifetime)
1544      * and whether the HQStreamTransport is enqueued (which impacts the
1545      * actual egress algorithm).  Note all 4 states are possible.
1546      */
1547     class HQPriHandle : public HTTP2PriorityQueueBase::BaseNode {
1548      public:
init(HTTP2PriorityQueueBase::Handle handle)1549       void init(HTTP2PriorityQueueBase::Handle handle) {
1550         egressQueueHandle_ = handle;
1551         enqueued_ = handle->isEnqueued();
1552       }
1553 
getHandle()1554       HTTP2PriorityQueueBase::Handle FOLLY_NULLABLE getHandle() const {
1555         return egressQueueHandle_;
1556       }
1557 
clearHandle()1558       void clearHandle() {
1559         egressQueueHandle_ = nullptr;
1560       }
1561 
1562       // HQStreamTransport is enqueued
isStreamTransportEnqueued()1563       bool isStreamTransportEnqueued() const {
1564         return egressQueueHandle_ ? egressQueueHandle_->isEnqueued() : false;
1565       }
1566 
isTransactionEnqueued()1567       bool isTransactionEnqueued() const {
1568         return isEnqueued();
1569       }
1570 
setEnqueued(bool enqueued)1571       void setEnqueued(bool enqueued) {
1572         enqueued_ = enqueued;
1573       }
1574 
isEnqueued()1575       bool isEnqueued() const override {
1576         return enqueued_;
1577       }
1578 
1579       uint64_t calculateDepth(bool includeVirtual = true) const override {
1580         return egressQueueHandle_->calculateDepth(includeVirtual);
1581       }
1582 
1583      private:
1584       HTTP2PriorityQueueBase::Handle egressQueueHandle_{nullptr};
1585       bool enqueued_;
1586     };
1587 
addTransaction(HTTPCodec::StreamID id,http2::PriorityUpdate pri,HTTPTransaction * txn,bool permanent,uint64_t * depth)1588     HTTP2PriorityQueueBase::Handle addTransaction(HTTPCodec::StreamID id,
1589                                                   http2::PriorityUpdate pri,
1590                                                   HTTPTransaction* txn,
1591                                                   bool permanent,
1592                                                   uint64_t* depth) override {
1593       queueHandle_.init(session_.txnEgressQueue_.addTransaction(
1594           id, pri, txn, permanent, depth));
1595       return &queueHandle_;
1596     }
1597 
1598     // update the priority of an existing node
updatePriority(HTTP2PriorityQueueBase::Handle handle,http2::PriorityUpdate pri,uint64_t * depth)1599     HTTP2PriorityQueueBase::Handle updatePriority(
1600         HTTP2PriorityQueueBase::Handle handle,
1601         http2::PriorityUpdate pri,
1602         uint64_t* depth) override {
1603       CHECK_EQ(handle, &queueHandle_);
1604       CHECK(queueHandle_.getHandle());
1605       return session_.txnEgressQueue_.updatePriority(
1606           queueHandle_.getHandle(), pri, depth);
1607     }
1608 
1609     // Remove the transaction from the priority tree
removeTransaction(HTTP2PriorityQueueBase::Handle handle)1610     void removeTransaction(HTTP2PriorityQueueBase::Handle handle) override {
1611       CHECK_EQ(handle, &queueHandle_);
1612       CHECK(queueHandle_.getHandle());
1613       session_.txnEgressQueue_.removeTransaction(queueHandle_.getHandle());
1614       queueHandle_.clearHandle();
1615     }
1616 
1617     // Notify the queue when a transaction has egress
signalPendingEgress(HTTP2PriorityQueueBase::Handle h)1618     void signalPendingEgress(HTTP2PriorityQueueBase::Handle h) override {
1619       CHECK_EQ(h, &queueHandle_);
1620       queueHandle_.setEnqueued(true);
1621       signalPendingEgressImpl();
1622     }
1623 
signalPendingEgressImpl()1624     void signalPendingEgressImpl() {
1625       auto flowControl =
1626           session_.sock_->getStreamFlowControl(getEgressStreamId());
1627       if (!flowControl.hasError() && flowControl->sendWindowAvailable > 0) {
1628         session_.txnEgressQueue_.signalPendingEgress(queueHandle_.getHandle());
1629       } else {
1630         VLOG(4) << "Delay pending egress signal on blocked txn=" << txn_;
1631       }
1632     }
1633 
1634     // Notify the queue when a transaction no longer has egress
clearPendingEgress(HTTP2PriorityQueueBase::Handle h)1635     void clearPendingEgress(HTTP2PriorityQueueBase::Handle h) override {
1636       CHECK_EQ(h, &queueHandle_);
1637       CHECK(queueHandle_.isTransactionEnqueued());
1638       queueHandle_.setEnqueued(false);
1639       if (pendingEOM_ || hasWriteBuffer()) {
1640         // no-op
1641         // Only HQSession can clearPendingEgress for these cases
1642         return;
1643       }
1644       // The transaction has pending body data, but it decided to remove itself
1645       // from the egress queue since it's rate-limited
1646       if (queueHandle_.isStreamTransportEnqueued()) {
1647         session_.txnEgressQueue_.clearPendingEgress(queueHandle_.getHandle());
1648       }
1649     }
1650 
addPriorityNode(HTTPCodec::StreamID id,HTTPCodec::StreamID parent)1651     void addPriorityNode(HTTPCodec::StreamID id,
1652                          HTTPCodec::StreamID parent) override {
1653       session_.txnEgressQueue_.addPriorityNode(id, parent);
1654     }
1655 
1656     /**
1657      * How many egress bytes we committed to transport, both written and
1658      * skipped.
1659      */
streamEgressCommittedByteOffset()1660     uint64_t streamEgressCommittedByteOffset() const {
1661       return bytesWritten_;
1662     }
1663 
1664     /**
1665      * streamEgressCommittedByteOffset() plus any pending bytes in the egress
1666      * queue.
1667      */
streamWriteByteOffset()1668     uint64_t streamWriteByteOffset() const {
1669       return streamEgressCommittedByteOffset() + writeBuf_.chainLength() +
1670              bufMeta_.length;
1671     }
1672 
1673     void abortIngress();
1674 
1675     void abortEgress(bool checkForDetach);
1676 
1677     void errorOnTransaction(ProxygenError err, const std::string& errorMsg);
1678     void errorOnTransaction(HTTPException ex);
1679 
1680     bool wantsOnWriteReady(size_t canSend) const;
1681 
1682     HQPriHandle queueHandle_;
1683     HTTPTransaction txn_;
1684     // need to send EOM
1685     bool pendingEOM_{false};
1686     // have read EOF
1687     bool readEOF_{false};
1688     bool hasCodec_{false};
1689     bool hasIngress_{false};
1690     bool detached_{false};
1691     bool ingressError_{false};
1692     bool hasHeaders_{false};
1693     enum class EOMType { CODEC, TRANSPORT };
1694     ConditionalGate<EOMType, 2> eomGate_;
1695 
1696     folly::Optional<HTTPCodec::StreamID> codecStreamId_;
1697 
1698     HQByteEventTracker byteEventTracker_;
1699 
1700     // Stream + session protocol info
1701     std::shared_ptr<QuicStreamProtocolInfo> quicStreamProtocolInfo_;
1702 
1703     // BufferMeta represents a buffer that isn't owned by this stream but a
1704     // remote entity.
1705     HTTPTransaction::BufferMeta bufMeta_;
1706 
1707     void armStreamAckCb(uint64_t streamOffset);
1708     void armEgressHeadersAckCb(uint64_t streamOffset);
1709     void armEgressBodyAckCb(uint64_t streamOffset);
resetEgressHeadersAckOffset()1710     void resetEgressHeadersAckOffset() {
1711       egressHeadersAckOffset_ = folly::none;
1712     }
resetEgressBodyAckOffset(uint64_t offset)1713     void resetEgressBodyAckOffset(uint64_t offset) {
1714       egressBodyAckOffsets_.erase(offset);
1715     }
1716 
numActiveDeliveryCallbacks()1717     uint64_t numActiveDeliveryCallbacks() const {
1718       return numActiveDeliveryCallbacks_;
1719     }
1720 
1721    private:
1722     void updatePriority(const HTTPMessage& headers) noexcept;
1723 
1724     // Return the old and new offset of the stream
1725     std::pair<uint64_t, uint64_t> generateHeadersCommon(
1726         quic::StreamId streamId,
1727         const HTTPMessage& headers,
1728         bool includeEOM,
1729         HTTPHeaderSize* size) noexcept;
1730     void coalesceEOM(size_t encodedBodySize);
1731     void handleHeadersAcked(uint64_t streamOffset);
1732     void handleBodyAcked(uint64_t streamOffset);
1733     void handleBodyCancelled(uint64_t streamOffset);
1734     // Egress headers offset.
1735     // This is updated every time we send headers. Needed for body delivery
1736     // callbacks to calculate body offset properly.
1737     uint64_t egressHeadersStreamOffset_{0};
1738     folly::Optional<uint64_t> egressHeadersAckOffset_;
1739     std::unordered_set<uint64_t> egressBodyAckOffsets_;
1740     // Track number of armed QUIC delivery callbacks.
1741     uint64_t numActiveDeliveryCallbacks_{0};
1742 
1743     // Used to store last seen ingress push ID between
1744     // the invocations of onPushPromiseBegin / onHeadersComplete.
1745     // It is being reset by
1746     //  - "onNewMessage" (in which case the push promise is being abandoned),
1747     //  - "onPushMessageBegin" (which may be abandonned / duplicate message id)
1748     //  - "onHeadersComplete" (not pending anymore)
1749     folly::Optional<hq::PushId> ingressPushId_;
1750   }; // HQStreamTransportBase
1751 
1752  protected:
1753   class HQStreamTransport
1754       : public detail::singlestream::SSBidir
1755       , public HQStreamTransportBase {
1756    public:
1757     HQStreamTransport(
1758         HQSession& session,
1759         TransportDirection direction,
1760         quic::StreamId streamId,
1761         uint32_t seqNo,
1762         std::unique_ptr<HTTPCodec> codec,
1763         const WheelTimerInstance& wheelTimer,
1764         HTTPSessionStats* stats = nullptr,
1765         http2::PriorityUpdate priority = hqDefaultPriority,
1766         folly::Optional<HTTPCodec::StreamID> parentTxnId = HTTPCodec::NoStream)
SSBidir(streamId)1767         : detail::singlestream::SSBidir(streamId),
1768           HQStreamTransportBase(session,
1769                                 direction,
1770                                 streamId,
1771                                 seqNo,
1772                                 wheelTimer,
1773                                 stats,
1774                                 priority,
1775                                 parentTxnId) {
1776       // Request streams are eagerly initialized
1777       initCodec(std::move(codec), __func__);
1778       initIngress(__func__);
1779     }
1780 
1781     HTTPTransaction* newPushedTransaction(
1782         HTTPCodec::StreamID /* parentTxnId */,
1783         HTTPTransaction::PushHandler* /* handler */,
1784         ProxygenError* error = nullptr) noexcept override;
1785 
1786     void sendPushPromise(HTTPTransaction* /* txn */,
1787                          folly::Optional<hq::PushId> /* pushId */,
1788                          const HTTPMessage& /* headers */,
1789                          HTTPHeaderSize* /* outSize */,
1790                          bool /* includeEOM */) override;
1791 
1792     void onPushPromiseHeadersComplete(
1793         hq::PushId /* pushID */,
1794         HTTPCodec::StreamID /* assoc streamID */,
1795         std::unique_ptr<HTTPMessage> /* promise */) override;
1796 
1797     uint16_t getDatagramSizeLimit() const noexcept override;
1798     bool sendDatagram(std::unique_ptr<folly::IOBuf> datagram) override;
1799   }; // HQStreamTransport
1800 
1801 #ifdef _MSC_VER
1802 #pragma warning(pop)
1803 #endif
1804 
1805  private:
1806   class VersionUtils {
1807    public:
VersionUtils(HQSession & session)1808     explicit VersionUtils(HQSession& session) : session_(session) {
1809     }
~VersionUtils()1810     virtual ~VersionUtils() {
1811     }
1812 
1813     // Checks whether it is allowed to process a new stream, depending on the
1814     // stream type, draining state/goaway. If not allowed, it resets the stream
1815     virtual CodecProtocol getCodecProtocol() const = 0;
1816     virtual bool checkNewStream(quic::StreamId id) = 0;
1817     virtual std::unique_ptr<HTTPCodec> createCodec(quic::StreamId id) = 0;
1818     virtual std::unique_ptr<hq::HQUnidirectionalCodec> createControlCodec(
1819         hq::UnidirectionalStreamType type, HQControlStream& controlStream) = 0;
1820     virtual folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface(
1821         uint64_t preface) = 0;
1822     virtual void sendGoaway() = 0;
1823     virtual void sendGoawayOnRequestStream(
1824         HQSession::HQStreamTransport& stream) = 0;
1825     virtual void headersComplete(HTTPMessage* msg) = 0;
1826     virtual void checkSendingGoaway(const HTTPMessage& msg) = 0;
1827     virtual size_t sendSettings() = 0;
1828     virtual bool createEgressControlStreams() = 0;
1829     virtual void applySettings(const SettingsList& settings) = 0;
1830     virtual void onSettings(const SettingsList& settings) = 0;
1831     virtual void readDataProcessed() = 0;
1832     virtual void abortStream(quic::StreamId id) = 0;
setMaxUncompressed(uint64_t)1833     virtual void setMaxUncompressed(uint64_t) {
1834     }
setHeaderCodecStats(HeaderCodec::Stats *)1835     virtual void setHeaderCodecStats(HeaderCodec::Stats*) {
1836     }
onConnectionEnd()1837     virtual void onConnectionEnd() {
1838     }
1839 
1840     HQSession& session_;
1841   };
1842 
1843   class H1QFBV1VersionUtils : public VersionUtils {
1844    public:
H1QFBV1VersionUtils(HQSession & session)1845     explicit H1QFBV1VersionUtils(HQSession& session) : VersionUtils(session) {
1846     }
1847 
getCodecProtocol()1848     CodecProtocol getCodecProtocol() const override {
1849       return CodecProtocol::HTTP_1_1;
1850     }
1851 
1852     bool checkNewStream(quic::StreamId id) override;
1853 
1854     std::unique_ptr<HTTPCodec> createCodec(quic::StreamId id) override;
1855 
createControlCodec(hq::UnidirectionalStreamType,HQControlStream &)1856     std::unique_ptr<hq::HQUnidirectionalCodec> createControlCodec(
1857         hq::UnidirectionalStreamType, HQControlStream&) override {
1858       return nullptr; // no control streams
1859     }
1860 
parseStreamPreface(uint64_t)1861     folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface(
1862         uint64_t /*preface*/) override {
1863       LOG(FATAL) << "H1Q does not use stream preface";
1864       folly::assume_unreachable();
1865     }
1866 
1867     void sendGoaway() override;
1868     void sendGoawayOnRequestStream(
1869         HQSession::HQStreamTransport& stream) override;
1870     void headersComplete(HTTPMessage* msg) override;
1871     void checkSendingGoaway(const HTTPMessage& msg) override;
1872 
sendSettings()1873     size_t sendSettings() override {
1874       return 0;
1875     }
1876 
createEgressControlStreams()1877     bool createEgressControlStreams() override {
1878       return true;
1879     }
1880 
applySettings(const SettingsList &)1881     void applySettings(const SettingsList& /*settings*/) override {
1882     }
1883 
onSettings(const SettingsList &)1884     void onSettings(const SettingsList& /*settings*/) override {
1885       CHECK(false) << "SETTINGS frame received for h1q-fb-v1 protocol";
1886     }
1887 
readDataProcessed()1888     void readDataProcessed() override {
1889     }
1890 
abortStream(quic::StreamId)1891     void abortStream(quic::StreamId /*id*/) override {
1892     }
1893   };
1894 
1895   class GoawayUtils {
1896    public:
1897     static bool checkNewStream(HQSession& session, quic::StreamId id);
1898     static void sendGoaway(HQSession& session);
1899   };
1900 
1901   class HQVersionUtils : public VersionUtils {
1902    public:
HQVersionUtils(HQSession & session)1903     explicit HQVersionUtils(HQSession& session) : VersionUtils(session) {
1904     }
1905 
getCodecProtocol()1906     CodecProtocol getCodecProtocol() const override {
1907       return CodecProtocol::HQ;
1908     }
1909 
1910     std::unique_ptr<HTTPCodec> createCodec(quic::StreamId id) override;
1911 
1912     std::unique_ptr<hq::HQUnidirectionalCodec> createControlCodec(
1913         hq::UnidirectionalStreamType type,
1914         HQControlStream& controlStream) override;
1915 
checkNewStream(quic::StreamId id)1916     bool checkNewStream(quic::StreamId id) override {
1917       return GoawayUtils::checkNewStream(session_, id);
1918     }
1919     folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface(
1920         uint64_t preface) override;
1921 
sendGoaway()1922     void sendGoaway() override {
1923       GoawayUtils::sendGoaway(session_);
1924     }
1925 
sendGoawayOnRequestStream(HQSession::HQStreamTransport &)1926     void sendGoawayOnRequestStream(
1927         HQSession::HQStreamTransport& /*stream*/) override {
1928     }
1929 
1930     void headersComplete(HTTPMessage* /*msg*/) override;
1931 
checkSendingGoaway(const HTTPMessage &)1932     void checkSendingGoaway(const HTTPMessage& /*msg*/) override {
1933     }
1934 
1935     size_t sendSettings() override;
1936 
1937     bool createEgressControlStreams() override;
1938 
1939     void applySettings(const SettingsList& settings) override;
1940 
1941     void onSettings(const SettingsList& settings) override;
1942 
1943     void readDataProcessed() override;
1944 
1945     void abortStream(quic::StreamId /*id*/) override;
1946 
setMaxUncompressed(uint64_t value)1947     void setMaxUncompressed(uint64_t value) override {
1948       qpackCodec_.setMaxUncompressed(value);
1949     }
1950 
setHeaderCodecStats(HeaderCodec::Stats * stats)1951     void setHeaderCodecStats(HeaderCodec::Stats* stats) override {
1952       qpackCodec_.setStats(stats);
1953     }
1954     void onConnectionEnd() override;
1955 
1956    private:
1957     QPACKCodec qpackCodec_;
1958     hq::HQStreamCodec* hqStreamCodecPtr_{nullptr};
1959   };
1960 
1961   class H1QFBV2VersionUtils : public H1QFBV1VersionUtils {
1962    public:
H1QFBV2VersionUtils(HQSession & session)1963     explicit H1QFBV2VersionUtils(HQSession& session)
1964         : H1QFBV1VersionUtils(session) {
1965     }
1966 
checkNewStream(quic::StreamId id)1967     bool checkNewStream(quic::StreamId id) override {
1968       return GoawayUtils::checkNewStream(session_, id);
1969     }
1970     std::unique_ptr<hq::HQUnidirectionalCodec> createControlCodec(
1971         hq::UnidirectionalStreamType, HQControlStream&) override;
1972 
1973     folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface(
1974         uint64_t preface) override;
1975 
1976     bool createEgressControlStreams() override;
1977 
onSettings(const SettingsList &)1978     void onSettings(const SettingsList& /*settings*/) override {
1979       session_.handleSessionError(
1980           CHECK_NOTNULL(session_.findControlStream(
1981               hq::UnidirectionalStreamType::H1Q_CONTROL)),
1982           hq::StreamDirection::INGRESS,
1983           HTTP3::ErrorCode::HTTP_GENERAL_PROTOCOL_ERROR,
1984           kErrorConnection);
1985     }
1986 
sendGoaway()1987     void sendGoaway() override {
1988       GoawayUtils::sendGoaway(session_);
1989     }
1990 
sendGoawayOnRequestStream(HQSession::HQStreamTransport &)1991     void sendGoawayOnRequestStream(
1992         HQSession::HQStreamTransport& /*stream*/) override {
1993     }
1994 
headersComplete(HTTPMessage *)1995     void headersComplete(HTTPMessage* /*msg*/) override {
1996     }
1997 
checkSendingGoaway(const HTTPMessage &)1998     void checkSendingGoaway(const HTTPMessage& /*msg*/) override {
1999     }
2000   };
2001 
getMaxConcurrentOutgoingStreamsRemote()2002   uint32_t getMaxConcurrentOutgoingStreamsRemote() const override {
2003     // need transport API
2004     return 100;
2005   }
2006 
2007   using HTTPCodecPtr = std::unique_ptr<HTTPCodec>;
2008   struct CodecStackEntry {
2009     HTTPCodecPtr* codecPtr;
2010     HTTPCodecPtr codec;
2011     HTTPCodec::Callback* callback;
CodecStackEntryCodecStackEntry2012     CodecStackEntry(HTTPCodecPtr* p, HTTPCodecPtr c, HTTPCodec::Callback* cb)
2013         : codecPtr(p), codec(std::move(c)), callback(cb) {
2014     }
2015   };
2016   std::vector<CodecStackEntry> codecStack_;
2017 
2018   /**
2019    * Container to hold the results of HTTP2PriorityQueue::nextEgress
2020    */
2021   HTTP2PriorityQueue::NextEgressResult nextEgressResults_;
2022 
2023   // Cleanup all pending streams. Invoked in session timeout
2024   size_t cleanupPendingStreams();
2025 
2026   // Remove all callbacks from a stream during cleanup
2027   void clearStreamCallbacks(quic::StreamId /* id */);
2028 
2029   using ControlStreamsKey = std::pair<quic::StreamId, hq::StreamDirection>;
2030   std::unordered_map<hq::UnidirectionalStreamType, HQControlStream>
2031       controlStreams_;
2032   HQUnidirStreamDispatcher unidirectionalReadDispatcher_;
2033 
2034   // Min Stream ID we haven't seen so far
2035   quic::StreamId minUnseenIncomingStreamId_{0};
2036   // Maximum Stream ID that we are allowed to open, according to the remote
2037   quic::StreamId minPeerUnseenId_{hq::kMaxClientBidiStreamId};
2038   // Whether SETTINGS have been received
2039   bool receivedSettings_{false};
2040 
2041   /**
2042    * The maximum number of concurrent transactions that this session's peer
2043    * may create.
2044    */
2045   uint32_t maxConcurrentIncomingStreams_{100};
2046   folly::Optional<uint32_t> receiveStreamWindowSize_;
2047 
2048   uint64_t maxToSend_{0};
2049   bool scheduledWrite_{false};
2050 
2051   bool forceUpstream1_1_{true};
2052   // Default to false for now to match existing behavior
2053   bool strictValidation_{false};
2054   bool datagramEnabled_{false};
2055 
2056   /** Reads in the current loop iteration */
2057   uint16_t readsPerLoop_{0};
2058   std::unordered_set<quic::StreamId> pendingProcessReadSet_;
2059   std::shared_ptr<QuicProtocolInfo> quicInfo_;
2060   folly::Optional<HQVersion> version_;
2061   std::string alpn_;
2062 
2063  protected:
2064   HTTPSettings egressSettings_{
2065       {SettingsId::HEADER_TABLE_SIZE, hq::kDefaultEgressHeaderTableSize},
2066       {SettingsId::MAX_HEADER_LIST_SIZE, hq::kDefaultEgressMaxHeaderListSize},
2067       {SettingsId::_HQ_QPACK_BLOCKED_STREAMS,
2068        hq::kDefaultEgressQpackBlockedStream},
2069   };
2070   HTTPSettings ingressSettings_;
2071   uint64_t minUnseenIncomingPushId_{0};
2072 
2073   std::unique_ptr<VersionUtils> versionUtils_;
2074   ReadyGate versionUtilsReady_;
2075 
2076   // NOTE: introduce better decoupling between the streams
2077   // and the containing session, then remove the friendship.
2078   friend class HQStreamBase;
2079 
2080   // To let the operator<< access DrainState which is private
2081   friend std::ostream& operator<<(std::ostream&, DrainState);
2082 
2083   // Bidirectional transport streams
2084   std::unordered_map<quic::StreamId, HQStreamTransport> streams_;
2085 
2086   // Buffer for datagrams waiting for a stream to be assigned to
2087   folly::EvictingCacheMap<
2088       quic::StreamId,
2089       folly::small_vector<std::unique_ptr<folly::IOBuf>,
2090                           kDefaultMaxBufferedDatagrams,
2091                           folly::small_vector_policy::NoHeap>>
2092       datagramsBuffer_{kMaxStreamsWithBufferedDatagrams};
2093 
2094   // Buffer for priority updates without an active stream
2095   folly::EvictingCacheMap<quic::StreamId, HTTPPriority> priorityUpdatesBuffer_{
2096       kMaxBufferedPriorityUpdates};
2097 
2098   // Creation time (for handshake time tracking)
2099   std::chrono::steady_clock::time_point createTime_;
2100 
2101   // Lookup maps for matching PushIds to StreamIds
2102   folly::F14FastMap<hq::PushId, quic::StreamId> pushIdToStreamId_;
2103   // Lookup maps for matching ingress push streams to push ids
2104   folly::F14FastMap<quic::StreamId, hq::PushId> streamIdToPushId_;
2105   std::string userAgent_;
2106 }; // HQSession
2107 
2108 std::ostream& operator<<(std::ostream& os, HQSession::DrainState drainState);
2109 
2110 } // namespace proxygen
2111