1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree.
7  */
8 
9 #pragma once
10 
11 #include <proxygen/lib/http/session/HQSession.h>
12 
13 namespace proxygen {
14 
15 class HQDownstreamSession : public HQSession {
16  public:
17   HQDownstreamSession(const std::chrono::milliseconds transactionsTimeout,
18                       HTTPSessionController* controller,
19                       const wangle::TransportInfo& tinfo,
20                       InfoCallback* sessionInfoCb,
21                       folly::Function<void(HTTPCodecFilterChain& chain)>
22                       /* codecFilterCallbackFn */
23                       = nullptr)
HQSession(transactionsTimeout,controller,proxygen::TransportDirection::DOWNSTREAM,tinfo,sessionInfoCb)24       : HQSession(transactionsTimeout,
25                   controller,
26                   proxygen::TransportDirection::DOWNSTREAM,
27                   tinfo,
28                   sessionInfoCb) {
29   }
30 
31   void onTransportReady() noexcept override;
32 
33   void onAppRateLimited() noexcept override;
34 
getTransactionTimeoutHandler(HTTPTransaction * txn)35   HTTPTransaction::Handler* getTransactionTimeoutHandler(
36       HTTPTransaction* txn) override {
37     return getController()->getTransactionTimeoutHandler(txn,
38                                                          getLocalAddress());
39   }
40 
41   void setupOnHeadersComplete(HTTPTransaction* txn, HTTPMessage* msg) override;
42 
43   void onConnectionErrorHandler(
44       std::pair<quic::QuicErrorCode, std::string>) noexcept override;
45 
46   bool isDetachable(bool) const override;
47 
48   void attachThreadLocals(folly::EventBase*,
49                           folly::SSLContextPtr,
50                           const WheelTimerInstance&,
51                           HTTPSessionStats*,
52                           FilterIteratorFn,
53                           HeaderCodec::Stats*,
54                           HTTPSessionController*) override;
55 
56   void detachThreadLocals(bool) override;
57 
isReplaySafe()58   bool isReplaySafe() const override {
59     LOG(FATAL) << __func__ << " is an upstream interface";
60     return false;
61   }
62   // Create a new pushed transaction.
63   HTTPTransaction* newPushedTransaction(
64       HTTPCodec::StreamID,           /* parentRequestStreamId */
65       HTTPTransaction::PushHandler*, /* handler */
66       ProxygenError* error = nullptr) override;
67 
getNumOutgoingStreams()68   uint32_t getNumOutgoingStreams() const override {
69     // need transport API
70     return static_cast<uint32_t>(numberOfEgressPushStreams());
71   }
72 
getNumIncomingStreams()73   uint32_t getNumIncomingStreams() const override {
74     // need transport API
75     return static_cast<uint32_t>(streams_.size());
76   }
77 
78   folly::Optional<HTTPHeaders> getExtraHeaders(
79       const HTTPMessage& haeders, quic::StreamId streamId) override;
80 
81  private:
~HQDownstreamSession()82   ~HQDownstreamSession() override {
83     CHECK_EQ(getNumStreams(), 0);
84   }
85 
86 #ifdef _MSC_VER
87 #pragma warning(push)
88 #pragma warning(disable : 4250) // inherits 'proxygen::detail::..' via dominance
89 #endif
90 
91   /**
92    * Server side representation of a push stream
93    * Does not support ingress
94    */
95   class HQEgressPushStream
96       : public detail::singlestream::SSEgress
97       , public HQStreamTransportBase {
98    public:
99     HQEgressPushStream(HQSession& session,
100                        quic::StreamId streamId,
101                        hq::PushId pushId,
102                        folly::Optional<HTTPCodec::StreamID> parentTxnId,
103                        uint32_t seqNo,
104                        std::unique_ptr<HTTPCodec> codec,
105                        const WheelTimerInstance& timeout,
106                        HTTPSessionStats* stats = nullptr,
107                        http2::PriorityUpdate priority = hqDefaultPriority)
SSEgress(streamId)108         : detail::singlestream::SSEgress(streamId),
109           HQStreamTransportBase(session,
110                                 TransportDirection::DOWNSTREAM,
111                                 streamId,
112                                 seqNo,
113                                 timeout,
114                                 stats,
115                                 priority,
116                                 parentTxnId,
117                                 hq::UnidirectionalStreamType::PUSH),
118           pushId_(pushId) {
119       // Request streams are eagerly initialized
120       initCodec(std::move(codec), __func__);
121       // DONT init ingress on egress-only stream
122     }
123 
getPushId()124     hq::PushId getPushId() const {
125       return pushId_;
126     }
127 
128     // Unlike request streams and ingres push streams,
129     // the egress push stream does not have to flush
130     // ingress queues
transactionTimeout(HTTPTransaction * txn)131     void transactionTimeout(HTTPTransaction* txn) noexcept override {
132       VLOG(4) << __func__ << " txn=" << txn_;
133       DCHECK(txn == &txn_);
134     }
135 
136     void sendPushPromise(HTTPTransaction* /* txn */,
137                          folly::Optional<hq::PushId> /* pushId */,
138                          const HTTPMessage& /* headers */,
139                          HTTPHeaderSize* /* outSize */,
140                          bool /* includeEOM */) override;
141 
142     /**
143      * Write the encoded push id to the egress stream.
144      */
145     size_t generateStreamPushId();
146 
147     // Egress only stream should not pause ingress
pauseIngress(HTTPTransaction *)148     void pauseIngress(HTTPTransaction* /* txn */) noexcept override {
149       VLOG(4) << __func__
150               << " Ingress function called on egress-only stream, ignoring";
151     }
152 
153     // Egress only stream should not pause ingress
resumeIngress(HTTPTransaction *)154     void resumeIngress(HTTPTransaction* /* txn */) noexcept override {
155       VLOG(4) << __func__
156               << " Ingress function called on egress-only stream, ignoring";
157     }
158 
159    private:
160     hq::PushId pushId_; // The push id in context of which this stream is sent
161   };                    // HQEgressPushStream
162 #ifdef _MSC_VER
163 #pragma warning(pop)
164 #endif
165 
166   std::unordered_map<quic::StreamId, HQEgressPushStream> egressPushStreams_;
167 
168   // Find an egress push stream
169   HQEgressPushStream* findEgressPushStream(quic::StreamId);
170 
171   uint32_t numberOfEgressPushStreams() const;
172 
173   HQEgressPushStream* FOLLY_NULLABLE
174   createEgressPushStream(hq::PushId pushId,
175                          quic::StreamId streamId,
176                          quic::StreamId parentStreamId);
177 
178   HQStreamTransportBase* findPushStream(quic::StreamId id) override;
179 
180   // Only need to search ingress push streams, so this is a no-op
findPushStreams(std::unordered_set<HQStreamTransportBase * > & streams)181   void findPushStreams(
182       std::unordered_set<HQStreamTransportBase*>& streams) override {
183     for (auto& pstream : egressPushStreams_) {
184       streams.insert(&pstream.second);
185     }
186   }
187 
188   bool erasePushStream(quic::StreamId streamId) override;
189 
onNewPushStream(quic::StreamId,hq::PushId,size_t)190   void onNewPushStream(quic::StreamId /* pushStreamId */,
191                        hq::PushId /* pushId */,
192                        size_t /* toConsume */) override {
193     LOG(DFATAL) << "nope";
194   }
195 
196   // This is the current method of creating new push IDs.
197   hq::PushId createNewPushId();
198 
199   // Value of the next pushId, used for outgoing push transactions
200   hq::PushId nextAvailablePushId_{0};
201 
202   // Whether or not we have already received an onTransportReady callback.
203   bool transportReadyNotified_{false};
204 };
205 
206 } // namespace proxygen
207