1// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Pipe adapter to connect code expecting an io.Reader
6// with code expecting an io.Writer.
7
8package io
9
10import (
11	"errors"
12	"sync"
13)
14
15// onceError is an object that will only store an error once.
16type onceError struct {
17	sync.Mutex // guards following
18	err        error
19}
20
21func (a *onceError) Store(err error) {
22	a.Lock()
23	defer a.Unlock()
24	if a.err != nil {
25		return
26	}
27	a.err = err
28}
29func (a *onceError) Load() error {
30	a.Lock()
31	defer a.Unlock()
32	return a.err
33}
34
35// ErrClosedPipe is the error used for read or write operations on a closed pipe.
36var ErrClosedPipe = errors.New("io: read/write on closed pipe")
37
38// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
39type pipe struct {
40	wrMu sync.Mutex // Serializes Write operations
41	wrCh chan []byte
42	rdCh chan int
43
44	once sync.Once // Protects closing done
45	done chan struct{}
46	rerr onceError
47	werr onceError
48}
49
50func (p *pipe) read(b []byte) (n int, err error) {
51	select {
52	case <-p.done:
53		return 0, p.readCloseError()
54	default:
55	}
56
57	select {
58	case bw := <-p.wrCh:
59		nr := copy(b, bw)
60		p.rdCh <- nr
61		return nr, nil
62	case <-p.done:
63		return 0, p.readCloseError()
64	}
65}
66
67func (p *pipe) closeRead(err error) error {
68	if err == nil {
69		err = ErrClosedPipe
70	}
71	p.rerr.Store(err)
72	p.once.Do(func() { close(p.done) })
73	return nil
74}
75
76func (p *pipe) write(b []byte) (n int, err error) {
77	select {
78	case <-p.done:
79		return 0, p.writeCloseError()
80	default:
81		p.wrMu.Lock()
82		defer p.wrMu.Unlock()
83	}
84
85	for once := true; once || len(b) > 0; once = false {
86		select {
87		case p.wrCh <- b:
88			nw := <-p.rdCh
89			b = b[nw:]
90			n += nw
91		case <-p.done:
92			return n, p.writeCloseError()
93		}
94	}
95	return n, nil
96}
97
98func (p *pipe) closeWrite(err error) error {
99	if err == nil {
100		err = EOF
101	}
102	p.werr.Store(err)
103	p.once.Do(func() { close(p.done) })
104	return nil
105}
106
107// readCloseError is considered internal to the pipe type.
108func (p *pipe) readCloseError() error {
109	rerr := p.rerr.Load()
110	if werr := p.werr.Load(); rerr == nil && werr != nil {
111		return werr
112	}
113	return ErrClosedPipe
114}
115
116// writeCloseError is considered internal to the pipe type.
117func (p *pipe) writeCloseError() error {
118	werr := p.werr.Load()
119	if rerr := p.rerr.Load(); werr == nil && rerr != nil {
120		return rerr
121	}
122	return ErrClosedPipe
123}
124
125// A PipeReader is the read half of a pipe.
126type PipeReader struct {
127	p *pipe
128}
129
130// Read implements the standard Read interface:
131// it reads data from the pipe, blocking until a writer
132// arrives or the write end is closed.
133// If the write end is closed with an error, that error is
134// returned as err; otherwise err is EOF.
135func (r *PipeReader) Read(data []byte) (n int, err error) {
136	return r.p.read(data)
137}
138
139// Close closes the reader; subsequent writes to the
140// write half of the pipe will return the error ErrClosedPipe.
141func (r *PipeReader) Close() error {
142	return r.CloseWithError(nil)
143}
144
145// CloseWithError closes the reader; subsequent writes
146// to the write half of the pipe will return the error err.
147//
148// CloseWithError never overwrites the previous error if it exists
149// and always returns nil.
150func (r *PipeReader) CloseWithError(err error) error {
151	return r.p.closeRead(err)
152}
153
154// A PipeWriter is the write half of a pipe.
155type PipeWriter struct {
156	p *pipe
157}
158
159// Write implements the standard Write interface:
160// it writes data to the pipe, blocking until one or more readers
161// have consumed all the data or the read end is closed.
162// If the read end is closed with an error, that err is
163// returned as err; otherwise err is ErrClosedPipe.
164func (w *PipeWriter) Write(data []byte) (n int, err error) {
165	return w.p.write(data)
166}
167
168// Close closes the writer; subsequent reads from the
169// read half of the pipe will return no bytes and EOF.
170func (w *PipeWriter) Close() error {
171	return w.CloseWithError(nil)
172}
173
174// CloseWithError closes the writer; subsequent reads from the
175// read half of the pipe will return no bytes and the error err,
176// or EOF if err is nil.
177//
178// CloseWithError never overwrites the previous error if it exists
179// and always returns nil.
180func (w *PipeWriter) CloseWithError(err error) error {
181	return w.p.closeWrite(err)
182}
183
184// Pipe creates a synchronous in-memory pipe.
185// It can be used to connect code expecting an io.Reader
186// with code expecting an io.Writer.
187//
188// Reads and Writes on the pipe are matched one to one
189// except when multiple Reads are needed to consume a single Write.
190// That is, each Write to the PipeWriter blocks until it has satisfied
191// one or more Reads from the PipeReader that fully consume
192// the written data.
193// The data is copied directly from the Write to the corresponding
194// Read (or Reads); there is no internal buffering.
195//
196// It is safe to call Read and Write in parallel with each other or with Close.
197// Parallel calls to Read and parallel calls to Write are also safe:
198// the individual calls will be gated sequentially.
199func Pipe() (*PipeReader, *PipeWriter) {
200	p := &pipe{
201		wrCh: make(chan []byte),
202		rdCh: make(chan int),
203		done: make(chan struct{}),
204	}
205	return &PipeReader{p}, &PipeWriter{p}
206}
207