1// Copyright 2010 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package net
6
7import (
8	"io"
9	"sync"
10	"time"
11)
12
13// pipeDeadline is an abstraction for handling timeouts.
14type pipeDeadline struct {
15	mu     sync.Mutex // Guards timer and cancel
16	timer  *time.Timer
17	cancel chan struct{} // Must be non-nil
18}
19
20func makePipeDeadline() pipeDeadline {
21	return pipeDeadline{cancel: make(chan struct{})}
22}
23
24// set sets the point in time when the deadline will time out.
25// A timeout event is signaled by closing the channel returned by waiter.
26// Once a timeout has occurred, the deadline can be refreshed by specifying a
27// t value in the future.
28//
29// A zero value for t prevents timeout.
30func (d *pipeDeadline) set(t time.Time) {
31	d.mu.Lock()
32	defer d.mu.Unlock()
33
34	if d.timer != nil && !d.timer.Stop() {
35		<-d.cancel // Wait for the timer callback to finish and close cancel
36	}
37	d.timer = nil
38
39	// Time is zero, then there is no deadline.
40	closed := isClosedChan(d.cancel)
41	if t.IsZero() {
42		if closed {
43			d.cancel = make(chan struct{})
44		}
45		return
46	}
47
48	// Time in the future, setup a timer to cancel in the future.
49	if dur := time.Until(t); dur > 0 {
50		if closed {
51			d.cancel = make(chan struct{})
52		}
53		d.timer = time.AfterFunc(dur, func() {
54			close(d.cancel)
55		})
56		return
57	}
58
59	// Time in the past, so close immediately.
60	if !closed {
61		close(d.cancel)
62	}
63}
64
65// wait returns a channel that is closed when the deadline is exceeded.
66func (d *pipeDeadline) wait() chan struct{} {
67	d.mu.Lock()
68	defer d.mu.Unlock()
69	return d.cancel
70}
71
72func isClosedChan(c <-chan struct{}) bool {
73	select {
74	case <-c:
75		return true
76	default:
77		return false
78	}
79}
80
81type timeoutError struct{}
82
83func (timeoutError) Error() string   { return "deadline exceeded" }
84func (timeoutError) Timeout() bool   { return true }
85func (timeoutError) Temporary() bool { return true }
86
87type pipeAddr struct{}
88
89func (pipeAddr) Network() string { return "pipe" }
90func (pipeAddr) String() string  { return "pipe" }
91
92type pipe struct {
93	wrMu sync.Mutex // Serialize Write operations
94
95	// Used by local Read to interact with remote Write.
96	// Successful receive on rdRx is always followed by send on rdTx.
97	rdRx <-chan []byte
98	rdTx chan<- int
99
100	// Used by local Write to interact with remote Read.
101	// Successful send on wrTx is always followed by receive on wrRx.
102	wrTx chan<- []byte
103	wrRx <-chan int
104
105	once       sync.Once // Protects closing localDone
106	localDone  chan struct{}
107	remoteDone <-chan struct{}
108
109	readDeadline  pipeDeadline
110	writeDeadline pipeDeadline
111}
112
113// Pipe creates a synchronous, in-memory, full duplex
114// network connection; both ends implement the Conn interface.
115// Reads on one end are matched with writes on the other,
116// copying data directly between the two; there is no internal
117// buffering.
118func Pipe() (Conn, Conn) {
119	cb1 := make(chan []byte)
120	cb2 := make(chan []byte)
121	cn1 := make(chan int)
122	cn2 := make(chan int)
123	done1 := make(chan struct{})
124	done2 := make(chan struct{})
125
126	p1 := &pipe{
127		rdRx: cb1, rdTx: cn1,
128		wrTx: cb2, wrRx: cn2,
129		localDone: done1, remoteDone: done2,
130		readDeadline:  makePipeDeadline(),
131		writeDeadline: makePipeDeadline(),
132	}
133	p2 := &pipe{
134		rdRx: cb2, rdTx: cn2,
135		wrTx: cb1, wrRx: cn1,
136		localDone: done2, remoteDone: done1,
137		readDeadline:  makePipeDeadline(),
138		writeDeadline: makePipeDeadline(),
139	}
140	return p1, p2
141}
142
143func (*pipe) LocalAddr() Addr  { return pipeAddr{} }
144func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
145
146func (p *pipe) Read(b []byte) (int, error) {
147	n, err := p.read(b)
148	if err != nil && err != io.EOF && err != io.ErrClosedPipe {
149		err = &OpError{Op: "read", Net: "pipe", Err: err}
150	}
151	return n, err
152}
153
154func (p *pipe) read(b []byte) (n int, err error) {
155	switch {
156	case isClosedChan(p.localDone):
157		return 0, io.ErrClosedPipe
158	case isClosedChan(p.remoteDone):
159		return 0, io.EOF
160	case isClosedChan(p.readDeadline.wait()):
161		return 0, timeoutError{}
162	}
163
164	select {
165	case bw := <-p.rdRx:
166		nr := copy(b, bw)
167		p.rdTx <- nr
168		return nr, nil
169	case <-p.localDone:
170		return 0, io.ErrClosedPipe
171	case <-p.remoteDone:
172		return 0, io.EOF
173	case <-p.readDeadline.wait():
174		return 0, timeoutError{}
175	}
176}
177
178func (p *pipe) Write(b []byte) (int, error) {
179	n, err := p.write(b)
180	if err != nil && err != io.ErrClosedPipe {
181		err = &OpError{Op: "write", Net: "pipe", Err: err}
182	}
183	return n, err
184}
185
186func (p *pipe) write(b []byte) (n int, err error) {
187	switch {
188	case isClosedChan(p.localDone):
189		return 0, io.ErrClosedPipe
190	case isClosedChan(p.remoteDone):
191		return 0, io.ErrClosedPipe
192	case isClosedChan(p.writeDeadline.wait()):
193		return 0, timeoutError{}
194	}
195
196	p.wrMu.Lock() // Ensure entirety of b is written together
197	defer p.wrMu.Unlock()
198	for once := true; once || len(b) > 0; once = false {
199		select {
200		case p.wrTx <- b:
201			nw := <-p.wrRx
202			b = b[nw:]
203			n += nw
204		case <-p.localDone:
205			return n, io.ErrClosedPipe
206		case <-p.remoteDone:
207			return n, io.ErrClosedPipe
208		case <-p.writeDeadline.wait():
209			return n, timeoutError{}
210		}
211	}
212	return n, nil
213}
214
215func (p *pipe) SetDeadline(t time.Time) error {
216	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
217		return io.ErrClosedPipe
218	}
219	p.readDeadline.set(t)
220	p.writeDeadline.set(t)
221	return nil
222}
223
224func (p *pipe) SetReadDeadline(t time.Time) error {
225	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
226		return io.ErrClosedPipe
227	}
228	p.readDeadline.set(t)
229	return nil
230}
231
232func (p *pipe) SetWriteDeadline(t time.Time) error {
233	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
234		return io.ErrClosedPipe
235	}
236	p.writeDeadline.set(t)
237	return nil
238}
239
240func (p *pipe) Close() error {
241	p.once.Do(func() { close(p.localDone) })
242	return nil
243}
244