1package quic 2 3import ( 4 "bytes" 5 "context" 6 "crypto/tls" 7 "errors" 8 "fmt" 9 "io" 10 "net" 11 "reflect" 12 "sync" 13 "sync/atomic" 14 "time" 15 16 "github.com/lucas-clemente/quic-go/internal/ackhandler" 17 "github.com/lucas-clemente/quic-go/internal/flowcontrol" 18 "github.com/lucas-clemente/quic-go/internal/handshake" 19 "github.com/lucas-clemente/quic-go/internal/logutils" 20 "github.com/lucas-clemente/quic-go/internal/protocol" 21 "github.com/lucas-clemente/quic-go/internal/qerr" 22 "github.com/lucas-clemente/quic-go/internal/utils" 23 "github.com/lucas-clemente/quic-go/internal/wire" 24 "github.com/lucas-clemente/quic-go/logging" 25) 26 27type unpacker interface { 28 Unpack(hdr *wire.Header, rcvTime time.Time, data []byte) (*unpackedPacket, error) 29} 30 31type streamGetter interface { 32 GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error) 33 GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error) 34} 35 36type streamManager interface { 37 GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error) 38 GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error) 39 OpenStream() (Stream, error) 40 OpenUniStream() (SendStream, error) 41 OpenStreamSync(context.Context) (Stream, error) 42 OpenUniStreamSync(context.Context) (SendStream, error) 43 AcceptStream(context.Context) (Stream, error) 44 AcceptUniStream(context.Context) (ReceiveStream, error) 45 DeleteStream(protocol.StreamID) error 46 UpdateLimits(*wire.TransportParameters) 47 HandleMaxStreamsFrame(*wire.MaxStreamsFrame) 48 CloseWithError(error) 49 ResetFor0RTT() 50 UseResetMaps() 51} 52 53type cryptoStreamHandler interface { 54 RunHandshake() 55 ChangeConnectionID(protocol.ConnectionID) 56 SetLargest1RTTAcked(protocol.PacketNumber) error 57 SetHandshakeConfirmed() 58 GetSessionTicket() ([]byte, error) 59 io.Closer 60 ConnectionState() handshake.ConnectionState 61} 62 63type packetInfo struct { 64 addr net.IP 65 ifIndex uint32 66} 67 68type receivedPacket struct { 69 buffer *packetBuffer 70 71 remoteAddr net.Addr 72 rcvTime time.Time 73 data []byte 74 75 ecn protocol.ECN 76 77 info *packetInfo 78} 79 80func (p *receivedPacket) Size() protocol.ByteCount { return protocol.ByteCount(len(p.data)) } 81 82func (p *receivedPacket) Clone() *receivedPacket { 83 return &receivedPacket{ 84 remoteAddr: p.remoteAddr, 85 rcvTime: p.rcvTime, 86 data: p.data, 87 buffer: p.buffer, 88 ecn: p.ecn, 89 info: p.info, 90 } 91} 92 93type sessionRunner interface { 94 Add(protocol.ConnectionID, packetHandler) bool 95 GetStatelessResetToken(protocol.ConnectionID) protocol.StatelessResetToken 96 Retire(protocol.ConnectionID) 97 Remove(protocol.ConnectionID) 98 ReplaceWithClosed(protocol.ConnectionID, packetHandler) 99 AddResetToken(protocol.StatelessResetToken, packetHandler) 100 RemoveResetToken(protocol.StatelessResetToken) 101} 102 103type handshakeRunner struct { 104 onReceivedParams func(*wire.TransportParameters) 105 onError func(error) 106 dropKeys func(protocol.EncryptionLevel) 107 onHandshakeComplete func() 108} 109 110func (r *handshakeRunner) OnReceivedParams(tp *wire.TransportParameters) { r.onReceivedParams(tp) } 111func (r *handshakeRunner) OnError(e error) { r.onError(e) } 112func (r *handshakeRunner) DropKeys(el protocol.EncryptionLevel) { r.dropKeys(el) } 113func (r *handshakeRunner) OnHandshakeComplete() { r.onHandshakeComplete() } 114 115type closeError struct { 116 err error 117 remote bool 118 immediate bool 119} 120 121type errCloseForRecreating struct { 122 nextPacketNumber protocol.PacketNumber 123 nextVersion protocol.VersionNumber 124} 125 126func (e *errCloseForRecreating) Error() string { 127 return "closing session in order to recreate it" 128} 129 130var sessionTracingID uint64 // to be accessed atomically 131func nextSessionTracingID() uint64 { return atomic.AddUint64(&sessionTracingID, 1) } 132 133// A Session is a QUIC session 134type session struct { 135 // Destination connection ID used during the handshake. 136 // Used to check source connection ID on incoming packets. 137 handshakeDestConnID protocol.ConnectionID 138 // Set for the client. Destination connection ID used on the first Initial sent. 139 origDestConnID protocol.ConnectionID 140 retrySrcConnID *protocol.ConnectionID // only set for the client (and if a Retry was performed) 141 142 srcConnIDLen int 143 144 perspective protocol.Perspective 145 version protocol.VersionNumber 146 config *Config 147 148 conn sendConn 149 sendQueue sender 150 151 streamsMap streamManager 152 connIDManager *connIDManager 153 connIDGenerator *connIDGenerator 154 155 rttStats *utils.RTTStats 156 157 cryptoStreamManager *cryptoStreamManager 158 sentPacketHandler ackhandler.SentPacketHandler 159 receivedPacketHandler ackhandler.ReceivedPacketHandler 160 retransmissionQueue *retransmissionQueue 161 framer framer 162 windowUpdateQueue *windowUpdateQueue 163 connFlowController flowcontrol.ConnectionFlowController 164 tokenStoreKey string // only set for the client 165 tokenGenerator *handshake.TokenGenerator // only set for the server 166 167 unpacker unpacker 168 frameParser wire.FrameParser 169 packer packer 170 mtuDiscoverer mtuDiscoverer // initialized when the handshake completes 171 172 oneRTTStream cryptoStream // only set for the server 173 cryptoStreamHandler cryptoStreamHandler 174 175 receivedPackets chan *receivedPacket 176 sendingScheduled chan struct{} 177 178 closeOnce sync.Once 179 // closeChan is used to notify the run loop that it should terminate 180 closeChan chan closeError 181 182 ctx context.Context 183 ctxCancel context.CancelFunc 184 handshakeCtx context.Context 185 handshakeCtxCancel context.CancelFunc 186 187 undecryptablePackets []*receivedPacket // undecryptable packets, waiting for a change in encryption level 188 undecryptablePacketsToProcess []*receivedPacket 189 190 clientHelloWritten <-chan *wire.TransportParameters 191 earlySessionReadyChan chan struct{} 192 handshakeCompleteChan chan struct{} // is closed when the handshake completes 193 handshakeComplete bool 194 handshakeConfirmed bool 195 196 receivedRetry bool 197 versionNegotiated bool 198 receivedFirstPacket bool 199 200 idleTimeout time.Duration 201 sessionCreationTime time.Time 202 // The idle timeout is set based on the max of the time we received the last packet... 203 lastPacketReceivedTime time.Time 204 // ... and the time we sent a new ack-eliciting packet after receiving a packet. 205 firstAckElicitingPacketAfterIdleSentTime time.Time 206 // pacingDeadline is the time when the next packet should be sent 207 pacingDeadline time.Time 208 209 peerParams *wire.TransportParameters 210 211 timer *utils.Timer 212 // keepAlivePingSent stores whether a keep alive PING is in flight. 213 // It is reset as soon as we receive a packet from the peer. 214 keepAlivePingSent bool 215 keepAliveInterval time.Duration 216 217 datagramQueue *datagramQueue 218 219 logID string 220 tracer logging.ConnectionTracer 221 logger utils.Logger 222} 223 224var ( 225 _ Session = &session{} 226 _ EarlySession = &session{} 227 _ streamSender = &session{} 228 deadlineSendImmediately = time.Time{}.Add(42 * time.Millisecond) // any value > time.Time{} and before time.Now() is fine 229) 230 231var newSession = func( 232 conn sendConn, 233 runner sessionRunner, 234 origDestConnID protocol.ConnectionID, 235 retrySrcConnID *protocol.ConnectionID, 236 clientDestConnID protocol.ConnectionID, 237 destConnID protocol.ConnectionID, 238 srcConnID protocol.ConnectionID, 239 statelessResetToken protocol.StatelessResetToken, 240 conf *Config, 241 tlsConf *tls.Config, 242 tokenGenerator *handshake.TokenGenerator, 243 enable0RTT bool, 244 tracer logging.ConnectionTracer, 245 tracingID uint64, 246 logger utils.Logger, 247 v protocol.VersionNumber, 248) quicSession { 249 s := &session{ 250 conn: conn, 251 config: conf, 252 handshakeDestConnID: destConnID, 253 srcConnIDLen: srcConnID.Len(), 254 tokenGenerator: tokenGenerator, 255 oneRTTStream: newCryptoStream(), 256 perspective: protocol.PerspectiveServer, 257 handshakeCompleteChan: make(chan struct{}), 258 tracer: tracer, 259 logger: logger, 260 version: v, 261 } 262 if origDestConnID != nil { 263 s.logID = origDestConnID.String() 264 } else { 265 s.logID = destConnID.String() 266 } 267 s.connIDManager = newConnIDManager( 268 destConnID, 269 func(token protocol.StatelessResetToken) { runner.AddResetToken(token, s) }, 270 runner.RemoveResetToken, 271 s.queueControlFrame, 272 ) 273 s.connIDGenerator = newConnIDGenerator( 274 srcConnID, 275 clientDestConnID, 276 func(connID protocol.ConnectionID) { runner.Add(connID, s) }, 277 runner.GetStatelessResetToken, 278 runner.Remove, 279 runner.Retire, 280 runner.ReplaceWithClosed, 281 s.queueControlFrame, 282 s.version, 283 ) 284 s.preSetup() 285 s.ctx, s.ctxCancel = context.WithCancel(context.WithValue(context.Background(), SessionTracingKey, tracingID)) 286 s.sentPacketHandler, s.receivedPacketHandler = ackhandler.NewAckHandler( 287 0, 288 getMaxPacketSize(s.conn.RemoteAddr()), 289 s.rttStats, 290 s.perspective, 291 s.tracer, 292 s.logger, 293 s.version, 294 ) 295 initialStream := newCryptoStream() 296 handshakeStream := newCryptoStream() 297 params := &wire.TransportParameters{ 298 InitialMaxStreamDataBidiLocal: protocol.ByteCount(s.config.InitialStreamReceiveWindow), 299 InitialMaxStreamDataBidiRemote: protocol.ByteCount(s.config.InitialStreamReceiveWindow), 300 InitialMaxStreamDataUni: protocol.ByteCount(s.config.InitialStreamReceiveWindow), 301 InitialMaxData: protocol.ByteCount(s.config.InitialConnectionReceiveWindow), 302 MaxIdleTimeout: s.config.MaxIdleTimeout, 303 MaxBidiStreamNum: protocol.StreamNum(s.config.MaxIncomingStreams), 304 MaxUniStreamNum: protocol.StreamNum(s.config.MaxIncomingUniStreams), 305 MaxAckDelay: protocol.MaxAckDelayInclGranularity, 306 AckDelayExponent: protocol.AckDelayExponent, 307 DisableActiveMigration: true, 308 StatelessResetToken: &statelessResetToken, 309 OriginalDestinationConnectionID: origDestConnID, 310 ActiveConnectionIDLimit: protocol.MaxActiveConnectionIDs, 311 InitialSourceConnectionID: srcConnID, 312 RetrySourceConnectionID: retrySrcConnID, 313 } 314 if s.config.EnableDatagrams { 315 params.MaxDatagramFrameSize = protocol.MaxDatagramFrameSize 316 } 317 if s.tracer != nil { 318 s.tracer.SentTransportParameters(params) 319 } 320 cs := handshake.NewCryptoSetupServer( 321 initialStream, 322 handshakeStream, 323 clientDestConnID, 324 conn.LocalAddr(), 325 conn.RemoteAddr(), 326 params, 327 &handshakeRunner{ 328 onReceivedParams: s.handleTransportParameters, 329 onError: s.closeLocal, 330 dropKeys: s.dropEncryptionLevel, 331 onHandshakeComplete: func() { 332 runner.Retire(clientDestConnID) 333 close(s.handshakeCompleteChan) 334 }, 335 }, 336 tlsConf, 337 enable0RTT, 338 s.rttStats, 339 tracer, 340 logger, 341 s.version, 342 ) 343 s.cryptoStreamHandler = cs 344 s.packer = newPacketPacker( 345 srcConnID, 346 s.connIDManager.Get, 347 initialStream, 348 handshakeStream, 349 s.sentPacketHandler, 350 s.retransmissionQueue, 351 s.RemoteAddr(), 352 cs, 353 s.framer, 354 s.receivedPacketHandler, 355 s.datagramQueue, 356 s.perspective, 357 s.version, 358 ) 359 s.unpacker = newPacketUnpacker(cs, s.version) 360 s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream, s.oneRTTStream) 361 return s 362} 363 364// declare this as a variable, such that we can it mock it in the tests 365var newClientSession = func( 366 conn sendConn, 367 runner sessionRunner, 368 destConnID protocol.ConnectionID, 369 srcConnID protocol.ConnectionID, 370 conf *Config, 371 tlsConf *tls.Config, 372 initialPacketNumber protocol.PacketNumber, 373 enable0RTT bool, 374 hasNegotiatedVersion bool, 375 tracer logging.ConnectionTracer, 376 tracingID uint64, 377 logger utils.Logger, 378 v protocol.VersionNumber, 379) quicSession { 380 s := &session{ 381 conn: conn, 382 config: conf, 383 origDestConnID: destConnID, 384 handshakeDestConnID: destConnID, 385 srcConnIDLen: srcConnID.Len(), 386 perspective: protocol.PerspectiveClient, 387 handshakeCompleteChan: make(chan struct{}), 388 logID: destConnID.String(), 389 logger: logger, 390 tracer: tracer, 391 versionNegotiated: hasNegotiatedVersion, 392 version: v, 393 } 394 s.connIDManager = newConnIDManager( 395 destConnID, 396 func(token protocol.StatelessResetToken) { runner.AddResetToken(token, s) }, 397 runner.RemoveResetToken, 398 s.queueControlFrame, 399 ) 400 s.connIDGenerator = newConnIDGenerator( 401 srcConnID, 402 nil, 403 func(connID protocol.ConnectionID) { runner.Add(connID, s) }, 404 runner.GetStatelessResetToken, 405 runner.Remove, 406 runner.Retire, 407 runner.ReplaceWithClosed, 408 s.queueControlFrame, 409 s.version, 410 ) 411 s.preSetup() 412 s.ctx, s.ctxCancel = context.WithCancel(context.WithValue(context.Background(), SessionTracingKey, tracingID)) 413 s.sentPacketHandler, s.receivedPacketHandler = ackhandler.NewAckHandler( 414 initialPacketNumber, 415 getMaxPacketSize(s.conn.RemoteAddr()), 416 s.rttStats, 417 s.perspective, 418 s.tracer, 419 s.logger, 420 s.version, 421 ) 422 initialStream := newCryptoStream() 423 handshakeStream := newCryptoStream() 424 params := &wire.TransportParameters{ 425 InitialMaxStreamDataBidiRemote: protocol.ByteCount(s.config.InitialStreamReceiveWindow), 426 InitialMaxStreamDataBidiLocal: protocol.ByteCount(s.config.InitialStreamReceiveWindow), 427 InitialMaxStreamDataUni: protocol.ByteCount(s.config.InitialStreamReceiveWindow), 428 InitialMaxData: protocol.ByteCount(s.config.InitialConnectionReceiveWindow), 429 MaxIdleTimeout: s.config.MaxIdleTimeout, 430 MaxBidiStreamNum: protocol.StreamNum(s.config.MaxIncomingStreams), 431 MaxUniStreamNum: protocol.StreamNum(s.config.MaxIncomingUniStreams), 432 MaxAckDelay: protocol.MaxAckDelayInclGranularity, 433 AckDelayExponent: protocol.AckDelayExponent, 434 DisableActiveMigration: true, 435 ActiveConnectionIDLimit: protocol.MaxActiveConnectionIDs, 436 InitialSourceConnectionID: srcConnID, 437 } 438 if s.config.EnableDatagrams { 439 params.MaxDatagramFrameSize = protocol.MaxDatagramFrameSize 440 } 441 if s.tracer != nil { 442 s.tracer.SentTransportParameters(params) 443 } 444 cs, clientHelloWritten := handshake.NewCryptoSetupClient( 445 initialStream, 446 handshakeStream, 447 destConnID, 448 conn.LocalAddr(), 449 conn.RemoteAddr(), 450 params, 451 &handshakeRunner{ 452 onReceivedParams: s.handleTransportParameters, 453 onError: s.closeLocal, 454 dropKeys: s.dropEncryptionLevel, 455 onHandshakeComplete: func() { close(s.handshakeCompleteChan) }, 456 }, 457 tlsConf, 458 enable0RTT, 459 s.rttStats, 460 tracer, 461 logger, 462 s.version, 463 ) 464 s.clientHelloWritten = clientHelloWritten 465 s.cryptoStreamHandler = cs 466 s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream, newCryptoStream()) 467 s.unpacker = newPacketUnpacker(cs, s.version) 468 s.packer = newPacketPacker( 469 srcConnID, 470 s.connIDManager.Get, 471 initialStream, 472 handshakeStream, 473 s.sentPacketHandler, 474 s.retransmissionQueue, 475 s.RemoteAddr(), 476 cs, 477 s.framer, 478 s.receivedPacketHandler, 479 s.datagramQueue, 480 s.perspective, 481 s.version, 482 ) 483 if len(tlsConf.ServerName) > 0 { 484 s.tokenStoreKey = tlsConf.ServerName 485 } else { 486 s.tokenStoreKey = conn.RemoteAddr().String() 487 } 488 if s.config.TokenStore != nil { 489 if token := s.config.TokenStore.Pop(s.tokenStoreKey); token != nil { 490 s.packer.SetToken(token.data) 491 } 492 } 493 return s 494} 495 496func (s *session) preSetup() { 497 s.sendQueue = newSendQueue(s.conn) 498 s.retransmissionQueue = newRetransmissionQueue(s.version) 499 s.frameParser = wire.NewFrameParser(s.config.EnableDatagrams, s.version) 500 s.rttStats = &utils.RTTStats{} 501 s.connFlowController = flowcontrol.NewConnectionFlowController( 502 protocol.ByteCount(s.config.InitialConnectionReceiveWindow), 503 protocol.ByteCount(s.config.MaxConnectionReceiveWindow), 504 s.onHasConnectionWindowUpdate, 505 s.rttStats, 506 s.logger, 507 ) 508 s.earlySessionReadyChan = make(chan struct{}) 509 s.streamsMap = newStreamsMap( 510 s, 511 s.newFlowController, 512 uint64(s.config.MaxIncomingStreams), 513 uint64(s.config.MaxIncomingUniStreams), 514 s.perspective, 515 s.version, 516 ) 517 s.framer = newFramer(s.streamsMap, s.version) 518 s.receivedPackets = make(chan *receivedPacket, protocol.MaxSessionUnprocessedPackets) 519 s.closeChan = make(chan closeError, 1) 520 s.sendingScheduled = make(chan struct{}, 1) 521 s.handshakeCtx, s.handshakeCtxCancel = context.WithCancel(context.Background()) 522 523 now := time.Now() 524 s.lastPacketReceivedTime = now 525 s.sessionCreationTime = now 526 527 s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame) 528 if s.config.EnableDatagrams { 529 s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger) 530 } 531} 532 533// run the session main loop 534func (s *session) run() error { 535 defer s.ctxCancel() 536 537 s.timer = utils.NewTimer() 538 539 go s.cryptoStreamHandler.RunHandshake() 540 go func() { 541 if err := s.sendQueue.Run(); err != nil { 542 s.destroyImpl(err) 543 } 544 }() 545 546 if s.perspective == protocol.PerspectiveClient { 547 select { 548 case zeroRTTParams := <-s.clientHelloWritten: 549 s.scheduleSending() 550 if zeroRTTParams != nil { 551 s.restoreTransportParameters(zeroRTTParams) 552 close(s.earlySessionReadyChan) 553 } 554 case closeErr := <-s.closeChan: 555 // put the close error back into the channel, so that the run loop can receive it 556 s.closeChan <- closeErr 557 } 558 } 559 560 var ( 561 closeErr closeError 562 sendQueueAvailable <-chan struct{} 563 ) 564 565runLoop: 566 for { 567 // Close immediately if requested 568 select { 569 case closeErr = <-s.closeChan: 570 break runLoop 571 case <-s.handshakeCompleteChan: 572 s.handleHandshakeComplete() 573 default: 574 } 575 576 s.maybeResetTimer() 577 578 var processedUndecryptablePacket bool 579 if len(s.undecryptablePacketsToProcess) > 0 { 580 queue := s.undecryptablePacketsToProcess 581 s.undecryptablePacketsToProcess = nil 582 for _, p := range queue { 583 if processed := s.handlePacketImpl(p); processed { 584 processedUndecryptablePacket = true 585 } 586 // Don't set timers and send packets if the packet made us close the session. 587 select { 588 case closeErr = <-s.closeChan: 589 break runLoop 590 default: 591 } 592 } 593 } else if !processedUndecryptablePacket { 594 select { 595 case closeErr = <-s.closeChan: 596 break runLoop 597 case <-s.timer.Chan(): 598 s.timer.SetRead() 599 // We do all the interesting stuff after the switch statement, so 600 // nothing to see here. 601 case <-s.sendingScheduled: 602 // We do all the interesting stuff after the switch statement, so 603 // nothing to see here. 604 case <-sendQueueAvailable: 605 case firstPacket := <-s.receivedPackets: 606 wasProcessed := s.handlePacketImpl(firstPacket) 607 // Don't set timers and send packets if the packet made us close the session. 608 select { 609 case closeErr = <-s.closeChan: 610 break runLoop 611 default: 612 } 613 if s.handshakeComplete { 614 // Now process all packets in the receivedPackets channel. 615 // Limit the number of packets to the length of the receivedPackets channel, 616 // so we eventually get a chance to send out an ACK when receiving a lot of packets. 617 numPackets := len(s.receivedPackets) 618 receiveLoop: 619 for i := 0; i < numPackets; i++ { 620 select { 621 case p := <-s.receivedPackets: 622 if processed := s.handlePacketImpl(p); processed { 623 wasProcessed = true 624 } 625 select { 626 case closeErr = <-s.closeChan: 627 break runLoop 628 default: 629 } 630 default: 631 break receiveLoop 632 } 633 } 634 } 635 // Only reset the timers if this packet was actually processed. 636 // This avoids modifying any state when handling undecryptable packets, 637 // which could be injected by an attacker. 638 if !wasProcessed { 639 continue 640 } 641 case <-s.handshakeCompleteChan: 642 s.handleHandshakeComplete() 643 } 644 } 645 646 now := time.Now() 647 if timeout := s.sentPacketHandler.GetLossDetectionTimeout(); !timeout.IsZero() && timeout.Before(now) { 648 // This could cause packets to be retransmitted. 649 // Check it before trying to send packets. 650 if err := s.sentPacketHandler.OnLossDetectionTimeout(); err != nil { 651 s.closeLocal(err) 652 } 653 } 654 655 if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() && !now.Before(keepAliveTime) { 656 // send a PING frame since there is no activity in the session 657 s.logger.Debugf("Sending a keep-alive PING to keep the connection alive.") 658 s.framer.QueueControlFrame(&wire.PingFrame{}) 659 s.keepAlivePingSent = true 660 } else if !s.handshakeComplete && now.Sub(s.sessionCreationTime) >= s.config.handshakeTimeout() { 661 s.destroyImpl(qerr.ErrHandshakeTimeout) 662 continue 663 } else { 664 idleTimeoutStartTime := s.idleTimeoutStartTime() 665 if (!s.handshakeComplete && now.Sub(idleTimeoutStartTime) >= s.config.HandshakeIdleTimeout) || 666 (s.handshakeComplete && now.Sub(idleTimeoutStartTime) >= s.idleTimeout) { 667 s.destroyImpl(qerr.ErrIdleTimeout) 668 continue 669 } 670 } 671 672 if s.sendQueue.WouldBlock() { 673 // The send queue is still busy sending out packets. 674 // Wait until there's space to enqueue new packets. 675 sendQueueAvailable = s.sendQueue.Available() 676 continue 677 } 678 if err := s.sendPackets(); err != nil { 679 s.closeLocal(err) 680 } 681 if s.sendQueue.WouldBlock() { 682 sendQueueAvailable = s.sendQueue.Available() 683 } else { 684 sendQueueAvailable = nil 685 } 686 } 687 688 s.handleCloseError(&closeErr) 689 if e := (&errCloseForRecreating{}); !errors.As(closeErr.err, &e) && s.tracer != nil { 690 s.tracer.Close() 691 } 692 s.logger.Infof("Connection %s closed.", s.logID) 693 s.cryptoStreamHandler.Close() 694 s.sendQueue.Close() 695 s.timer.Stop() 696 return closeErr.err 697} 698 699// blocks until the early session can be used 700func (s *session) earlySessionReady() <-chan struct{} { 701 return s.earlySessionReadyChan 702} 703 704func (s *session) HandshakeComplete() context.Context { 705 return s.handshakeCtx 706} 707 708func (s *session) Context() context.Context { 709 return s.ctx 710} 711 712func (s *session) supportsDatagrams() bool { 713 return s.peerParams.MaxDatagramFrameSize != protocol.InvalidByteCount 714} 715 716func (s *session) ConnectionState() ConnectionState { 717 return ConnectionState{ 718 TLS: s.cryptoStreamHandler.ConnectionState(), 719 SupportsDatagrams: s.supportsDatagrams(), 720 } 721} 722 723// Time when the next keep-alive packet should be sent. 724// It returns a zero time if no keep-alive should be sent. 725func (s *session) nextKeepAliveTime() time.Time { 726 if !s.config.KeepAlive || s.keepAlivePingSent || !s.firstAckElicitingPacketAfterIdleSentTime.IsZero() { 727 return time.Time{} 728 } 729 return s.lastPacketReceivedTime.Add(s.keepAliveInterval) 730} 731 732func (s *session) maybeResetTimer() { 733 var deadline time.Time 734 if !s.handshakeComplete { 735 deadline = utils.MinTime( 736 s.sessionCreationTime.Add(s.config.handshakeTimeout()), 737 s.idleTimeoutStartTime().Add(s.config.HandshakeIdleTimeout), 738 ) 739 } else { 740 if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() { 741 deadline = keepAliveTime 742 } else { 743 deadline = s.idleTimeoutStartTime().Add(s.idleTimeout) 744 } 745 } 746 if s.handshakeConfirmed && !s.config.DisablePathMTUDiscovery { 747 if probeTime := s.mtuDiscoverer.NextProbeTime(); !probeTime.IsZero() { 748 deadline = utils.MinTime(deadline, probeTime) 749 } 750 } 751 752 if ackAlarm := s.receivedPacketHandler.GetAlarmTimeout(); !ackAlarm.IsZero() { 753 deadline = utils.MinTime(deadline, ackAlarm) 754 } 755 if lossTime := s.sentPacketHandler.GetLossDetectionTimeout(); !lossTime.IsZero() { 756 deadline = utils.MinTime(deadline, lossTime) 757 } 758 if !s.pacingDeadline.IsZero() { 759 deadline = utils.MinTime(deadline, s.pacingDeadline) 760 } 761 762 s.timer.Reset(deadline) 763} 764 765func (s *session) idleTimeoutStartTime() time.Time { 766 return utils.MaxTime(s.lastPacketReceivedTime, s.firstAckElicitingPacketAfterIdleSentTime) 767} 768 769func (s *session) handleHandshakeComplete() { 770 s.handshakeComplete = true 771 s.handshakeCompleteChan = nil // prevent this case from ever being selected again 772 defer s.handshakeCtxCancel() 773 // Once the handshake completes, we have derived 1-RTT keys. 774 // There's no point in queueing undecryptable packets for later decryption any more. 775 s.undecryptablePackets = nil 776 777 s.connIDManager.SetHandshakeComplete() 778 s.connIDGenerator.SetHandshakeComplete() 779 780 if s.perspective == protocol.PerspectiveClient { 781 s.applyTransportParameters() 782 return 783 } 784 785 s.handleHandshakeConfirmed() 786 787 ticket, err := s.cryptoStreamHandler.GetSessionTicket() 788 if err != nil { 789 s.closeLocal(err) 790 } 791 if ticket != nil { 792 s.oneRTTStream.Write(ticket) 793 for s.oneRTTStream.HasData() { 794 s.queueControlFrame(s.oneRTTStream.PopCryptoFrame(protocol.MaxPostHandshakeCryptoFrameSize)) 795 } 796 } 797 token, err := s.tokenGenerator.NewToken(s.conn.RemoteAddr()) 798 if err != nil { 799 s.closeLocal(err) 800 } 801 s.queueControlFrame(&wire.NewTokenFrame{Token: token}) 802 s.queueControlFrame(&wire.HandshakeDoneFrame{}) 803} 804 805func (s *session) handleHandshakeConfirmed() { 806 s.handshakeConfirmed = true 807 s.sentPacketHandler.SetHandshakeConfirmed() 808 s.cryptoStreamHandler.SetHandshakeConfirmed() 809 810 if !s.config.DisablePathMTUDiscovery { 811 maxPacketSize := s.peerParams.MaxUDPPayloadSize 812 if maxPacketSize == 0 { 813 maxPacketSize = protocol.MaxByteCount 814 } 815 maxPacketSize = utils.MinByteCount(maxPacketSize, protocol.MaxPacketBufferSize) 816 s.mtuDiscoverer = newMTUDiscoverer( 817 s.rttStats, 818 getMaxPacketSize(s.conn.RemoteAddr()), 819 maxPacketSize, 820 func(size protocol.ByteCount) { 821 s.sentPacketHandler.SetMaxDatagramSize(size) 822 s.packer.SetMaxPacketSize(size) 823 }, 824 ) 825 } 826} 827 828func (s *session) handlePacketImpl(rp *receivedPacket) bool { 829 s.sentPacketHandler.ReceivedBytes(rp.Size()) 830 831 if wire.IsVersionNegotiationPacket(rp.data) { 832 s.handleVersionNegotiationPacket(rp) 833 return false 834 } 835 836 var counter uint8 837 var lastConnID protocol.ConnectionID 838 var processed bool 839 data := rp.data 840 p := rp 841 for len(data) > 0 { 842 if counter > 0 { 843 p = p.Clone() 844 p.data = data 845 } 846 847 hdr, packetData, rest, err := wire.ParsePacket(p.data, s.srcConnIDLen) 848 if err != nil { 849 if s.tracer != nil { 850 dropReason := logging.PacketDropHeaderParseError 851 if err == wire.ErrUnsupportedVersion { 852 dropReason = logging.PacketDropUnsupportedVersion 853 } 854 s.tracer.DroppedPacket(logging.PacketTypeNotDetermined, protocol.ByteCount(len(data)), dropReason) 855 } 856 s.logger.Debugf("error parsing packet: %s", err) 857 break 858 } 859 860 if hdr.IsLongHeader && hdr.Version != s.version { 861 if s.tracer != nil { 862 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), protocol.ByteCount(len(data)), logging.PacketDropUnexpectedVersion) 863 } 864 s.logger.Debugf("Dropping packet with version %x. Expected %x.", hdr.Version, s.version) 865 break 866 } 867 868 if counter > 0 && !hdr.DestConnectionID.Equal(lastConnID) { 869 if s.tracer != nil { 870 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), protocol.ByteCount(len(data)), logging.PacketDropUnknownConnectionID) 871 } 872 s.logger.Debugf("coalesced packet has different destination connection ID: %s, expected %s", hdr.DestConnectionID, lastConnID) 873 break 874 } 875 lastConnID = hdr.DestConnectionID 876 877 if counter > 0 { 878 p.buffer.Split() 879 } 880 counter++ 881 882 // only log if this actually a coalesced packet 883 if s.logger.Debug() && (counter > 1 || len(rest) > 0) { 884 s.logger.Debugf("Parsed a coalesced packet. Part %d: %d bytes. Remaining: %d bytes.", counter, len(packetData), len(rest)) 885 } 886 p.data = packetData 887 if wasProcessed := s.handleSinglePacket(p, hdr); wasProcessed { 888 processed = true 889 } 890 data = rest 891 } 892 p.buffer.MaybeRelease() 893 return processed 894} 895 896func (s *session) handleSinglePacket(p *receivedPacket, hdr *wire.Header) bool /* was the packet successfully processed */ { 897 var wasQueued bool 898 899 defer func() { 900 // Put back the packet buffer if the packet wasn't queued for later decryption. 901 if !wasQueued { 902 p.buffer.Decrement() 903 } 904 }() 905 906 if hdr.Type == protocol.PacketTypeRetry { 907 return s.handleRetryPacket(hdr, p.data) 908 } 909 910 // The server can change the source connection ID with the first Handshake packet. 911 // After this, all packets with a different source connection have to be ignored. 912 if s.receivedFirstPacket && hdr.IsLongHeader && hdr.Type == protocol.PacketTypeInitial && !hdr.SrcConnectionID.Equal(s.handshakeDestConnID) { 913 if s.tracer != nil { 914 s.tracer.DroppedPacket(logging.PacketTypeInitial, p.Size(), logging.PacketDropUnknownConnectionID) 915 } 916 s.logger.Debugf("Dropping Initial packet (%d bytes) with unexpected source connection ID: %s (expected %s)", p.Size(), hdr.SrcConnectionID, s.handshakeDestConnID) 917 return false 918 } 919 // drop 0-RTT packets, if we are a client 920 if s.perspective == protocol.PerspectiveClient && hdr.Type == protocol.PacketType0RTT { 921 if s.tracer != nil { 922 s.tracer.DroppedPacket(logging.PacketType0RTT, p.Size(), logging.PacketDropKeyUnavailable) 923 } 924 return false 925 } 926 927 packet, err := s.unpacker.Unpack(hdr, p.rcvTime, p.data) 928 if err != nil { 929 switch err { 930 case handshake.ErrKeysDropped: 931 if s.tracer != nil { 932 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropKeyUnavailable) 933 } 934 s.logger.Debugf("Dropping %s packet (%d bytes) because we already dropped the keys.", hdr.PacketType(), p.Size()) 935 case handshake.ErrKeysNotYetAvailable: 936 // Sealer for this encryption level not yet available. 937 // Try again later. 938 wasQueued = true 939 s.tryQueueingUndecryptablePacket(p, hdr) 940 case wire.ErrInvalidReservedBits: 941 s.closeLocal(&qerr.TransportError{ 942 ErrorCode: qerr.ProtocolViolation, 943 ErrorMessage: err.Error(), 944 }) 945 case handshake.ErrDecryptionFailed: 946 // This might be a packet injected by an attacker. Drop it. 947 if s.tracer != nil { 948 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropPayloadDecryptError) 949 } 950 s.logger.Debugf("Dropping %s packet (%d bytes) that could not be unpacked. Error: %s", hdr.PacketType(), p.Size(), err) 951 default: 952 var headerErr *headerParseError 953 if errors.As(err, &headerErr) { 954 // This might be a packet injected by an attacker. Drop it. 955 if s.tracer != nil { 956 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropHeaderParseError) 957 } 958 s.logger.Debugf("Dropping %s packet (%d bytes) for which we couldn't unpack the header. Error: %s", hdr.PacketType(), p.Size(), err) 959 } else { 960 // This is an error returned by the AEAD (other than ErrDecryptionFailed). 961 // For example, a PROTOCOL_VIOLATION due to key updates. 962 s.closeLocal(err) 963 } 964 } 965 return false 966 } 967 968 if s.logger.Debug() { 969 s.logger.Debugf("<- Reading packet %d (%d bytes) for connection %s, %s", packet.packetNumber, p.Size(), hdr.DestConnectionID, packet.encryptionLevel) 970 packet.hdr.Log(s.logger) 971 } 972 973 if s.receivedPacketHandler.IsPotentiallyDuplicate(packet.packetNumber, packet.encryptionLevel) { 974 s.logger.Debugf("Dropping (potentially) duplicate packet.") 975 if s.tracer != nil { 976 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropDuplicate) 977 } 978 return false 979 } 980 981 if err := s.handleUnpackedPacket(packet, p.ecn, p.rcvTime, p.Size()); err != nil { 982 s.closeLocal(err) 983 return false 984 } 985 return true 986} 987 988func (s *session) handleRetryPacket(hdr *wire.Header, data []byte) bool /* was this a valid Retry */ { 989 if s.perspective == protocol.PerspectiveServer { 990 if s.tracer != nil { 991 s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedPacket) 992 } 993 s.logger.Debugf("Ignoring Retry.") 994 return false 995 } 996 if s.receivedFirstPacket { 997 if s.tracer != nil { 998 s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedPacket) 999 } 1000 s.logger.Debugf("Ignoring Retry, since we already received a packet.") 1001 return false 1002 } 1003 destConnID := s.connIDManager.Get() 1004 if hdr.SrcConnectionID.Equal(destConnID) { 1005 if s.tracer != nil { 1006 s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedPacket) 1007 } 1008 s.logger.Debugf("Ignoring Retry, since the server didn't change the Source Connection ID.") 1009 return false 1010 } 1011 // If a token is already set, this means that we already received a Retry from the server. 1012 // Ignore this Retry packet. 1013 if s.receivedRetry { 1014 s.logger.Debugf("Ignoring Retry, since a Retry was already received.") 1015 return false 1016 } 1017 1018 tag := handshake.GetRetryIntegrityTag(data[:len(data)-16], destConnID, hdr.Version) 1019 if !bytes.Equal(data[len(data)-16:], tag[:]) { 1020 if s.tracer != nil { 1021 s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.ByteCount(len(data)), logging.PacketDropPayloadDecryptError) 1022 } 1023 s.logger.Debugf("Ignoring spoofed Retry. Integrity Tag doesn't match.") 1024 return false 1025 } 1026 1027 if s.logger.Debug() { 1028 s.logger.Debugf("<- Received Retry:") 1029 (&wire.ExtendedHeader{Header: *hdr}).Log(s.logger) 1030 s.logger.Debugf("Switching destination connection ID to: %s", hdr.SrcConnectionID) 1031 } 1032 if s.tracer != nil { 1033 s.tracer.ReceivedRetry(hdr) 1034 } 1035 newDestConnID := hdr.SrcConnectionID 1036 s.receivedRetry = true 1037 if err := s.sentPacketHandler.ResetForRetry(); err != nil { 1038 s.closeLocal(err) 1039 return false 1040 } 1041 s.handshakeDestConnID = newDestConnID 1042 s.retrySrcConnID = &newDestConnID 1043 s.cryptoStreamHandler.ChangeConnectionID(newDestConnID) 1044 s.packer.SetToken(hdr.Token) 1045 s.connIDManager.ChangeInitialConnID(newDestConnID) 1046 s.scheduleSending() 1047 return true 1048} 1049 1050func (s *session) handleVersionNegotiationPacket(p *receivedPacket) { 1051 if s.perspective == protocol.PerspectiveServer || // servers never receive version negotiation packets 1052 s.receivedFirstPacket || s.versionNegotiated { // ignore delayed / duplicated version negotiation packets 1053 if s.tracer != nil { 1054 s.tracer.DroppedPacket(logging.PacketTypeVersionNegotiation, p.Size(), logging.PacketDropUnexpectedPacket) 1055 } 1056 return 1057 } 1058 1059 hdr, supportedVersions, err := wire.ParseVersionNegotiationPacket(bytes.NewReader(p.data)) 1060 if err != nil { 1061 if s.tracer != nil { 1062 s.tracer.DroppedPacket(logging.PacketTypeVersionNegotiation, p.Size(), logging.PacketDropHeaderParseError) 1063 } 1064 s.logger.Debugf("Error parsing Version Negotiation packet: %s", err) 1065 return 1066 } 1067 1068 for _, v := range supportedVersions { 1069 if v == s.version { 1070 if s.tracer != nil { 1071 s.tracer.DroppedPacket(logging.PacketTypeVersionNegotiation, p.Size(), logging.PacketDropUnexpectedVersion) 1072 } 1073 // The Version Negotiation packet contains the version that we offered. 1074 // This might be a packet sent by an attacker, or it was corrupted. 1075 return 1076 } 1077 } 1078 1079 s.logger.Infof("Received a Version Negotiation packet. Supported Versions: %s", supportedVersions) 1080 if s.tracer != nil { 1081 s.tracer.ReceivedVersionNegotiationPacket(hdr, supportedVersions) 1082 } 1083 newVersion, ok := protocol.ChooseSupportedVersion(s.config.Versions, supportedVersions) 1084 if !ok { 1085 s.destroyImpl(&VersionNegotiationError{ 1086 Ours: s.config.Versions, 1087 Theirs: supportedVersions, 1088 }) 1089 s.logger.Infof("No compatible QUIC version found.") 1090 return 1091 } 1092 if s.tracer != nil { 1093 s.tracer.NegotiatedVersion(newVersion, s.config.Versions, supportedVersions) 1094 } 1095 1096 s.logger.Infof("Switching to QUIC version %s.", newVersion) 1097 nextPN, _ := s.sentPacketHandler.PeekPacketNumber(protocol.EncryptionInitial) 1098 s.destroyImpl(&errCloseForRecreating{ 1099 nextPacketNumber: nextPN, 1100 nextVersion: newVersion, 1101 }) 1102} 1103 1104func (s *session) handleUnpackedPacket( 1105 packet *unpackedPacket, 1106 ecn protocol.ECN, 1107 rcvTime time.Time, 1108 packetSize protocol.ByteCount, // only for logging 1109) error { 1110 if len(packet.data) == 0 { 1111 return &qerr.TransportError{ 1112 ErrorCode: qerr.ProtocolViolation, 1113 ErrorMessage: "empty packet", 1114 } 1115 } 1116 1117 if !s.receivedFirstPacket { 1118 s.receivedFirstPacket = true 1119 if !s.versionNegotiated && s.tracer != nil { 1120 var clientVersions, serverVersions []protocol.VersionNumber 1121 switch s.perspective { 1122 case protocol.PerspectiveClient: 1123 clientVersions = s.config.Versions 1124 case protocol.PerspectiveServer: 1125 serverVersions = s.config.Versions 1126 } 1127 s.tracer.NegotiatedVersion(s.version, clientVersions, serverVersions) 1128 } 1129 // The server can change the source connection ID with the first Handshake packet. 1130 if s.perspective == protocol.PerspectiveClient && packet.hdr.IsLongHeader && !packet.hdr.SrcConnectionID.Equal(s.handshakeDestConnID) { 1131 cid := packet.hdr.SrcConnectionID 1132 s.logger.Debugf("Received first packet. Switching destination connection ID to: %s", cid) 1133 s.handshakeDestConnID = cid 1134 s.connIDManager.ChangeInitialConnID(cid) 1135 } 1136 // We create the session as soon as we receive the first packet from the client. 1137 // We do that before authenticating the packet. 1138 // That means that if the source connection ID was corrupted, 1139 // we might have create a session with an incorrect source connection ID. 1140 // Once we authenticate the first packet, we need to update it. 1141 if s.perspective == protocol.PerspectiveServer { 1142 if !packet.hdr.SrcConnectionID.Equal(s.handshakeDestConnID) { 1143 s.handshakeDestConnID = packet.hdr.SrcConnectionID 1144 s.connIDManager.ChangeInitialConnID(packet.hdr.SrcConnectionID) 1145 } 1146 if s.tracer != nil { 1147 s.tracer.StartedConnection( 1148 s.conn.LocalAddr(), 1149 s.conn.RemoteAddr(), 1150 packet.hdr.SrcConnectionID, 1151 packet.hdr.DestConnectionID, 1152 ) 1153 } 1154 } 1155 } 1156 1157 s.lastPacketReceivedTime = rcvTime 1158 s.firstAckElicitingPacketAfterIdleSentTime = time.Time{} 1159 s.keepAlivePingSent = false 1160 1161 // Only used for tracing. 1162 // If we're not tracing, this slice will always remain empty. 1163 var frames []wire.Frame 1164 r := bytes.NewReader(packet.data) 1165 var isAckEliciting bool 1166 for { 1167 frame, err := s.frameParser.ParseNext(r, packet.encryptionLevel) 1168 if err != nil { 1169 return err 1170 } 1171 if frame == nil { 1172 break 1173 } 1174 if ackhandler.IsFrameAckEliciting(frame) { 1175 isAckEliciting = true 1176 } 1177 // Only process frames now if we're not logging. 1178 // If we're logging, we need to make sure that the packet_received event is logged first. 1179 if s.tracer == nil { 1180 if err := s.handleFrame(frame, packet.encryptionLevel, packet.hdr.DestConnectionID); err != nil { 1181 return err 1182 } 1183 } else { 1184 frames = append(frames, frame) 1185 } 1186 } 1187 1188 if s.tracer != nil { 1189 fs := make([]logging.Frame, len(frames)) 1190 for i, frame := range frames { 1191 fs[i] = logutils.ConvertFrame(frame) 1192 } 1193 s.tracer.ReceivedPacket(packet.hdr, packetSize, fs) 1194 for _, frame := range frames { 1195 if err := s.handleFrame(frame, packet.encryptionLevel, packet.hdr.DestConnectionID); err != nil { 1196 return err 1197 } 1198 } 1199 } 1200 1201 return s.receivedPacketHandler.ReceivedPacket(packet.packetNumber, ecn, packet.encryptionLevel, rcvTime, isAckEliciting) 1202} 1203 1204func (s *session) handleFrame(f wire.Frame, encLevel protocol.EncryptionLevel, destConnID protocol.ConnectionID) error { 1205 var err error 1206 wire.LogFrame(s.logger, f, false) 1207 switch frame := f.(type) { 1208 case *wire.CryptoFrame: 1209 err = s.handleCryptoFrame(frame, encLevel) 1210 case *wire.StreamFrame: 1211 err = s.handleStreamFrame(frame) 1212 case *wire.AckFrame: 1213 err = s.handleAckFrame(frame, encLevel) 1214 case *wire.ConnectionCloseFrame: 1215 s.handleConnectionCloseFrame(frame) 1216 case *wire.ResetStreamFrame: 1217 err = s.handleResetStreamFrame(frame) 1218 case *wire.MaxDataFrame: 1219 s.handleMaxDataFrame(frame) 1220 case *wire.MaxStreamDataFrame: 1221 err = s.handleMaxStreamDataFrame(frame) 1222 case *wire.MaxStreamsFrame: 1223 s.handleMaxStreamsFrame(frame) 1224 case *wire.DataBlockedFrame: 1225 case *wire.StreamDataBlockedFrame: 1226 case *wire.StreamsBlockedFrame: 1227 case *wire.StopSendingFrame: 1228 err = s.handleStopSendingFrame(frame) 1229 case *wire.PingFrame: 1230 case *wire.PathChallengeFrame: 1231 s.handlePathChallengeFrame(frame) 1232 case *wire.PathResponseFrame: 1233 // since we don't send PATH_CHALLENGEs, we don't expect PATH_RESPONSEs 1234 err = errors.New("unexpected PATH_RESPONSE frame") 1235 case *wire.NewTokenFrame: 1236 err = s.handleNewTokenFrame(frame) 1237 case *wire.NewConnectionIDFrame: 1238 err = s.handleNewConnectionIDFrame(frame) 1239 case *wire.RetireConnectionIDFrame: 1240 err = s.handleRetireConnectionIDFrame(frame, destConnID) 1241 case *wire.HandshakeDoneFrame: 1242 err = s.handleHandshakeDoneFrame() 1243 case *wire.DatagramFrame: 1244 err = s.handleDatagramFrame(frame) 1245 default: 1246 err = fmt.Errorf("unexpected frame type: %s", reflect.ValueOf(&frame).Elem().Type().Name()) 1247 } 1248 return err 1249} 1250 1251// handlePacket is called by the server with a new packet 1252func (s *session) handlePacket(p *receivedPacket) { 1253 // Discard packets once the amount of queued packets is larger than 1254 // the channel size, protocol.MaxSessionUnprocessedPackets 1255 select { 1256 case s.receivedPackets <- p: 1257 default: 1258 if s.tracer != nil { 1259 s.tracer.DroppedPacket(logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropDOSPrevention) 1260 } 1261 } 1262} 1263 1264func (s *session) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) { 1265 if frame.IsApplicationError { 1266 s.closeRemote(&qerr.ApplicationError{ 1267 Remote: true, 1268 ErrorCode: qerr.ApplicationErrorCode(frame.ErrorCode), 1269 ErrorMessage: frame.ReasonPhrase, 1270 }) 1271 return 1272 } 1273 s.closeRemote(&qerr.TransportError{ 1274 Remote: true, 1275 ErrorCode: qerr.TransportErrorCode(frame.ErrorCode), 1276 FrameType: frame.FrameType, 1277 ErrorMessage: frame.ReasonPhrase, 1278 }) 1279} 1280 1281func (s *session) handleCryptoFrame(frame *wire.CryptoFrame, encLevel protocol.EncryptionLevel) error { 1282 encLevelChanged, err := s.cryptoStreamManager.HandleCryptoFrame(frame, encLevel) 1283 if err != nil { 1284 return err 1285 } 1286 if encLevelChanged { 1287 // Queue all packets for decryption that have been undecryptable so far. 1288 s.undecryptablePacketsToProcess = s.undecryptablePackets 1289 s.undecryptablePackets = nil 1290 } 1291 return nil 1292} 1293 1294func (s *session) handleStreamFrame(frame *wire.StreamFrame) error { 1295 str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID) 1296 if err != nil { 1297 return err 1298 } 1299 if str == nil { 1300 // Stream is closed and already garbage collected 1301 // ignore this StreamFrame 1302 return nil 1303 } 1304 return str.handleStreamFrame(frame) 1305} 1306 1307func (s *session) handleMaxDataFrame(frame *wire.MaxDataFrame) { 1308 s.connFlowController.UpdateSendWindow(frame.MaximumData) 1309} 1310 1311func (s *session) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error { 1312 str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID) 1313 if err != nil { 1314 return err 1315 } 1316 if str == nil { 1317 // stream is closed and already garbage collected 1318 return nil 1319 } 1320 str.updateSendWindow(frame.MaximumStreamData) 1321 return nil 1322} 1323 1324func (s *session) handleMaxStreamsFrame(frame *wire.MaxStreamsFrame) { 1325 s.streamsMap.HandleMaxStreamsFrame(frame) 1326} 1327 1328func (s *session) handleResetStreamFrame(frame *wire.ResetStreamFrame) error { 1329 str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID) 1330 if err != nil { 1331 return err 1332 } 1333 if str == nil { 1334 // stream is closed and already garbage collected 1335 return nil 1336 } 1337 return str.handleResetStreamFrame(frame) 1338} 1339 1340func (s *session) handleStopSendingFrame(frame *wire.StopSendingFrame) error { 1341 str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID) 1342 if err != nil { 1343 return err 1344 } 1345 if str == nil { 1346 // stream is closed and already garbage collected 1347 return nil 1348 } 1349 str.handleStopSendingFrame(frame) 1350 return nil 1351} 1352 1353func (s *session) handlePathChallengeFrame(frame *wire.PathChallengeFrame) { 1354 s.queueControlFrame(&wire.PathResponseFrame{Data: frame.Data}) 1355} 1356 1357func (s *session) handleNewTokenFrame(frame *wire.NewTokenFrame) error { 1358 if s.perspective == protocol.PerspectiveServer { 1359 return &qerr.TransportError{ 1360 ErrorCode: qerr.ProtocolViolation, 1361 ErrorMessage: "received NEW_TOKEN frame from the client", 1362 } 1363 } 1364 if s.config.TokenStore != nil { 1365 s.config.TokenStore.Put(s.tokenStoreKey, &ClientToken{data: frame.Token}) 1366 } 1367 return nil 1368} 1369 1370func (s *session) handleNewConnectionIDFrame(f *wire.NewConnectionIDFrame) error { 1371 return s.connIDManager.Add(f) 1372} 1373 1374func (s *session) handleRetireConnectionIDFrame(f *wire.RetireConnectionIDFrame, destConnID protocol.ConnectionID) error { 1375 return s.connIDGenerator.Retire(f.SequenceNumber, destConnID) 1376} 1377 1378func (s *session) handleHandshakeDoneFrame() error { 1379 if s.perspective == protocol.PerspectiveServer { 1380 return &qerr.TransportError{ 1381 ErrorCode: qerr.ProtocolViolation, 1382 ErrorMessage: "received a HANDSHAKE_DONE frame", 1383 } 1384 } 1385 if !s.handshakeConfirmed { 1386 s.handleHandshakeConfirmed() 1387 } 1388 return nil 1389} 1390 1391func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel) error { 1392 acked1RTTPacket, err := s.sentPacketHandler.ReceivedAck(frame, encLevel, s.lastPacketReceivedTime) 1393 if err != nil { 1394 return err 1395 } 1396 if !acked1RTTPacket { 1397 return nil 1398 } 1399 if s.perspective == protocol.PerspectiveClient && !s.handshakeConfirmed { 1400 s.handleHandshakeConfirmed() 1401 } 1402 return s.cryptoStreamHandler.SetLargest1RTTAcked(frame.LargestAcked()) 1403} 1404 1405func (s *session) handleDatagramFrame(f *wire.DatagramFrame) error { 1406 if f.Length(s.version) > protocol.MaxDatagramFrameSize { 1407 return &qerr.TransportError{ 1408 ErrorCode: qerr.ProtocolViolation, 1409 ErrorMessage: "DATAGRAM frame too large", 1410 } 1411 } 1412 s.datagramQueue.HandleDatagramFrame(f) 1413 return nil 1414} 1415 1416// closeLocal closes the session and send a CONNECTION_CLOSE containing the error 1417func (s *session) closeLocal(e error) { 1418 s.closeOnce.Do(func() { 1419 if e == nil { 1420 s.logger.Infof("Closing session.") 1421 } else { 1422 s.logger.Errorf("Closing session with error: %s", e) 1423 } 1424 s.closeChan <- closeError{err: e, immediate: false, remote: false} 1425 }) 1426} 1427 1428// destroy closes the session without sending the error on the wire 1429func (s *session) destroy(e error) { 1430 s.destroyImpl(e) 1431 <-s.ctx.Done() 1432} 1433 1434func (s *session) destroyImpl(e error) { 1435 s.closeOnce.Do(func() { 1436 if nerr, ok := e.(net.Error); ok && nerr.Timeout() { 1437 s.logger.Errorf("Destroying session: %s", e) 1438 } else { 1439 s.logger.Errorf("Destroying session with error: %s", e) 1440 } 1441 s.closeChan <- closeError{err: e, immediate: true, remote: false} 1442 }) 1443} 1444 1445func (s *session) closeRemote(e error) { 1446 s.closeOnce.Do(func() { 1447 s.logger.Errorf("Peer closed session with error: %s", e) 1448 s.closeChan <- closeError{err: e, immediate: true, remote: true} 1449 }) 1450} 1451 1452// Close the connection. It sends a NO_ERROR application error. 1453// It waits until the run loop has stopped before returning 1454func (s *session) shutdown() { 1455 s.closeLocal(nil) 1456 <-s.ctx.Done() 1457} 1458 1459func (s *session) CloseWithError(code ApplicationErrorCode, desc string) error { 1460 s.closeLocal(&qerr.ApplicationError{ 1461 ErrorCode: code, 1462 ErrorMessage: desc, 1463 }) 1464 <-s.ctx.Done() 1465 return nil 1466} 1467 1468func (s *session) handleCloseError(closeErr *closeError) { 1469 e := closeErr.err 1470 if e == nil { 1471 e = &qerr.ApplicationError{} 1472 } else { 1473 defer func() { 1474 closeErr.err = e 1475 }() 1476 } 1477 1478 var ( 1479 statelessResetErr *StatelessResetError 1480 versionNegotiationErr *VersionNegotiationError 1481 recreateErr *errCloseForRecreating 1482 applicationErr *ApplicationError 1483 transportErr *TransportError 1484 ) 1485 switch { 1486 case errors.Is(e, qerr.ErrIdleTimeout), 1487 errors.Is(e, qerr.ErrHandshakeTimeout), 1488 errors.As(e, &statelessResetErr), 1489 errors.As(e, &versionNegotiationErr), 1490 errors.As(e, &recreateErr), 1491 errors.As(e, &applicationErr), 1492 errors.As(e, &transportErr): 1493 default: 1494 e = &qerr.TransportError{ 1495 ErrorCode: qerr.InternalError, 1496 ErrorMessage: e.Error(), 1497 } 1498 } 1499 1500 s.streamsMap.CloseWithError(e) 1501 s.connIDManager.Close() 1502 if s.datagramQueue != nil { 1503 s.datagramQueue.CloseWithError(e) 1504 } 1505 1506 if s.tracer != nil && !errors.As(e, &recreateErr) { 1507 s.tracer.ClosedConnection(e) 1508 } 1509 1510 // If this is a remote close we're done here 1511 if closeErr.remote { 1512 s.connIDGenerator.ReplaceWithClosed(newClosedRemoteSession(s.perspective)) 1513 return 1514 } 1515 if closeErr.immediate { 1516 s.connIDGenerator.RemoveAll() 1517 return 1518 } 1519 connClosePacket, err := s.sendConnectionClose(e) 1520 if err != nil { 1521 s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err) 1522 } 1523 cs := newClosedLocalSession(s.conn, connClosePacket, s.perspective, s.logger) 1524 s.connIDGenerator.ReplaceWithClosed(cs) 1525} 1526 1527func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) { 1528 s.sentPacketHandler.DropPackets(encLevel) 1529 s.receivedPacketHandler.DropPackets(encLevel) 1530 if s.tracer != nil { 1531 s.tracer.DroppedEncryptionLevel(encLevel) 1532 } 1533 if encLevel == protocol.Encryption0RTT { 1534 s.streamsMap.ResetFor0RTT() 1535 if err := s.connFlowController.Reset(); err != nil { 1536 s.closeLocal(err) 1537 } 1538 if err := s.framer.Handle0RTTRejection(); err != nil { 1539 s.closeLocal(err) 1540 } 1541 } 1542} 1543 1544// is called for the client, when restoring transport parameters saved for 0-RTT 1545func (s *session) restoreTransportParameters(params *wire.TransportParameters) { 1546 if s.logger.Debug() { 1547 s.logger.Debugf("Restoring Transport Parameters: %s", params) 1548 } 1549 1550 s.peerParams = params 1551 s.connIDGenerator.SetMaxActiveConnIDs(params.ActiveConnectionIDLimit) 1552 s.connFlowController.UpdateSendWindow(params.InitialMaxData) 1553 s.streamsMap.UpdateLimits(params) 1554} 1555 1556func (s *session) handleTransportParameters(params *wire.TransportParameters) { 1557 if err := s.checkTransportParameters(params); err != nil { 1558 s.closeLocal(&qerr.TransportError{ 1559 ErrorCode: qerr.TransportParameterError, 1560 ErrorMessage: err.Error(), 1561 }) 1562 } 1563 s.peerParams = params 1564 // On the client side we have to wait for handshake completion. 1565 // During a 0-RTT connection, we are only allowed to use the new transport parameters for 1-RTT packets. 1566 if s.perspective == protocol.PerspectiveServer { 1567 s.applyTransportParameters() 1568 // On the server side, the early session is ready as soon as we processed 1569 // the client's transport parameters. 1570 close(s.earlySessionReadyChan) 1571 } 1572} 1573 1574func (s *session) checkTransportParameters(params *wire.TransportParameters) error { 1575 if s.logger.Debug() { 1576 s.logger.Debugf("Processed Transport Parameters: %s", params) 1577 } 1578 if s.tracer != nil { 1579 s.tracer.ReceivedTransportParameters(params) 1580 } 1581 1582 // check the initial_source_connection_id 1583 if !params.InitialSourceConnectionID.Equal(s.handshakeDestConnID) { 1584 return fmt.Errorf("expected initial_source_connection_id to equal %s, is %s", s.handshakeDestConnID, params.InitialSourceConnectionID) 1585 } 1586 1587 if s.perspective == protocol.PerspectiveServer { 1588 return nil 1589 } 1590 // check the original_destination_connection_id 1591 if !params.OriginalDestinationConnectionID.Equal(s.origDestConnID) { 1592 return fmt.Errorf("expected original_destination_connection_id to equal %s, is %s", s.origDestConnID, params.OriginalDestinationConnectionID) 1593 } 1594 if s.retrySrcConnID != nil { // a Retry was performed 1595 if params.RetrySourceConnectionID == nil { 1596 return errors.New("missing retry_source_connection_id") 1597 } 1598 if !(*params.RetrySourceConnectionID).Equal(*s.retrySrcConnID) { 1599 return fmt.Errorf("expected retry_source_connection_id to equal %s, is %s", s.retrySrcConnID, *params.RetrySourceConnectionID) 1600 } 1601 } else if params.RetrySourceConnectionID != nil { 1602 return errors.New("received retry_source_connection_id, although no Retry was performed") 1603 } 1604 return nil 1605} 1606 1607func (s *session) applyTransportParameters() { 1608 params := s.peerParams 1609 // Our local idle timeout will always be > 0. 1610 s.idleTimeout = utils.MinNonZeroDuration(s.config.MaxIdleTimeout, params.MaxIdleTimeout) 1611 s.keepAliveInterval = utils.MinDuration(s.idleTimeout/2, protocol.MaxKeepAliveInterval) 1612 s.streamsMap.UpdateLimits(params) 1613 s.packer.HandleTransportParameters(params) 1614 s.frameParser.SetAckDelayExponent(params.AckDelayExponent) 1615 s.connFlowController.UpdateSendWindow(params.InitialMaxData) 1616 s.rttStats.SetMaxAckDelay(params.MaxAckDelay) 1617 s.connIDGenerator.SetMaxActiveConnIDs(params.ActiveConnectionIDLimit) 1618 if params.StatelessResetToken != nil { 1619 s.connIDManager.SetStatelessResetToken(*params.StatelessResetToken) 1620 } 1621 // We don't support connection migration yet, so we don't have any use for the preferred_address. 1622 if params.PreferredAddress != nil { 1623 // Retire the connection ID. 1624 s.connIDManager.AddFromPreferredAddress(params.PreferredAddress.ConnectionID, params.PreferredAddress.StatelessResetToken) 1625 } 1626} 1627 1628func (s *session) sendPackets() error { 1629 s.pacingDeadline = time.Time{} 1630 1631 var sentPacket bool // only used in for packets sent in send mode SendAny 1632 for { 1633 sendMode := s.sentPacketHandler.SendMode() 1634 if sendMode == ackhandler.SendAny && s.handshakeComplete && !s.sentPacketHandler.HasPacingBudget() { 1635 deadline := s.sentPacketHandler.TimeUntilSend() 1636 if deadline.IsZero() { 1637 deadline = deadlineSendImmediately 1638 } 1639 s.pacingDeadline = deadline 1640 // Allow sending of an ACK if we're pacing limit (if we haven't sent out a packet yet). 1641 // This makes sure that a peer that is mostly receiving data (and thus has an inaccurate cwnd estimate) 1642 // sends enough ACKs to allow its peer to utilize the bandwidth. 1643 if sentPacket { 1644 return nil 1645 } 1646 sendMode = ackhandler.SendAck 1647 } 1648 switch sendMode { 1649 case ackhandler.SendNone: 1650 return nil 1651 case ackhandler.SendAck: 1652 // If we already sent packets, and the send mode switches to SendAck, 1653 // as we've just become congestion limited. 1654 // There's no need to try to send an ACK at this moment. 1655 if sentPacket { 1656 return nil 1657 } 1658 // We can at most send a single ACK only packet. 1659 // There will only be a new ACK after receiving new packets. 1660 // SendAck is only returned when we're congestion limited, so we don't need to set the pacingt timer. 1661 return s.maybeSendAckOnlyPacket() 1662 case ackhandler.SendPTOInitial: 1663 if err := s.sendProbePacket(protocol.EncryptionInitial); err != nil { 1664 return err 1665 } 1666 case ackhandler.SendPTOHandshake: 1667 if err := s.sendProbePacket(protocol.EncryptionHandshake); err != nil { 1668 return err 1669 } 1670 case ackhandler.SendPTOAppData: 1671 if err := s.sendProbePacket(protocol.Encryption1RTT); err != nil { 1672 return err 1673 } 1674 case ackhandler.SendAny: 1675 sent, err := s.sendPacket() 1676 if err != nil || !sent { 1677 return err 1678 } 1679 sentPacket = true 1680 default: 1681 return fmt.Errorf("BUG: invalid send mode %d", sendMode) 1682 } 1683 // Prioritize receiving of packets over sending out more packets. 1684 if len(s.receivedPackets) > 0 { 1685 s.pacingDeadline = deadlineSendImmediately 1686 return nil 1687 } 1688 if s.sendQueue.WouldBlock() { 1689 return nil 1690 } 1691 } 1692} 1693 1694func (s *session) maybeSendAckOnlyPacket() error { 1695 packet, err := s.packer.MaybePackAckPacket(s.handshakeConfirmed) 1696 if err != nil { 1697 return err 1698 } 1699 if packet == nil { 1700 return nil 1701 } 1702 s.sendPackedPacket(packet, time.Now()) 1703 return nil 1704} 1705 1706func (s *session) sendProbePacket(encLevel protocol.EncryptionLevel) error { 1707 // Queue probe packets until we actually send out a packet, 1708 // or until there are no more packets to queue. 1709 var packet *packedPacket 1710 for { 1711 if wasQueued := s.sentPacketHandler.QueueProbePacket(encLevel); !wasQueued { 1712 break 1713 } 1714 var err error 1715 packet, err = s.packer.MaybePackProbePacket(encLevel) 1716 if err != nil { 1717 return err 1718 } 1719 if packet != nil { 1720 break 1721 } 1722 } 1723 if packet == nil { 1724 //nolint:exhaustive // Cannot send probe packets for 0-RTT. 1725 switch encLevel { 1726 case protocol.EncryptionInitial: 1727 s.retransmissionQueue.AddInitial(&wire.PingFrame{}) 1728 case protocol.EncryptionHandshake: 1729 s.retransmissionQueue.AddHandshake(&wire.PingFrame{}) 1730 case protocol.Encryption1RTT: 1731 s.retransmissionQueue.AddAppData(&wire.PingFrame{}) 1732 default: 1733 panic("unexpected encryption level") 1734 } 1735 var err error 1736 packet, err = s.packer.MaybePackProbePacket(encLevel) 1737 if err != nil { 1738 return err 1739 } 1740 } 1741 if packet == nil || packet.packetContents == nil { 1742 return fmt.Errorf("session BUG: couldn't pack %s probe packet", encLevel) 1743 } 1744 s.sendPackedPacket(packet, time.Now()) 1745 return nil 1746} 1747 1748func (s *session) sendPacket() (bool, error) { 1749 if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked { 1750 s.framer.QueueControlFrame(&wire.DataBlockedFrame{MaximumData: offset}) 1751 } 1752 s.windowUpdateQueue.QueueAll() 1753 1754 now := time.Now() 1755 if !s.handshakeConfirmed { 1756 packet, err := s.packer.PackCoalescedPacket() 1757 if err != nil || packet == nil { 1758 return false, err 1759 } 1760 s.logCoalescedPacket(packet) 1761 for _, p := range packet.packets { 1762 if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && p.IsAckEliciting() { 1763 s.firstAckElicitingPacketAfterIdleSentTime = now 1764 } 1765 s.sentPacketHandler.SentPacket(p.ToAckHandlerPacket(now, s.retransmissionQueue)) 1766 } 1767 s.connIDManager.SentPacket() 1768 s.sendQueue.Send(packet.buffer) 1769 return true, nil 1770 } 1771 if !s.config.DisablePathMTUDiscovery && s.mtuDiscoverer.ShouldSendProbe(now) { 1772 packet, err := s.packer.PackMTUProbePacket(s.mtuDiscoverer.GetPing()) 1773 if err != nil { 1774 return false, err 1775 } 1776 s.sendPackedPacket(packet, now) 1777 return true, nil 1778 } 1779 packet, err := s.packer.PackPacket() 1780 if err != nil || packet == nil { 1781 return false, err 1782 } 1783 s.sendPackedPacket(packet, now) 1784 return true, nil 1785} 1786 1787func (s *session) sendPackedPacket(packet *packedPacket, now time.Time) { 1788 if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() { 1789 s.firstAckElicitingPacketAfterIdleSentTime = now 1790 } 1791 s.logPacket(packet) 1792 s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket(now, s.retransmissionQueue)) 1793 s.connIDManager.SentPacket() 1794 s.sendQueue.Send(packet.buffer) 1795} 1796 1797func (s *session) sendConnectionClose(e error) ([]byte, error) { 1798 var packet *coalescedPacket 1799 var err error 1800 var transportErr *qerr.TransportError 1801 var applicationErr *qerr.ApplicationError 1802 if errors.As(e, &transportErr) { 1803 packet, err = s.packer.PackConnectionClose(transportErr) 1804 } else if errors.As(e, &applicationErr) { 1805 packet, err = s.packer.PackApplicationClose(applicationErr) 1806 } else { 1807 packet, err = s.packer.PackConnectionClose(&qerr.TransportError{ 1808 ErrorCode: qerr.InternalError, 1809 ErrorMessage: fmt.Sprintf("session BUG: unspecified error type (msg: %s)", e.Error()), 1810 }) 1811 } 1812 if err != nil { 1813 return nil, err 1814 } 1815 s.logCoalescedPacket(packet) 1816 return packet.buffer.Data, s.conn.Write(packet.buffer.Data) 1817} 1818 1819func (s *session) logPacketContents(p *packetContents) { 1820 // tracing 1821 if s.tracer != nil { 1822 frames := make([]logging.Frame, 0, len(p.frames)) 1823 for _, f := range p.frames { 1824 frames = append(frames, logutils.ConvertFrame(f.Frame)) 1825 } 1826 s.tracer.SentPacket(p.header, p.length, p.ack, frames) 1827 } 1828 1829 // quic-go logging 1830 if !s.logger.Debug() { 1831 return 1832 } 1833 p.header.Log(s.logger) 1834 if p.ack != nil { 1835 wire.LogFrame(s.logger, p.ack, true) 1836 } 1837 for _, frame := range p.frames { 1838 wire.LogFrame(s.logger, frame.Frame, true) 1839 } 1840} 1841 1842func (s *session) logCoalescedPacket(packet *coalescedPacket) { 1843 if s.logger.Debug() { 1844 if len(packet.packets) > 1 { 1845 s.logger.Debugf("-> Sending coalesced packet (%d parts, %d bytes) for connection %s", len(packet.packets), packet.buffer.Len(), s.logID) 1846 } else { 1847 s.logger.Debugf("-> Sending packet %d (%d bytes) for connection %s, %s", packet.packets[0].header.PacketNumber, packet.buffer.Len(), s.logID, packet.packets[0].EncryptionLevel()) 1848 } 1849 } 1850 for _, p := range packet.packets { 1851 s.logPacketContents(p) 1852 } 1853} 1854 1855func (s *session) logPacket(packet *packedPacket) { 1856 if s.logger.Debug() { 1857 s.logger.Debugf("-> Sending packet %d (%d bytes) for connection %s, %s", packet.header.PacketNumber, packet.buffer.Len(), s.logID, packet.EncryptionLevel()) 1858 } 1859 s.logPacketContents(packet.packetContents) 1860} 1861 1862// AcceptStream returns the next stream openend by the peer 1863func (s *session) AcceptStream(ctx context.Context) (Stream, error) { 1864 return s.streamsMap.AcceptStream(ctx) 1865} 1866 1867func (s *session) AcceptUniStream(ctx context.Context) (ReceiveStream, error) { 1868 return s.streamsMap.AcceptUniStream(ctx) 1869} 1870 1871// OpenStream opens a stream 1872func (s *session) OpenStream() (Stream, error) { 1873 return s.streamsMap.OpenStream() 1874} 1875 1876func (s *session) OpenStreamSync(ctx context.Context) (Stream, error) { 1877 return s.streamsMap.OpenStreamSync(ctx) 1878} 1879 1880func (s *session) OpenUniStream() (SendStream, error) { 1881 return s.streamsMap.OpenUniStream() 1882} 1883 1884func (s *session) OpenUniStreamSync(ctx context.Context) (SendStream, error) { 1885 return s.streamsMap.OpenUniStreamSync(ctx) 1886} 1887 1888func (s *session) newFlowController(id protocol.StreamID) flowcontrol.StreamFlowController { 1889 initialSendWindow := s.peerParams.InitialMaxStreamDataUni 1890 if id.Type() == protocol.StreamTypeBidi { 1891 if id.InitiatedBy() == s.perspective { 1892 initialSendWindow = s.peerParams.InitialMaxStreamDataBidiRemote 1893 } else { 1894 initialSendWindow = s.peerParams.InitialMaxStreamDataBidiLocal 1895 } 1896 } 1897 return flowcontrol.NewStreamFlowController( 1898 id, 1899 s.connFlowController, 1900 protocol.ByteCount(s.config.InitialStreamReceiveWindow), 1901 protocol.ByteCount(s.config.MaxStreamReceiveWindow), 1902 initialSendWindow, 1903 s.onHasStreamWindowUpdate, 1904 s.rttStats, 1905 s.logger, 1906 ) 1907} 1908 1909// scheduleSending signals that we have data for sending 1910func (s *session) scheduleSending() { 1911 select { 1912 case s.sendingScheduled <- struct{}{}: 1913 default: 1914 } 1915} 1916 1917func (s *session) tryQueueingUndecryptablePacket(p *receivedPacket, hdr *wire.Header) { 1918 if s.handshakeComplete { 1919 panic("shouldn't queue undecryptable packets after handshake completion") 1920 } 1921 if len(s.undecryptablePackets)+1 > protocol.MaxUndecryptablePackets { 1922 if s.tracer != nil { 1923 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropDOSPrevention) 1924 } 1925 s.logger.Infof("Dropping undecryptable packet (%d bytes). Undecryptable packet queue full.", p.Size()) 1926 return 1927 } 1928 s.logger.Infof("Queueing packet (%d bytes) for later decryption", p.Size()) 1929 if s.tracer != nil { 1930 s.tracer.BufferedPacket(logging.PacketTypeFromHeader(hdr)) 1931 } 1932 s.undecryptablePackets = append(s.undecryptablePackets, p) 1933} 1934 1935func (s *session) queueControlFrame(f wire.Frame) { 1936 s.framer.QueueControlFrame(f) 1937 s.scheduleSending() 1938} 1939 1940func (s *session) onHasStreamWindowUpdate(id protocol.StreamID) { 1941 s.windowUpdateQueue.AddStream(id) 1942 s.scheduleSending() 1943} 1944 1945func (s *session) onHasConnectionWindowUpdate() { 1946 s.windowUpdateQueue.AddConnection() 1947 s.scheduleSending() 1948} 1949 1950func (s *session) onHasStreamData(id protocol.StreamID) { 1951 s.framer.AddActiveStream(id) 1952 s.scheduleSending() 1953} 1954 1955func (s *session) onStreamCompleted(id protocol.StreamID) { 1956 if err := s.streamsMap.DeleteStream(id); err != nil { 1957 s.closeLocal(err) 1958 } 1959} 1960 1961func (s *session) SendMessage(p []byte) error { 1962 f := &wire.DatagramFrame{DataLenPresent: true} 1963 if protocol.ByteCount(len(p)) > f.MaxDataLen(s.peerParams.MaxDatagramFrameSize, s.version) { 1964 return errors.New("message too large") 1965 } 1966 f.Data = make([]byte, len(p)) 1967 copy(f.Data, p) 1968 return s.datagramQueue.AddAndWait(f) 1969} 1970 1971func (s *session) ReceiveMessage() ([]byte, error) { 1972 return s.datagramQueue.Receive() 1973} 1974 1975func (s *session) LocalAddr() net.Addr { 1976 return s.conn.LocalAddr() 1977} 1978 1979func (s *session) RemoteAddr() net.Addr { 1980 return s.conn.RemoteAddr() 1981} 1982 1983func (s *session) getPerspective() protocol.Perspective { 1984 return s.perspective 1985} 1986 1987func (s *session) GetVersion() protocol.VersionNumber { 1988 return s.version 1989} 1990 1991func (s *session) NextSession() Session { 1992 <-s.HandshakeComplete().Done() 1993 s.streamsMap.UseResetMaps() 1994 return s 1995} 1996