1package bufpipe 2 3import ( 4 "bytes" 5 "errors" 6 "io" 7 "sync" 8) 9 10// ErrClosedPipe is the error used for read or write operations on a closed pipe. 11var ErrClosedPipe = errors.New("bufpipe: read/write on closed pipe") 12 13type pipe struct { 14 cond *sync.Cond 15 buf *bytes.Buffer 16 rerr, werr error 17} 18 19// A PipeReader is the read half of a pipe. 20type PipeReader struct { 21 *pipe 22} 23 24// A PipeWriter is the write half of a pipe. 25type PipeWriter struct { 26 *pipe 27} 28 29// New creates a synchronous pipe using buf as its initial contents. It can be 30// used to connect code expecting an io.Reader with code expecting an io.Writer. 31// 32// Unlike io.Pipe, writes never block because the internal buffer has variable 33// size. Reads block only when the buffer is empty. 34// 35// It is safe to call Read and Write in parallel with each other or with Close. 36// Parallel calls to Read and parallel calls to Write are also safe: the 37// individual calls will be gated sequentially. 38// 39// The new pipe takes ownership of buf, and the caller should not use buf after 40// this call. New is intended to prepare a PipeReader to read existing data. It 41// can also be used to set the initial size of the internal buffer for writing. 42// To do that, buf should have the desired capacity but a length of zero. 43func New(buf []byte) (*PipeReader, *PipeWriter) { 44 p := &pipe{ 45 buf: bytes.NewBuffer(buf), 46 cond: sync.NewCond(new(sync.Mutex)), 47 } 48 return &PipeReader{ 49 pipe: p, 50 }, &PipeWriter{ 51 pipe: p, 52 } 53} 54 55// Read implements the standard Read interface: it reads data from the pipe, 56// reading from the internal buffer, otherwise blocking until a writer arrives 57// or the write end is closed. If the write end is closed with an error, that 58// error is returned as err; otherwise err is io.EOF. 59func (r *PipeReader) Read(data []byte) (int, error) { 60 r.cond.L.Lock() 61 defer r.cond.L.Unlock() 62 63RETRY: 64 n, err := r.buf.Read(data) 65 // If not closed and no read, wait for writing. 66 if err == io.EOF && r.rerr == nil && n == 0 { 67 r.cond.Wait() 68 goto RETRY 69 } 70 if err == io.EOF { 71 return n, r.rerr 72 } 73 return n, err 74} 75 76// Close closes the reader; subsequent writes from the write half of the pipe 77// will return error ErrClosedPipe. 78func (r *PipeReader) Close() error { 79 return r.CloseWithError(nil) 80} 81 82// CloseWithError closes the reader; subsequent writes to the write half of the 83// pipe will return the error err. 84func (r *PipeReader) CloseWithError(err error) error { 85 r.cond.L.Lock() 86 defer r.cond.L.Unlock() 87 88 if err == nil { 89 err = ErrClosedPipe 90 } 91 r.werr = err 92 return nil 93} 94 95// Write implements the standard Write interface: it writes data to the internal 96// buffer. If the read end is closed with an error, that err is returned as err; 97// otherwise err is ErrClosedPipe. 98func (w *PipeWriter) Write(data []byte) (int, error) { 99 w.cond.L.Lock() 100 defer w.cond.L.Unlock() 101 102 if w.werr != nil { 103 return 0, w.werr 104 } 105 106 n, err := w.buf.Write(data) 107 w.cond.Signal() 108 return n, err 109} 110 111// Close closes the writer; subsequent reads from the read half of the pipe will 112// return io.EOF once the internal buffer get empty. 113func (w *PipeWriter) Close() error { 114 return w.CloseWithError(nil) 115} 116 117// Close closes the writer; subsequent reads from the read half of the pipe will 118// return err once the internal buffer get empty. 119func (w *PipeWriter) CloseWithError(err error) error { 120 w.cond.L.Lock() 121 defer w.cond.L.Unlock() 122 123 if err == nil { 124 err = io.EOF 125 } 126 w.rerr = err 127 return nil 128} 129