1package bufpipe
2
3import (
4	"bytes"
5	"errors"
6	"io"
7	"sync"
8)
9
10// ErrClosedPipe is the error used for read or write operations on a closed pipe.
11var ErrClosedPipe = errors.New("bufpipe: read/write on closed pipe")
12
13type pipe struct {
14	cond       *sync.Cond
15	buf        *bytes.Buffer
16	rerr, werr error
17}
18
19// A PipeReader is the read half of a pipe.
20type PipeReader struct {
21	*pipe
22}
23
24// A PipeWriter is the write half of a pipe.
25type PipeWriter struct {
26	*pipe
27}
28
29// New creates a synchronous pipe using buf as its initial contents. It can be
30// used to connect code expecting an io.Reader with code expecting an io.Writer.
31//
32// Unlike io.Pipe, writes never block because the internal buffer has variable
33// size. Reads block only when the buffer is empty.
34//
35// It is safe to call Read and Write in parallel with each other or with Close.
36// Parallel calls to Read and parallel calls to Write are also safe: the
37// individual calls will be gated sequentially.
38//
39// The new pipe takes ownership of buf, and the caller should not use buf after
40// this call. New is intended to prepare a PipeReader to read existing data. It
41// can also be used to set the initial size of the internal buffer for writing.
42// To do that, buf should have the desired capacity but a length of zero.
43func New(buf []byte) (*PipeReader, *PipeWriter) {
44	p := &pipe{
45		buf:  bytes.NewBuffer(buf),
46		cond: sync.NewCond(new(sync.Mutex)),
47	}
48	return &PipeReader{
49			pipe: p,
50		}, &PipeWriter{
51			pipe: p,
52		}
53}
54
55// Read implements the standard Read interface: it reads data from the pipe,
56// reading from the internal buffer, otherwise blocking until a writer arrives
57// or the write end is closed. If the write end is closed with an error, that
58// error is returned as err; otherwise err is io.EOF.
59func (r *PipeReader) Read(data []byte) (int, error) {
60	r.cond.L.Lock()
61	defer r.cond.L.Unlock()
62
63RETRY:
64	n, err := r.buf.Read(data)
65	// If not closed and no read, wait for writing.
66	if err == io.EOF && r.rerr == nil && n == 0 {
67		r.cond.Wait()
68		goto RETRY
69	}
70	if err == io.EOF {
71		return n, r.rerr
72	}
73	return n, err
74}
75
76// Close closes the reader; subsequent writes from the write half of the pipe
77// will return error ErrClosedPipe.
78func (r *PipeReader) Close() error {
79	return r.CloseWithError(nil)
80}
81
82// CloseWithError closes the reader; subsequent writes to the write half of the
83// pipe will return the error err.
84func (r *PipeReader) CloseWithError(err error) error {
85	r.cond.L.Lock()
86	defer r.cond.L.Unlock()
87
88	if err == nil {
89		err = ErrClosedPipe
90	}
91	r.werr = err
92	return nil
93}
94
95// Write implements the standard Write interface: it writes data to the internal
96// buffer. If the read end is closed with an error, that err is returned as err;
97// otherwise err is ErrClosedPipe.
98func (w *PipeWriter) Write(data []byte) (int, error) {
99	w.cond.L.Lock()
100	defer w.cond.L.Unlock()
101
102	if w.werr != nil {
103		return 0, w.werr
104	}
105
106	n, err := w.buf.Write(data)
107	w.cond.Signal()
108	return n, err
109}
110
111// Close closes the writer; subsequent reads from the read half of the pipe will
112// return io.EOF once the internal buffer get empty.
113func (w *PipeWriter) Close() error {
114	return w.CloseWithError(nil)
115}
116
117// Close closes the writer; subsequent reads from the read half of the pipe will
118// return err once the internal buffer get empty.
119func (w *PipeWriter) CloseWithError(err error) error {
120	w.cond.L.Lock()
121	defer w.cond.L.Unlock()
122
123	if err == nil {
124		err = io.EOF
125	}
126	w.rerr = err
127	return nil
128}
129