1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * All rights reserved. 4 * 5 * This source code is licensed under the BSD-style license found in the 6 * LICENSE file in the root directory of this source tree. 7 */ 8 9 #pragma once 10 11 #include <proxygen/lib/http/session/HQSession.h> 12 13 namespace proxygen { 14 15 class HQDownstreamSession : public HQSession { 16 public: 17 HQDownstreamSession(const std::chrono::milliseconds transactionsTimeout, 18 HTTPSessionController* controller, 19 const wangle::TransportInfo& tinfo, 20 InfoCallback* sessionInfoCb, 21 folly::Function<void(HTTPCodecFilterChain& chain)> 22 /* codecFilterCallbackFn */ 23 = nullptr) HQSession(transactionsTimeout,controller,proxygen::TransportDirection::DOWNSTREAM,tinfo,sessionInfoCb)24 : HQSession(transactionsTimeout, 25 controller, 26 proxygen::TransportDirection::DOWNSTREAM, 27 tinfo, 28 sessionInfoCb) { 29 } 30 31 void onTransportReady() noexcept override; 32 33 void onAppRateLimited() noexcept override; 34 getTransactionTimeoutHandler(HTTPTransaction * txn)35 HTTPTransaction::Handler* getTransactionTimeoutHandler( 36 HTTPTransaction* txn) override { 37 return getController()->getTransactionTimeoutHandler(txn, 38 getLocalAddress()); 39 } 40 41 void setupOnHeadersComplete(HTTPTransaction* txn, HTTPMessage* msg) override; 42 43 void onConnectionErrorHandler( 44 std::pair<quic::QuicErrorCode, std::string>) noexcept override; 45 46 bool isDetachable(bool) const override; 47 48 void attachThreadLocals(folly::EventBase*, 49 folly::SSLContextPtr, 50 const WheelTimerInstance&, 51 HTTPSessionStats*, 52 FilterIteratorFn, 53 HeaderCodec::Stats*, 54 HTTPSessionController*) override; 55 56 void detachThreadLocals(bool) override; 57 isReplaySafe()58 bool isReplaySafe() const override { 59 LOG(FATAL) << __func__ << " is an upstream interface"; 60 return false; 61 } 62 // Create a new pushed transaction. 63 HTTPTransaction* newPushedTransaction( 64 HTTPCodec::StreamID, /* parentRequestStreamId */ 65 HTTPTransaction::PushHandler*, /* handler */ 66 ProxygenError* error = nullptr) override; 67 getNumOutgoingStreams()68 uint32_t getNumOutgoingStreams() const override { 69 // need transport API 70 return static_cast<uint32_t>(numberOfEgressPushStreams()); 71 } 72 getNumIncomingStreams()73 uint32_t getNumIncomingStreams() const override { 74 // need transport API 75 return static_cast<uint32_t>(streams_.size()); 76 } 77 78 folly::Optional<HTTPHeaders> getExtraHeaders( 79 const HTTPMessage& haeders, quic::StreamId streamId) override; 80 81 private: ~HQDownstreamSession()82 ~HQDownstreamSession() override { 83 CHECK_EQ(getNumStreams(), 0); 84 } 85 86 #ifdef _MSC_VER 87 #pragma warning(push) 88 #pragma warning(disable : 4250) // inherits 'proxygen::detail::..' via dominance 89 #endif 90 91 /** 92 * Server side representation of a push stream 93 * Does not support ingress 94 */ 95 class HQEgressPushStream 96 : public detail::singlestream::SSEgress 97 , public HQStreamTransportBase { 98 public: 99 HQEgressPushStream(HQSession& session, 100 quic::StreamId streamId, 101 hq::PushId pushId, 102 folly::Optional<HTTPCodec::StreamID> parentTxnId, 103 uint32_t seqNo, 104 std::unique_ptr<HTTPCodec> codec, 105 const WheelTimerInstance& timeout, 106 HTTPSessionStats* stats = nullptr, 107 http2::PriorityUpdate priority = hqDefaultPriority) SSEgress(streamId)108 : detail::singlestream::SSEgress(streamId), 109 HQStreamTransportBase(session, 110 TransportDirection::DOWNSTREAM, 111 streamId, 112 seqNo, 113 timeout, 114 stats, 115 priority, 116 parentTxnId, 117 hq::UnidirectionalStreamType::PUSH), 118 pushId_(pushId) { 119 // Request streams are eagerly initialized 120 initCodec(std::move(codec), __func__); 121 // DONT init ingress on egress-only stream 122 } 123 getPushId()124 hq::PushId getPushId() const { 125 return pushId_; 126 } 127 128 // Unlike request streams and ingres push streams, 129 // the egress push stream does not have to flush 130 // ingress queues transactionTimeout(HTTPTransaction * txn)131 void transactionTimeout(HTTPTransaction* txn) noexcept override { 132 VLOG(4) << __func__ << " txn=" << txn_; 133 DCHECK(txn == &txn_); 134 } 135 136 void sendPushPromise(HTTPTransaction* /* txn */, 137 folly::Optional<hq::PushId> /* pushId */, 138 const HTTPMessage& /* headers */, 139 HTTPHeaderSize* /* outSize */, 140 bool /* includeEOM */) override; 141 142 /** 143 * Write the encoded push id to the egress stream. 144 */ 145 size_t generateStreamPushId(); 146 147 // Egress only stream should not pause ingress pauseIngress(HTTPTransaction *)148 void pauseIngress(HTTPTransaction* /* txn */) noexcept override { 149 VLOG(4) << __func__ 150 << " Ingress function called on egress-only stream, ignoring"; 151 } 152 153 // Egress only stream should not pause ingress resumeIngress(HTTPTransaction *)154 void resumeIngress(HTTPTransaction* /* txn */) noexcept override { 155 VLOG(4) << __func__ 156 << " Ingress function called on egress-only stream, ignoring"; 157 } 158 159 private: 160 hq::PushId pushId_; // The push id in context of which this stream is sent 161 }; // HQEgressPushStream 162 #ifdef _MSC_VER 163 #pragma warning(pop) 164 #endif 165 166 std::unordered_map<quic::StreamId, HQEgressPushStream> egressPushStreams_; 167 168 // Find an egress push stream 169 HQEgressPushStream* findEgressPushStream(quic::StreamId); 170 171 uint32_t numberOfEgressPushStreams() const; 172 173 HQEgressPushStream* FOLLY_NULLABLE 174 createEgressPushStream(hq::PushId pushId, 175 quic::StreamId streamId, 176 quic::StreamId parentStreamId); 177 178 HQStreamTransportBase* findPushStream(quic::StreamId id) override; 179 180 // Only need to search ingress push streams, so this is a no-op findPushStreams(std::unordered_set<HQStreamTransportBase * > & streams)181 void findPushStreams( 182 std::unordered_set<HQStreamTransportBase*>& streams) override { 183 for (auto& pstream : egressPushStreams_) { 184 streams.insert(&pstream.second); 185 } 186 } 187 188 bool erasePushStream(quic::StreamId streamId) override; 189 onNewPushStream(quic::StreamId,hq::PushId,size_t)190 void onNewPushStream(quic::StreamId /* pushStreamId */, 191 hq::PushId /* pushId */, 192 size_t /* toConsume */) override { 193 LOG(DFATAL) << "nope"; 194 } 195 196 // This is the current method of creating new push IDs. 197 hq::PushId createNewPushId(); 198 199 // Value of the next pushId, used for outgoing push transactions 200 hq::PushId nextAvailablePushId_{0}; 201 202 // Whether or not we have already received an onTransportReady callback. 203 bool transportReadyNotified_{false}; 204 }; 205 206 } // namespace proxygen 207