1// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Pipe adapter to connect code expecting an io.Reader
6// with code expecting an io.Writer.
7
8package io
9
10import (
11	"errors"
12	"sync"
13)
14
15// onceError is an object that will only store an error once.
16type onceError struct {
17	sync.Mutex // guards following
18	err        error
19}
20
21func (a *onceError) Store(err error) {
22	a.Lock()
23	defer a.Unlock()
24	if a.err != nil {
25		return
26	}
27	a.err = err
28}
29func (a *onceError) Load() error {
30	a.Lock()
31	defer a.Unlock()
32	return a.err
33}
34
35// ErrClosedPipe is the error used for read or write operations on a closed pipe.
36var ErrClosedPipe = errors.New("io: read/write on closed pipe")
37
38// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
39type pipe struct {
40	wrMu sync.Mutex // Serializes Write operations
41	wrCh chan []byte
42	rdCh chan int
43
44	once sync.Once // Protects closing done
45	done chan struct{}
46	rerr onceError
47	werr onceError
48}
49
50func (p *pipe) Read(b []byte) (n int, err error) {
51	select {
52	case <-p.done:
53		return 0, p.readCloseError()
54	default:
55	}
56
57	select {
58	case bw := <-p.wrCh:
59		nr := copy(b, bw)
60		p.rdCh <- nr
61		return nr, nil
62	case <-p.done:
63		return 0, p.readCloseError()
64	}
65}
66
67func (p *pipe) readCloseError() error {
68	rerr := p.rerr.Load()
69	if werr := p.werr.Load(); rerr == nil && werr != nil {
70		return werr
71	}
72	return ErrClosedPipe
73}
74
75func (p *pipe) CloseRead(err error) error {
76	if err == nil {
77		err = ErrClosedPipe
78	}
79	p.rerr.Store(err)
80	p.once.Do(func() { close(p.done) })
81	return nil
82}
83
84func (p *pipe) Write(b []byte) (n int, err error) {
85	select {
86	case <-p.done:
87		return 0, p.writeCloseError()
88	default:
89		p.wrMu.Lock()
90		defer p.wrMu.Unlock()
91	}
92
93	for once := true; once || len(b) > 0; once = false {
94		select {
95		case p.wrCh <- b:
96			nw := <-p.rdCh
97			b = b[nw:]
98			n += nw
99		case <-p.done:
100			return n, p.writeCloseError()
101		}
102	}
103	return n, nil
104}
105
106func (p *pipe) writeCloseError() error {
107	werr := p.werr.Load()
108	if rerr := p.rerr.Load(); werr == nil && rerr != nil {
109		return rerr
110	}
111	return ErrClosedPipe
112}
113
114func (p *pipe) CloseWrite(err error) error {
115	if err == nil {
116		err = EOF
117	}
118	p.werr.Store(err)
119	p.once.Do(func() { close(p.done) })
120	return nil
121}
122
123// A PipeReader is the read half of a pipe.
124type PipeReader struct {
125	p *pipe
126}
127
128// Read implements the standard Read interface:
129// it reads data from the pipe, blocking until a writer
130// arrives or the write end is closed.
131// If the write end is closed with an error, that error is
132// returned as err; otherwise err is EOF.
133func (r *PipeReader) Read(data []byte) (n int, err error) {
134	return r.p.Read(data)
135}
136
137// Close closes the reader; subsequent writes to the
138// write half of the pipe will return the error ErrClosedPipe.
139func (r *PipeReader) Close() error {
140	return r.CloseWithError(nil)
141}
142
143// CloseWithError closes the reader; subsequent writes
144// to the write half of the pipe will return the error err.
145//
146// CloseWithError never overwrites the previous error if it exists
147// and always returns nil.
148func (r *PipeReader) CloseWithError(err error) error {
149	return r.p.CloseRead(err)
150}
151
152// A PipeWriter is the write half of a pipe.
153type PipeWriter struct {
154	p *pipe
155}
156
157// Write implements the standard Write interface:
158// it writes data to the pipe, blocking until one or more readers
159// have consumed all the data or the read end is closed.
160// If the read end is closed with an error, that err is
161// returned as err; otherwise err is ErrClosedPipe.
162func (w *PipeWriter) Write(data []byte) (n int, err error) {
163	return w.p.Write(data)
164}
165
166// Close closes the writer; subsequent reads from the
167// read half of the pipe will return no bytes and EOF.
168func (w *PipeWriter) Close() error {
169	return w.CloseWithError(nil)
170}
171
172// CloseWithError closes the writer; subsequent reads from the
173// read half of the pipe will return no bytes and the error err,
174// or EOF if err is nil.
175//
176// CloseWithError never overwrites the previous error if it exists
177// and always returns nil.
178func (w *PipeWriter) CloseWithError(err error) error {
179	return w.p.CloseWrite(err)
180}
181
182// Pipe creates a synchronous in-memory pipe.
183// It can be used to connect code expecting an io.Reader
184// with code expecting an io.Writer.
185//
186// Reads and Writes on the pipe are matched one to one
187// except when multiple Reads are needed to consume a single Write.
188// That is, each Write to the PipeWriter blocks until it has satisfied
189// one or more Reads from the PipeReader that fully consume
190// the written data.
191// The data is copied directly from the Write to the corresponding
192// Read (or Reads); there is no internal buffering.
193//
194// It is safe to call Read and Write in parallel with each other or with Close.
195// Parallel calls to Read and parallel calls to Write are also safe:
196// the individual calls will be gated sequentially.
197func Pipe() (*PipeReader, *PipeWriter) {
198	p := &pipe{
199		wrCh: make(chan []byte),
200		rdCh: make(chan int),
201		done: make(chan struct{}),
202	}
203	return &PipeReader{p}, &PipeWriter{p}
204}
205