1// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Pipe adapter to connect code expecting an io.Reader
6// with code expecting an io.Writer.
7
8package io
9
10import (
11	"errors"
12	"sync"
13)
14
15// ErrClosedPipe is the error used for read or write operations on a closed pipe.
16var ErrClosedPipe = errors.New("io: read/write on closed pipe")
17
18type pipeResult struct {
19	n   int
20	err error
21}
22
23// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
24type pipe struct {
25	rl    sync.Mutex // gates readers one at a time
26	wl    sync.Mutex // gates writers one at a time
27	l     sync.Mutex // protects remaining fields
28	data  []byte     // data remaining in pending write
29	rwait sync.Cond  // waiting reader
30	wwait sync.Cond  // waiting writer
31	rerr  error      // if reader closed, error to give writes
32	werr  error      // if writer closed, error to give reads
33}
34
35func (p *pipe) read(b []byte) (n int, err error) {
36	// One reader at a time.
37	p.rl.Lock()
38	defer p.rl.Unlock()
39
40	p.l.Lock()
41	defer p.l.Unlock()
42	for {
43		if p.rerr != nil {
44			return 0, ErrClosedPipe
45		}
46		if p.data != nil {
47			break
48		}
49		if p.werr != nil {
50			return 0, p.werr
51		}
52		p.rwait.Wait()
53	}
54	n = copy(b, p.data)
55	p.data = p.data[n:]
56	if len(p.data) == 0 {
57		p.data = nil
58		p.wwait.Signal()
59	}
60	return
61}
62
63var zero [0]byte
64
65func (p *pipe) write(b []byte) (n int, err error) {
66	// pipe uses nil to mean not available
67	if b == nil {
68		b = zero[:]
69	}
70
71	// One writer at a time.
72	p.wl.Lock()
73	defer p.wl.Unlock()
74
75	p.l.Lock()
76	defer p.l.Unlock()
77	if p.werr != nil {
78		err = ErrClosedPipe
79		return
80	}
81	p.data = b
82	p.rwait.Signal()
83	for {
84		if p.data == nil {
85			break
86		}
87		if p.rerr != nil {
88			err = p.rerr
89			break
90		}
91		if p.werr != nil {
92			err = ErrClosedPipe
93		}
94		p.wwait.Wait()
95	}
96	n = len(b) - len(p.data)
97	p.data = nil // in case of rerr or werr
98	return
99}
100
101func (p *pipe) rclose(err error) {
102	if err == nil {
103		err = ErrClosedPipe
104	}
105	p.l.Lock()
106	defer p.l.Unlock()
107	p.rerr = err
108	p.rwait.Signal()
109	p.wwait.Signal()
110}
111
112func (p *pipe) wclose(err error) {
113	if err == nil {
114		err = EOF
115	}
116	p.l.Lock()
117	defer p.l.Unlock()
118	p.werr = err
119	p.rwait.Signal()
120	p.wwait.Signal()
121}
122
123// A PipeReader is the read half of a pipe.
124type PipeReader struct {
125	p *pipe
126}
127
128// Read implements the standard Read interface:
129// it reads data from the pipe, blocking until a writer
130// arrives or the write end is closed.
131// If the write end is closed with an error, that error is
132// returned as err; otherwise err is EOF.
133func (r *PipeReader) Read(data []byte) (n int, err error) {
134	return r.p.read(data)
135}
136
137// Close closes the reader; subsequent writes to the
138// write half of the pipe will return the error ErrClosedPipe.
139func (r *PipeReader) Close() error {
140	return r.CloseWithError(nil)
141}
142
143// CloseWithError closes the reader; subsequent writes
144// to the write half of the pipe will return the error err.
145func (r *PipeReader) CloseWithError(err error) error {
146	r.p.rclose(err)
147	return nil
148}
149
150// A PipeWriter is the write half of a pipe.
151type PipeWriter struct {
152	p *pipe
153}
154
155// Write implements the standard Write interface:
156// it writes data to the pipe, blocking until readers
157// have consumed all the data or the read end is closed.
158// If the read end is closed with an error, that err is
159// returned as err; otherwise err is ErrClosedPipe.
160func (w *PipeWriter) Write(data []byte) (n int, err error) {
161	return w.p.write(data)
162}
163
164// Close closes the writer; subsequent reads from the
165// read half of the pipe will return no bytes and EOF.
166func (w *PipeWriter) Close() error {
167	return w.CloseWithError(nil)
168}
169
170// CloseWithError closes the writer; subsequent reads from the
171// read half of the pipe will return no bytes and the error err.
172func (w *PipeWriter) CloseWithError(err error) error {
173	w.p.wclose(err)
174	return nil
175}
176
177// Pipe creates a synchronous in-memory pipe.
178// It can be used to connect code expecting an io.Reader
179// with code expecting an io.Writer.
180// Reads on one end are matched with writes on the other,
181// copying data directly between the two; there is no internal buffering.
182// It is safe to call Read and Write in parallel with each other or with
183// Close. Close will complete once pending I/O is done. Parallel calls to
184// Read, and parallel calls to Write, are also safe:
185// the individual calls will be gated sequentially.
186func Pipe() (*PipeReader, *PipeWriter) {
187	p := new(pipe)
188	p.rwait.L = &p.l
189	p.wwait.L = &p.l
190	r := &PipeReader{p}
191	w := &PipeWriter{p}
192	return r, w
193}
194