1// Copyright 2009 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Pipe adapter to connect code expecting an io.Reader 6// with code expecting an io.Writer. 7 8package io 9 10import ( 11 "errors" 12 "sync" 13 "sync/atomic" 14) 15 16// atomicError is a type-safe atomic value for errors. 17// We use a struct{ error } to ensure consistent use of a concrete type. 18type atomicError struct{ v atomic.Value } 19 20func (a *atomicError) Store(err error) { 21 a.v.Store(struct{ error }{err}) 22} 23func (a *atomicError) Load() error { 24 err, _ := a.v.Load().(struct{ error }) 25 return err.error 26} 27 28// ErrClosedPipe is the error used for read or write operations on a closed pipe. 29var ErrClosedPipe = errors.New("io: read/write on closed pipe") 30 31// A pipe is the shared pipe structure underlying PipeReader and PipeWriter. 32type pipe struct { 33 wrMu sync.Mutex // Serializes Write operations 34 wrCh chan []byte 35 rdCh chan int 36 37 once sync.Once // Protects closing done 38 done chan struct{} 39 rerr atomicError 40 werr atomicError 41} 42 43func (p *pipe) Read(b []byte) (n int, err error) { 44 select { 45 case <-p.done: 46 return 0, p.readCloseError() 47 default: 48 } 49 50 select { 51 case bw := <-p.wrCh: 52 nr := copy(b, bw) 53 p.rdCh <- nr 54 return nr, nil 55 case <-p.done: 56 return 0, p.readCloseError() 57 } 58} 59 60func (p *pipe) readCloseError() error { 61 rerr := p.rerr.Load() 62 if werr := p.werr.Load(); rerr == nil && werr != nil { 63 return werr 64 } 65 return ErrClosedPipe 66} 67 68func (p *pipe) CloseRead(err error) error { 69 if err == nil { 70 err = ErrClosedPipe 71 } 72 p.rerr.Store(err) 73 p.once.Do(func() { close(p.done) }) 74 return nil 75} 76 77func (p *pipe) Write(b []byte) (n int, err error) { 78 select { 79 case <-p.done: 80 return 0, p.writeCloseError() 81 default: 82 p.wrMu.Lock() 83 defer p.wrMu.Unlock() 84 } 85 86 for once := true; once || len(b) > 0; once = false { 87 select { 88 case p.wrCh <- b: 89 nw := <-p.rdCh 90 b = b[nw:] 91 n += nw 92 case <-p.done: 93 return n, p.writeCloseError() 94 } 95 } 96 return n, nil 97} 98 99func (p *pipe) writeCloseError() error { 100 werr := p.werr.Load() 101 if rerr := p.rerr.Load(); werr == nil && rerr != nil { 102 return rerr 103 } 104 return ErrClosedPipe 105} 106 107func (p *pipe) CloseWrite(err error) error { 108 if err == nil { 109 err = EOF 110 } 111 p.werr.Store(err) 112 p.once.Do(func() { close(p.done) }) 113 return nil 114} 115 116// A PipeReader is the read half of a pipe. 117type PipeReader struct { 118 p *pipe 119} 120 121// Read implements the standard Read interface: 122// it reads data from the pipe, blocking until a writer 123// arrives or the write end is closed. 124// If the write end is closed with an error, that error is 125// returned as err; otherwise err is EOF. 126func (r *PipeReader) Read(data []byte) (n int, err error) { 127 return r.p.Read(data) 128} 129 130// Close closes the reader; subsequent writes to the 131// write half of the pipe will return the error ErrClosedPipe. 132func (r *PipeReader) Close() error { 133 return r.CloseWithError(nil) 134} 135 136// CloseWithError closes the reader; subsequent writes 137// to the write half of the pipe will return the error err. 138func (r *PipeReader) CloseWithError(err error) error { 139 return r.p.CloseRead(err) 140} 141 142// A PipeWriter is the write half of a pipe. 143type PipeWriter struct { 144 p *pipe 145} 146 147// Write implements the standard Write interface: 148// it writes data to the pipe, blocking until one or more readers 149// have consumed all the data or the read end is closed. 150// If the read end is closed with an error, that err is 151// returned as err; otherwise err is ErrClosedPipe. 152func (w *PipeWriter) Write(data []byte) (n int, err error) { 153 return w.p.Write(data) 154} 155 156// Close closes the writer; subsequent reads from the 157// read half of the pipe will return no bytes and EOF. 158func (w *PipeWriter) Close() error { 159 return w.CloseWithError(nil) 160} 161 162// CloseWithError closes the writer; subsequent reads from the 163// read half of the pipe will return no bytes and the error err, 164// or EOF if err is nil. 165// 166// CloseWithError always returns nil. 167func (w *PipeWriter) CloseWithError(err error) error { 168 return w.p.CloseWrite(err) 169} 170 171// Pipe creates a synchronous in-memory pipe. 172// It can be used to connect code expecting an io.Reader 173// with code expecting an io.Writer. 174// 175// Reads and Writes on the pipe are matched one to one 176// except when multiple Reads are needed to consume a single Write. 177// That is, each Write to the PipeWriter blocks until it has satisfied 178// one or more Reads from the PipeReader that fully consume 179// the written data. 180// The data is copied directly from the Write to the corresponding 181// Read (or Reads); there is no internal buffering. 182// 183// It is safe to call Read and Write in parallel with each other or with Close. 184// Parallel calls to Read and parallel calls to Write are also safe: 185// the individual calls will be gated sequentially. 186func Pipe() (*PipeReader, *PipeWriter) { 187 p := &pipe{ 188 wrCh: make(chan []byte), 189 rdCh: make(chan int), 190 done: make(chan struct{}), 191 } 192 return &PipeReader{p}, &PipeWriter{p} 193} 194