1package quic 2 3import ( 4 "bytes" 5 "context" 6 "crypto/tls" 7 "errors" 8 "fmt" 9 "io" 10 "net" 11 "reflect" 12 "sync" 13 "time" 14 15 "github.com/lucas-clemente/quic-go/internal/ackhandler" 16 "github.com/lucas-clemente/quic-go/internal/flowcontrol" 17 "github.com/lucas-clemente/quic-go/internal/handshake" 18 "github.com/lucas-clemente/quic-go/internal/logutils" 19 "github.com/lucas-clemente/quic-go/internal/protocol" 20 "github.com/lucas-clemente/quic-go/internal/qerr" 21 "github.com/lucas-clemente/quic-go/internal/utils" 22 "github.com/lucas-clemente/quic-go/internal/wire" 23 "github.com/lucas-clemente/quic-go/logging" 24 "github.com/lucas-clemente/quic-go/quictrace" 25) 26 27type unpacker interface { 28 Unpack(hdr *wire.Header, rcvTime time.Time, data []byte) (*unpackedPacket, error) 29} 30 31type streamGetter interface { 32 GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error) 33 GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error) 34} 35 36type streamManager interface { 37 GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error) 38 GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error) 39 OpenStream() (Stream, error) 40 OpenUniStream() (SendStream, error) 41 OpenStreamSync(context.Context) (Stream, error) 42 OpenUniStreamSync(context.Context) (SendStream, error) 43 AcceptStream(context.Context) (Stream, error) 44 AcceptUniStream(context.Context) (ReceiveStream, error) 45 DeleteStream(protocol.StreamID) error 46 UpdateLimits(*wire.TransportParameters) error 47 HandleMaxStreamsFrame(*wire.MaxStreamsFrame) error 48 CloseWithError(error) 49} 50 51type cryptoStreamHandler interface { 52 RunHandshake() 53 ChangeConnectionID(protocol.ConnectionID) 54 SetLargest1RTTAcked(protocol.PacketNumber) error 55 SetHandshakeConfirmed() 56 GetSessionTicket() ([]byte, error) 57 io.Closer 58 ConnectionState() handshake.ConnectionState 59} 60 61type receivedPacket struct { 62 buffer *packetBuffer 63 64 remoteAddr net.Addr 65 rcvTime time.Time 66 data []byte 67 68 ecn protocol.ECN 69} 70 71func (p *receivedPacket) Size() protocol.ByteCount { return protocol.ByteCount(len(p.data)) } 72 73func (p *receivedPacket) Clone() *receivedPacket { 74 return &receivedPacket{ 75 remoteAddr: p.remoteAddr, 76 rcvTime: p.rcvTime, 77 data: p.data, 78 buffer: p.buffer, 79 ecn: p.ecn, 80 } 81} 82 83type sessionRunner interface { 84 Add(protocol.ConnectionID, packetHandler) bool 85 GetStatelessResetToken(protocol.ConnectionID) protocol.StatelessResetToken 86 Retire(protocol.ConnectionID) 87 Remove(protocol.ConnectionID) 88 ReplaceWithClosed(protocol.ConnectionID, packetHandler) 89 AddResetToken(protocol.StatelessResetToken, packetHandler) 90 RemoveResetToken(protocol.StatelessResetToken) 91} 92 93type handshakeRunner struct { 94 onReceivedParams func(*wire.TransportParameters) 95 onError func(error) 96 dropKeys func(protocol.EncryptionLevel) 97 onHandshakeComplete func() 98} 99 100func (r *handshakeRunner) OnReceivedParams(tp *wire.TransportParameters) { r.onReceivedParams(tp) } 101func (r *handshakeRunner) OnError(e error) { r.onError(e) } 102func (r *handshakeRunner) DropKeys(el protocol.EncryptionLevel) { r.dropKeys(el) } 103func (r *handshakeRunner) OnHandshakeComplete() { r.onHandshakeComplete() } 104 105type closeError struct { 106 err error 107 remote bool 108 immediate bool 109} 110 111type errCloseForRecreating struct { 112 nextPacketNumber protocol.PacketNumber 113 nextVersion protocol.VersionNumber 114} 115 116func (errCloseForRecreating) Error() string { 117 return "closing session in order to recreate it" 118} 119 120func (errCloseForRecreating) Is(target error) bool { 121 _, ok := target.(errCloseForRecreating) 122 return ok 123} 124 125// A Session is a QUIC session 126type session struct { 127 // Destination connection ID used during the handshake. 128 // Used to check source connection ID on incoming packets. 129 handshakeDestConnID protocol.ConnectionID 130 // Set for the client. Destination connection ID used on the first Initial sent. 131 origDestConnID protocol.ConnectionID 132 retrySrcConnID *protocol.ConnectionID // only set for the client (and if a Retry was performed) 133 134 srcConnIDLen int 135 136 perspective protocol.Perspective 137 initialVersion protocol.VersionNumber // if version negotiation is performed, this is the version we initially tried 138 version protocol.VersionNumber 139 config *Config 140 141 conn sendConn 142 sendQueue *sendQueue 143 144 streamsMap streamManager 145 connIDManager *connIDManager 146 connIDGenerator *connIDGenerator 147 148 rttStats *utils.RTTStats 149 150 cryptoStreamManager *cryptoStreamManager 151 sentPacketHandler ackhandler.SentPacketHandler 152 receivedPacketHandler ackhandler.ReceivedPacketHandler 153 retransmissionQueue *retransmissionQueue 154 framer framer 155 windowUpdateQueue *windowUpdateQueue 156 connFlowController flowcontrol.ConnectionFlowController 157 tokenStoreKey string // only set for the client 158 tokenGenerator *handshake.TokenGenerator // only set for the server 159 160 unpacker unpacker 161 frameParser wire.FrameParser 162 packer packer 163 164 oneRTTStream cryptoStream // only set for the server 165 cryptoStreamHandler cryptoStreamHandler 166 167 receivedPackets chan *receivedPacket 168 sendingScheduled chan struct{} 169 170 closeOnce sync.Once 171 // closeChan is used to notify the run loop that it should terminate 172 closeChan chan closeError 173 174 ctx context.Context 175 ctxCancel context.CancelFunc 176 handshakeCtx context.Context 177 handshakeCtxCancel context.CancelFunc 178 179 undecryptablePackets []*receivedPacket 180 181 clientHelloWritten <-chan *wire.TransportParameters 182 earlySessionReadyChan chan struct{} 183 handshakeCompleteChan chan struct{} // is closed when the handshake completes 184 handshakeComplete bool 185 handshakeConfirmed bool 186 187 receivedRetry bool 188 versionNegotiated bool 189 receivedFirstPacket bool 190 191 idleTimeout time.Duration 192 sessionCreationTime time.Time 193 // The idle timeout is set based on the max of the time we received the last packet... 194 lastPacketReceivedTime time.Time 195 // ... and the time we sent a new ack-eliciting packet after receiving a packet. 196 firstAckElicitingPacketAfterIdleSentTime time.Time 197 // pacingDeadline is the time when the next packet should be sent 198 pacingDeadline time.Time 199 200 peerParams *wire.TransportParameters 201 202 timer *utils.Timer 203 // keepAlivePingSent stores whether a keep alive PING is in flight. 204 // It is reset as soon as we receive a packet from the peer. 205 keepAlivePingSent bool 206 keepAliveInterval time.Duration 207 208 traceCallback func(quictrace.Event) 209 210 logID string 211 tracer logging.ConnectionTracer 212 logger utils.Logger 213} 214 215var ( 216 _ Session = &session{} 217 _ EarlySession = &session{} 218 _ streamSender = &session{} 219) 220 221var newSession = func( 222 conn sendConn, 223 runner sessionRunner, 224 origDestConnID protocol.ConnectionID, 225 retrySrcConnID *protocol.ConnectionID, 226 clientDestConnID protocol.ConnectionID, 227 destConnID protocol.ConnectionID, 228 srcConnID protocol.ConnectionID, 229 statelessResetToken protocol.StatelessResetToken, 230 conf *Config, 231 tlsConf *tls.Config, 232 tokenGenerator *handshake.TokenGenerator, 233 enable0RTT bool, 234 tracer logging.ConnectionTracer, 235 logger utils.Logger, 236 v protocol.VersionNumber, 237) quicSession { 238 s := &session{ 239 conn: conn, 240 config: conf, 241 handshakeDestConnID: destConnID, 242 srcConnIDLen: srcConnID.Len(), 243 tokenGenerator: tokenGenerator, 244 oneRTTStream: newCryptoStream(), 245 perspective: protocol.PerspectiveServer, 246 handshakeCompleteChan: make(chan struct{}), 247 tracer: tracer, 248 logger: logger, 249 version: v, 250 } 251 if origDestConnID != nil { 252 s.logID = origDestConnID.String() 253 } else { 254 s.logID = destConnID.String() 255 } 256 s.connIDManager = newConnIDManager( 257 destConnID, 258 func(token protocol.StatelessResetToken) { runner.AddResetToken(token, s) }, 259 runner.RemoveResetToken, 260 s.queueControlFrame, 261 ) 262 s.connIDGenerator = newConnIDGenerator( 263 srcConnID, 264 clientDestConnID, 265 func(connID protocol.ConnectionID) { runner.Add(connID, s) }, 266 runner.GetStatelessResetToken, 267 runner.Remove, 268 runner.Retire, 269 runner.ReplaceWithClosed, 270 s.queueControlFrame, 271 s.version, 272 ) 273 s.preSetup() 274 s.sentPacketHandler, s.receivedPacketHandler = ackhandler.NewAckHandler( 275 0, 276 s.rttStats, 277 s.perspective, 278 s.traceCallback, 279 s.tracer, 280 s.logger, 281 s.version, 282 ) 283 initialStream := newCryptoStream() 284 handshakeStream := newCryptoStream() 285 params := &wire.TransportParameters{ 286 InitialMaxStreamDataBidiLocal: protocol.InitialMaxStreamData, 287 InitialMaxStreamDataBidiRemote: protocol.InitialMaxStreamData, 288 InitialMaxStreamDataUni: protocol.InitialMaxStreamData, 289 InitialMaxData: protocol.InitialMaxData, 290 MaxIdleTimeout: s.config.MaxIdleTimeout, 291 MaxBidiStreamNum: protocol.StreamNum(s.config.MaxIncomingStreams), 292 MaxUniStreamNum: protocol.StreamNum(s.config.MaxIncomingUniStreams), 293 MaxAckDelay: protocol.MaxAckDelayInclGranularity, 294 AckDelayExponent: protocol.AckDelayExponent, 295 DisableActiveMigration: true, 296 StatelessResetToken: &statelessResetToken, 297 OriginalDestinationConnectionID: origDestConnID, 298 ActiveConnectionIDLimit: protocol.MaxActiveConnectionIDs, 299 InitialSourceConnectionID: srcConnID, 300 RetrySourceConnectionID: retrySrcConnID, 301 } 302 if s.tracer != nil { 303 s.tracer.SentTransportParameters(params) 304 } 305 cs := handshake.NewCryptoSetupServer( 306 initialStream, 307 handshakeStream, 308 clientDestConnID, 309 conn.LocalAddr(), 310 conn.RemoteAddr(), 311 params, 312 &handshakeRunner{ 313 onReceivedParams: s.processTransportParameters, 314 onError: s.closeLocal, 315 dropKeys: s.dropEncryptionLevel, 316 onHandshakeComplete: func() { 317 runner.Retire(clientDestConnID) 318 close(s.handshakeCompleteChan) 319 }, 320 }, 321 tlsConf, 322 enable0RTT, 323 s.rttStats, 324 tracer, 325 logger, 326 s.version, 327 ) 328 s.cryptoStreamHandler = cs 329 s.packer = newPacketPacker( 330 srcConnID, 331 s.connIDManager.Get, 332 initialStream, 333 handshakeStream, 334 s.sentPacketHandler, 335 s.retransmissionQueue, 336 s.RemoteAddr(), 337 cs, 338 s.framer, 339 s.receivedPacketHandler, 340 s.perspective, 341 s.version, 342 ) 343 s.unpacker = newPacketUnpacker(cs, s.version) 344 s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream, s.oneRTTStream) 345 return s 346} 347 348// declare this as a variable, such that we can it mock it in the tests 349var newClientSession = func( 350 conn sendConn, 351 runner sessionRunner, 352 destConnID protocol.ConnectionID, 353 srcConnID protocol.ConnectionID, 354 conf *Config, 355 tlsConf *tls.Config, 356 initialPacketNumber protocol.PacketNumber, 357 initialVersion protocol.VersionNumber, 358 enable0RTT bool, 359 hasNegotiatedVersion bool, 360 tracer logging.ConnectionTracer, 361 logger utils.Logger, 362 v protocol.VersionNumber, 363) quicSession { 364 s := &session{ 365 conn: conn, 366 config: conf, 367 origDestConnID: destConnID, 368 handshakeDestConnID: destConnID, 369 srcConnIDLen: srcConnID.Len(), 370 perspective: protocol.PerspectiveClient, 371 handshakeCompleteChan: make(chan struct{}), 372 logID: destConnID.String(), 373 logger: logger, 374 tracer: tracer, 375 initialVersion: initialVersion, 376 versionNegotiated: hasNegotiatedVersion, 377 version: v, 378 } 379 s.connIDManager = newConnIDManager( 380 destConnID, 381 func(token protocol.StatelessResetToken) { runner.AddResetToken(token, s) }, 382 runner.RemoveResetToken, 383 s.queueControlFrame, 384 ) 385 s.connIDGenerator = newConnIDGenerator( 386 srcConnID, 387 nil, 388 func(connID protocol.ConnectionID) { runner.Add(connID, s) }, 389 runner.GetStatelessResetToken, 390 runner.Remove, 391 runner.Retire, 392 runner.ReplaceWithClosed, 393 s.queueControlFrame, 394 s.version, 395 ) 396 s.preSetup() 397 s.sentPacketHandler, s.receivedPacketHandler = ackhandler.NewAckHandler( 398 initialPacketNumber, 399 s.rttStats, 400 s.perspective, 401 s.traceCallback, 402 s.tracer, 403 s.logger, 404 s.version, 405 ) 406 initialStream := newCryptoStream() 407 handshakeStream := newCryptoStream() 408 params := &wire.TransportParameters{ 409 InitialMaxStreamDataBidiRemote: protocol.InitialMaxStreamData, 410 InitialMaxStreamDataBidiLocal: protocol.InitialMaxStreamData, 411 InitialMaxStreamDataUni: protocol.InitialMaxStreamData, 412 InitialMaxData: protocol.InitialMaxData, 413 MaxIdleTimeout: s.config.MaxIdleTimeout, 414 MaxBidiStreamNum: protocol.StreamNum(s.config.MaxIncomingStreams), 415 MaxUniStreamNum: protocol.StreamNum(s.config.MaxIncomingUniStreams), 416 MaxAckDelay: protocol.MaxAckDelayInclGranularity, 417 AckDelayExponent: protocol.AckDelayExponent, 418 DisableActiveMigration: true, 419 ActiveConnectionIDLimit: protocol.MaxActiveConnectionIDs, 420 InitialSourceConnectionID: srcConnID, 421 } 422 if s.tracer != nil { 423 s.tracer.SentTransportParameters(params) 424 } 425 cs, clientHelloWritten := handshake.NewCryptoSetupClient( 426 initialStream, 427 handshakeStream, 428 destConnID, 429 conn.LocalAddr(), 430 conn.RemoteAddr(), 431 params, 432 &handshakeRunner{ 433 onReceivedParams: s.processTransportParameters, 434 onError: s.closeLocal, 435 dropKeys: s.dropEncryptionLevel, 436 onHandshakeComplete: func() { close(s.handshakeCompleteChan) }, 437 }, 438 tlsConf, 439 enable0RTT, 440 s.rttStats, 441 tracer, 442 logger, 443 s.version, 444 ) 445 s.clientHelloWritten = clientHelloWritten 446 s.cryptoStreamHandler = cs 447 s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream, newCryptoStream()) 448 s.unpacker = newPacketUnpacker(cs, s.version) 449 s.packer = newPacketPacker( 450 srcConnID, 451 s.connIDManager.Get, 452 initialStream, 453 handshakeStream, 454 s.sentPacketHandler, 455 s.retransmissionQueue, 456 s.RemoteAddr(), 457 cs, 458 s.framer, 459 s.receivedPacketHandler, 460 s.perspective, 461 s.version, 462 ) 463 if len(tlsConf.ServerName) > 0 { 464 s.tokenStoreKey = tlsConf.ServerName 465 } else { 466 s.tokenStoreKey = conn.RemoteAddr().String() 467 } 468 if s.config.TokenStore != nil { 469 if token := s.config.TokenStore.Pop(s.tokenStoreKey); token != nil { 470 s.packer.SetToken(token.data) 471 } 472 } 473 return s 474} 475 476func (s *session) preSetup() { 477 s.sendQueue = newSendQueue(s.conn) 478 s.retransmissionQueue = newRetransmissionQueue(s.version) 479 s.frameParser = wire.NewFrameParser(s.version) 480 s.rttStats = &utils.RTTStats{} 481 s.connFlowController = flowcontrol.NewConnectionFlowController( 482 protocol.InitialMaxData, 483 protocol.ByteCount(s.config.MaxReceiveConnectionFlowControlWindow), 484 s.onHasConnectionWindowUpdate, 485 s.rttStats, 486 s.logger, 487 ) 488 s.earlySessionReadyChan = make(chan struct{}) 489 s.streamsMap = newStreamsMap( 490 s, 491 s.newFlowController, 492 uint64(s.config.MaxIncomingStreams), 493 uint64(s.config.MaxIncomingUniStreams), 494 s.perspective, 495 s.version, 496 ) 497 s.framer = newFramer(s.streamsMap, s.version) 498 s.receivedPackets = make(chan *receivedPacket, protocol.MaxSessionUnprocessedPackets) 499 s.closeChan = make(chan closeError, 1) 500 s.sendingScheduled = make(chan struct{}, 1) 501 s.undecryptablePackets = make([]*receivedPacket, 0, protocol.MaxUndecryptablePackets) 502 s.ctx, s.ctxCancel = context.WithCancel(context.Background()) 503 s.handshakeCtx, s.handshakeCtxCancel = context.WithCancel(context.Background()) 504 505 now := time.Now() 506 s.lastPacketReceivedTime = now 507 s.sessionCreationTime = now 508 509 s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame) 510 511 if s.config.QuicTracer != nil { 512 s.traceCallback = func(ev quictrace.Event) { 513 s.config.QuicTracer.Trace(s.origDestConnID, ev) 514 } 515 } 516} 517 518// run the session main loop 519func (s *session) run() error { 520 defer s.ctxCancel() 521 522 s.timer = utils.NewTimer() 523 524 go s.cryptoStreamHandler.RunHandshake() 525 go func() { 526 if err := s.sendQueue.Run(); err != nil { 527 s.destroyImpl(err) 528 } 529 }() 530 531 if s.perspective == protocol.PerspectiveClient { 532 select { 533 case zeroRTTParams := <-s.clientHelloWritten: 534 s.scheduleSending() 535 if zeroRTTParams != nil { 536 s.restoreTransportParameters(zeroRTTParams) 537 close(s.earlySessionReadyChan) 538 } 539 case closeErr := <-s.closeChan: 540 // put the close error back into the channel, so that the run loop can receive it 541 s.closeChan <- closeErr 542 } 543 } 544 545 var closeErr closeError 546 547runLoop: 548 for { 549 // Close immediately if requested 550 select { 551 case closeErr = <-s.closeChan: 552 break runLoop 553 case <-s.handshakeCompleteChan: 554 s.handleHandshakeComplete() 555 default: 556 } 557 558 s.maybeResetTimer() 559 560 select { 561 case closeErr = <-s.closeChan: 562 break runLoop 563 case <-s.timer.Chan(): 564 s.timer.SetRead() 565 // We do all the interesting stuff after the switch statement, so 566 // nothing to see here. 567 case <-s.sendingScheduled: 568 // We do all the interesting stuff after the switch statement, so 569 // nothing to see here. 570 case p := <-s.receivedPackets: 571 // Only reset the timers if this packet was actually processed. 572 // This avoids modifying any state when handling undecryptable packets, 573 // which could be injected by an attacker. 574 if wasProcessed := s.handlePacketImpl(p); !wasProcessed { 575 continue 576 } 577 // Don't set timers and send packets if the packet made us close the session. 578 select { 579 case closeErr = <-s.closeChan: 580 break runLoop 581 default: 582 } 583 case <-s.handshakeCompleteChan: 584 s.handleHandshakeComplete() 585 } 586 587 now := time.Now() 588 if timeout := s.sentPacketHandler.GetLossDetectionTimeout(); !timeout.IsZero() && timeout.Before(now) { 589 // This could cause packets to be retransmitted. 590 // Check it before trying to send packets. 591 if err := s.sentPacketHandler.OnLossDetectionTimeout(); err != nil { 592 s.closeLocal(err) 593 } 594 } 595 596 if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() && !now.Before(keepAliveTime) { 597 // send a PING frame since there is no activity in the session 598 s.logger.Debugf("Sending a keep-alive PING to keep the connection alive.") 599 s.framer.QueueControlFrame(&wire.PingFrame{}) 600 s.keepAlivePingSent = true 601 } else if !s.handshakeComplete && now.Sub(s.sessionCreationTime) >= s.config.HandshakeTimeout { 602 if s.tracer != nil { 603 s.tracer.ClosedConnection(logging.NewTimeoutCloseReason(logging.TimeoutReasonHandshake)) 604 } 605 s.destroyImpl(qerr.NewTimeoutError("Handshake did not complete in time")) 606 continue 607 } else if s.handshakeComplete && now.Sub(s.idleTimeoutStartTime()) >= s.idleTimeout { 608 if s.tracer != nil { 609 s.tracer.ClosedConnection(logging.NewTimeoutCloseReason(logging.TimeoutReasonIdle)) 610 } 611 s.destroyImpl(qerr.NewTimeoutError("No recent network activity")) 612 continue 613 } 614 615 if err := s.sendPackets(); err != nil { 616 s.closeLocal(err) 617 } 618 } 619 620 s.handleCloseError(closeErr) 621 if !errors.Is(closeErr.err, errCloseForRecreating{}) && s.tracer != nil { 622 s.tracer.Close() 623 } 624 s.logger.Infof("Connection %s closed.", s.logID) 625 s.cryptoStreamHandler.Close() 626 s.sendQueue.Close() 627 s.timer.Stop() 628 return closeErr.err 629} 630 631// blocks until the early session can be used 632func (s *session) earlySessionReady() <-chan struct{} { 633 return s.earlySessionReadyChan 634} 635 636func (s *session) HandshakeComplete() context.Context { 637 return s.handshakeCtx 638} 639 640func (s *session) Context() context.Context { 641 return s.ctx 642} 643 644func (s *session) ConnectionState() ConnectionState { 645 return s.cryptoStreamHandler.ConnectionState() 646} 647 648// Time when the next keep-alive packet should be sent. 649// It returns a zero time if no keep-alive should be sent. 650func (s *session) nextKeepAliveTime() time.Time { 651 if !s.config.KeepAlive || s.keepAlivePingSent || !s.firstAckElicitingPacketAfterIdleSentTime.IsZero() { 652 return time.Time{} 653 } 654 return s.lastPacketReceivedTime.Add(s.keepAliveInterval / 2) 655} 656 657func (s *session) maybeResetTimer() { 658 var deadline time.Time 659 if !s.handshakeComplete { 660 deadline = s.sessionCreationTime.Add(s.config.HandshakeTimeout) 661 } else { 662 if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() { 663 deadline = keepAliveTime 664 } else { 665 deadline = s.idleTimeoutStartTime().Add(s.idleTimeout) 666 } 667 } 668 669 if ackAlarm := s.receivedPacketHandler.GetAlarmTimeout(); !ackAlarm.IsZero() { 670 deadline = utils.MinTime(deadline, ackAlarm) 671 } 672 if lossTime := s.sentPacketHandler.GetLossDetectionTimeout(); !lossTime.IsZero() { 673 deadline = utils.MinTime(deadline, lossTime) 674 } 675 if !s.pacingDeadline.IsZero() { 676 deadline = utils.MinTime(deadline, s.pacingDeadline) 677 } 678 679 s.timer.Reset(deadline) 680} 681 682func (s *session) idleTimeoutStartTime() time.Time { 683 return utils.MaxTime(s.lastPacketReceivedTime, s.firstAckElicitingPacketAfterIdleSentTime) 684} 685 686func (s *session) handleHandshakeComplete() { 687 s.handshakeComplete = true 688 s.handshakeCompleteChan = nil // prevent this case from ever being selected again 689 s.handshakeCtxCancel() 690 691 s.connIDManager.SetHandshakeComplete() 692 s.connIDGenerator.SetHandshakeComplete() 693 694 if s.perspective == protocol.PerspectiveServer { 695 s.handshakeConfirmed = true 696 s.sentPacketHandler.SetHandshakeConfirmed() 697 ticket, err := s.cryptoStreamHandler.GetSessionTicket() 698 if err != nil { 699 s.closeLocal(err) 700 } 701 if ticket != nil { 702 s.oneRTTStream.Write(ticket) 703 for s.oneRTTStream.HasData() { 704 s.queueControlFrame(s.oneRTTStream.PopCryptoFrame(protocol.MaxPostHandshakeCryptoFrameSize)) 705 } 706 } 707 token, err := s.tokenGenerator.NewToken(s.conn.RemoteAddr()) 708 if err != nil { 709 s.closeLocal(err) 710 } 711 s.queueControlFrame(&wire.NewTokenFrame{Token: token}) 712 s.cryptoStreamHandler.SetHandshakeConfirmed() 713 s.queueControlFrame(&wire.HandshakeDoneFrame{}) 714 } 715} 716 717func (s *session) handlePacketImpl(rp *receivedPacket) bool { 718 if wire.IsVersionNegotiationPacket(rp.data) { 719 s.handleVersionNegotiationPacket(rp) 720 return false 721 } 722 723 var counter uint8 724 var lastConnID protocol.ConnectionID 725 var processed bool 726 data := rp.data 727 p := rp 728 s.sentPacketHandler.ReceivedBytes(protocol.ByteCount(len(data))) 729 for len(data) > 0 { 730 if counter > 0 { 731 p = p.Clone() 732 p.data = data 733 } 734 735 hdr, packetData, rest, err := wire.ParsePacket(p.data, s.srcConnIDLen) 736 if err != nil { 737 if s.tracer != nil { 738 dropReason := logging.PacketDropHeaderParseError 739 if err == wire.ErrUnsupportedVersion { 740 dropReason = logging.PacketDropUnsupportedVersion 741 } 742 s.tracer.DroppedPacket(logging.PacketTypeNotDetermined, protocol.ByteCount(len(data)), dropReason) 743 } 744 s.logger.Debugf("error parsing packet: %s", err) 745 break 746 } 747 748 if hdr.IsLongHeader && hdr.Version != s.version { 749 if s.tracer != nil { 750 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), protocol.ByteCount(len(data)), logging.PacketDropUnexpectedVersion) 751 } 752 s.logger.Debugf("Dropping packet with version %x. Expected %x.", hdr.Version, s.version) 753 break 754 } 755 756 if counter > 0 && !hdr.DestConnectionID.Equal(lastConnID) { 757 if s.tracer != nil { 758 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), protocol.ByteCount(len(data)), logging.PacketDropUnknownConnectionID) 759 } 760 s.logger.Debugf("coalesced packet has different destination connection ID: %s, expected %s", hdr.DestConnectionID, lastConnID) 761 break 762 } 763 lastConnID = hdr.DestConnectionID 764 765 if counter > 0 { 766 p.buffer.Split() 767 } 768 counter++ 769 770 // only log if this actually a coalesced packet 771 if s.logger.Debug() && (counter > 1 || len(rest) > 0) { 772 s.logger.Debugf("Parsed a coalesced packet. Part %d: %d bytes. Remaining: %d bytes.", counter, len(packetData), len(rest)) 773 } 774 p.data = packetData 775 if wasProcessed := s.handleSinglePacket(p, hdr); wasProcessed { 776 processed = true 777 } 778 data = rest 779 } 780 p.buffer.MaybeRelease() 781 return processed 782} 783 784func (s *session) handleSinglePacket(p *receivedPacket, hdr *wire.Header) bool /* was the packet successfully processed */ { 785 var wasQueued bool 786 787 defer func() { 788 // Put back the packet buffer if the packet wasn't queued for later decryption. 789 if !wasQueued { 790 p.buffer.Decrement() 791 } 792 }() 793 794 if hdr.Type == protocol.PacketTypeRetry { 795 return s.handleRetryPacket(hdr, p.data) 796 } 797 798 // The server can change the source connection ID with the first Handshake packet. 799 // After this, all packets with a different source connection have to be ignored. 800 if s.receivedFirstPacket && hdr.IsLongHeader && hdr.Type == protocol.PacketTypeInitial && !hdr.SrcConnectionID.Equal(s.handshakeDestConnID) { 801 if s.tracer != nil { 802 s.tracer.DroppedPacket(logging.PacketTypeInitial, p.Size(), logging.PacketDropUnknownConnectionID) 803 } 804 s.logger.Debugf("Dropping Initial packet (%d bytes) with unexpected source connection ID: %s (expected %s)", p.Size(), hdr.SrcConnectionID, s.handshakeDestConnID) 805 return false 806 } 807 // drop 0-RTT packets, if we are a client 808 if s.perspective == protocol.PerspectiveClient && hdr.Type == protocol.PacketType0RTT { 809 if s.tracer != nil { 810 s.tracer.DroppedPacket(logging.PacketType0RTT, p.Size(), logging.PacketDropKeyUnavailable) 811 } 812 return false 813 } 814 815 packet, err := s.unpacker.Unpack(hdr, p.rcvTime, p.data) 816 if err != nil { 817 switch err { 818 case handshake.ErrKeysDropped: 819 if s.tracer != nil { 820 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropKeyUnavailable) 821 } 822 s.logger.Debugf("Dropping %s packet (%d bytes) because we already dropped the keys.", hdr.PacketType(), p.Size()) 823 case handshake.ErrKeysNotYetAvailable: 824 // Sealer for this encryption level not yet available. 825 // Try again later. 826 wasQueued = true 827 s.tryQueueingUndecryptablePacket(p, hdr) 828 case wire.ErrInvalidReservedBits: 829 s.closeLocal(qerr.NewError(qerr.ProtocolViolation, err.Error())) 830 case handshake.ErrDecryptionFailed: 831 // This might be a packet injected by an attacker. Drop it. 832 if s.tracer != nil { 833 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropPayloadDecryptError) 834 } 835 s.logger.Debugf("Dropping %s packet (%d bytes) that could not be unpacked. Error: %s", hdr.PacketType(), p.Size(), err) 836 default: 837 var headerErr *headerParseError 838 if errors.As(err, &headerErr) { 839 // This might be a packet injected by an attacker. Drop it. 840 if s.tracer != nil { 841 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropHeaderParseError) 842 } 843 s.logger.Debugf("Dropping %s packet (%d bytes) for which we couldn't unpack the header. Error: %s", hdr.PacketType(), p.Size(), err) 844 } else { 845 // This is an error returned by the AEAD (other than ErrDecryptionFailed). 846 // For example, a PROTOCOL_VIOLATION due to key updates. 847 s.closeLocal(err) 848 } 849 } 850 return false 851 } 852 853 if s.logger.Debug() { 854 s.logger.Debugf("<- Reading packet %d (%d bytes) for connection %s, %s", packet.packetNumber, p.Size(), hdr.DestConnectionID, packet.encryptionLevel) 855 packet.hdr.Log(s.logger) 856 } 857 858 if s.receivedPacketHandler.IsPotentiallyDuplicate(packet.packetNumber, packet.encryptionLevel) { 859 s.logger.Debugf("Dropping (potentially) duplicate packet.") 860 if s.tracer != nil { 861 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropDuplicate) 862 } 863 return false 864 } 865 866 if err := s.handleUnpackedPacket(packet, p.ecn, p.rcvTime, p.Size()); err != nil { 867 s.closeLocal(err) 868 return false 869 } 870 return true 871} 872 873func (s *session) handleRetryPacket(hdr *wire.Header, data []byte) bool /* was this a valid Retry */ { 874 (&wire.ExtendedHeader{Header: *hdr}).Log(s.logger) 875 if s.perspective == protocol.PerspectiveServer { 876 if s.tracer != nil { 877 s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedPacket) 878 } 879 s.logger.Debugf("Ignoring Retry.") 880 return false 881 } 882 if s.receivedFirstPacket { 883 if s.tracer != nil { 884 s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedPacket) 885 } 886 s.logger.Debugf("Ignoring Retry, since we already received a packet.") 887 return false 888 } 889 destConnID := s.connIDManager.Get() 890 if hdr.SrcConnectionID.Equal(destConnID) { 891 if s.tracer != nil { 892 s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedPacket) 893 } 894 s.logger.Debugf("Ignoring Retry, since the server didn't change the Source Connection ID.") 895 return false 896 } 897 // If a token is already set, this means that we already received a Retry from the server. 898 // Ignore this Retry packet. 899 if s.receivedRetry { 900 s.logger.Debugf("Ignoring Retry, since a Retry was already received.") 901 return false 902 } 903 904 tag := handshake.GetRetryIntegrityTag(data[:len(data)-16], destConnID) 905 if !bytes.Equal(data[len(data)-16:], tag[:]) { 906 if s.tracer != nil { 907 s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.ByteCount(len(data)), logging.PacketDropPayloadDecryptError) 908 } 909 s.logger.Debugf("Ignoring spoofed Retry. Integrity Tag doesn't match.") 910 return false 911 } 912 913 if s.logger.Debug() { 914 s.logger.Debugf("<- Received Retry:") 915 (&wire.ExtendedHeader{Header: *hdr}).Log(s.logger) 916 s.logger.Debugf("Switching destination connection ID to: %s", hdr.SrcConnectionID) 917 } 918 if s.tracer != nil { 919 s.tracer.ReceivedRetry(hdr) 920 } 921 newDestConnID := hdr.SrcConnectionID 922 s.receivedRetry = true 923 if err := s.sentPacketHandler.ResetForRetry(); err != nil { 924 s.closeLocal(err) 925 return false 926 } 927 s.handshakeDestConnID = newDestConnID 928 s.retrySrcConnID = &newDestConnID 929 s.cryptoStreamHandler.ChangeConnectionID(newDestConnID) 930 s.packer.SetToken(hdr.Token) 931 s.connIDManager.ChangeInitialConnID(newDestConnID) 932 s.scheduleSending() 933 return true 934} 935 936func (s *session) handleVersionNegotiationPacket(p *receivedPacket) { 937 if s.perspective == protocol.PerspectiveServer || // servers never receive version negotiation packets 938 s.receivedFirstPacket || s.versionNegotiated { // ignore delayed / duplicated version negotiation packets 939 if s.tracer != nil { 940 s.tracer.DroppedPacket(logging.PacketTypeVersionNegotiation, p.Size(), logging.PacketDropUnexpectedPacket) 941 } 942 return 943 } 944 945 hdr, supportedVersions, err := wire.ParseVersionNegotiationPacket(bytes.NewReader(p.data)) 946 if err != nil { 947 if s.tracer != nil { 948 s.tracer.DroppedPacket(logging.PacketTypeVersionNegotiation, p.Size(), logging.PacketDropHeaderParseError) 949 } 950 s.logger.Debugf("Error parsing Version Negotiation packet: %s", err) 951 return 952 } 953 954 for _, v := range supportedVersions { 955 if v == s.version { 956 if s.tracer != nil { 957 s.tracer.DroppedPacket(logging.PacketTypeVersionNegotiation, p.Size(), logging.PacketDropUnexpectedVersion) 958 } 959 // The Version Negotiation packet contains the version that we offered. 960 // This might be a packet sent by an attacker, or it was corrupted. 961 return 962 } 963 } 964 965 s.logger.Infof("Received a Version Negotiation packet. Supported Versions: %s", supportedVersions) 966 if s.tracer != nil { 967 s.tracer.ReceivedVersionNegotiationPacket(hdr, supportedVersions) 968 } 969 newVersion, ok := protocol.ChooseSupportedVersion(s.config.Versions, supportedVersions) 970 if !ok { 971 //nolint:stylecheck 972 s.destroyImpl(fmt.Errorf("No compatible QUIC version found. We support %s, server offered %s.", s.config.Versions, supportedVersions)) 973 s.logger.Infof("No compatible QUIC version found.") 974 return 975 } 976 977 s.logger.Infof("Switching to QUIC version %s.", newVersion) 978 nextPN, _ := s.sentPacketHandler.PeekPacketNumber(protocol.EncryptionInitial) 979 s.destroyImpl(&errCloseForRecreating{ 980 nextPacketNumber: nextPN, 981 nextVersion: newVersion, 982 }) 983} 984 985func (s *session) handleUnpackedPacket( 986 packet *unpackedPacket, 987 ecn protocol.ECN, 988 rcvTime time.Time, 989 packetSize protocol.ByteCount, // only for logging 990) error { 991 if len(packet.data) == 0 { 992 return qerr.NewError(qerr.ProtocolViolation, "empty packet") 993 } 994 995 if !s.receivedFirstPacket { 996 s.receivedFirstPacket = true 997 // The server can change the source connection ID with the first Handshake packet. 998 if s.perspective == protocol.PerspectiveClient && packet.hdr.IsLongHeader && !packet.hdr.SrcConnectionID.Equal(s.handshakeDestConnID) { 999 cid := packet.hdr.SrcConnectionID 1000 s.logger.Debugf("Received first packet. Switching destination connection ID to: %s", cid) 1001 s.handshakeDestConnID = cid 1002 s.connIDManager.ChangeInitialConnID(cid) 1003 } 1004 // We create the session as soon as we receive the first packet from the client. 1005 // We do that before authenticating the packet. 1006 // That means that if the source connection ID was corrupted, 1007 // we might have create a session with an incorrect source connection ID. 1008 // Once we authenticate the first packet, we need to update it. 1009 if s.perspective == protocol.PerspectiveServer { 1010 if !packet.hdr.SrcConnectionID.Equal(s.handshakeDestConnID) { 1011 s.handshakeDestConnID = packet.hdr.SrcConnectionID 1012 s.connIDManager.ChangeInitialConnID(packet.hdr.SrcConnectionID) 1013 } 1014 if s.tracer != nil { 1015 s.tracer.StartedConnection( 1016 s.conn.LocalAddr(), 1017 s.conn.RemoteAddr(), 1018 s.version, 1019 packet.hdr.SrcConnectionID, 1020 packet.hdr.DestConnectionID, 1021 ) 1022 } 1023 } 1024 } 1025 1026 s.lastPacketReceivedTime = rcvTime 1027 s.firstAckElicitingPacketAfterIdleSentTime = time.Time{} 1028 s.keepAlivePingSent = false 1029 1030 // Only used for tracing. 1031 // If we're not tracing, this slice will always remain empty. 1032 var frames []wire.Frame 1033 var transportState *quictrace.TransportState 1034 1035 r := bytes.NewReader(packet.data) 1036 var isAckEliciting bool 1037 for { 1038 frame, err := s.frameParser.ParseNext(r, packet.encryptionLevel) 1039 if err != nil { 1040 return err 1041 } 1042 if frame == nil { 1043 break 1044 } 1045 if ackhandler.IsFrameAckEliciting(frame) { 1046 isAckEliciting = true 1047 } 1048 if s.traceCallback != nil || s.tracer != nil { 1049 frames = append(frames, frame) 1050 } 1051 // Only process frames now if we're not logging. 1052 // If we're logging, we need to make sure that the packet_received event is logged first. 1053 if s.tracer == nil { 1054 if err := s.handleFrame(frame, packet.encryptionLevel, packet.hdr.DestConnectionID); err != nil { 1055 return err 1056 } 1057 } 1058 } 1059 1060 if s.traceCallback != nil { 1061 transportState = s.sentPacketHandler.GetStats() 1062 s.traceCallback(quictrace.Event{ 1063 Time: rcvTime, 1064 EventType: quictrace.PacketReceived, 1065 TransportState: transportState, 1066 EncryptionLevel: packet.encryptionLevel, 1067 PacketNumber: packet.packetNumber, 1068 PacketSize: protocol.ByteCount(len(packet.data)), 1069 Frames: frames, 1070 }) 1071 } 1072 if s.tracer != nil { 1073 fs := make([]logging.Frame, len(frames)) 1074 for i, frame := range frames { 1075 fs[i] = logutils.ConvertFrame(frame) 1076 } 1077 s.tracer.ReceivedPacket(packet.hdr, packetSize, fs) 1078 for _, frame := range frames { 1079 if err := s.handleFrame(frame, packet.encryptionLevel, packet.hdr.DestConnectionID); err != nil { 1080 return err 1081 } 1082 } 1083 } 1084 1085 return s.receivedPacketHandler.ReceivedPacket(packet.packetNumber, ecn, packet.encryptionLevel, rcvTime, isAckEliciting) 1086} 1087 1088func (s *session) handleFrame(f wire.Frame, encLevel protocol.EncryptionLevel, destConnID protocol.ConnectionID) error { 1089 var err error 1090 wire.LogFrame(s.logger, f, false) 1091 switch frame := f.(type) { 1092 case *wire.CryptoFrame: 1093 err = s.handleCryptoFrame(frame, encLevel) 1094 case *wire.StreamFrame: 1095 err = s.handleStreamFrame(frame) 1096 case *wire.AckFrame: 1097 err = s.handleAckFrame(frame, encLevel) 1098 case *wire.ConnectionCloseFrame: 1099 s.handleConnectionCloseFrame(frame) 1100 case *wire.ResetStreamFrame: 1101 err = s.handleResetStreamFrame(frame) 1102 case *wire.MaxDataFrame: 1103 s.handleMaxDataFrame(frame) 1104 case *wire.MaxStreamDataFrame: 1105 err = s.handleMaxStreamDataFrame(frame) 1106 case *wire.MaxStreamsFrame: 1107 err = s.handleMaxStreamsFrame(frame) 1108 case *wire.DataBlockedFrame: 1109 case *wire.StreamDataBlockedFrame: 1110 case *wire.StreamsBlockedFrame: 1111 case *wire.StopSendingFrame: 1112 err = s.handleStopSendingFrame(frame) 1113 case *wire.PingFrame: 1114 case *wire.PathChallengeFrame: 1115 s.handlePathChallengeFrame(frame) 1116 case *wire.PathResponseFrame: 1117 // since we don't send PATH_CHALLENGEs, we don't expect PATH_RESPONSEs 1118 err = errors.New("unexpected PATH_RESPONSE frame") 1119 case *wire.NewTokenFrame: 1120 err = s.handleNewTokenFrame(frame) 1121 case *wire.NewConnectionIDFrame: 1122 err = s.handleNewConnectionIDFrame(frame) 1123 case *wire.RetireConnectionIDFrame: 1124 err = s.handleRetireConnectionIDFrame(frame, destConnID) 1125 case *wire.HandshakeDoneFrame: 1126 err = s.handleHandshakeDoneFrame() 1127 default: 1128 err = fmt.Errorf("unexpected frame type: %s", reflect.ValueOf(&frame).Elem().Type().Name()) 1129 } 1130 return err 1131} 1132 1133// handlePacket is called by the server with a new packet 1134func (s *session) handlePacket(p *receivedPacket) { 1135 // Discard packets once the amount of queued packets is larger than 1136 // the channel size, protocol.MaxSessionUnprocessedPackets 1137 select { 1138 case s.receivedPackets <- p: 1139 default: 1140 } 1141} 1142 1143func (s *session) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) { 1144 var e error 1145 if frame.IsApplicationError { 1146 e = qerr.NewApplicationError(frame.ErrorCode, frame.ReasonPhrase) 1147 } else { 1148 e = qerr.NewError(frame.ErrorCode, frame.ReasonPhrase) 1149 } 1150 s.closeRemote(e) 1151} 1152 1153func (s *session) handleCryptoFrame(frame *wire.CryptoFrame, encLevel protocol.EncryptionLevel) error { 1154 encLevelChanged, err := s.cryptoStreamManager.HandleCryptoFrame(frame, encLevel) 1155 if err != nil { 1156 return err 1157 } 1158 if encLevelChanged { 1159 s.tryDecryptingQueuedPackets() 1160 } 1161 return nil 1162} 1163 1164func (s *session) handleStreamFrame(frame *wire.StreamFrame) error { 1165 str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID) 1166 if err != nil { 1167 return err 1168 } 1169 if str == nil { 1170 // Stream is closed and already garbage collected 1171 // ignore this StreamFrame 1172 return nil 1173 } 1174 return str.handleStreamFrame(frame) 1175} 1176 1177func (s *session) handleMaxDataFrame(frame *wire.MaxDataFrame) { 1178 s.connFlowController.UpdateSendWindow(frame.MaximumData) 1179} 1180 1181func (s *session) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error { 1182 str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID) 1183 if err != nil { 1184 return err 1185 } 1186 if str == nil { 1187 // stream is closed and already garbage collected 1188 return nil 1189 } 1190 str.handleMaxStreamDataFrame(frame) 1191 return nil 1192} 1193 1194func (s *session) handleMaxStreamsFrame(frame *wire.MaxStreamsFrame) error { 1195 return s.streamsMap.HandleMaxStreamsFrame(frame) 1196} 1197 1198func (s *session) handleResetStreamFrame(frame *wire.ResetStreamFrame) error { 1199 str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID) 1200 if err != nil { 1201 return err 1202 } 1203 if str == nil { 1204 // stream is closed and already garbage collected 1205 return nil 1206 } 1207 return str.handleResetStreamFrame(frame) 1208} 1209 1210func (s *session) handleStopSendingFrame(frame *wire.StopSendingFrame) error { 1211 str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID) 1212 if err != nil { 1213 return err 1214 } 1215 if str == nil { 1216 // stream is closed and already garbage collected 1217 return nil 1218 } 1219 str.handleStopSendingFrame(frame) 1220 return nil 1221} 1222 1223func (s *session) handlePathChallengeFrame(frame *wire.PathChallengeFrame) { 1224 s.queueControlFrame(&wire.PathResponseFrame{Data: frame.Data}) 1225} 1226 1227func (s *session) handleNewTokenFrame(frame *wire.NewTokenFrame) error { 1228 if s.perspective == protocol.PerspectiveServer { 1229 return qerr.NewError(qerr.ProtocolViolation, "Received NEW_TOKEN frame from the client.") 1230 } 1231 if s.config.TokenStore != nil { 1232 s.config.TokenStore.Put(s.tokenStoreKey, &ClientToken{data: frame.Token}) 1233 } 1234 return nil 1235} 1236 1237func (s *session) handleNewConnectionIDFrame(f *wire.NewConnectionIDFrame) error { 1238 return s.connIDManager.Add(f) 1239} 1240 1241func (s *session) handleRetireConnectionIDFrame(f *wire.RetireConnectionIDFrame, destConnID protocol.ConnectionID) error { 1242 return s.connIDGenerator.Retire(f.SequenceNumber, destConnID) 1243} 1244 1245func (s *session) handleHandshakeDoneFrame() error { 1246 if s.perspective == protocol.PerspectiveServer { 1247 return qerr.NewError(qerr.ProtocolViolation, "received a HANDSHAKE_DONE frame") 1248 } 1249 s.handshakeConfirmed = true 1250 s.sentPacketHandler.SetHandshakeConfirmed() 1251 s.cryptoStreamHandler.SetHandshakeConfirmed() 1252 return nil 1253} 1254 1255func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel) error { 1256 if err := s.sentPacketHandler.ReceivedAck(frame, encLevel, s.lastPacketReceivedTime); err != nil { 1257 return err 1258 } 1259 if encLevel != protocol.Encryption1RTT { 1260 return nil 1261 } 1262 return s.cryptoStreamHandler.SetLargest1RTTAcked(frame.LargestAcked()) 1263} 1264 1265// closeLocal closes the session and send a CONNECTION_CLOSE containing the error 1266func (s *session) closeLocal(e error) { 1267 s.closeOnce.Do(func() { 1268 if e == nil { 1269 s.logger.Infof("Closing session.") 1270 } else { 1271 s.logger.Errorf("Closing session with error: %s", e) 1272 } 1273 s.closeChan <- closeError{err: e, immediate: false, remote: false} 1274 }) 1275} 1276 1277// destroy closes the session without sending the error on the wire 1278func (s *session) destroy(e error) { 1279 s.destroyImpl(e) 1280 <-s.ctx.Done() 1281} 1282 1283func (s *session) destroyImpl(e error) { 1284 s.closeOnce.Do(func() { 1285 if nerr, ok := e.(net.Error); ok && nerr.Timeout() { 1286 s.logger.Errorf("Destroying session: %s", e) 1287 } else { 1288 s.logger.Errorf("Destroying session with error: %s", e) 1289 } 1290 s.closeChan <- closeError{err: e, immediate: true, remote: false} 1291 }) 1292} 1293 1294func (s *session) closeRemote(e error) { 1295 s.closeOnce.Do(func() { 1296 s.logger.Errorf("Peer closed session with error: %s", e) 1297 s.closeChan <- closeError{err: e, immediate: true, remote: true} 1298 }) 1299} 1300 1301// Close the connection. It sends a NO_ERROR application error. 1302// It waits until the run loop has stopped before returning 1303func (s *session) shutdown() { 1304 s.closeLocal(nil) 1305 <-s.ctx.Done() 1306} 1307 1308func (s *session) CloseWithError(code protocol.ApplicationErrorCode, desc string) error { 1309 s.closeLocal(qerr.NewApplicationError(qerr.ErrorCode(code), desc)) 1310 <-s.ctx.Done() 1311 return nil 1312} 1313 1314func (s *session) handleCloseError(closeErr closeError) { 1315 if closeErr.err == nil { 1316 closeErr.err = qerr.NewApplicationError(0, "") 1317 } 1318 1319 var quicErr *qerr.QuicError 1320 var ok bool 1321 if quicErr, ok = closeErr.err.(*qerr.QuicError); !ok { 1322 quicErr = qerr.ToQuicError(closeErr.err) 1323 } 1324 1325 s.streamsMap.CloseWithError(quicErr) 1326 s.connIDManager.Close() 1327 1328 if s.tracer != nil { 1329 // timeout errors are logged as soon as they occur (to distinguish between handshake and idle timeouts) 1330 if nerr, ok := closeErr.err.(net.Error); !ok || !nerr.Timeout() { 1331 var resetErr statelessResetErr 1332 if errors.As(closeErr.err, &resetErr) { 1333 s.tracer.ClosedConnection(logging.NewStatelessResetCloseReason(resetErr.token)) 1334 } else if quicErr.IsApplicationError() { 1335 s.tracer.ClosedConnection(logging.NewApplicationCloseReason(quicErr.ErrorCode, closeErr.remote)) 1336 } else { 1337 s.tracer.ClosedConnection(logging.NewTransportCloseReason(quicErr.ErrorCode, closeErr.remote)) 1338 } 1339 } 1340 } 1341 1342 // If this is a remote close we're done here 1343 if closeErr.remote { 1344 s.connIDGenerator.ReplaceWithClosed(newClosedRemoteSession(s.perspective)) 1345 return 1346 } 1347 if closeErr.immediate { 1348 s.connIDGenerator.RemoveAll() 1349 return 1350 } 1351 connClosePacket, err := s.sendConnectionClose(quicErr) 1352 if err != nil { 1353 s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err) 1354 } 1355 cs := newClosedLocalSession(s.conn, connClosePacket, s.perspective, s.logger) 1356 s.connIDGenerator.ReplaceWithClosed(cs) 1357} 1358 1359func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) { 1360 s.sentPacketHandler.DropPackets(encLevel) 1361 s.receivedPacketHandler.DropPackets(encLevel) 1362 if s.tracer != nil { 1363 s.tracer.DroppedEncryptionLevel(encLevel) 1364 } 1365} 1366 1367// is called for the client, when restoring transport parameters saved for 0-RTT 1368func (s *session) restoreTransportParameters(params *wire.TransportParameters) { 1369 if s.logger.Debug() { 1370 s.logger.Debugf("Restoring Transport Parameters: %s", params) 1371 } 1372 1373 s.peerParams = params 1374 s.connIDGenerator.SetMaxActiveConnIDs(params.ActiveConnectionIDLimit) 1375 s.connFlowController.UpdateSendWindow(params.InitialMaxData) 1376 if err := s.streamsMap.UpdateLimits(params); err != nil { 1377 s.closeLocal(err) 1378 return 1379 } 1380} 1381 1382func (s *session) processTransportParameters(params *wire.TransportParameters) { 1383 if err := s.processTransportParametersImpl(params); err != nil { 1384 s.closeLocal(err) 1385 } 1386} 1387 1388func (s *session) processTransportParametersImpl(params *wire.TransportParameters) error { 1389 if s.logger.Debug() { 1390 s.logger.Debugf("Processed Transport Parameters: %s", params) 1391 } 1392 if s.tracer != nil { 1393 s.tracer.ReceivedTransportParameters(params) 1394 } 1395 1396 // check the initial_source_connection_id 1397 if !params.InitialSourceConnectionID.Equal(s.handshakeDestConnID) { 1398 return qerr.NewError(qerr.TransportParameterError, fmt.Sprintf("expected initial_source_connection_id to equal %s, is %s", s.handshakeDestConnID, params.InitialSourceConnectionID)) 1399 } 1400 1401 if s.perspective == protocol.PerspectiveClient { 1402 // check the original_destination_connection_id 1403 if !params.OriginalDestinationConnectionID.Equal(s.origDestConnID) { 1404 return qerr.NewError(qerr.TransportParameterError, fmt.Sprintf("expected original_destination_connection_id to equal %s, is %s", s.origDestConnID, params.OriginalDestinationConnectionID)) 1405 } 1406 if s.retrySrcConnID != nil { // a Retry was performed 1407 if params.RetrySourceConnectionID == nil { 1408 return qerr.NewError(qerr.TransportParameterError, "missing retry_source_connection_id") 1409 } 1410 if !(*params.RetrySourceConnectionID).Equal(*s.retrySrcConnID) { 1411 return qerr.NewError(qerr.TransportParameterError, fmt.Sprintf("expected retry_source_connection_id to equal %s, is %s", s.retrySrcConnID, *params.RetrySourceConnectionID)) 1412 } 1413 } else if params.RetrySourceConnectionID != nil { 1414 return qerr.NewError(qerr.TransportParameterError, "received retry_source_connection_id, although no Retry was performed") 1415 } 1416 } 1417 1418 s.peerParams = params 1419 // Our local idle timeout will always be > 0. 1420 s.idleTimeout = utils.MinNonZeroDuration(s.config.MaxIdleTimeout, params.MaxIdleTimeout) 1421 s.keepAliveInterval = utils.MinDuration(s.idleTimeout/2, protocol.MaxKeepAliveInterval) 1422 if err := s.streamsMap.UpdateLimits(params); err != nil { 1423 return err 1424 } 1425 s.packer.HandleTransportParameters(params) 1426 s.frameParser.SetAckDelayExponent(params.AckDelayExponent) 1427 s.connFlowController.UpdateSendWindow(params.InitialMaxData) 1428 s.rttStats.SetMaxAckDelay(params.MaxAckDelay) 1429 s.connIDGenerator.SetMaxActiveConnIDs(params.ActiveConnectionIDLimit) 1430 if params.StatelessResetToken != nil { 1431 s.connIDManager.SetStatelessResetToken(*params.StatelessResetToken) 1432 } 1433 // We don't support connection migration yet, so we don't have any use for the preferred_address. 1434 if params.PreferredAddress != nil { 1435 // Retire the connection ID. 1436 s.connIDManager.AddFromPreferredAddress(params.PreferredAddress.ConnectionID, params.PreferredAddress.StatelessResetToken) 1437 } 1438 // On the server side, the early session is ready as soon as we processed 1439 // the client's transport parameters. 1440 if s.perspective == protocol.PerspectiveServer { 1441 close(s.earlySessionReadyChan) 1442 } 1443 return nil 1444} 1445 1446func (s *session) sendPackets() error { 1447 s.pacingDeadline = time.Time{} 1448 1449 var sentPacket bool // only used in for packets sent in send mode SendAny 1450 for { 1451 switch sendMode := s.sentPacketHandler.SendMode(); sendMode { 1452 case ackhandler.SendNone: 1453 return nil 1454 case ackhandler.SendAck: 1455 // If we already sent packets, and the send mode switches to SendAck, 1456 // as we've just become congestion limited. 1457 // There's no need to try to send an ACK at this moment. 1458 if sentPacket { 1459 return nil 1460 } 1461 // We can at most send a single ACK only packet. 1462 // There will only be a new ACK after receiving new packets. 1463 // SendAck is only returned when we're congestion limited, so we don't need to set the pacingt timer. 1464 return s.maybeSendAckOnlyPacket() 1465 case ackhandler.SendPTOInitial: 1466 if err := s.sendProbePacket(protocol.EncryptionInitial); err != nil { 1467 return err 1468 } 1469 case ackhandler.SendPTOHandshake: 1470 if err := s.sendProbePacket(protocol.EncryptionHandshake); err != nil { 1471 return err 1472 } 1473 case ackhandler.SendPTOAppData: 1474 if err := s.sendProbePacket(protocol.Encryption1RTT); err != nil { 1475 return err 1476 } 1477 case ackhandler.SendAny: 1478 if s.handshakeComplete && !s.sentPacketHandler.HasPacingBudget() { 1479 s.pacingDeadline = s.sentPacketHandler.TimeUntilSend() 1480 return nil 1481 } 1482 sent, err := s.sendPacket() 1483 if err != nil || !sent { 1484 return err 1485 } 1486 sentPacket = true 1487 default: 1488 return fmt.Errorf("BUG: invalid send mode %d", sendMode) 1489 } 1490 } 1491} 1492 1493func (s *session) maybeSendAckOnlyPacket() error { 1494 packet, err := s.packer.MaybePackAckPacket(s.handshakeConfirmed) 1495 if err != nil { 1496 return err 1497 } 1498 if packet == nil { 1499 return nil 1500 } 1501 s.sendPackedPacket(packet) 1502 return nil 1503} 1504 1505func (s *session) sendProbePacket(encLevel protocol.EncryptionLevel) error { 1506 // Queue probe packets until we actually send out a packet, 1507 // or until there are no more packets to queue. 1508 var packet *packedPacket 1509 for { 1510 if wasQueued := s.sentPacketHandler.QueueProbePacket(encLevel); !wasQueued { 1511 break 1512 } 1513 var err error 1514 packet, err = s.packer.MaybePackProbePacket(encLevel) 1515 if err != nil { 1516 return err 1517 } 1518 if packet != nil { 1519 break 1520 } 1521 } 1522 if packet == nil { 1523 //nolint:exhaustive // Cannot send probe packets for 0-RTT. 1524 switch encLevel { 1525 case protocol.EncryptionInitial: 1526 s.retransmissionQueue.AddInitial(&wire.PingFrame{}) 1527 case protocol.EncryptionHandshake: 1528 s.retransmissionQueue.AddHandshake(&wire.PingFrame{}) 1529 case protocol.Encryption1RTT: 1530 s.retransmissionQueue.AddAppData(&wire.PingFrame{}) 1531 default: 1532 panic("unexpected encryption level") 1533 } 1534 var err error 1535 packet, err = s.packer.MaybePackProbePacket(encLevel) 1536 if err != nil { 1537 return err 1538 } 1539 } 1540 if packet == nil || packet.packetContents == nil { 1541 return fmt.Errorf("session BUG: couldn't pack %s probe packet", encLevel) 1542 } 1543 s.sendPackedPacket(packet) 1544 return nil 1545} 1546 1547func (s *session) sendPacket() (bool, error) { 1548 if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked { 1549 s.framer.QueueControlFrame(&wire.DataBlockedFrame{MaximumData: offset}) 1550 } 1551 s.windowUpdateQueue.QueueAll() 1552 1553 if !s.handshakeConfirmed { 1554 now := time.Now() 1555 packet, err := s.packer.PackCoalescedPacket() 1556 if err != nil || packet == nil { 1557 return false, err 1558 } 1559 s.logCoalescedPacket(now, packet) 1560 for _, p := range packet.packets { 1561 if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && p.IsAckEliciting() { 1562 s.firstAckElicitingPacketAfterIdleSentTime = now 1563 } 1564 s.sentPacketHandler.SentPacket(p.ToAckHandlerPacket(now, s.retransmissionQueue)) 1565 } 1566 s.connIDManager.SentPacket() 1567 s.sendQueue.Send(packet.buffer) 1568 return true, nil 1569 } 1570 packet, err := s.packer.PackPacket() 1571 if err != nil || packet == nil { 1572 return false, err 1573 } 1574 s.sendPackedPacket(packet) 1575 return true, nil 1576} 1577 1578func (s *session) sendPackedPacket(packet *packedPacket) { 1579 now := time.Now() 1580 if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() { 1581 s.firstAckElicitingPacketAfterIdleSentTime = now 1582 } 1583 s.logPacket(now, packet) 1584 s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket(time.Now(), s.retransmissionQueue)) 1585 s.connIDManager.SentPacket() 1586 s.sendQueue.Send(packet.buffer) 1587} 1588 1589func (s *session) sendConnectionClose(quicErr *qerr.QuicError) ([]byte, error) { 1590 packet, err := s.packer.PackConnectionClose(quicErr) 1591 if err != nil { 1592 return nil, err 1593 } 1594 s.logCoalescedPacket(time.Now(), packet) 1595 return packet.buffer.Data, s.conn.Write(packet.buffer.Data) 1596} 1597 1598func (s *session) logPacketContents(now time.Time, p *packetContents) { 1599 // tracing 1600 if s.tracer != nil { 1601 frames := make([]logging.Frame, 0, len(p.frames)) 1602 for _, f := range p.frames { 1603 frames = append(frames, logutils.ConvertFrame(f.Frame)) 1604 } 1605 s.tracer.SentPacket(p.header, p.length, p.ack, frames) 1606 } 1607 1608 // quic-trace 1609 if s.traceCallback != nil { 1610 frames := make([]wire.Frame, 0, len(p.frames)) 1611 for _, f := range p.frames { 1612 frames = append(frames, f.Frame) 1613 } 1614 s.traceCallback(quictrace.Event{ 1615 Time: now, 1616 EventType: quictrace.PacketSent, 1617 TransportState: s.sentPacketHandler.GetStats(), 1618 EncryptionLevel: p.EncryptionLevel(), 1619 PacketNumber: p.header.PacketNumber, 1620 PacketSize: p.length, 1621 Frames: frames, 1622 }) 1623 } 1624 1625 // quic-go logging 1626 if !s.logger.Debug() { 1627 return 1628 } 1629 p.header.Log(s.logger) 1630 if p.ack != nil { 1631 wire.LogFrame(s.logger, p.ack, true) 1632 } 1633 for _, frame := range p.frames { 1634 wire.LogFrame(s.logger, frame.Frame, true) 1635 } 1636} 1637 1638func (s *session) logCoalescedPacket(now time.Time, packet *coalescedPacket) { 1639 if s.logger.Debug() { 1640 if len(packet.packets) > 1 { 1641 s.logger.Debugf("-> Sending coalesced packet (%d parts, %d bytes) for connection %s", len(packet.packets), packet.buffer.Len(), s.logID) 1642 } else { 1643 s.logger.Debugf("-> Sending packet %d (%d bytes) for connection %s, %s", packet.packets[0].header.PacketNumber, packet.buffer.Len(), s.logID, packet.packets[0].EncryptionLevel()) 1644 } 1645 } 1646 for _, p := range packet.packets { 1647 s.logPacketContents(now, p) 1648 } 1649} 1650 1651func (s *session) logPacket(now time.Time, packet *packedPacket) { 1652 if s.logger.Debug() { 1653 s.logger.Debugf("-> Sending packet %d (%d bytes) for connection %s, %s", packet.header.PacketNumber, packet.buffer.Len(), s.logID, packet.EncryptionLevel()) 1654 } 1655 s.logPacketContents(now, packet.packetContents) 1656} 1657 1658// AcceptStream returns the next stream openend by the peer 1659func (s *session) AcceptStream(ctx context.Context) (Stream, error) { 1660 return s.streamsMap.AcceptStream(ctx) 1661} 1662 1663func (s *session) AcceptUniStream(ctx context.Context) (ReceiveStream, error) { 1664 return s.streamsMap.AcceptUniStream(ctx) 1665} 1666 1667// OpenStream opens a stream 1668func (s *session) OpenStream() (Stream, error) { 1669 return s.streamsMap.OpenStream() 1670} 1671 1672func (s *session) OpenStreamSync(ctx context.Context) (Stream, error) { 1673 return s.streamsMap.OpenStreamSync(ctx) 1674} 1675 1676func (s *session) OpenUniStream() (SendStream, error) { 1677 return s.streamsMap.OpenUniStream() 1678} 1679 1680func (s *session) OpenUniStreamSync(ctx context.Context) (SendStream, error) { 1681 return s.streamsMap.OpenUniStreamSync(ctx) 1682} 1683 1684func (s *session) newFlowController(id protocol.StreamID) flowcontrol.StreamFlowController { 1685 var initialSendWindow protocol.ByteCount 1686 if s.peerParams != nil { 1687 if id.Type() == protocol.StreamTypeUni { 1688 initialSendWindow = s.peerParams.InitialMaxStreamDataUni 1689 } else { 1690 if id.InitiatedBy() == s.perspective { 1691 initialSendWindow = s.peerParams.InitialMaxStreamDataBidiRemote 1692 } else { 1693 initialSendWindow = s.peerParams.InitialMaxStreamDataBidiLocal 1694 } 1695 } 1696 } 1697 return flowcontrol.NewStreamFlowController( 1698 id, 1699 s.connFlowController, 1700 protocol.InitialMaxStreamData, 1701 protocol.ByteCount(s.config.MaxReceiveStreamFlowControlWindow), 1702 initialSendWindow, 1703 s.onHasStreamWindowUpdate, 1704 s.rttStats, 1705 s.logger, 1706 ) 1707} 1708 1709// scheduleSending signals that we have data for sending 1710func (s *session) scheduleSending() { 1711 select { 1712 case s.sendingScheduled <- struct{}{}: 1713 default: 1714 } 1715} 1716 1717func (s *session) tryQueueingUndecryptablePacket(p *receivedPacket, hdr *wire.Header) { 1718 if len(s.undecryptablePackets)+1 > protocol.MaxUndecryptablePackets { 1719 if s.tracer != nil { 1720 s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropDOSPrevention) 1721 } 1722 s.logger.Infof("Dropping undecryptable packet (%d bytes). Undecryptable packet queue full.", p.Size()) 1723 return 1724 } 1725 s.logger.Infof("Queueing packet (%d bytes) for later decryption", p.Size()) 1726 if s.tracer != nil { 1727 s.tracer.BufferedPacket(logging.PacketTypeFromHeader(hdr)) 1728 } 1729 s.undecryptablePackets = append(s.undecryptablePackets, p) 1730} 1731 1732func (s *session) tryDecryptingQueuedPackets() { 1733 for _, p := range s.undecryptablePackets { 1734 s.handlePacket(p) 1735 } 1736 s.undecryptablePackets = s.undecryptablePackets[:0] 1737} 1738 1739func (s *session) queueControlFrame(f wire.Frame) { 1740 s.framer.QueueControlFrame(f) 1741 s.scheduleSending() 1742} 1743 1744func (s *session) onHasStreamWindowUpdate(id protocol.StreamID) { 1745 s.windowUpdateQueue.AddStream(id) 1746 s.scheduleSending() 1747} 1748 1749func (s *session) onHasConnectionWindowUpdate() { 1750 s.windowUpdateQueue.AddConnection() 1751 s.scheduleSending() 1752} 1753 1754func (s *session) onHasStreamData(id protocol.StreamID) { 1755 s.framer.AddActiveStream(id) 1756 s.scheduleSending() 1757} 1758 1759func (s *session) onStreamCompleted(id protocol.StreamID) { 1760 if err := s.streamsMap.DeleteStream(id); err != nil { 1761 s.closeLocal(err) 1762 } 1763} 1764 1765func (s *session) LocalAddr() net.Addr { 1766 return s.conn.LocalAddr() 1767} 1768 1769func (s *session) RemoteAddr() net.Addr { 1770 return s.conn.RemoteAddr() 1771} 1772 1773func (s *session) getPerspective() protocol.Perspective { 1774 return s.perspective 1775} 1776 1777func (s *session) GetVersion() protocol.VersionNumber { 1778 return s.version 1779} 1780