1// Copyright 2014 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package http2 6 7import ( 8 "errors" 9 "io" 10 "sync" 11) 12 13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like 14// io.Pipe except there are no PipeReader/PipeWriter halves, and the 15// underlying buffer is an interface. (io.Pipe is always unbuffered) 16type pipe struct { 17 mu sync.Mutex 18 c sync.Cond // c.L lazily initialized to &p.mu 19 b pipeBuffer 20 err error // read error once empty. non-nil means closed. 21 breakErr error // immediate read error (caller doesn't see rest of b) 22 donec chan struct{} // closed on error 23 readFn func() // optional code to run in Read before error 24} 25 26type pipeBuffer interface { 27 Len() int 28 io.Writer 29 io.Reader 30} 31 32func (p *pipe) Len() int { 33 p.mu.Lock() 34 defer p.mu.Unlock() 35 return p.b.Len() 36} 37 38// Read waits until data is available and copies bytes 39// from the buffer into p. 40func (p *pipe) Read(d []byte) (n int, err error) { 41 p.mu.Lock() 42 defer p.mu.Unlock() 43 if p.c.L == nil { 44 p.c.L = &p.mu 45 } 46 for { 47 if p.breakErr != nil { 48 return 0, p.breakErr 49 } 50 if p.b.Len() > 0 { 51 return p.b.Read(d) 52 } 53 if p.err != nil { 54 if p.readFn != nil { 55 p.readFn() // e.g. copy trailers 56 p.readFn = nil // not sticky like p.err 57 } 58 return 0, p.err 59 } 60 p.c.Wait() 61 } 62} 63 64var errClosedPipeWrite = errors.New("write on closed buffer") 65 66// Write copies bytes from p into the buffer and wakes a reader. 67// It is an error to write more data than the buffer can hold. 68func (p *pipe) Write(d []byte) (n int, err error) { 69 p.mu.Lock() 70 defer p.mu.Unlock() 71 if p.c.L == nil { 72 p.c.L = &p.mu 73 } 74 defer p.c.Signal() 75 if p.err != nil { 76 return 0, errClosedPipeWrite 77 } 78 return p.b.Write(d) 79} 80 81// CloseWithError causes the next Read (waking up a current blocked 82// Read if needed) to return the provided err after all data has been 83// read. 84// 85// The error must be non-nil. 86func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) } 87 88// BreakWithError causes the next Read (waking up a current blocked 89// Read if needed) to return the provided err immediately, without 90// waiting for unread data. 91func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) } 92 93// closeWithErrorAndCode is like CloseWithError but also sets some code to run 94// in the caller's goroutine before returning the error. 95func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) } 96 97func (p *pipe) closeWithError(dst *error, err error, fn func()) { 98 if err == nil { 99 panic("err must be non-nil") 100 } 101 p.mu.Lock() 102 defer p.mu.Unlock() 103 if p.c.L == nil { 104 p.c.L = &p.mu 105 } 106 defer p.c.Signal() 107 if *dst != nil { 108 // Already been done. 109 return 110 } 111 p.readFn = fn 112 *dst = err 113 p.closeDoneLocked() 114} 115 116// requires p.mu be held. 117func (p *pipe) closeDoneLocked() { 118 if p.donec == nil { 119 return 120 } 121 // Close if unclosed. This isn't racy since we always 122 // hold p.mu while closing. 123 select { 124 case <-p.donec: 125 default: 126 close(p.donec) 127 } 128} 129 130// Err returns the error (if any) first set by BreakWithError or CloseWithError. 131func (p *pipe) Err() error { 132 p.mu.Lock() 133 defer p.mu.Unlock() 134 if p.breakErr != nil { 135 return p.breakErr 136 } 137 return p.err 138} 139 140// Done returns a channel which is closed if and when this pipe is closed 141// with CloseWithError. 142func (p *pipe) Done() <-chan struct{} { 143 p.mu.Lock() 144 defer p.mu.Unlock() 145 if p.donec == nil { 146 p.donec = make(chan struct{}) 147 if p.err != nil || p.breakErr != nil { 148 // Already hit an error. 149 p.closeDoneLocked() 150 } 151 } 152 return p.donec 153} 154