1// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Pipe adapter to connect code expecting an io.Reader
6// with code expecting an io.Writer.
7
8package io
9
10import (
11	"errors"
12	"sync"
13	"sync/atomic"
14)
15
16// atomicError is a type-safe atomic value for errors.
17// We use a struct{ error } to ensure consistent use of a concrete type.
18type atomicError struct{ v atomic.Value }
19
20func (a *atomicError) Store(err error) {
21	a.v.Store(struct{ error }{err})
22}
23func (a *atomicError) Load() error {
24	err, _ := a.v.Load().(struct{ error })
25	return err.error
26}
27
28// ErrClosedPipe is the error used for read or write operations on a closed pipe.
29var ErrClosedPipe = errors.New("io: read/write on closed pipe")
30
31// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
32type pipe struct {
33	wrMu sync.Mutex // Serializes Write operations
34	wrCh chan []byte
35	rdCh chan int
36
37	once sync.Once // Protects closing done
38	done chan struct{}
39	rerr atomicError
40	werr atomicError
41}
42
43func (p *pipe) Read(b []byte) (n int, err error) {
44	select {
45	case <-p.done:
46		return 0, p.readCloseError()
47	default:
48	}
49
50	select {
51	case bw := <-p.wrCh:
52		nr := copy(b, bw)
53		p.rdCh <- nr
54		return nr, nil
55	case <-p.done:
56		return 0, p.readCloseError()
57	}
58}
59
60func (p *pipe) readCloseError() error {
61	rerr := p.rerr.Load()
62	if werr := p.werr.Load(); rerr == nil && werr != nil {
63		return werr
64	}
65	return ErrClosedPipe
66}
67
68func (p *pipe) CloseRead(err error) error {
69	if err == nil {
70		err = ErrClosedPipe
71	}
72	p.rerr.Store(err)
73	p.once.Do(func() { close(p.done) })
74	return nil
75}
76
77func (p *pipe) Write(b []byte) (n int, err error) {
78	select {
79	case <-p.done:
80		return 0, p.writeCloseError()
81	default:
82		p.wrMu.Lock()
83		defer p.wrMu.Unlock()
84	}
85
86	for once := true; once || len(b) > 0; once = false {
87		select {
88		case p.wrCh <- b:
89			nw := <-p.rdCh
90			b = b[nw:]
91			n += nw
92		case <-p.done:
93			return n, p.writeCloseError()
94		}
95	}
96	return n, nil
97}
98
99func (p *pipe) writeCloseError() error {
100	werr := p.werr.Load()
101	if rerr := p.rerr.Load(); werr == nil && rerr != nil {
102		return rerr
103	}
104	return ErrClosedPipe
105}
106
107func (p *pipe) CloseWrite(err error) error {
108	if err == nil {
109		err = EOF
110	}
111	p.werr.Store(err)
112	p.once.Do(func() { close(p.done) })
113	return nil
114}
115
116// A PipeReader is the read half of a pipe.
117type PipeReader struct {
118	p *pipe
119}
120
121// Read implements the standard Read interface:
122// it reads data from the pipe, blocking until a writer
123// arrives or the write end is closed.
124// If the write end is closed with an error, that error is
125// returned as err; otherwise err is EOF.
126func (r *PipeReader) Read(data []byte) (n int, err error) {
127	return r.p.Read(data)
128}
129
130// Close closes the reader; subsequent writes to the
131// write half of the pipe will return the error ErrClosedPipe.
132func (r *PipeReader) Close() error {
133	return r.CloseWithError(nil)
134}
135
136// CloseWithError closes the reader; subsequent writes
137// to the write half of the pipe will return the error err.
138func (r *PipeReader) CloseWithError(err error) error {
139	return r.p.CloseRead(err)
140}
141
142// A PipeWriter is the write half of a pipe.
143type PipeWriter struct {
144	p *pipe
145}
146
147// Write implements the standard Write interface:
148// it writes data to the pipe, blocking until one or more readers
149// have consumed all the data or the read end is closed.
150// If the read end is closed with an error, that err is
151// returned as err; otherwise err is ErrClosedPipe.
152func (w *PipeWriter) Write(data []byte) (n int, err error) {
153	return w.p.Write(data)
154}
155
156// Close closes the writer; subsequent reads from the
157// read half of the pipe will return no bytes and EOF.
158func (w *PipeWriter) Close() error {
159	return w.CloseWithError(nil)
160}
161
162// CloseWithError closes the writer; subsequent reads from the
163// read half of the pipe will return no bytes and the error err,
164// or EOF if err is nil.
165//
166// CloseWithError always returns nil.
167func (w *PipeWriter) CloseWithError(err error) error {
168	return w.p.CloseWrite(err)
169}
170
171// Pipe creates a synchronous in-memory pipe.
172// It can be used to connect code expecting an io.Reader
173// with code expecting an io.Writer.
174//
175// Reads and Writes on the pipe are matched one to one
176// except when multiple Reads are needed to consume a single Write.
177// That is, each Write to the PipeWriter blocks until it has satisfied
178// one or more Reads from the PipeReader that fully consume
179// the written data.
180// The data is copied directly from the Write to the corresponding
181// Read (or Reads); there is no internal buffering.
182//
183// It is safe to call Read and Write in parallel with each other or with Close.
184// Parallel calls to Read and parallel calls to Write are also safe:
185// the individual calls will be gated sequentially.
186func Pipe() (*PipeReader, *PipeWriter) {
187	p := &pipe{
188		wrCh: make(chan []byte),
189		rdCh: make(chan int),
190		done: make(chan struct{}),
191	}
192	return &PipeReader{p}, &PipeWriter{p}
193}
194