1// Copyright 2010 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package net
6
7import (
8	"io"
9	"os"
10	"sync"
11	"time"
12)
13
14// pipeDeadline is an abstraction for handling timeouts.
15type pipeDeadline struct {
16	mu     sync.Mutex // Guards timer and cancel
17	timer  *time.Timer
18	cancel chan struct{} // Must be non-nil
19}
20
21func makePipeDeadline() pipeDeadline {
22	return pipeDeadline{cancel: make(chan struct{})}
23}
24
25// set sets the point in time when the deadline will time out.
26// A timeout event is signaled by closing the channel returned by waiter.
27// Once a timeout has occurred, the deadline can be refreshed by specifying a
28// t value in the future.
29//
30// A zero value for t prevents timeout.
31func (d *pipeDeadline) set(t time.Time) {
32	d.mu.Lock()
33	defer d.mu.Unlock()
34
35	if d.timer != nil && !d.timer.Stop() {
36		<-d.cancel // Wait for the timer callback to finish and close cancel
37	}
38	d.timer = nil
39
40	// Time is zero, then there is no deadline.
41	closed := isClosedChan(d.cancel)
42	if t.IsZero() {
43		if closed {
44			d.cancel = make(chan struct{})
45		}
46		return
47	}
48
49	// Time in the future, setup a timer to cancel in the future.
50	if dur := time.Until(t); dur > 0 {
51		if closed {
52			d.cancel = make(chan struct{})
53		}
54		d.timer = time.AfterFunc(dur, func() {
55			close(d.cancel)
56		})
57		return
58	}
59
60	// Time in the past, so close immediately.
61	if !closed {
62		close(d.cancel)
63	}
64}
65
66// wait returns a channel that is closed when the deadline is exceeded.
67func (d *pipeDeadline) wait() chan struct{} {
68	d.mu.Lock()
69	defer d.mu.Unlock()
70	return d.cancel
71}
72
73func isClosedChan(c <-chan struct{}) bool {
74	select {
75	case <-c:
76		return true
77	default:
78		return false
79	}
80}
81
82type pipeAddr struct{}
83
84func (pipeAddr) Network() string { return "pipe" }
85func (pipeAddr) String() string  { return "pipe" }
86
87type pipe struct {
88	wrMu sync.Mutex // Serialize Write operations
89
90	// Used by local Read to interact with remote Write.
91	// Successful receive on rdRx is always followed by send on rdTx.
92	rdRx <-chan []byte
93	rdTx chan<- int
94
95	// Used by local Write to interact with remote Read.
96	// Successful send on wrTx is always followed by receive on wrRx.
97	wrTx chan<- []byte
98	wrRx <-chan int
99
100	once       sync.Once // Protects closing localDone
101	localDone  chan struct{}
102	remoteDone <-chan struct{}
103
104	readDeadline  pipeDeadline
105	writeDeadline pipeDeadline
106}
107
108// Pipe creates a synchronous, in-memory, full duplex
109// network connection; both ends implement the Conn interface.
110// Reads on one end are matched with writes on the other,
111// copying data directly between the two; there is no internal
112// buffering.
113func Pipe() (Conn, Conn) {
114	cb1 := make(chan []byte)
115	cb2 := make(chan []byte)
116	cn1 := make(chan int)
117	cn2 := make(chan int)
118	done1 := make(chan struct{})
119	done2 := make(chan struct{})
120
121	p1 := &pipe{
122		rdRx: cb1, rdTx: cn1,
123		wrTx: cb2, wrRx: cn2,
124		localDone: done1, remoteDone: done2,
125		readDeadline:  makePipeDeadline(),
126		writeDeadline: makePipeDeadline(),
127	}
128	p2 := &pipe{
129		rdRx: cb2, rdTx: cn2,
130		wrTx: cb1, wrRx: cn1,
131		localDone: done2, remoteDone: done1,
132		readDeadline:  makePipeDeadline(),
133		writeDeadline: makePipeDeadline(),
134	}
135	return p1, p2
136}
137
138func (*pipe) LocalAddr() Addr  { return pipeAddr{} }
139func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
140
141func (p *pipe) Read(b []byte) (int, error) {
142	n, err := p.read(b)
143	if err != nil && err != io.EOF && err != io.ErrClosedPipe {
144		err = &OpError{Op: "read", Net: "pipe", Err: err}
145	}
146	return n, err
147}
148
149func (p *pipe) read(b []byte) (n int, err error) {
150	switch {
151	case isClosedChan(p.localDone):
152		return 0, io.ErrClosedPipe
153	case isClosedChan(p.remoteDone):
154		return 0, io.EOF
155	case isClosedChan(p.readDeadline.wait()):
156		return 0, os.ErrDeadlineExceeded
157	}
158
159	select {
160	case bw := <-p.rdRx:
161		nr := copy(b, bw)
162		p.rdTx <- nr
163		return nr, nil
164	case <-p.localDone:
165		return 0, io.ErrClosedPipe
166	case <-p.remoteDone:
167		return 0, io.EOF
168	case <-p.readDeadline.wait():
169		return 0, os.ErrDeadlineExceeded
170	}
171}
172
173func (p *pipe) Write(b []byte) (int, error) {
174	n, err := p.write(b)
175	if err != nil && err != io.ErrClosedPipe {
176		err = &OpError{Op: "write", Net: "pipe", Err: err}
177	}
178	return n, err
179}
180
181func (p *pipe) write(b []byte) (n int, err error) {
182	switch {
183	case isClosedChan(p.localDone):
184		return 0, io.ErrClosedPipe
185	case isClosedChan(p.remoteDone):
186		return 0, io.ErrClosedPipe
187	case isClosedChan(p.writeDeadline.wait()):
188		return 0, os.ErrDeadlineExceeded
189	}
190
191	p.wrMu.Lock() // Ensure entirety of b is written together
192	defer p.wrMu.Unlock()
193	for once := true; once || len(b) > 0; once = false {
194		select {
195		case p.wrTx <- b:
196			nw := <-p.wrRx
197			b = b[nw:]
198			n += nw
199		case <-p.localDone:
200			return n, io.ErrClosedPipe
201		case <-p.remoteDone:
202			return n, io.ErrClosedPipe
203		case <-p.writeDeadline.wait():
204			return n, os.ErrDeadlineExceeded
205		}
206	}
207	return n, nil
208}
209
210func (p *pipe) SetDeadline(t time.Time) error {
211	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
212		return io.ErrClosedPipe
213	}
214	p.readDeadline.set(t)
215	p.writeDeadline.set(t)
216	return nil
217}
218
219func (p *pipe) SetReadDeadline(t time.Time) error {
220	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
221		return io.ErrClosedPipe
222	}
223	p.readDeadline.set(t)
224	return nil
225}
226
227func (p *pipe) SetWriteDeadline(t time.Time) error {
228	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
229		return io.ErrClosedPipe
230	}
231	p.writeDeadline.set(t)
232	return nil
233}
234
235func (p *pipe) Close() error {
236	p.once.Do(func() { close(p.localDone) })
237	return nil
238}
239