1package quic
2
3import (
4	"net"
5	"os"
6	"sync"
7	"time"
8
9	"github.com/lucas-clemente/quic-go/internal/ackhandler"
10	"github.com/lucas-clemente/quic-go/internal/flowcontrol"
11	"github.com/lucas-clemente/quic-go/internal/protocol"
12	"github.com/lucas-clemente/quic-go/internal/wire"
13)
14
15type deadlineError struct{}
16
17func (deadlineError) Error() string   { return "deadline exceeded" }
18func (deadlineError) Temporary() bool { return true }
19func (deadlineError) Timeout() bool   { return true }
20func (deadlineError) Unwrap() error   { return os.ErrDeadlineExceeded }
21
22var errDeadline net.Error = &deadlineError{}
23
24// The streamSender is notified by the stream about various events.
25type streamSender interface {
26	queueControlFrame(wire.Frame)
27	onHasStreamData(protocol.StreamID)
28	// must be called without holding the mutex that is acquired by closeForShutdown
29	onStreamCompleted(protocol.StreamID)
30}
31
32// Each of the both stream halves gets its own uniStreamSender.
33// This is necessary in order to keep track when both halves have been completed.
34type uniStreamSender struct {
35	streamSender
36	onStreamCompletedImpl func()
37}
38
39func (s *uniStreamSender) queueControlFrame(f wire.Frame) {
40	s.streamSender.queueControlFrame(f)
41}
42
43func (s *uniStreamSender) onHasStreamData(id protocol.StreamID) {
44	s.streamSender.onHasStreamData(id)
45}
46
47func (s *uniStreamSender) onStreamCompleted(protocol.StreamID) {
48	s.onStreamCompletedImpl()
49}
50
51var _ streamSender = &uniStreamSender{}
52
53type streamI interface {
54	Stream
55	closeForShutdown(error)
56	// for receiving
57	handleStreamFrame(*wire.StreamFrame) error
58	handleResetStreamFrame(*wire.ResetStreamFrame) error
59	getWindowUpdate() protocol.ByteCount
60	// for sending
61	hasData() bool
62	handleStopSendingFrame(*wire.StopSendingFrame)
63	popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool)
64	updateSendWindow(protocol.ByteCount)
65}
66
67var (
68	_ receiveStreamI = (streamI)(nil)
69	_ sendStreamI    = (streamI)(nil)
70)
71
72// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
73//
74// Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
75type stream struct {
76	receiveStream
77	sendStream
78
79	completedMutex         sync.Mutex
80	sender                 streamSender
81	receiveStreamCompleted bool
82	sendStreamCompleted    bool
83
84	version protocol.VersionNumber
85}
86
87var _ Stream = &stream{}
88
89// newStream creates a new Stream
90func newStream(streamID protocol.StreamID,
91	sender streamSender,
92	flowController flowcontrol.StreamFlowController,
93	version protocol.VersionNumber,
94) *stream {
95	s := &stream{sender: sender, version: version}
96	senderForSendStream := &uniStreamSender{
97		streamSender: sender,
98		onStreamCompletedImpl: func() {
99			s.completedMutex.Lock()
100			s.sendStreamCompleted = true
101			s.checkIfCompleted()
102			s.completedMutex.Unlock()
103		},
104	}
105	s.sendStream = *newSendStream(streamID, senderForSendStream, flowController, version)
106	senderForReceiveStream := &uniStreamSender{
107		streamSender: sender,
108		onStreamCompletedImpl: func() {
109			s.completedMutex.Lock()
110			s.receiveStreamCompleted = true
111			s.checkIfCompleted()
112			s.completedMutex.Unlock()
113		},
114	}
115	s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController, version)
116	return s
117}
118
119// need to define StreamID() here, since both receiveStream and readStream have a StreamID()
120func (s *stream) StreamID() protocol.StreamID {
121	// the result is same for receiveStream and sendStream
122	return s.sendStream.StreamID()
123}
124
125func (s *stream) Close() error {
126	return s.sendStream.Close()
127}
128
129func (s *stream) SetDeadline(t time.Time) error {
130	_ = s.SetReadDeadline(t)  // SetReadDeadline never errors
131	_ = s.SetWriteDeadline(t) // SetWriteDeadline never errors
132	return nil
133}
134
135// CloseForShutdown closes a stream abruptly.
136// It makes Read and Write unblock (and return the error) immediately.
137// The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
138func (s *stream) closeForShutdown(err error) {
139	s.sendStream.closeForShutdown(err)
140	s.receiveStream.closeForShutdown(err)
141}
142
143// checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed.
144// It makes sure that the onStreamCompleted callback is only called if both receive and send side have completed.
145func (s *stream) checkIfCompleted() {
146	if s.sendStreamCompleted && s.receiveStreamCompleted {
147		s.sender.onStreamCompleted(s.StreamID())
148	}
149}
150