1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree.
7  */
8 
9 #pragma once
10 
11 #include <proxygen/lib/http/codec/HQControlCodec.h>
12 #include <proxygen/lib/http/codec/HQStreamCodec.h>
13 #include <proxygen/lib/http/codec/compress/QPACKCodec.h>
14 
15 namespace proxygen { namespace hq {
16 
17 class HQMultiCodec : public HQControlCodec {
18 
19  public:
HQMultiCodec(TransportDirection direction)20   explicit HQMultiCodec(TransportDirection direction)
21       : HQControlCodec(HTTPCodec::MaxStreamID,
22                        direction,
23                        StreamDirection::INGRESS, /* to match settings */
24                        ingressSettings_,
25                        UnidirectionalStreamType::CONTROL) {
26     VLOG(4) << "creating " << getTransportDirectionString(direction)
27             << " HQMultiCodec for stream " << streamId_;
28     // Has to be explicitly enabled
29     doubleGoaway_ = false;
30     minUnseenStreamID_ = 0;
31     minUnseenPushID_ = 0;
32   }
33 
34   ~HQMultiCodec() override = default;
35 
setControlStreamID(StreamID controlID)36   void setControlStreamID(StreamID controlID) {
37     streamId_ = controlID;
38   }
39 
setQPACKEncoderMaxDataFn(std::function<uint64_t ()> qpackEncoderMaxData)40   void setQPACKEncoderMaxDataFn(std::function<uint64_t()> qpackEncoderMaxData) {
41     qpackEncoderMaxDataFn_ = std::move(qpackEncoderMaxData);
42   }
43 
setCurrentStream(StreamID currentStream)44   bool setCurrentStream(StreamID currentStream) {
45     if (codecs_.find(currentStream) == codecs_.end()) {
46       return false;
47     }
48     currentStream_ = currentStream;
49     return true;
50   }
51 
addCodec(StreamID streamId)52   HTTPCodec& addCodec(StreamID streamId) {
53     if (transportDirection_ == TransportDirection::DOWNSTREAM &&
54         (streamId & 0x3) == 0 && streamId >= minUnseenStreamID_) {
55       // only bump for client initiated bidi streams, for now
56       minUnseenStreamID_ = streamId + 4;
57     }
58     auto res =
59         codecs_.emplace(streamId,
60                         std::make_unique<HQStreamCodec>(streamId,
61                                                         transportDirection_,
62                                                         qpackCodec_,
63                                                         qpackEncoderWriteBuf_,
64                                                         qpackDecoderWriteBuf_,
65                                                         qpackEncoderMaxDataFn_,
66                                                         settings_));
67     auto& codec = res.first->second;
68     codec->setCallback(callback_);
69     return *codec;
70   }
71 
removeCodec(StreamID streamId)72   void removeCodec(StreamID streamId) {
73     codecs_.erase(streamId);
74   }
75 
76   void setResumeHook(StreamID streamId,
77                      folly::Function<void()> hook = nullptr) {
78     getCodec(streamId).setResumeHook(std::move(hook));
79   }
80 
getQPACKCodec()81   QPACKCodec& getQPACKCodec() {
82     return qpackCodec_;
83   }
84 
getQPACKEncoderWriteBuf()85   folly::IOBufQueue& getQPACKEncoderWriteBuf() {
86     return qpackEncoderWriteBuf_;
87   }
88 
getQPACKDecoderWriteBuf()89   folly::IOBufQueue& getQPACKDecoderWriteBuf() {
90     return qpackDecoderWriteBuf_;
91   }
92 
encodeCancelStream(quic::StreamId id)93   void encodeCancelStream(quic::StreamId id) {
94     auto cancel = qpackCodec_.encodeCancelStream(id);
95     qpackDecoderWriteBuf_.append(std::move(cancel));
96   }
97 
encodeInsertCountIncrement()98   bool encodeInsertCountIncrement() {
99     auto ici = qpackCodec_.encodeInsertCountInc();
100     if (ici) {
101       qpackDecoderWriteBuf_.append(std::move(ici));
102       return true;
103     }
104     return false;
105   }
106 
setCallback(proxygen::HTTPCodec::Callback * callback)107   void setCallback(proxygen::HTTPCodec::Callback* callback) override {
108     HQControlCodec::setCallback(callback);
109     for (const auto& codec : codecs_) {
110       codec.second->setCallback(callback);
111     }
112   }
113 
getUserAgent()114   const std::string& getUserAgent() const override {
115     // TODO
116     static const std::string empty;
117     return empty;
118   }
119 
onIngress(const folly::IOBuf & buf)120   size_t onIngress(const folly::IOBuf& buf) override {
121     auto res = getCurrentCodec().onIngress(buf);
122     currentStream_ = HTTPCodec::MaxStreamID;
123     return res;
124   }
125 
onIngressEOF()126   void onIngressEOF() override {
127     getCurrentCodec().onIngressEOF();
128     currentStream_ = HTTPCodec::MaxStreamID;
129   }
130 
isReusable()131   bool isReusable() const override {
132     return !sentGoaway_;
133   }
134 
isParserPaused()135   bool isParserPaused() const override {
136     auto res = getCurrentCodec().isParserPaused();
137     currentStream_ = HTTPCodec::MaxStreamID;
138     return res;
139   }
140 
supportsParallelRequests()141   bool supportsParallelRequests() const override {
142     return true;
143   }
144 
generateConnectionPreface(folly::IOBufQueue &)145   size_t generateConnectionPreface(folly::IOBufQueue& /*writeBuf*/) override {
146     return 0;
147   }
148 
generateSettingsAck(folly::IOBufQueue &)149   size_t generateSettingsAck(folly::IOBufQueue& /*writeBuf*/) override {
150     return 0;
151   }
152 
153   // It is possible to make HQStreamCodec egress stateless and avoid the
154   // hashtable lookup in the generate* functions.
155   void generateHeader(
156       folly::IOBufQueue& writeBuf,
157       StreamID stream,
158       const HTTPMessage& msg,
159       bool eom = false,
160       HTTPHeaderSize* size = nullptr,
161       const folly::Optional<HTTPHeaders>& extraHeaders = folly::none) override {
162     getCodec(stream).generateHeader(
163         writeBuf, stream, msg, eom, size, extraHeaders);
164   }
165 
166   void generatePushPromise(folly::IOBufQueue& writeBuf,
167                            StreamID stream,
168                            const HTTPMessage& msg,
169                            StreamID pushID,
170                            bool eom = false,
171                            HTTPHeaderSize* size = nullptr) override {
172     getCodec(stream).generatePushPromise(
173         writeBuf, stream, msg, pushID, eom, size);
174   }
175 
generateBody(folly::IOBufQueue & writeBuf,StreamID stream,std::unique_ptr<folly::IOBuf> chain,folly::Optional<uint8_t> padding,bool eom)176   size_t generateBody(folly::IOBufQueue& writeBuf,
177                       StreamID stream,
178                       std::unique_ptr<folly::IOBuf> chain,
179                       folly::Optional<uint8_t> padding,
180                       bool eom) override {
181     return getCodec(stream).generateBody(
182         writeBuf, stream, std::move(chain), padding, eom);
183   }
184 
generateTrailers(folly::IOBufQueue & writeBuf,StreamID stream,const HTTPHeaders & trailers)185   size_t generateTrailers(folly::IOBufQueue& writeBuf,
186                           StreamID stream,
187                           const HTTPHeaders& trailers) override {
188     return getCodec(stream).generateTrailers(writeBuf, stream, trailers);
189   }
190 
generateEOM(folly::IOBufQueue & writeBuf,StreamID stream)191   size_t generateEOM(folly::IOBufQueue& writeBuf, StreamID stream) override {
192     return getCodec(stream).generateEOM(writeBuf, stream);
193   }
194 
getCompressionInfo()195   CompressionInfo getCompressionInfo() const override {
196     return qpackCodec_.getCompressionInfo();
197   }
198 
199   // HTTPCodec API
getDefaultWindowSize()200   uint32_t getDefaultWindowSize() const override {
201     return std::numeric_limits<uint32_t>::max();
202   }
203 
getEgressSettings()204   HTTPSettings* getEgressSettings() override {
205     return &egressSettings_;
206   }
207 
nextPushID()208   uint64_t nextPushID() {
209     CHECK_EQ(transportDirection_, TransportDirection::DOWNSTREAM);
210     return nextPushID_++;
211   }
212 
onIngressPushId(uint64_t pushId)213   void onIngressPushId(uint64_t pushId) {
214     minUnseenPushID_ = std::max(minUnseenPushID_, pushId + 1);
215   }
216 
217  protected:
getCurrentCodec()218   HTTPCodec& getCurrentCodec() {
219     return getCodec(currentStream_);
220   }
221 
getCurrentCodec()222   const HTTPCodec& getCurrentCodec() const {
223     return getCodec(currentStream_);
224   }
225 
getCodec(StreamID stream)226   HQStreamCodec& getCodec(StreamID stream) {
227     auto it = codecs_.find(stream);
228     CHECK(it != codecs_.end()) << "stream=" << stream;
229     return *it->second;
230   }
231 
getCodec(StreamID stream)232   const HQStreamCodec& getCodec(StreamID stream) const {
233     auto it = codecs_.find(stream);
234     CHECK(it != codecs_.end()) << "stream=" << stream;
235     return *it->second;
236   }
237 
238   HTTPSettings ingressSettings_;
239   // Turn peer's QPACK dynamic table on by default
240   HTTPSettings egressSettings_{
241       {SettingsId::HEADER_TABLE_SIZE, kDefaultEgressHeaderTableSize},
242       {SettingsId::MAX_HEADER_LIST_SIZE, kDefaultEgressMaxHeaderListSize},
243       {SettingsId::_HQ_QPACK_BLOCKED_STREAMS,
244        hq::kDefaultEgressQpackBlockedStream}};
245   mutable StreamID currentStream_{HTTPCodec::MaxStreamID};
246   folly::F14FastMap<StreamID, std::unique_ptr<HQStreamCodec>> codecs_;
247   QPACKCodec qpackCodec_;
248   folly::IOBufQueue qpackEncoderWriteBuf_{
249       folly::IOBufQueue::cacheChainLength()};
250   folly::IOBufQueue qpackDecoderWriteBuf_{
251       folly::IOBufQueue::cacheChainLength()};
252   std::function<uint64_t()> qpackEncoderMaxDataFn_;
253   uint64_t nextPushID_{0};
254 };
255 
256 }} // namespace proxygen::hq
257