1package quic 2 3import ( 4 "net" 5 "os" 6 "sync" 7 "time" 8 9 "github.com/lucas-clemente/quic-go/internal/ackhandler" 10 "github.com/lucas-clemente/quic-go/internal/flowcontrol" 11 "github.com/lucas-clemente/quic-go/internal/protocol" 12 "github.com/lucas-clemente/quic-go/internal/wire" 13) 14 15type deadlineError struct{} 16 17func (deadlineError) Error() string { return "deadline exceeded" } 18func (deadlineError) Temporary() bool { return true } 19func (deadlineError) Timeout() bool { return true } 20func (deadlineError) Unwrap() error { return os.ErrDeadlineExceeded } 21 22var errDeadline net.Error = &deadlineError{} 23 24// The streamSender is notified by the stream about various events. 25type streamSender interface { 26 queueControlFrame(wire.Frame) 27 onHasStreamData(protocol.StreamID) 28 // must be called without holding the mutex that is acquired by closeForShutdown 29 onStreamCompleted(protocol.StreamID) 30} 31 32// Each of the both stream halves gets its own uniStreamSender. 33// This is necessary in order to keep track when both halves have been completed. 34type uniStreamSender struct { 35 streamSender 36 onStreamCompletedImpl func() 37} 38 39func (s *uniStreamSender) queueControlFrame(f wire.Frame) { 40 s.streamSender.queueControlFrame(f) 41} 42 43func (s *uniStreamSender) onHasStreamData(id protocol.StreamID) { 44 s.streamSender.onHasStreamData(id) 45} 46 47func (s *uniStreamSender) onStreamCompleted(protocol.StreamID) { 48 s.onStreamCompletedImpl() 49} 50 51var _ streamSender = &uniStreamSender{} 52 53type streamI interface { 54 Stream 55 closeForShutdown(error) 56 // for receiving 57 handleStreamFrame(*wire.StreamFrame) error 58 handleResetStreamFrame(*wire.ResetStreamFrame) error 59 getWindowUpdate() protocol.ByteCount 60 // for sending 61 hasData() bool 62 handleStopSendingFrame(*wire.StopSendingFrame) 63 popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool) 64 updateSendWindow(protocol.ByteCount) 65} 66 67var ( 68 _ receiveStreamI = (streamI)(nil) 69 _ sendStreamI = (streamI)(nil) 70) 71 72// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface 73// 74// Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually. 75type stream struct { 76 receiveStream 77 sendStream 78 79 completedMutex sync.Mutex 80 sender streamSender 81 receiveStreamCompleted bool 82 sendStreamCompleted bool 83 84 version protocol.VersionNumber 85} 86 87var _ Stream = &stream{} 88 89// newStream creates a new Stream 90func newStream(streamID protocol.StreamID, 91 sender streamSender, 92 flowController flowcontrol.StreamFlowController, 93 version protocol.VersionNumber, 94) *stream { 95 s := &stream{sender: sender, version: version} 96 senderForSendStream := &uniStreamSender{ 97 streamSender: sender, 98 onStreamCompletedImpl: func() { 99 s.completedMutex.Lock() 100 s.sendStreamCompleted = true 101 s.checkIfCompleted() 102 s.completedMutex.Unlock() 103 }, 104 } 105 s.sendStream = *newSendStream(streamID, senderForSendStream, flowController, version) 106 senderForReceiveStream := &uniStreamSender{ 107 streamSender: sender, 108 onStreamCompletedImpl: func() { 109 s.completedMutex.Lock() 110 s.receiveStreamCompleted = true 111 s.checkIfCompleted() 112 s.completedMutex.Unlock() 113 }, 114 } 115 s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController, version) 116 return s 117} 118 119// need to define StreamID() here, since both receiveStream and readStream have a StreamID() 120func (s *stream) StreamID() protocol.StreamID { 121 // the result is same for receiveStream and sendStream 122 return s.sendStream.StreamID() 123} 124 125func (s *stream) Close() error { 126 return s.sendStream.Close() 127} 128 129func (s *stream) SetDeadline(t time.Time) error { 130 _ = s.SetReadDeadline(t) // SetReadDeadline never errors 131 _ = s.SetWriteDeadline(t) // SetWriteDeadline never errors 132 return nil 133} 134 135// CloseForShutdown closes a stream abruptly. 136// It makes Read and Write unblock (and return the error) immediately. 137// The peer will NOT be informed about this: the stream is closed without sending a FIN or RST. 138func (s *stream) closeForShutdown(err error) { 139 s.sendStream.closeForShutdown(err) 140 s.receiveStream.closeForShutdown(err) 141} 142 143// checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed. 144// It makes sure that the onStreamCompleted callback is only called if both receive and send side have completed. 145func (s *stream) checkIfCompleted() { 146 if s.sendStreamCompleted && s.receiveStreamCompleted { 147 s.sender.onStreamCompleted(s.StreamID()) 148 } 149} 150