1// Copyright 2014 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package http2 6 7import ( 8 "errors" 9 "io" 10 "sync" 11) 12 13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like 14// io.Pipe except there are no PipeReader/PipeWriter halves, and the 15// underlying buffer is an interface. (io.Pipe is always unbuffered) 16type pipe struct { 17 mu sync.Mutex 18 c sync.Cond // c.L lazily initialized to &p.mu 19 b pipeBuffer // nil when done reading 20 unread int // bytes unread when done 21 err error // read error once empty. non-nil means closed. 22 breakErr error // immediate read error (caller doesn't see rest of b) 23 donec chan struct{} // closed on error 24 readFn func() // optional code to run in Read before error 25} 26 27type pipeBuffer interface { 28 Len() int 29 io.Writer 30 io.Reader 31} 32 33func (p *pipe) Len() int { 34 p.mu.Lock() 35 defer p.mu.Unlock() 36 if p.b == nil { 37 return p.unread 38 } 39 return p.b.Len() 40} 41 42// Read waits until data is available and copies bytes 43// from the buffer into p. 44func (p *pipe) Read(d []byte) (n int, err error) { 45 p.mu.Lock() 46 defer p.mu.Unlock() 47 if p.c.L == nil { 48 p.c.L = &p.mu 49 } 50 for { 51 if p.breakErr != nil { 52 return 0, p.breakErr 53 } 54 if p.b != nil && p.b.Len() > 0 { 55 return p.b.Read(d) 56 } 57 if p.err != nil { 58 if p.readFn != nil { 59 p.readFn() // e.g. copy trailers 60 p.readFn = nil // not sticky like p.err 61 } 62 p.b = nil 63 return 0, p.err 64 } 65 p.c.Wait() 66 } 67} 68 69var errClosedPipeWrite = errors.New("write on closed buffer") 70 71// Write copies bytes from p into the buffer and wakes a reader. 72// It is an error to write more data than the buffer can hold. 73func (p *pipe) Write(d []byte) (n int, err error) { 74 p.mu.Lock() 75 defer p.mu.Unlock() 76 if p.c.L == nil { 77 p.c.L = &p.mu 78 } 79 defer p.c.Signal() 80 if p.err != nil { 81 return 0, errClosedPipeWrite 82 } 83 if p.breakErr != nil { 84 p.unread += len(d) 85 return len(d), nil // discard when there is no reader 86 } 87 return p.b.Write(d) 88} 89 90// CloseWithError causes the next Read (waking up a current blocked 91// Read if needed) to return the provided err after all data has been 92// read. 93// 94// The error must be non-nil. 95func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) } 96 97// BreakWithError causes the next Read (waking up a current blocked 98// Read if needed) to return the provided err immediately, without 99// waiting for unread data. 100func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) } 101 102// closeWithErrorAndCode is like CloseWithError but also sets some code to run 103// in the caller's goroutine before returning the error. 104func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) } 105 106func (p *pipe) closeWithError(dst *error, err error, fn func()) { 107 if err == nil { 108 panic("err must be non-nil") 109 } 110 p.mu.Lock() 111 defer p.mu.Unlock() 112 if p.c.L == nil { 113 p.c.L = &p.mu 114 } 115 defer p.c.Signal() 116 if *dst != nil { 117 // Already been done. 118 return 119 } 120 p.readFn = fn 121 if dst == &p.breakErr { 122 if p.b != nil { 123 p.unread += p.b.Len() 124 } 125 p.b = nil 126 } 127 *dst = err 128 p.closeDoneLocked() 129} 130 131// requires p.mu be held. 132func (p *pipe) closeDoneLocked() { 133 if p.donec == nil { 134 return 135 } 136 // Close if unclosed. This isn't racy since we always 137 // hold p.mu while closing. 138 select { 139 case <-p.donec: 140 default: 141 close(p.donec) 142 } 143} 144 145// Err returns the error (if any) first set by BreakWithError or CloseWithError. 146func (p *pipe) Err() error { 147 p.mu.Lock() 148 defer p.mu.Unlock() 149 if p.breakErr != nil { 150 return p.breakErr 151 } 152 return p.err 153} 154 155// Done returns a channel which is closed if and when this pipe is closed 156// with CloseWithError. 157func (p *pipe) Done() <-chan struct{} { 158 p.mu.Lock() 159 defer p.mu.Unlock() 160 if p.donec == nil { 161 p.donec = make(chan struct{}) 162 if p.err != nil || p.breakErr != nil { 163 // Already hit an error. 164 p.closeDoneLocked() 165 } 166 } 167 return p.donec 168} 169