1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * All rights reserved. 4 * 5 * This source code is licensed under the BSD-style license found in the 6 * LICENSE file in the root directory of this source tree. 7 */ 8 9 #pragma once 10 11 #include <folly/container/EvictingCacheMap.h> 12 #include <folly/io/IOBufQueue.h> 13 #include <folly/io/async/AsyncSocket.h> 14 #include <folly/io/async/DelayedDestructionBase.h> 15 #include <folly/io/async/EventBase.h> 16 #include <folly/lang/Assume.h> 17 #include <proxygen/lib/http/codec/HQControlCodec.h> 18 #include <proxygen/lib/http/codec/HQUnidirectionalCodec.h> 19 #include <proxygen/lib/http/codec/HQUtils.h> 20 #include <proxygen/lib/http/codec/HTTP1xCodec.h> 21 #include <proxygen/lib/http/codec/HTTP2Framer.h> 22 #include <proxygen/lib/http/codec/HTTPChecks.h> 23 #include <proxygen/lib/http/codec/HTTPCodec.h> 24 #include <proxygen/lib/http/codec/HTTPCodecFilter.h> 25 #include <proxygen/lib/http/codec/HTTPSettings.h> 26 #include <proxygen/lib/http/session/HQByteEventTracker.h> 27 #include <proxygen/lib/http/session/HQStreamBase.h> 28 #include <proxygen/lib/http/session/HQUnidirectionalCallbacks.h> 29 #include <proxygen/lib/http/session/HTTPSessionBase.h> 30 #include <proxygen/lib/http/session/HTTPSessionController.h> 31 #include <proxygen/lib/http/session/HTTPTransaction.h> 32 #include <proxygen/lib/http/session/ServerPushLifecycle.h> 33 #include <proxygen/lib/utils/ConditionalGate.h> 34 #include <quic/api/QuicSocket.h> 35 #include <quic/common/BufUtil.h> 36 37 namespace proxygen { 38 39 class HTTPSessionController; 40 class HQSession; 41 class VersionUtils; 42 43 namespace hq { 44 class HQStreamCodec; 45 } 46 47 std::ostream& operator<<(std::ostream& os, const HQSession& session); 48 49 enum class HQVersion : uint8_t { 50 H1Q_FB_V1, // HTTP1.1 on each stream, no control stream 51 H1Q_FB_V2, // HTTP1.1 on each stream, control stream for GOAWAY 52 HQ, // The real McCoy 53 }; 54 55 extern const std::string kH3; 56 extern const std::string kHQ; 57 extern const std::string kH3FBCurrentDraft; 58 extern const std::string kH3CurrentDraft; 59 extern const std::string kH3LegacyDraft; 60 extern const std::string kHQCurrentDraft; 61 62 // Default Priority Node 63 extern const proxygen::http2::PriorityUpdate hqDefaultPriority; 64 65 using HQVersionType = std::underlying_type<HQVersion>::type; 66 67 constexpr uint8_t kMaxDatagramHeaderSize = 16; 68 // Maximum number of datagrams to buffer per stream 69 constexpr uint8_t kDefaultMaxBufferedDatagrams = 5; 70 // Maximum number of streams with datagrams buffered 71 constexpr uint8_t kMaxStreamsWithBufferedDatagrams = 10; 72 // Maximum number of priority updates received when stream is not available 73 constexpr uint8_t kMaxBufferedPriorityUpdates = 10; 74 75 /** 76 * Session-level protocol info. 77 */ 78 struct QuicProtocolInfo : public wangle::ProtocolInfo { 79 virtual ~QuicProtocolInfo() override = default; 80 81 folly::Optional<quic::ConnectionId> clientChosenDestConnectionId; 82 folly::Optional<quic::ConnectionId> clientConnectionId; 83 folly::Optional<quic::ConnectionId> serverConnectionId; 84 folly::Optional<quic::TransportSettings> transportSettings; 85 86 uint32_t ptoCount{0}; 87 uint32_t totalPTOCount{0}; 88 uint64_t totalTransportBytesSent{0}; 89 uint64_t totalTransportBytesRecvd{0}; 90 bool usedZeroRtt{false}; 91 }; 92 93 /** 94 * Stream level protocol info. Contains all data from 95 * the sessinon info, plus stream-specific information. 96 * This structure is owned by each individual stream, 97 * and is updated when requested. 98 * If instance of HQ Transport Stream outlives the corresponding QUIC socket, 99 * has been destroyed, this structure will contain the last snapshot 100 * of the data received from the QUIC socket. 101 * 102 * Usage: 103 * TransportInfo tinfo; 104 * txn.getCurrentTransportInfo(&tinfo); // txn is the HTTP transaction object 105 * auto streamInfo = dynamic_cast<QuicStreamProtocolInfo>(tinfo.protocolInfo); 106 * if (streamInfo) { 107 * // stream level AND connection level info is available 108 * }; 109 * auto connectionInfo = dynamic_cast<QuicProtocolInfo>(tinfo.protocolInfo); 110 * if (connectionInfo) { 111 * // ONLY connection level info is available. No stream level info. 112 * } 113 * 114 */ 115 struct QuicStreamProtocolInfo : public QuicProtocolInfo { 116 117 // Slicing assignment operator to initialize the per-stream protocol info 118 // with the values of the per-session protocol info. 119 QuicStreamProtocolInfo& operator=(const QuicProtocolInfo& other) { 120 if (this != &other) { 121 *(static_cast<QuicProtocolInfo*>(this)) = other; 122 } 123 return *this; 124 } 125 126 quic::QuicSocket::StreamTransportInfo streamTransportInfo; 127 // NOTE: when the control stream latency stats will be reintroduced, 128 // collect it here. 129 }; 130 131 class HQSession 132 : public quic::QuicSocket::ConnectionCallback 133 , public quic::QuicSocket::ReadCallback 134 , public quic::QuicSocket::WriteCallback 135 , public quic::QuicSocket::DeliveryCallback 136 , public quic::QuicSocket::DatagramCallback 137 , public HTTPSessionBase 138 , public folly::EventBase::LoopCallback 139 , public HQUnidirStreamDispatcher::Callback { 140 // Forward declarations 141 public: 142 class HQStreamTransportBase; 143 144 protected: 145 class HQStreamTransport; 146 147 private: 148 class HQControlStream; 149 class H1QFBV1VersionUtils; 150 class H1QFBV2VersionUtils; 151 class HQVersionUtils; 152 153 static constexpr uint8_t kMaxCodecStackDepth = 3; 154 155 public: getMaxAllowedPushId()156 folly::Optional<hq::PushId> getMaxAllowedPushId() { 157 return maxAllowedPushId_; 158 } 159 setServerPushLifecycleCallback(ServerPushLifecycleCallback * cb)160 void setServerPushLifecycleCallback(ServerPushLifecycleCallback* cb) { 161 serverPushLifecycleCb_ = cb; 162 } 163 164 class ConnectCallback { 165 public: ~ConnectCallback()166 virtual ~ConnectCallback() { 167 } 168 169 /** 170 * This function is not terminal of the callback, downstream should expect 171 * onReplaySafe to be invoked after connectSuccess. 172 * onReplaySafe is invoked right after connectSuccess if zero rtt is not 173 * attempted. 174 * In zero rtt case, onReplaySafe might never be invoked if e.g. server 175 * does not respond. 176 */ connectSuccess()177 virtual void connectSuccess() { 178 // Default empty implementation is provided in case downstream does not 179 // attempt zero rtt data. 180 } 181 182 /** 183 * Terminal callback. 184 */ 185 virtual void onReplaySafe() = 0; 186 187 /** 188 * Terminal callback. 189 */ 190 virtual void connectError( 191 std::pair<quic::QuicErrorCode, std::string> code) = 0; 192 193 /** 194 * Callback for the first time transport has processed a packet from peer. 195 */ onFirstPeerPacketProcessed()196 virtual void onFirstPeerPacketProcessed() { 197 } 198 }; 199 200 virtual ~HQSession(); 201 getType()202 HTTPTransaction::Transport::Type getType() const noexcept override { 203 return HTTPTransaction::Transport::Type::QUIC; 204 } 205 setSocket(std::shared_ptr<quic::QuicSocket> sock)206 void setSocket(std::shared_ptr<quic::QuicSocket> sock) noexcept { 207 sock_ = sock; 208 if (infoCallback_) { 209 infoCallback_->onCreate(*this); 210 } 211 212 if (quicInfo_) { 213 quicInfo_->transportSettings = sock_->getTransportSettings(); 214 } 215 } 216 setForceUpstream1_1(bool force)217 void setForceUpstream1_1(bool force) { 218 forceUpstream1_1_ = force; 219 } setStrictValidation(bool strictValidation)220 void setStrictValidation(bool strictValidation) { 221 strictValidation_ = strictValidation; 222 } 223 224 void setSessionStats(HTTPSessionStats* stats) override; 225 226 void onNewBidirectionalStream(quic::StreamId id) noexcept override; 227 228 void onNewUnidirectionalStream(quic::StreamId id) noexcept override; 229 230 void onStopSending(quic::StreamId id, 231 quic::ApplicationErrorCode error) noexcept override; 232 233 void onConnectionEnd() noexcept override; 234 235 void onConnectionSetupError( 236 std::pair<quic::QuicErrorCode, std::string> code) noexcept override; 237 238 void onConnectionError( 239 std::pair<quic::QuicErrorCode, std::string> code) noexcept override; 240 241 void onKnob(uint64_t knobSpace, uint64_t knobId, quic::Buf knobBlob) override; 242 243 // returns false in case of failure 244 bool onTransportReadyCommon() noexcept; 245 246 void onReplaySafe() noexcept override; 247 248 void onFlowControlUpdate(quic::StreamId id) noexcept override; 249 250 // quic::QuicSocket::ReadCallback 251 void readAvailable(quic::StreamId id) noexcept override; 252 253 void readError( 254 quic::StreamId id, 255 std::pair<quic::QuicErrorCode, folly::Optional<folly::StringPiece>> 256 error) noexcept override; 257 258 // quic::QuicSocket::WriteCallback 259 void onConnectionWriteReady(uint64_t maxToSend) noexcept override; 260 261 void onConnectionWriteError( 262 std::pair<quic::QuicErrorCode, folly::Optional<folly::StringPiece>> 263 error) noexcept override; 264 265 // quic::QuicSocket::DatagramCallback 266 void onDatagramsAvailable() noexcept override; 267 268 // Only for UpstreamSession 269 HTTPTransaction* newTransaction(HTTPTransaction::Handler* handler) override; 270 271 void startNow() override; 272 describe(std::ostream & os)273 void describe(std::ostream& os) const override { 274 using quic::operator<<; 275 os << "proto=" << alpn_; 276 auto clientCid = (sock_ && sock_->getClientConnectionId()) 277 ? *sock_->getClientConnectionId() 278 : quic::ConnectionId({0, 0, 0, 0}); 279 auto serverCid = (sock_ && sock_->getServerConnectionId()) 280 ? *sock_->getServerConnectionId() 281 : quic::ConnectionId({0, 0, 0, 0}); 282 if (direction_ == TransportDirection::DOWNSTREAM) { 283 os << ", UA=" << userAgent_ << ", client CID=" << clientCid 284 << ", server CID=" << serverCid << ", downstream=" << getPeerAddress() 285 << ", " << getLocalAddress() << "=local"; 286 } else { 287 os << ", client CID=" << clientCid << ", server CID=" << serverCid 288 << ", local=" << getLocalAddress() << ", " << getPeerAddress() 289 << "=upstream"; 290 } 291 os << ", drain=" << drainState_; 292 } 293 294 void onGoaway(uint64_t lastGoodStreamID, 295 ErrorCode code, 296 std::unique_ptr<folly::IOBuf> debugData = nullptr); 297 298 void onSettings(const SettingsList& settings); 299 300 void onPriority(quic::StreamId streamId, const HTTPPriority& pri); 301 void onPushPriority(hq::PushId pushId, const HTTPPriority& pri); 302 getTransport()303 folly::AsyncTransport* getTransport() override { 304 return nullptr; 305 } 306 getEventBase()307 folly::EventBase* getEventBase() const override { 308 if (sock_) { 309 return sock_->getEventBase(); 310 } 311 return nullptr; 312 } 313 getTransport()314 const folly::AsyncTransport* getTransport() const override { 315 return nullptr; 316 } 317 hasActiveTransactions()318 bool hasActiveTransactions() const override { 319 return getNumStreams() > 0; 320 } 321 getNumStreams()322 uint32_t getNumStreams() const override { 323 return getNumOutgoingStreams() + getNumIncomingStreams(); 324 } 325 getCodecProtocol()326 CodecProtocol getCodecProtocol() const override { 327 if (!versionUtils_) { 328 // return a default protocol before alpn is set 329 return CodecProtocol::HTTP_1_1; 330 } 331 return versionUtils_->getCodecProtocol(); 332 } 333 334 // for testing only getDispatcher()335 HQUnidirStreamDispatcher* getDispatcher() { 336 return &unidirectionalReadDispatcher_; 337 } 338 339 /** 340 * Set flow control properties on an already started session. 341 * QUIC requires both stream and connection flow control window sizes to be 342 * specified in the initial transport handshake. Specifying 343 * SETTINGS_INITIAL_WINDOW_SIZE in the SETTINGS frame is an error. 344 * 345 * @param initialReceiveWindow (unused) 346 * @param receiveStreamWindowSize per-stream receive window for NEW streams; 347 * @param receiveSessionWindowSize per-session receive window; 348 */ setFlowControl(size_t,size_t receiveStreamWindowSize,size_t receiveSessionWindowSize)349 void setFlowControl(size_t /* initialReceiveWindow */, 350 size_t receiveStreamWindowSize, 351 size_t receiveSessionWindowSize) override { 352 if (sock_) { 353 sock_->setConnectionFlowControlWindow(receiveSessionWindowSize); 354 } 355 receiveStreamWindowSize_ = (uint32_t)receiveStreamWindowSize; 356 HTTPSessionBase::setReadBufferLimit((uint32_t)receiveSessionWindowSize); 357 } 358 359 /** 360 * Set outgoing settings for this session 361 */ setEgressSettings(const SettingsList & settings)362 void setEgressSettings(const SettingsList& settings) override { 363 for (const auto& setting : settings) { 364 egressSettings_.setSetting(setting.id, setting.value); 365 } 366 const auto maxHeaderListSize = 367 egressSettings_.getSetting(SettingsId::MAX_HEADER_LIST_SIZE); 368 if (maxHeaderListSize) { 369 versionUtilsReady_.then([this, size = maxHeaderListSize->value] { 370 versionUtils_->setMaxUncompressed(size); 371 }); 372 } 373 auto datagramEnabled = egressSettings_.getSetting(SettingsId::_HQ_DATAGRAM); 374 // if enabling H3 datagrams check that the transport supports datagrams 375 if (datagramEnabled && datagramEnabled->value) { 376 datagramEnabled_ = true; 377 } 378 } 379 setMaxConcurrentIncomingStreams(uint32_t)380 void setMaxConcurrentIncomingStreams(uint32_t /*num*/) override { 381 // need transport API 382 } 383 384 /** 385 * Send a settings frame 386 */ 387 size_t sendSettings() override; 388 389 /** 390 * Causes a ping to be sent on the session. If the underlying protocol 391 * doesn't support pings, this will return 0. Otherwise, it will return 392 * the number of bytes written on the transport to send the ping. 393 */ sendPing()394 size_t sendPing() override { 395 sock_->sendPing(nullptr, std::chrono::milliseconds(0)); 396 return 0; 397 } 398 399 /** 400 * Sends a knob frame on the session. 401 */ sendKnob(uint64_t knobSpace,uint64_t knobId,quic::Buf knobBlob)402 folly::Expected<folly::Unit, quic::LocalErrorCode> sendKnob( 403 uint64_t knobSpace, uint64_t knobId, quic::Buf knobBlob) { 404 return sock_->setKnob(knobSpace, knobId, std::move(knobBlob)); 405 } 406 407 /** 408 * Sends a priority message on this session. If the underlying protocol 409 * doesn't support priority, this is a no-op. A new stream identifier will 410 * be selected and returned. 411 */ sendPriority(http2::PriorityUpdate)412 HTTPCodec::StreamID sendPriority(http2::PriorityUpdate /*pri*/) override { 413 return 0; 414 } 415 416 /** 417 * As above, but updates an existing priority node. Do not use for 418 * real nodes, prefer HTTPTransaction::changePriority. 419 */ sendPriority(HTTPCodec::StreamID,http2::PriorityUpdate)420 size_t sendPriority(HTTPCodec::StreamID /*id*/, 421 http2::PriorityUpdate /*pri*/) override { 422 return 0; 423 } 424 425 size_t sendPriority(HTTPCodec::StreamID id, HTTPPriority pri); 426 size_t sendPushPriority(hq::PushId pushId, HTTPPriority pri); 427 428 /** 429 * Get session-level transport info. 430 * NOTE: The protocolInfo will be set to connection-level pointer. 431 */ 432 bool getCurrentTransportInfo(wangle::TransportInfo* /*tinfo*/) override; 433 434 /** 435 * Get session level AND stream level transport info. 436 * NOTE: the protocolInfo will be set to stream-level pointer. 437 */ 438 bool getCurrentStreamTransportInfo(QuicStreamProtocolInfo* /*qspinfo*/, 439 quic::StreamId /*streamId*/); 440 connCloseByRemote()441 bool connCloseByRemote() override { 442 return false; 443 } 444 445 // From ManagedConnection 446 void timeoutExpired() noexcept override; 447 isBusy()448 bool isBusy() const override { 449 return getNumStreams() > 0; 450 } 451 void notifyPendingShutdown() override; 452 void closeWhenIdle() override; 453 void dropConnection(const std::string& errorMsg = "") override; dumpConnectionState(uint8_t)454 void dumpConnectionState(uint8_t /*loglevel*/) override { 455 } 456 457 virtual void injectTraceEventIntoAllTransactions(TraceEvent& event) override; 458 459 /* 460 * dropConnectionSync drops the connection immediately. 461 * This means that when invoked internally may need a destructor guard and 462 * the socket will be invalid after it is invoked. 463 * 464 * errorCode is passed to transport CLOSE_CONNNECTION frame 465 * 466 * proxygenError is delivered to open transactions 467 */ 468 void dropConnectionSync(std::pair<quic::QuicErrorCode, std::string> errorCode, 469 ProxygenError proxygenError); 470 471 // Invokes dropConnectionSync at the beginning of the next loopCallback 472 void dropConnectionAsync( 473 std::pair<quic::QuicErrorCode, std::string> errorCode, 474 ProxygenError proxygenError); 475 476 bool getCurrentTransportInfoWithoutUpdate( 477 wangle::TransportInfo* /*tinfo*/) const override; 478 setHeaderCodecStats(HeaderCodec::Stats * stats)479 void setHeaderCodecStats(HeaderCodec::Stats* stats) override { 480 versionUtilsReady_.then( 481 [this, stats] { versionUtils_->setHeaderCodecStats(stats); }); 482 } 483 enableDoubleGoawayDrain()484 void enableDoubleGoawayDrain() override { 485 } 486 487 // Upstream interface isReusable()488 bool isReusable() const override { 489 VLOG(4) << __func__ << " sess=" << *this; 490 return !isClosing(); 491 } 492 isClosing()493 bool isClosing() const override { 494 VLOG(4) << __func__ << " sess=" << *this; 495 return (drainState_ != DrainState::NONE || dropping_); 496 } 497 drain()498 void drain() override { 499 notifyPendingShutdown(); 500 } 501 getHTTPPriority(uint8_t)502 folly::Optional<const HTTPMessage::HTTP2Priority> getHTTPPriority( 503 uint8_t /*level*/) override { 504 return folly::none; 505 } 506 getQuicSocket()507 quic::QuicSocket* getQuicSocket() const { 508 return sock_.get(); 509 } 510 511 // Override HTTPSessionBase address getter functions getLocalAddress()512 const folly::SocketAddress& getLocalAddress() const noexcept override { 513 return sock_ && sock_->good() ? sock_->getLocalAddress() : localAddr_; 514 } 515 getPeerAddress()516 const folly::SocketAddress& getPeerAddress() const noexcept override { 517 return sock_ && sock_->good() ? sock_->getPeerAddress() : peerAddr_; 518 } 519 520 // Returns creation time point for logging of handshake duration getCreatedTime()521 const std::chrono::steady_clock::time_point& getCreatedTime() const { 522 return createTime_; 523 } 524 enablePingProbes(std::chrono::seconds,std::chrono::seconds,bool,bool)525 void enablePingProbes(std::chrono::seconds /*interval*/, 526 std::chrono::seconds /*timeout*/, 527 bool /*extendIntervalOnIngress*/, 528 bool /*immediate*/) override { 529 // TODO 530 } 531 532 protected: 533 // Finds any transport-like stream that has not been detached 534 // by quic stream id 535 HQStreamTransportBase* findNonDetachedStream(quic::StreamId streamId); 536 537 // Find any transport-like stream by quic stream id 538 HQStreamTransportBase* findStream(quic::StreamId streamId); 539 540 // Find any transport-like stream suitable for ingress (request/push-ingress) 541 HQStreamTransportBase* findIngressStream(quic::StreamId streamId, 542 bool includeDetached = false); 543 // Find any transport-like stream suitable for egress (request/push-egress) 544 HQStreamTransportBase* findEgressStream(quic::StreamId streamId, 545 bool includeDetached = false); 546 547 /** 548 * The following functions invoke a callback on all or on all non-detached 549 * request streams. It does an extra lookup per stream but it is safe. Note 550 * that if the callback *adds* streams, they will not get the callback. 551 */ invokeOnAllStreams(std::function<void (HQStreamTransportBase *)> fn)552 void invokeOnAllStreams(std::function<void(HQStreamTransportBase*)> fn) { 553 invokeOnStreamsImpl( 554 std::move(fn), 555 [this](quic::StreamId id) { return this->findStream(id); }, 556 true); 557 } 558 559 void invokeOnEgressStreams(std::function<void(HQStreamTransportBase*)> fn, 560 bool includeDetached = false) { 561 invokeOnStreamsImpl(std::move(fn), 562 [this, includeDetached](quic::StreamId id) { 563 return this->findEgressStream(id, includeDetached); 564 }); 565 } 566 567 void invokeOnIngressStreams(std::function<void(HQStreamTransportBase*)> fn, 568 bool includeDetached = false) { 569 invokeOnStreamsImpl( 570 std::move(fn), 571 [this, includeDetached](quic::StreamId id) { 572 return this->findIngressStream(id, includeDetached); 573 }, 574 true); 575 } 576 invokeOnNonDetachedStreams(std::function<void (HQStreamTransportBase *)> fn)577 void invokeOnNonDetachedStreams( 578 std::function<void(HQStreamTransportBase*)> fn) { 579 invokeOnStreamsImpl(std::move(fn), [this](quic::StreamId id) { 580 return this->findNonDetachedStream(id); 581 }); 582 } 583 584 virtual HQStreamTransportBase* findPushStream(quic::StreamId) = 0; 585 586 virtual void findPushStreams( 587 std::unordered_set<HQStreamTransportBase*>& streams) = 0; 588 589 // Apply the function on the streams found by the two locators. 590 // Note that same stream can be returned by a find-by-stream-id 591 // and find-by-push-id locators. 592 // This is mitigated by collecting the streams in an unordered set 593 // prior to application of the funtion 594 // Note that the function is allowed to delete a stream by invoking 595 // erase stream, but the locators are not allowed to do so. 596 // Note that neither the locators nor the function are allowed 597 // to call "invokeOnStreamsImpl" 598 void invokeOnStreamsImpl( 599 std::function<void(HQStreamTransportBase*)> fn, 600 std::function<HQStreamTransportBase*(quic::StreamId)> findByStreamIdFn, 601 bool includePush = false) { 602 DestructorGuard g(this); 603 std::unordered_set<HQStreamTransportBase*> streams; 604 streams.reserve(getNumStreams()); 605 606 for (const auto& txn : streams_) { 607 HQStreamTransportBase* pstream = findByStreamIdFn(txn.first); 608 if (pstream) { 609 streams.insert(pstream); 610 } 611 } 612 613 if (includePush) { 614 findPushStreams(streams); 615 } 616 617 for (HQStreamTransportBase* pstream : streams) { 618 CHECK(pstream); 619 fn(pstream); 620 } 621 } 622 623 // Erase the stream. Returns true if the stream 624 // has been erased 625 bool eraseStream(quic::StreamId); 626 627 virtual bool erasePushStream(quic::StreamId streamId) = 0; 628 resumeReadsForPushStream(quic::StreamId streamId)629 void resumeReadsForPushStream(quic::StreamId streamId) { 630 pendingProcessReadSet_.insert(streamId); 631 resumeReads(streamId); 632 } 633 634 // Find a control stream by type 635 HQControlStream* findControlStream(hq::UnidirectionalStreamType streamType); 636 637 // Find a control stream by stream id (either ingress or egress) 638 HQControlStream* findControlStream(quic::StreamId streamId); 639 createIngressPushStream(quic::StreamId,hq::PushId)640 virtual HQStreamTransportBase* createIngressPushStream(quic::StreamId, 641 hq::PushId) { 642 return nullptr; 643 } 644 eraseUnboundStream(HQStreamTransportBase *)645 virtual void eraseUnboundStream(HQStreamTransportBase*) { 646 } 647 newPushedTransaction(HTTPCodec::StreamID,HTTPTransaction::PushHandler *,ProxygenError *)648 virtual HTTPTransaction* newPushedTransaction( 649 HTTPCodec::StreamID, /* parentRequestStreamId */ 650 HTTPTransaction::PushHandler*, /* handler */ 651 ProxygenError*) { 652 return nullptr; 653 } 654 655 /* 656 * for HQ we need a read callback for unidirectional streams to read the 657 * stream type from the the wire to decide whether a stream is 658 * a control stream, a header codec/decoder stream or a push stream 659 * 660 * This part is now implemented in HQUnidirStreamDispatcher 661 */ 662 663 // Callback methods that are invoked by the dispatcher 664 void assignPeekCallback( 665 quic::StreamId /* id */, 666 hq::UnidirectionalStreamType /* type */, 667 size_t /* toConsume */, 668 quic::QuicSocket::PeekCallback* const /* cb */) override; 669 670 void assignReadCallback( 671 quic::StreamId /* id */, 672 hq::UnidirectionalStreamType /* type */, 673 size_t /* toConsume */, 674 quic::QuicSocket::ReadCallback* const /* cb */) override; 675 676 void rejectStream(quic::StreamId /* id */) override; 677 678 folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface( 679 uint64_t preface) override; 680 681 void controlStreamReadAvailable(quic::StreamId /* id */) override; 682 683 void controlStreamReadError( 684 quic::StreamId /* id */, 685 const HQUnidirStreamDispatcher::Callback::ReadError& /* err */) override; 686 687 /** 688 * HQSession is an HTTPSessionBase that uses QUIC as the underlying transport 689 * 690 * HQSession is an abstract base class and cannot be instantiated 691 * directly. If you want to handle requests and send responses (act as a 692 * server), construct a HQDownstreamSession. If you want to make 693 * requests and handle responses (act as a client), construct a 694 * HQUpstreamSession. 695 */ 696 HQSession(const std::chrono::milliseconds transactionsTimeout, 697 HTTPSessionController* controller, 698 proxygen::TransportDirection direction, 699 const wangle::TransportInfo& tinfo, 700 InfoCallback* sessionInfoCb, 701 folly::Function<void(HTTPCodecFilterChain& chain)> 702 /* codecFilterCallbackFn */ 703 = nullptr) HTTPSessionBase(folly::SocketAddress (),folly::SocketAddress (),controller,tinfo,sessionInfoCb,std::make_unique<HTTP1xCodec> (direction),WheelTimerInstance (),hq::kSessionStreamId)704 : HTTPSessionBase(folly::SocketAddress(), 705 folly::SocketAddress(), 706 controller, 707 tinfo, 708 sessionInfoCb, 709 std::make_unique<HTTP1xCodec>(direction), 710 WheelTimerInstance(), 711 hq::kSessionStreamId), 712 direction_(direction), 713 transactionsTimeout_(transactionsTimeout), 714 started_(false), 715 dropping_(false), 716 inLoopCallback_(false), 717 unidirectionalReadDispatcher_(*this, direction), 718 createTime_(std::chrono::steady_clock::now()) { 719 codec_.add<HTTPChecks>(); 720 // dummy, ingress, egress 721 codecStack_.reserve(kMaxCodecStackDepth); 722 codecStack_.emplace_back(nullptr, nullptr, nullptr); 723 724 attachToSessionController(); 725 nextEgressResults_.reserve(maxConcurrentIncomingStreams_); 726 quicInfo_ = std::make_shared<QuicProtocolInfo>(); 727 } 728 729 // EventBase::LoopCallback methods 730 void runLoopCallback() noexcept override; 731 732 /** 733 * Called by transactionTimeout if the transaction has no handler. 734 */ 735 virtual HTTPTransaction::Handler* getTransactionTimeoutHandler( 736 HTTPTransaction* txn) = 0; 737 738 /** 739 * Called by onHeadersComplete(). This function allows downstream and 740 * upstream to do any setup (like preparing a handler) when headers are 741 * first received from the remote side on a given transaction. 742 */ 743 virtual void setupOnHeadersComplete(HTTPTransaction* txn, 744 HTTPMessage* msg) = 0; 745 746 virtual void onConnectionErrorHandler( 747 std::pair<quic::QuicErrorCode, std::string> error) noexcept = 0; 748 749 void applySettings(const SettingsList& settings); 750 connectSuccess()751 virtual void connectSuccess() noexcept { 752 } 753 isPeerUniStream(quic::StreamId id)754 bool isPeerUniStream(quic::StreamId id) { 755 return sock_->isUnidirectionalStream(id) && 756 ((direction_ == TransportDirection::DOWNSTREAM && 757 sock_->isClientStream(id)) || 758 (direction_ == TransportDirection::UPSTREAM && 759 sock_->isServerStream(id))); 760 } 761 isSelfUniStream(quic::StreamId id)762 bool isSelfUniStream(quic::StreamId id) { 763 return sock_->isUnidirectionalStream(id) && 764 ((direction_ == TransportDirection::DOWNSTREAM && 765 sock_->isServerStream(id)) || 766 (direction_ == TransportDirection::UPSTREAM && 767 sock_->isClientStream(id))); 768 } 769 770 void abortStream(HTTPException::Direction dir, 771 quic::StreamId id, 772 HTTP3::ErrorCode err); 773 774 // Get extra HTTP headers we want to add to the HTTPMessage in sendHeaders. getExtraHeaders(const HTTPMessage &,quic::StreamId)775 virtual folly::Optional<HTTPHeaders> getExtraHeaders(const HTTPMessage&, 776 quic::StreamId) { 777 return folly::none; 778 } 779 780 proxygen::TransportDirection direction_; 781 std::chrono::milliseconds transactionsTimeout_; 782 TimePoint transportStart_; 783 784 std::shared_ptr<quic::QuicSocket> sock_; 785 786 // Callback pointer used for correctness testing. Not used 787 // for session logic. 788 ServerPushLifecycleCallback* serverPushLifecycleCb_{nullptr}; 789 790 private: 791 std::unique_ptr<HTTPCodec> createStreamCodec(quic::StreamId streamId); 792 793 // Creates a request stream. All streams that are not control streams 794 // or Push streams are request streams. 795 HQStreamTransport* createStreamTransport(quic::StreamId streamId); 796 797 bool createEgressControlStreams(); 798 HQControlStream* tryCreateIngressControlStream(quic::StreamId id, 799 uint64_t preface); 800 801 // Creates outgoing control stream. 802 bool createEgressControlStream(hq::UnidirectionalStreamType streamType); 803 804 // Creates incoming control stream 805 HQControlStream* createIngressControlStream( 806 quic::StreamId id, hq::UnidirectionalStreamType streamType); 807 cleanupUnboundPushStreams(std::vector<quic::StreamId> &)808 virtual void cleanupUnboundPushStreams(std::vector<quic::StreamId>&) { 809 } 810 811 // gets the ALPN from the transport and returns whether the protocol is 812 // supported. Drops the connection if not supported 813 bool getAndCheckApplicationProtocol(); 814 815 // Use ALPN to set the correct version utils strategy. 816 void setVersionUtils(); 817 818 // Used during 2-phased GOAWAY messages, and EOF sending. 819 void onDeliveryAck(quic::StreamId id, 820 uint64_t offset, 821 std::chrono::microseconds rtt) override; 822 823 void onCanceled(quic::StreamId id, uint64_t offset) override; 824 825 // helper functions for reads 826 void readRequestStream(quic::StreamId id) noexcept; 827 void readControlStream(HQControlStream* controlStream); 828 829 // Runs the codecs on all request streams that have received data 830 // during the last event loop 831 void processReadData(); 832 833 // Pausing reads prevents the read callback to be invoked on the stream 834 void resumeReads(quic::StreamId id); 835 836 // Resume all ingress transactions 837 void resumeReads(); 838 839 // Resuming the reads allows the read callback to be involved 840 void pauseReads(quic::StreamId id); 841 842 // Pause all ingress transactions 843 void pauseReads(); 844 845 void notifyEgressBodyBuffered(int64_t bytes); 846 847 // The max allowed push id value. Value folly::none indicates that 848 // a. For downstream session: MAX_PUSH_ID has not been received 849 // b. For upstream session: MAX_PUSH_ID has been explicitly set to none 850 // In both cases, maxAllowedPushId_ == folly::none means that no push id 851 // is allowed. Default to kEightByteLimit assuming this session will 852 // be using push. 853 folly::Optional<hq::PushId> maxAllowedPushId_{folly::none}; 854 855 // Schedule the loop callback. 856 // To keep this consistent with EventBase::runInLoop run in the next loop 857 // by default 858 void scheduleLoopCallback(bool thisIteration = false); 859 860 // helper functions for writes 861 uint64_t writeRequestStreams(uint64_t maxEgress) noexcept; 862 void scheduleWrite(); 863 void handleWriteError(HQStreamTransportBase* hqStream, 864 quic::QuicErrorCode err); 865 866 /** 867 * Handles the write to the socket and errors for a request stream. 868 * Returns the number of bytes written from data. 869 */ 870 template <typename WriteFunc, typename DataType> 871 size_t handleWrite(WriteFunc writeFunc, 872 HQStreamTransportBase* hqStream, 873 DataType dataType, 874 size_t dataChainLen, 875 bool sendEof); 876 877 /** 878 * Helper function to perform writes on a single request stream 879 * The first argument defines whether the implementation should 880 * call onWriteReady on the transaction to get data allocated 881 * in the write buffer. 882 * Returns the number of bytes written to the transport 883 */ 884 uint64_t requestStreamWriteImpl(HQStreamTransportBase* hqStream, 885 uint64_t maxEgress, 886 double ratio); 887 888 uint64_t writeControlStreams(uint64_t maxEgress); 889 uint64_t controlStreamWriteImpl(HQControlStream* ctrlStream, 890 uint64_t maxEgress); 891 void handleSessionError(HQStreamBase* stream, 892 hq::StreamDirection streamDir, 893 quic::QuicErrorCode err, 894 ProxygenError proxygenError); 895 896 void detachStreamTransport(HQStreamTransportBase* hqStream); 897 898 void drainImpl(); 899 900 void checkForShutdown(); 901 void onGoawayAck(); 902 quic::StreamId getGoawayStreamId(); 903 904 void errorOnTransactionId(quic::StreamId id, HTTPException ex); 905 906 /** 907 * Shared implementation of "findXXXstream" methods 908 */ 909 HQStreamTransportBase* findStreamImpl(quic::StreamId streamId, 910 bool includeEgress = true, 911 bool includeIngress = true, 912 bool includeDetached = true); 913 914 /** 915 * Shared implementation of "numberOfXXX" methods 916 */ 917 uint32_t countStreamsImpl(bool includeEgress = true, 918 bool includeIngress = true) const; 919 920 std::list<folly::AsyncTransport::ReplaySafetyCallback*> 921 waitingForReplaySafety_; 922 923 /** 924 * With HTTP/1.1 codecs, graceful shutdown happens when the session has sent 925 * and received a Connection: close header, and all streams have completed. 926 * 927 * The application can signal intent to drain by calling notifyPendingShutdown 928 * (or its alias, drain). The peer can signal intent to drain by including 929 * a Connection: close header. 930 * 931 * closeWhenIdle will bypass the requirement to send/receive Connection: 932 * close, and the socket will terminate as soon as the stream count reaches 0. 933 * 934 * dropConnection will forcibly close all streams and guarantee that the 935 * HQSession has been deleted before exiting. 936 * 937 * The intent is that an application will first notifyPendingShutdown() all 938 * open sessions. Then after some period of time, it will call closeWhenIdle. 939 * As a last resort, it will call dropConnection. 940 * 941 * Note we allow the peer to create streams after draining because of out 942 * of order delivery. 943 * 944 * drainState_ tracks the progress towards shutdown. 945 * 946 * NONE - no shutdown requested 947 * PENDING - shutdown requested but no Connection: close seen 948 * CLOSE_SENT - sent Connection: close but not received 949 * CLOSE_RECEIVED - received Connection: close but not sent 950 * DONE - sent and received Connection: close. 951 * 952 * NONE ---> PENDING ---> CLOSE_SENT --+--> DONE 953 * | | | 954 * +----------+-------> CLOSE_RECV --+ 955 * 956 * For sessions with a control stream shutdown is driven by GOAWAYs. 957 * Only the server can send GOAWAYs so the behavior is asymmetric between 958 * upstream and downstream 959 * 960 * NONE - no shutdown requested 961 * PENDING - shutdown requested but no GOAWAY sent/received yet 962 * FIRST_GOAWAY - first GOAWAY received/sent 963 * SECOND_GOAWAY - downstream only - second GOAWAY sent 964 * DONE - two GOAWAYs sent/received. can close when request streams are done 965 * 966 */ 967 enum DrainState : uint8_t { 968 NONE = 0, 969 PENDING = 1, 970 CLOSE_SENT = 2, 971 CLOSE_RECEIVED = 3, 972 FIRST_GOAWAY = 4, 973 SECOND_GOAWAY = 5, 974 DONE = 6 975 }; 976 977 DrainState drainState_{DrainState::NONE}; 978 bool started_ : 1; 979 bool dropping_ : 1; 980 bool inLoopCallback_ : 1; 981 folly::Optional< 982 std::pair<std::pair<quic::QuicErrorCode, std::string>, ProxygenError>> 983 dropInNextLoop_; 984 985 #ifdef _MSC_VER 986 #pragma warning(push) 987 #pragma warning(disable : 4250) // inherits 'proxygen::detail::..' via dominance 988 #endif 989 990 // A control stream is created as egress first, then the ingress counterpart 991 // is linked as soon as we read the stream preface on the associated stream 992 class HQControlStream 993 : public detail::composite::CSBidir 994 , public HQStreamBase 995 , public hq::HQUnidirectionalCodec::Callback 996 , public quic::QuicSocket::DeliveryCallback { 997 public: 998 HQControlStream() = delete; HQControlStream(HQSession & session,quic::StreamId egressStreamId,hq::UnidirectionalStreamType type)999 HQControlStream(HQSession& session, 1000 quic::StreamId egressStreamId, 1001 hq::UnidirectionalStreamType type) 1002 : detail::composite::CSBidir(egressStreamId, folly::none), 1003 HQStreamBase(session, session.codec_, type) { 1004 createEgressCodec(); 1005 } 1006 createEgressCodec()1007 void createEgressCodec() { 1008 CHECK(type_.has_value()); 1009 switch (*type_) { 1010 case hq::UnidirectionalStreamType::H1Q_CONTROL: 1011 case hq::UnidirectionalStreamType::CONTROL: 1012 realCodec_ = 1013 std::make_unique<hq::HQControlCodec>(getEgressStreamId(), 1014 session_.direction_, 1015 hq::StreamDirection::EGRESS, 1016 session_.egressSettings_, 1017 *type_); 1018 break; 1019 case hq::UnidirectionalStreamType::QPACK_ENCODER: 1020 case hq::UnidirectionalStreamType::QPACK_DECODER: 1021 // These are statically allocated in the session 1022 break; 1023 default: 1024 LOG(FATAL) 1025 << "Failed to create egress codec." 1026 << " unrecognized stream type=" << static_cast<uint64_t>(*type_); 1027 } 1028 } 1029 setIngressCodec(std::unique_ptr<hq::HQUnidirectionalCodec> codec)1030 void setIngressCodec(std::unique_ptr<hq::HQUnidirectionalCodec> codec) { 1031 ingressCodec_ = std::move(codec); 1032 } 1033 1034 void processReadData(); 1035 1036 // QuicSocket::DeliveryCallback 1037 void onDeliveryAck(quic::StreamId id, 1038 uint64_t offset, 1039 std::chrono::microseconds rtt) override; 1040 void onCanceled(quic::StreamId id, uint64_t offset) override; 1041 1042 // HTTPCodec::Callback onMessageBegin(HTTPCodec::StreamID,HTTPMessage *)1043 void onMessageBegin(HTTPCodec::StreamID /*stream*/, 1044 HTTPMessage* /*msg*/) override { 1045 LOG(FATAL) << __func__ << " called on a Control Stream."; 1046 } 1047 onHeadersComplete(HTTPCodec::StreamID,std::unique_ptr<HTTPMessage>)1048 void onHeadersComplete(HTTPCodec::StreamID /*stream*/, 1049 std::unique_ptr<HTTPMessage> /*msg*/) override { 1050 LOG(FATAL) << __func__ << " called on a Control Stream."; 1051 } 1052 onBody(HTTPCodec::StreamID,std::unique_ptr<folly::IOBuf>,uint16_t)1053 void onBody(HTTPCodec::StreamID /*stream*/, 1054 std::unique_ptr<folly::IOBuf> /*chain*/, 1055 uint16_t /*padding*/) override { 1056 LOG(FATAL) << __func__ << " called on a Control Stream."; 1057 } 1058 onTrailersComplete(HTTPCodec::StreamID,std::unique_ptr<HTTPHeaders>)1059 void onTrailersComplete( 1060 HTTPCodec::StreamID /*stream*/, 1061 std::unique_ptr<HTTPHeaders> /*trailers*/) override { 1062 LOG(FATAL) << __func__ << " called on a Control Stream."; 1063 } 1064 onMessageComplete(HTTPCodec::StreamID,bool)1065 void onMessageComplete(HTTPCodec::StreamID /*stream*/, 1066 bool /*upgrade*/) override { 1067 LOG(FATAL) << __func__ << " called on a Control Stream."; 1068 } 1069 1070 void onError(HTTPCodec::StreamID /*stream*/, 1071 const HTTPException& /*error*/, 1072 bool /* newTxn */ = false) override; 1073 1074 void onGoaway(uint64_t lastGoodStreamID, 1075 ErrorCode code, 1076 std::unique_ptr<folly::IOBuf> debugData = nullptr) override { 1077 session_.onGoaway(lastGoodStreamID, code, std::move(debugData)); 1078 } 1079 onSettings(const SettingsList & settings)1080 void onSettings(const SettingsList& settings) override { 1081 session_.onSettings(settings); 1082 } 1083 onPriority(HTTPCodec::StreamID id,const HTTPPriority & pri)1084 void onPriority(HTTPCodec::StreamID id, const HTTPPriority& pri) override { 1085 session_.onPriority(id, pri); 1086 } 1087 onPushPriority(HTTPCodec::StreamID id,const HTTPPriority & pri)1088 void onPushPriority(HTTPCodec::StreamID id, 1089 const HTTPPriority& pri) override { 1090 session_.onPushPriority(id, pri); 1091 } 1092 1093 std::unique_ptr<hq::HQUnidirectionalCodec> ingressCodec_; 1094 bool readEOF_{false}; 1095 }; // HQControlStream 1096 1097 public: 1098 class HQStreamTransportBase 1099 : public HQStreamBase 1100 , public HTTPTransaction::Transport 1101 , public HTTP2PriorityQueueBase 1102 , public quic::QuicSocket::DeliveryCallback { 1103 protected: 1104 HQStreamTransportBase( 1105 HQSession& session, 1106 TransportDirection direction, 1107 quic::StreamId streamId, 1108 uint32_t seqNo, 1109 const WheelTimerInstance& wheelTimer, 1110 HTTPSessionStats* stats = nullptr, 1111 http2::PriorityUpdate priority = hqDefaultPriority, 1112 folly::Optional<HTTPCodec::StreamID> parentTxnId = HTTPCodec::NoStream, 1113 folly::Optional<hq::UnidirectionalStreamType> type = folly::none); 1114 1115 void initCodec(std::unique_ptr<HTTPCodec> /* codec */, 1116 const std::string& /* where */); 1117 1118 void initIngress(const std::string& /* where */); 1119 getHTTPSessionBase()1120 HTTPSessionBase* getHTTPSessionBase() override { 1121 return &(getSession()); 1122 } 1123 1124 public: 1125 HQStreamTransportBase() = delete; 1126 hasCodec()1127 bool hasCodec() const { 1128 return hasCodec_; 1129 } 1130 hasIngress()1131 bool hasIngress() const { 1132 return hasIngress_; 1133 } 1134 1135 // process data in the read buffer, returns true if the codec is blocked 1136 bool processReadData(); 1137 1138 // Process data from QUIC onDataAvailable callback. 1139 void processPeekData( 1140 const folly::Range<quic::QuicSocket::PeekIterator>& peekData); 1141 1142 // QuicSocket::DeliveryCallback 1143 void onDeliveryAck(quic::StreamId id, 1144 uint64_t offset, 1145 std::chrono::microseconds rtt) override; 1146 1147 void onCanceled(quic::StreamId id, uint64_t offset) override; 1148 1149 // HTTPCodec::Callback methods 1150 void onMessageBegin(HTTPCodec::StreamID streamID, 1151 HTTPMessage* /* msg */) override; 1152 1153 void onPushMessageBegin(HTTPCodec::StreamID /* pushID */, 1154 HTTPCodec::StreamID /* parentTxnId */, 1155 HTTPMessage* /* msg */) override; 1156 onExMessageBegin(HTTPCodec::StreamID,HTTPCodec::StreamID,bool,HTTPMessage *)1157 void onExMessageBegin(HTTPCodec::StreamID /* streamID */, 1158 HTTPCodec::StreamID /* controlStream */, 1159 bool /* unidirectional */, 1160 HTTPMessage* /* msg */) override { 1161 LOG(ERROR) << "exMessage: txn=" << txn_ << " TODO"; 1162 } 1163 onPushPromiseHeadersComplete(hq::PushId,HTTPCodec::StreamID,std::unique_ptr<HTTPMessage>)1164 virtual void onPushPromiseHeadersComplete( 1165 hq::PushId /* pushID */, 1166 HTTPCodec::StreamID /* assoc streamID */, 1167 std::unique_ptr<HTTPMessage> /* msg */) { 1168 LOG(ERROR) << "push promise: txn=" << txn_ << " TODO"; 1169 } 1170 1171 void onHeadersComplete(HTTPCodec::StreamID streamID, 1172 std::unique_ptr<HTTPMessage> msg) override; 1173 onBody(HTTPCodec::StreamID,std::unique_ptr<folly::IOBuf> chain,uint16_t padding)1174 void onBody(HTTPCodec::StreamID /* streamID */, 1175 std::unique_ptr<folly::IOBuf> chain, 1176 uint16_t padding) override { 1177 VLOG(4) << __func__ << " txn=" << txn_; 1178 CHECK(chain); 1179 auto len = chain->computeChainDataLength(); 1180 if (session_.onBodyImpl(std::move(chain), len, padding, &txn_)) { 1181 session_.pauseReads(); 1182 }; 1183 } 1184 onChunkHeader(HTTPCodec::StreamID,size_t length)1185 void onChunkHeader(HTTPCodec::StreamID /* stream */, 1186 size_t length) override { 1187 VLOG(4) << __func__ << " txn=" << txn_; 1188 txn_.onIngressChunkHeader(length); 1189 } 1190 onChunkComplete(HTTPCodec::StreamID)1191 void onChunkComplete(HTTPCodec::StreamID /* stream */) override { 1192 VLOG(4) << __func__ << " txn=" << txn_; 1193 txn_.onIngressChunkComplete(); 1194 } 1195 onTrailersComplete(HTTPCodec::StreamID,std::unique_ptr<HTTPHeaders> trailers)1196 void onTrailersComplete(HTTPCodec::StreamID /* streamID */, 1197 std::unique_ptr<HTTPHeaders> trailers) override { 1198 VLOG(4) << __func__ << " txn=" << txn_; 1199 txn_.onIngressTrailers(std::move(trailers)); 1200 } 1201 onMessageComplete(HTTPCodec::StreamID,bool)1202 void onMessageComplete(HTTPCodec::StreamID /* streamID */, 1203 bool /* upgrade */) override { 1204 VLOG(4) << __func__ << " txn=" << txn_; 1205 // for 1xx responses (excluding 101) onMessageComplete may be called 1206 // more than once 1207 if (txn_.isUpstream() && txn_.extraResponseExpected()) { 1208 return; 1209 } 1210 if (session_.infoCallback_) { 1211 session_.infoCallback_->onRequestEnd(session_, 1212 txn_.getMaxDeferredSize()); 1213 } 1214 // Pause the parser, which will prevent more than one message from being 1215 // processed 1216 auto g = folly::makeGuard(setActiveCodec(__func__)); 1217 codecFilterChain->setParserPaused(true); 1218 eomGate_.set(EOMType::CODEC); 1219 } 1220 onIngressEOF()1221 void onIngressEOF() { 1222 // Can only call this once 1223 CHECK(!eomGate_.get(EOMType::TRANSPORT)); 1224 if (ingressError_) { 1225 // This codec has already errored, no need to give it more input 1226 return; 1227 } 1228 auto g = folly::makeGuard(setActiveCodec(__func__)); 1229 codecFilterChain->onIngressEOF(); 1230 eomGate_.set(EOMType::TRANSPORT); 1231 } 1232 1233 void onError(HTTPCodec::StreamID streamID, 1234 const HTTPException& error, 1235 bool newTxn) override; 1236 1237 // Invoked when we get a RST_STREAM from the transport 1238 void onResetStream(HTTP3::ErrorCode error, HTTPException ex); 1239 onAbort(HTTPCodec::StreamID,ErrorCode)1240 void onAbort(HTTPCodec::StreamID /* streamID */, 1241 ErrorCode /* code */) override { 1242 VLOG(4) << __func__ << " txn=" << txn_; 1243 // Can't really get here since no HQ codecs can produce aborts. 1244 // The entry point is onResetStream via readError() 1245 LOG(DFATAL) << "Unexpected abort"; 1246 } 1247 1248 void onFrameHeader(HTTPCodec::StreamID /* stream_id */, 1249 uint8_t /* flags */, 1250 uint64_t /* length */, 1251 uint64_t /* type */, 1252 uint16_t /* version */ = 0) override { 1253 VLOG(4) << __func__ << " txn=" << txn_; 1254 } 1255 1256 void onGoaway( 1257 uint64_t /* lastGoodStreamID */, 1258 ErrorCode /* code */, 1259 std::unique_ptr<folly::IOBuf> /* debugData */ = nullptr) override { 1260 VLOG(4) << __func__ << " txn=" << txn_; 1261 } 1262 onPingRequest(uint64_t)1263 void onPingRequest(uint64_t /* data */) override { 1264 VLOG(4) << __func__ << " txn=" << txn_; 1265 } 1266 onPingReply(uint64_t)1267 void onPingReply(uint64_t /* data */) override { 1268 // This method should not get called 1269 LOG(FATAL) << __func__ << " txn=" << txn_; 1270 } 1271 onWindowUpdate(HTTPCodec::StreamID,uint32_t)1272 void onWindowUpdate(HTTPCodec::StreamID /* stream */, 1273 uint32_t /* amount */) override { 1274 VLOG(4) << __func__ << " txn=" << txn_; 1275 } 1276 onSettings(const SettingsList &)1277 void onSettings(const SettingsList& /*settings*/) override { 1278 VLOG(4) << __func__ << " txn=" << txn_; 1279 } 1280 onSettingsAck()1281 void onSettingsAck() override { 1282 VLOG(4) << __func__ << " txn=" << txn_; 1283 } 1284 onPriority(HTTPCodec::StreamID,const HTTPMessage::HTTP2Priority &)1285 void onPriority(HTTPCodec::StreamID /* stream */, 1286 const HTTPMessage::HTTP2Priority& /* priority */) override { 1287 VLOG(4) << __func__ << " txn=" << txn_; 1288 } 1289 onNativeProtocolUpgrade(HTTPCodec::StreamID,CodecProtocol,const std::string &,HTTPMessage &)1290 bool onNativeProtocolUpgrade(HTTPCodec::StreamID /* stream */, 1291 CodecProtocol /* protocol */, 1292 const std::string& /* protocolString */, 1293 HTTPMessage& /* msg */) override { 1294 VLOG(4) << __func__ << " txn=" << txn_; 1295 return false; 1296 } 1297 numOutgoingStreams()1298 uint32_t numOutgoingStreams() const override { 1299 VLOG(4) << __func__ << " txn=" << txn_; 1300 return 0; 1301 } 1302 numIncomingStreams()1303 uint32_t numIncomingStreams() const override { 1304 VLOG(4) << __func__ << " txn=" << txn_; 1305 return 0; 1306 } 1307 1308 // HTTPTransaction::Transport methods 1309 1310 // For parity with H2, pause/resumeIngress now a no-op. All transactions 1311 // will pause when total buffered egress exceeds the configured limit, which 1312 // should be equal to the recv flow control window pauseIngress(HTTPTransaction *)1313 void pauseIngress(HTTPTransaction* /* txn */) noexcept override { 1314 VLOG(4) << __func__ << " txn=" << txn_; 1315 } 1316 resumeIngress(HTTPTransaction *)1317 void resumeIngress(HTTPTransaction* /* txn */) noexcept override { 1318 VLOG(4) << __func__ << " txn=" << txn_; 1319 } 1320 1321 void transactionTimeout(HTTPTransaction* /* txn */) noexcept override; 1322 1323 void sendHeaders(HTTPTransaction* txn, 1324 const HTTPMessage& headers, 1325 HTTPHeaderSize* size, 1326 bool includeEOM) noexcept override; 1327 1328 bool sendHeadersWithDelegate( 1329 HTTPTransaction* txn, 1330 const HTTPMessage& headers, 1331 HTTPHeaderSize* size, 1332 size_t* dataFrameHeaderSize, 1333 uint64_t contentLength, 1334 std::unique_ptr<DSRRequestSender> dsrSender) noexcept override; 1335 1336 size_t sendBody(HTTPTransaction* txn, 1337 std::unique_ptr<folly::IOBuf> body, 1338 bool includeEOM, 1339 bool trackLastByteFlushed) noexcept override; 1340 1341 size_t sendBody(HTTPTransaction* txn, 1342 const HTTPTransaction::BufferMeta& body, 1343 bool eom) noexcept override; 1344 1345 size_t sendChunkHeader(HTTPTransaction* txn, 1346 size_t length) noexcept override; 1347 1348 size_t sendChunkTerminator(HTTPTransaction* txn) noexcept override; 1349 1350 size_t sendEOM(HTTPTransaction* txn, 1351 const HTTPHeaders* trailers) noexcept override; 1352 1353 size_t sendAbort(HTTPTransaction* txn, 1354 ErrorCode statusCode) noexcept override; 1355 1356 size_t sendAbortImpl(HTTP3::ErrorCode errorCode, std::string errorMsg); 1357 1358 size_t sendPriority( 1359 HTTPTransaction* /* txn */, 1360 const http2::PriorityUpdate& /* pri */) noexcept override; 1361 size_t changePriority(HTTPTransaction* txn, 1362 HTTPPriority pri) noexcept override; 1363 sendWindowUpdate(HTTPTransaction *,uint32_t)1364 size_t sendWindowUpdate(HTTPTransaction* /* txn */, 1365 uint32_t /* bytes */) noexcept override { 1366 VLOG(4) << __func__ << " txn=" << txn_; 1367 CHECK(hasEgressStreamId()) 1368 << __func__ << " invoked on stream without egress"; 1369 return 0; 1370 } 1371 1372 // Send a push promise. Has different implementations in 1373 // request streams / push streams sendPushPromise(HTTPTransaction *,folly::Optional<hq::PushId>,const HTTPMessage &,HTTPHeaderSize *,bool)1374 virtual void sendPushPromise(HTTPTransaction* /* txn */, 1375 folly::Optional<hq::PushId> /* pushId */, 1376 const HTTPMessage& /* headers */, 1377 HTTPHeaderSize* /* outSize */, 1378 bool /* includeEOM */) { 1379 VLOG(4) << __func__ << " txn=" << txn_; 1380 CHECK(hasEgressStreamId()) 1381 << __func__ << " invoked on stream without egress"; 1382 } 1383 1384 void notifyPendingEgress() noexcept override; 1385 detach(HTTPTransaction *)1386 void detach(HTTPTransaction* /* txn */) noexcept override { 1387 VLOG(4) << __func__ << " txn=" << txn_; 1388 detached_ = true; 1389 session_.scheduleLoopCallback(); 1390 } 1391 void checkForDetach(); 1392 notifyIngressBodyProcessed(uint32_t bytes)1393 void notifyIngressBodyProcessed(uint32_t bytes) noexcept override { 1394 VLOG(4) << __func__ << " txn=" << txn_; 1395 if (session_.notifyBodyProcessed(bytes)) { 1396 session_.resumeReads(); 1397 } 1398 } 1399 notifyEgressBodyBuffered(int64_t bytes)1400 void notifyEgressBodyBuffered(int64_t bytes) noexcept override { 1401 session_.notifyEgressBodyBuffered(bytes); 1402 } 1403 getLocalAddress()1404 const folly::SocketAddress& getLocalAddress() const noexcept override { 1405 return session_.getLocalAddress(); 1406 } 1407 getPeerAddress()1408 const folly::SocketAddress& getPeerAddress() const noexcept override { 1409 return session_.getPeerAddress(); 1410 } 1411 describe(std::ostream & os)1412 void describe(std::ostream& os) const override { 1413 session_.describe(os); 1414 } 1415 getSetupTransportInfo()1416 const wangle::TransportInfo& getSetupTransportInfo() 1417 const noexcept override { 1418 VLOG(4) << __func__ << " txn=" << txn_; 1419 return session_.transportInfo_; 1420 } 1421 1422 bool getCurrentTransportInfo(wangle::TransportInfo* tinfo) override; 1423 getFlowControlInfo(HTTPTransaction::FlowControlInfo *)1424 void getFlowControlInfo( 1425 HTTPTransaction::FlowControlInfo* /*info*/) override { 1426 // Not implemented 1427 } 1428 1429 HTTPTransaction::Transport::Type getSessionType() const noexcept override; 1430 getCodec()1431 virtual const HTTPCodec& getCodec() const noexcept override { 1432 return HQStreamBase::getCodec(); 1433 } 1434 drain()1435 void drain() override { 1436 VLOG(4) << __func__ << " txn=" << txn_; 1437 } 1438 isDraining()1439 bool isDraining() const override { 1440 VLOG(4) << __func__ << " txn=" << txn_; 1441 return false; 1442 } 1443 1444 HTTPTransaction* newPushedTransaction( 1445 HTTPCodec::StreamID /* parentTxnId */, 1446 HTTPTransaction::PushHandler* /* handler */, 1447 ProxygenError* /* error */ = nullptr) noexcept override { 1448 LOG(FATAL) << __func__ << " Only available via request stream"; 1449 folly::assume_unreachable(); 1450 } 1451 newExTransaction(HTTPTransactionHandler *,HTTPCodec::StreamID,bool)1452 HTTPTransaction* newExTransaction( 1453 HTTPTransactionHandler* /* handler */, 1454 HTTPCodec::StreamID /* controlStream */, 1455 bool /* unidirectional */) noexcept override { 1456 VLOG(4) << __func__ << " txn=" << txn_; 1457 return nullptr; 1458 } 1459 getSecurityProtocol()1460 std::string getSecurityProtocol() const override { 1461 VLOG(4) << __func__ << " txn=" << txn_; 1462 return "quic/tls1.3"; 1463 } 1464 addWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback * callback)1465 void addWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback* 1466 callback) noexcept override { 1467 VLOG(4) << __func__ << " txn=" << txn_; 1468 if (session_.sock_->replaySafe()) { 1469 callback->onReplaySafe(); 1470 } else { 1471 session_.waitingForReplaySafety_.push_back(callback); 1472 } 1473 } 1474 removeWaitingForReplaySafety(folly::AsyncTransport::ReplaySafetyCallback * callback)1475 void removeWaitingForReplaySafety( 1476 folly::AsyncTransport::ReplaySafetyCallback* callback) noexcept 1477 override { 1478 VLOG(4) << __func__ << " txn=" << txn_; 1479 session_.waitingForReplaySafety_.remove(callback); 1480 } 1481 needToBlockForReplaySafety()1482 bool needToBlockForReplaySafety() const override { 1483 VLOG(4) << __func__ << " txn=" << txn_; 1484 return false; 1485 } 1486 getUnderlyingTransport()1487 const folly::AsyncTransport* getUnderlyingTransport() 1488 const noexcept override { 1489 VLOG(4) << __func__ << " txn=" << txn_; 1490 return nullptr; 1491 } 1492 isReplaySafe()1493 bool isReplaySafe() const override { 1494 return session_.isReplaySafe(); 1495 } 1496 setHTTP2PrioritiesEnabled(bool)1497 void setHTTP2PrioritiesEnabled(bool /* enabled */) override { 1498 } getHTTP2PrioritiesEnabled()1499 bool getHTTP2PrioritiesEnabled() const override { 1500 return false; 1501 } 1502 getHTTPPriority(uint8_t)1503 folly::Optional<const HTTPMessage::HTTP2Priority> getHTTPPriority( 1504 uint8_t /* pri */) override { 1505 VLOG(4) << __func__ << " txn=" << txn_; 1506 return HTTPMessage::HTTP2Priority(hqDefaultPriority.streamDependency, 1507 hqDefaultPriority.exclusive, 1508 hqDefaultPriority.weight); 1509 } 1510 getHTTPPriority()1511 folly::Optional<HTTPPriority> getHTTPPriority() override { 1512 if (session_.sock_ && hasStreamId()) { 1513 auto sp = session_.sock_->getStreamPriority(getStreamId()); 1514 if (sp) { 1515 return HTTPPriority(sp.value().level, sp.value().incremental); 1516 } 1517 } 1518 return folly::none; 1519 } 1520 getConnectionToken()1521 folly::Optional<HTTPTransaction::ConnectionToken> getConnectionToken() 1522 const noexcept override { 1523 return session_.connectionToken_; 1524 } 1525 1526 void generateGoaway(); 1527 1528 void trackEgressBodyDelivery(uint64_t bodyOffset) override; 1529 1530 /** 1531 * Returns whether or no we have any body bytes buffered in the stream, or 1532 * the txn has any body bytes buffered. 1533 */ 1534 size_t writeBufferSize() const; 1535 bool hasWriteBuffer() const; 1536 bool hasPendingBody() const; 1537 bool hasPendingEOM() const; 1538 bool hasPendingEgress() const; 1539 1540 /** 1541 * Adapter class for managing different enqueued state between 1542 * HTTPTransaction and HQStreamTransport. The decouples whether the 1543 * transaction thinks it is enqueued for egress (which impacts txn lifetime) 1544 * and whether the HQStreamTransport is enqueued (which impacts the 1545 * actual egress algorithm). Note all 4 states are possible. 1546 */ 1547 class HQPriHandle : public HTTP2PriorityQueueBase::BaseNode { 1548 public: init(HTTP2PriorityQueueBase::Handle handle)1549 void init(HTTP2PriorityQueueBase::Handle handle) { 1550 egressQueueHandle_ = handle; 1551 enqueued_ = handle->isEnqueued(); 1552 } 1553 getHandle()1554 HTTP2PriorityQueueBase::Handle FOLLY_NULLABLE getHandle() const { 1555 return egressQueueHandle_; 1556 } 1557 clearHandle()1558 void clearHandle() { 1559 egressQueueHandle_ = nullptr; 1560 } 1561 1562 // HQStreamTransport is enqueued isStreamTransportEnqueued()1563 bool isStreamTransportEnqueued() const { 1564 return egressQueueHandle_ ? egressQueueHandle_->isEnqueued() : false; 1565 } 1566 isTransactionEnqueued()1567 bool isTransactionEnqueued() const { 1568 return isEnqueued(); 1569 } 1570 setEnqueued(bool enqueued)1571 void setEnqueued(bool enqueued) { 1572 enqueued_ = enqueued; 1573 } 1574 isEnqueued()1575 bool isEnqueued() const override { 1576 return enqueued_; 1577 } 1578 1579 uint64_t calculateDepth(bool includeVirtual = true) const override { 1580 return egressQueueHandle_->calculateDepth(includeVirtual); 1581 } 1582 1583 private: 1584 HTTP2PriorityQueueBase::Handle egressQueueHandle_{nullptr}; 1585 bool enqueued_; 1586 }; 1587 addTransaction(HTTPCodec::StreamID id,http2::PriorityUpdate pri,HTTPTransaction * txn,bool permanent,uint64_t * depth)1588 HTTP2PriorityQueueBase::Handle addTransaction(HTTPCodec::StreamID id, 1589 http2::PriorityUpdate pri, 1590 HTTPTransaction* txn, 1591 bool permanent, 1592 uint64_t* depth) override { 1593 queueHandle_.init(session_.txnEgressQueue_.addTransaction( 1594 id, pri, txn, permanent, depth)); 1595 return &queueHandle_; 1596 } 1597 1598 // update the priority of an existing node updatePriority(HTTP2PriorityQueueBase::Handle handle,http2::PriorityUpdate pri,uint64_t * depth)1599 HTTP2PriorityQueueBase::Handle updatePriority( 1600 HTTP2PriorityQueueBase::Handle handle, 1601 http2::PriorityUpdate pri, 1602 uint64_t* depth) override { 1603 CHECK_EQ(handle, &queueHandle_); 1604 CHECK(queueHandle_.getHandle()); 1605 return session_.txnEgressQueue_.updatePriority( 1606 queueHandle_.getHandle(), pri, depth); 1607 } 1608 1609 // Remove the transaction from the priority tree removeTransaction(HTTP2PriorityQueueBase::Handle handle)1610 void removeTransaction(HTTP2PriorityQueueBase::Handle handle) override { 1611 CHECK_EQ(handle, &queueHandle_); 1612 CHECK(queueHandle_.getHandle()); 1613 session_.txnEgressQueue_.removeTransaction(queueHandle_.getHandle()); 1614 queueHandle_.clearHandle(); 1615 } 1616 1617 // Notify the queue when a transaction has egress signalPendingEgress(HTTP2PriorityQueueBase::Handle h)1618 void signalPendingEgress(HTTP2PriorityQueueBase::Handle h) override { 1619 CHECK_EQ(h, &queueHandle_); 1620 queueHandle_.setEnqueued(true); 1621 signalPendingEgressImpl(); 1622 } 1623 signalPendingEgressImpl()1624 void signalPendingEgressImpl() { 1625 auto flowControl = 1626 session_.sock_->getStreamFlowControl(getEgressStreamId()); 1627 if (!flowControl.hasError() && flowControl->sendWindowAvailable > 0) { 1628 session_.txnEgressQueue_.signalPendingEgress(queueHandle_.getHandle()); 1629 } else { 1630 VLOG(4) << "Delay pending egress signal on blocked txn=" << txn_; 1631 } 1632 } 1633 1634 // Notify the queue when a transaction no longer has egress clearPendingEgress(HTTP2PriorityQueueBase::Handle h)1635 void clearPendingEgress(HTTP2PriorityQueueBase::Handle h) override { 1636 CHECK_EQ(h, &queueHandle_); 1637 CHECK(queueHandle_.isTransactionEnqueued()); 1638 queueHandle_.setEnqueued(false); 1639 if (pendingEOM_ || hasWriteBuffer()) { 1640 // no-op 1641 // Only HQSession can clearPendingEgress for these cases 1642 return; 1643 } 1644 // The transaction has pending body data, but it decided to remove itself 1645 // from the egress queue since it's rate-limited 1646 if (queueHandle_.isStreamTransportEnqueued()) { 1647 session_.txnEgressQueue_.clearPendingEgress(queueHandle_.getHandle()); 1648 } 1649 } 1650 addPriorityNode(HTTPCodec::StreamID id,HTTPCodec::StreamID parent)1651 void addPriorityNode(HTTPCodec::StreamID id, 1652 HTTPCodec::StreamID parent) override { 1653 session_.txnEgressQueue_.addPriorityNode(id, parent); 1654 } 1655 1656 /** 1657 * How many egress bytes we committed to transport, both written and 1658 * skipped. 1659 */ streamEgressCommittedByteOffset()1660 uint64_t streamEgressCommittedByteOffset() const { 1661 return bytesWritten_; 1662 } 1663 1664 /** 1665 * streamEgressCommittedByteOffset() plus any pending bytes in the egress 1666 * queue. 1667 */ streamWriteByteOffset()1668 uint64_t streamWriteByteOffset() const { 1669 return streamEgressCommittedByteOffset() + writeBuf_.chainLength() + 1670 bufMeta_.length; 1671 } 1672 1673 void abortIngress(); 1674 1675 void abortEgress(bool checkForDetach); 1676 1677 void errorOnTransaction(ProxygenError err, const std::string& errorMsg); 1678 void errorOnTransaction(HTTPException ex); 1679 1680 bool wantsOnWriteReady(size_t canSend) const; 1681 1682 HQPriHandle queueHandle_; 1683 HTTPTransaction txn_; 1684 // need to send EOM 1685 bool pendingEOM_{false}; 1686 // have read EOF 1687 bool readEOF_{false}; 1688 bool hasCodec_{false}; 1689 bool hasIngress_{false}; 1690 bool detached_{false}; 1691 bool ingressError_{false}; 1692 bool hasHeaders_{false}; 1693 enum class EOMType { CODEC, TRANSPORT }; 1694 ConditionalGate<EOMType, 2> eomGate_; 1695 1696 folly::Optional<HTTPCodec::StreamID> codecStreamId_; 1697 1698 HQByteEventTracker byteEventTracker_; 1699 1700 // Stream + session protocol info 1701 std::shared_ptr<QuicStreamProtocolInfo> quicStreamProtocolInfo_; 1702 1703 // BufferMeta represents a buffer that isn't owned by this stream but a 1704 // remote entity. 1705 HTTPTransaction::BufferMeta bufMeta_; 1706 1707 void armStreamAckCb(uint64_t streamOffset); 1708 void armEgressHeadersAckCb(uint64_t streamOffset); 1709 void armEgressBodyAckCb(uint64_t streamOffset); resetEgressHeadersAckOffset()1710 void resetEgressHeadersAckOffset() { 1711 egressHeadersAckOffset_ = folly::none; 1712 } resetEgressBodyAckOffset(uint64_t offset)1713 void resetEgressBodyAckOffset(uint64_t offset) { 1714 egressBodyAckOffsets_.erase(offset); 1715 } 1716 numActiveDeliveryCallbacks()1717 uint64_t numActiveDeliveryCallbacks() const { 1718 return numActiveDeliveryCallbacks_; 1719 } 1720 1721 private: 1722 void updatePriority(const HTTPMessage& headers) noexcept; 1723 1724 // Return the old and new offset of the stream 1725 std::pair<uint64_t, uint64_t> generateHeadersCommon( 1726 quic::StreamId streamId, 1727 const HTTPMessage& headers, 1728 bool includeEOM, 1729 HTTPHeaderSize* size) noexcept; 1730 void coalesceEOM(size_t encodedBodySize); 1731 void handleHeadersAcked(uint64_t streamOffset); 1732 void handleBodyAcked(uint64_t streamOffset); 1733 void handleBodyCancelled(uint64_t streamOffset); 1734 // Egress headers offset. 1735 // This is updated every time we send headers. Needed for body delivery 1736 // callbacks to calculate body offset properly. 1737 uint64_t egressHeadersStreamOffset_{0}; 1738 folly::Optional<uint64_t> egressHeadersAckOffset_; 1739 std::unordered_set<uint64_t> egressBodyAckOffsets_; 1740 // Track number of armed QUIC delivery callbacks. 1741 uint64_t numActiveDeliveryCallbacks_{0}; 1742 1743 // Used to store last seen ingress push ID between 1744 // the invocations of onPushPromiseBegin / onHeadersComplete. 1745 // It is being reset by 1746 // - "onNewMessage" (in which case the push promise is being abandoned), 1747 // - "onPushMessageBegin" (which may be abandonned / duplicate message id) 1748 // - "onHeadersComplete" (not pending anymore) 1749 folly::Optional<hq::PushId> ingressPushId_; 1750 }; // HQStreamTransportBase 1751 1752 protected: 1753 class HQStreamTransport 1754 : public detail::singlestream::SSBidir 1755 , public HQStreamTransportBase { 1756 public: 1757 HQStreamTransport( 1758 HQSession& session, 1759 TransportDirection direction, 1760 quic::StreamId streamId, 1761 uint32_t seqNo, 1762 std::unique_ptr<HTTPCodec> codec, 1763 const WheelTimerInstance& wheelTimer, 1764 HTTPSessionStats* stats = nullptr, 1765 http2::PriorityUpdate priority = hqDefaultPriority, 1766 folly::Optional<HTTPCodec::StreamID> parentTxnId = HTTPCodec::NoStream) SSBidir(streamId)1767 : detail::singlestream::SSBidir(streamId), 1768 HQStreamTransportBase(session, 1769 direction, 1770 streamId, 1771 seqNo, 1772 wheelTimer, 1773 stats, 1774 priority, 1775 parentTxnId) { 1776 // Request streams are eagerly initialized 1777 initCodec(std::move(codec), __func__); 1778 initIngress(__func__); 1779 } 1780 1781 HTTPTransaction* newPushedTransaction( 1782 HTTPCodec::StreamID /* parentTxnId */, 1783 HTTPTransaction::PushHandler* /* handler */, 1784 ProxygenError* error = nullptr) noexcept override; 1785 1786 void sendPushPromise(HTTPTransaction* /* txn */, 1787 folly::Optional<hq::PushId> /* pushId */, 1788 const HTTPMessage& /* headers */, 1789 HTTPHeaderSize* /* outSize */, 1790 bool /* includeEOM */) override; 1791 1792 void onPushPromiseHeadersComplete( 1793 hq::PushId /* pushID */, 1794 HTTPCodec::StreamID /* assoc streamID */, 1795 std::unique_ptr<HTTPMessage> /* promise */) override; 1796 1797 uint16_t getDatagramSizeLimit() const noexcept override; 1798 bool sendDatagram(std::unique_ptr<folly::IOBuf> datagram) override; 1799 }; // HQStreamTransport 1800 1801 #ifdef _MSC_VER 1802 #pragma warning(pop) 1803 #endif 1804 1805 private: 1806 class VersionUtils { 1807 public: VersionUtils(HQSession & session)1808 explicit VersionUtils(HQSession& session) : session_(session) { 1809 } ~VersionUtils()1810 virtual ~VersionUtils() { 1811 } 1812 1813 // Checks whether it is allowed to process a new stream, depending on the 1814 // stream type, draining state/goaway. If not allowed, it resets the stream 1815 virtual CodecProtocol getCodecProtocol() const = 0; 1816 virtual bool checkNewStream(quic::StreamId id) = 0; 1817 virtual std::unique_ptr<HTTPCodec> createCodec(quic::StreamId id) = 0; 1818 virtual std::unique_ptr<hq::HQUnidirectionalCodec> createControlCodec( 1819 hq::UnidirectionalStreamType type, HQControlStream& controlStream) = 0; 1820 virtual folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface( 1821 uint64_t preface) = 0; 1822 virtual void sendGoaway() = 0; 1823 virtual void sendGoawayOnRequestStream( 1824 HQSession::HQStreamTransport& stream) = 0; 1825 virtual void headersComplete(HTTPMessage* msg) = 0; 1826 virtual void checkSendingGoaway(const HTTPMessage& msg) = 0; 1827 virtual size_t sendSettings() = 0; 1828 virtual bool createEgressControlStreams() = 0; 1829 virtual void applySettings(const SettingsList& settings) = 0; 1830 virtual void onSettings(const SettingsList& settings) = 0; 1831 virtual void readDataProcessed() = 0; 1832 virtual void abortStream(quic::StreamId id) = 0; setMaxUncompressed(uint64_t)1833 virtual void setMaxUncompressed(uint64_t) { 1834 } setHeaderCodecStats(HeaderCodec::Stats *)1835 virtual void setHeaderCodecStats(HeaderCodec::Stats*) { 1836 } onConnectionEnd()1837 virtual void onConnectionEnd() { 1838 } 1839 1840 HQSession& session_; 1841 }; 1842 1843 class H1QFBV1VersionUtils : public VersionUtils { 1844 public: H1QFBV1VersionUtils(HQSession & session)1845 explicit H1QFBV1VersionUtils(HQSession& session) : VersionUtils(session) { 1846 } 1847 getCodecProtocol()1848 CodecProtocol getCodecProtocol() const override { 1849 return CodecProtocol::HTTP_1_1; 1850 } 1851 1852 bool checkNewStream(quic::StreamId id) override; 1853 1854 std::unique_ptr<HTTPCodec> createCodec(quic::StreamId id) override; 1855 createControlCodec(hq::UnidirectionalStreamType,HQControlStream &)1856 std::unique_ptr<hq::HQUnidirectionalCodec> createControlCodec( 1857 hq::UnidirectionalStreamType, HQControlStream&) override { 1858 return nullptr; // no control streams 1859 } 1860 parseStreamPreface(uint64_t)1861 folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface( 1862 uint64_t /*preface*/) override { 1863 LOG(FATAL) << "H1Q does not use stream preface"; 1864 folly::assume_unreachable(); 1865 } 1866 1867 void sendGoaway() override; 1868 void sendGoawayOnRequestStream( 1869 HQSession::HQStreamTransport& stream) override; 1870 void headersComplete(HTTPMessage* msg) override; 1871 void checkSendingGoaway(const HTTPMessage& msg) override; 1872 sendSettings()1873 size_t sendSettings() override { 1874 return 0; 1875 } 1876 createEgressControlStreams()1877 bool createEgressControlStreams() override { 1878 return true; 1879 } 1880 applySettings(const SettingsList &)1881 void applySettings(const SettingsList& /*settings*/) override { 1882 } 1883 onSettings(const SettingsList &)1884 void onSettings(const SettingsList& /*settings*/) override { 1885 CHECK(false) << "SETTINGS frame received for h1q-fb-v1 protocol"; 1886 } 1887 readDataProcessed()1888 void readDataProcessed() override { 1889 } 1890 abortStream(quic::StreamId)1891 void abortStream(quic::StreamId /*id*/) override { 1892 } 1893 }; 1894 1895 class GoawayUtils { 1896 public: 1897 static bool checkNewStream(HQSession& session, quic::StreamId id); 1898 static void sendGoaway(HQSession& session); 1899 }; 1900 1901 class HQVersionUtils : public VersionUtils { 1902 public: HQVersionUtils(HQSession & session)1903 explicit HQVersionUtils(HQSession& session) : VersionUtils(session) { 1904 } 1905 getCodecProtocol()1906 CodecProtocol getCodecProtocol() const override { 1907 return CodecProtocol::HQ; 1908 } 1909 1910 std::unique_ptr<HTTPCodec> createCodec(quic::StreamId id) override; 1911 1912 std::unique_ptr<hq::HQUnidirectionalCodec> createControlCodec( 1913 hq::UnidirectionalStreamType type, 1914 HQControlStream& controlStream) override; 1915 checkNewStream(quic::StreamId id)1916 bool checkNewStream(quic::StreamId id) override { 1917 return GoawayUtils::checkNewStream(session_, id); 1918 } 1919 folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface( 1920 uint64_t preface) override; 1921 sendGoaway()1922 void sendGoaway() override { 1923 GoawayUtils::sendGoaway(session_); 1924 } 1925 sendGoawayOnRequestStream(HQSession::HQStreamTransport &)1926 void sendGoawayOnRequestStream( 1927 HQSession::HQStreamTransport& /*stream*/) override { 1928 } 1929 1930 void headersComplete(HTTPMessage* /*msg*/) override; 1931 checkSendingGoaway(const HTTPMessage &)1932 void checkSendingGoaway(const HTTPMessage& /*msg*/) override { 1933 } 1934 1935 size_t sendSettings() override; 1936 1937 bool createEgressControlStreams() override; 1938 1939 void applySettings(const SettingsList& settings) override; 1940 1941 void onSettings(const SettingsList& settings) override; 1942 1943 void readDataProcessed() override; 1944 1945 void abortStream(quic::StreamId /*id*/) override; 1946 setMaxUncompressed(uint64_t value)1947 void setMaxUncompressed(uint64_t value) override { 1948 qpackCodec_.setMaxUncompressed(value); 1949 } 1950 setHeaderCodecStats(HeaderCodec::Stats * stats)1951 void setHeaderCodecStats(HeaderCodec::Stats* stats) override { 1952 qpackCodec_.setStats(stats); 1953 } 1954 void onConnectionEnd() override; 1955 1956 private: 1957 QPACKCodec qpackCodec_; 1958 hq::HQStreamCodec* hqStreamCodecPtr_{nullptr}; 1959 }; 1960 1961 class H1QFBV2VersionUtils : public H1QFBV1VersionUtils { 1962 public: H1QFBV2VersionUtils(HQSession & session)1963 explicit H1QFBV2VersionUtils(HQSession& session) 1964 : H1QFBV1VersionUtils(session) { 1965 } 1966 checkNewStream(quic::StreamId id)1967 bool checkNewStream(quic::StreamId id) override { 1968 return GoawayUtils::checkNewStream(session_, id); 1969 } 1970 std::unique_ptr<hq::HQUnidirectionalCodec> createControlCodec( 1971 hq::UnidirectionalStreamType, HQControlStream&) override; 1972 1973 folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface( 1974 uint64_t preface) override; 1975 1976 bool createEgressControlStreams() override; 1977 onSettings(const SettingsList &)1978 void onSettings(const SettingsList& /*settings*/) override { 1979 session_.handleSessionError( 1980 CHECK_NOTNULL(session_.findControlStream( 1981 hq::UnidirectionalStreamType::H1Q_CONTROL)), 1982 hq::StreamDirection::INGRESS, 1983 HTTP3::ErrorCode::HTTP_GENERAL_PROTOCOL_ERROR, 1984 kErrorConnection); 1985 } 1986 sendGoaway()1987 void sendGoaway() override { 1988 GoawayUtils::sendGoaway(session_); 1989 } 1990 sendGoawayOnRequestStream(HQSession::HQStreamTransport &)1991 void sendGoawayOnRequestStream( 1992 HQSession::HQStreamTransport& /*stream*/) override { 1993 } 1994 headersComplete(HTTPMessage *)1995 void headersComplete(HTTPMessage* /*msg*/) override { 1996 } 1997 checkSendingGoaway(const HTTPMessage &)1998 void checkSendingGoaway(const HTTPMessage& /*msg*/) override { 1999 } 2000 }; 2001 getMaxConcurrentOutgoingStreamsRemote()2002 uint32_t getMaxConcurrentOutgoingStreamsRemote() const override { 2003 // need transport API 2004 return 100; 2005 } 2006 2007 using HTTPCodecPtr = std::unique_ptr<HTTPCodec>; 2008 struct CodecStackEntry { 2009 HTTPCodecPtr* codecPtr; 2010 HTTPCodecPtr codec; 2011 HTTPCodec::Callback* callback; CodecStackEntryCodecStackEntry2012 CodecStackEntry(HTTPCodecPtr* p, HTTPCodecPtr c, HTTPCodec::Callback* cb) 2013 : codecPtr(p), codec(std::move(c)), callback(cb) { 2014 } 2015 }; 2016 std::vector<CodecStackEntry> codecStack_; 2017 2018 /** 2019 * Container to hold the results of HTTP2PriorityQueue::nextEgress 2020 */ 2021 HTTP2PriorityQueue::NextEgressResult nextEgressResults_; 2022 2023 // Cleanup all pending streams. Invoked in session timeout 2024 size_t cleanupPendingStreams(); 2025 2026 // Remove all callbacks from a stream during cleanup 2027 void clearStreamCallbacks(quic::StreamId /* id */); 2028 2029 using ControlStreamsKey = std::pair<quic::StreamId, hq::StreamDirection>; 2030 std::unordered_map<hq::UnidirectionalStreamType, HQControlStream> 2031 controlStreams_; 2032 HQUnidirStreamDispatcher unidirectionalReadDispatcher_; 2033 2034 // Min Stream ID we haven't seen so far 2035 quic::StreamId minUnseenIncomingStreamId_{0}; 2036 // Maximum Stream ID that we are allowed to open, according to the remote 2037 quic::StreamId minPeerUnseenId_{hq::kMaxClientBidiStreamId}; 2038 // Whether SETTINGS have been received 2039 bool receivedSettings_{false}; 2040 2041 /** 2042 * The maximum number of concurrent transactions that this session's peer 2043 * may create. 2044 */ 2045 uint32_t maxConcurrentIncomingStreams_{100}; 2046 folly::Optional<uint32_t> receiveStreamWindowSize_; 2047 2048 uint64_t maxToSend_{0}; 2049 bool scheduledWrite_{false}; 2050 2051 bool forceUpstream1_1_{true}; 2052 // Default to false for now to match existing behavior 2053 bool strictValidation_{false}; 2054 bool datagramEnabled_{false}; 2055 2056 /** Reads in the current loop iteration */ 2057 uint16_t readsPerLoop_{0}; 2058 std::unordered_set<quic::StreamId> pendingProcessReadSet_; 2059 std::shared_ptr<QuicProtocolInfo> quicInfo_; 2060 folly::Optional<HQVersion> version_; 2061 std::string alpn_; 2062 2063 protected: 2064 HTTPSettings egressSettings_{ 2065 {SettingsId::HEADER_TABLE_SIZE, hq::kDefaultEgressHeaderTableSize}, 2066 {SettingsId::MAX_HEADER_LIST_SIZE, hq::kDefaultEgressMaxHeaderListSize}, 2067 {SettingsId::_HQ_QPACK_BLOCKED_STREAMS, 2068 hq::kDefaultEgressQpackBlockedStream}, 2069 }; 2070 HTTPSettings ingressSettings_; 2071 uint64_t minUnseenIncomingPushId_{0}; 2072 2073 std::unique_ptr<VersionUtils> versionUtils_; 2074 ReadyGate versionUtilsReady_; 2075 2076 // NOTE: introduce better decoupling between the streams 2077 // and the containing session, then remove the friendship. 2078 friend class HQStreamBase; 2079 2080 // To let the operator<< access DrainState which is private 2081 friend std::ostream& operator<<(std::ostream&, DrainState); 2082 2083 // Bidirectional transport streams 2084 std::unordered_map<quic::StreamId, HQStreamTransport> streams_; 2085 2086 // Buffer for datagrams waiting for a stream to be assigned to 2087 folly::EvictingCacheMap< 2088 quic::StreamId, 2089 folly::small_vector<std::unique_ptr<folly::IOBuf>, 2090 kDefaultMaxBufferedDatagrams, 2091 folly::small_vector_policy::NoHeap>> 2092 datagramsBuffer_{kMaxStreamsWithBufferedDatagrams}; 2093 2094 // Buffer for priority updates without an active stream 2095 folly::EvictingCacheMap<quic::StreamId, HTTPPriority> priorityUpdatesBuffer_{ 2096 kMaxBufferedPriorityUpdates}; 2097 2098 // Creation time (for handshake time tracking) 2099 std::chrono::steady_clock::time_point createTime_; 2100 2101 // Lookup maps for matching PushIds to StreamIds 2102 folly::F14FastMap<hq::PushId, quic::StreamId> pushIdToStreamId_; 2103 // Lookup maps for matching ingress push streams to push ids 2104 folly::F14FastMap<quic::StreamId, hq::PushId> streamIdToPushId_; 2105 std::string userAgent_; 2106 }; // HQSession 2107 2108 std::ostream& operator<<(std::ostream& os, HQSession::DrainState drainState); 2109 2110 } // namespace proxygen 2111