1// Copyright 2014 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package http2 6 7import ( 8 "errors" 9 "io" 10 "sync" 11) 12 13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like 14// io.Pipe except there are no PipeReader/PipeWriter halves, and the 15// underlying buffer is an interface. (io.Pipe is always unbuffered) 16type pipe struct { 17 mu sync.Mutex 18 c sync.Cond // c.L lazily initialized to &p.mu 19 b pipeBuffer 20 err error // read error once empty. non-nil means closed. 21 breakErr error // immediate read error (caller doesn't see rest of b) 22 donec chan struct{} // closed on error 23 readFn func() // optional code to run in Read before error 24} 25 26type pipeBuffer interface { 27 Len() int 28 io.Writer 29 io.Reader 30} 31 32// Read waits until data is available and copies bytes 33// from the buffer into p. 34func (p *pipe) Read(d []byte) (n int, err error) { 35 p.mu.Lock() 36 defer p.mu.Unlock() 37 if p.c.L == nil { 38 p.c.L = &p.mu 39 } 40 for { 41 if p.breakErr != nil { 42 return 0, p.breakErr 43 } 44 if p.b.Len() > 0 { 45 return p.b.Read(d) 46 } 47 if p.err != nil { 48 if p.readFn != nil { 49 p.readFn() // e.g. copy trailers 50 p.readFn = nil // not sticky like p.err 51 } 52 return 0, p.err 53 } 54 p.c.Wait() 55 } 56} 57 58var errClosedPipeWrite = errors.New("write on closed buffer") 59 60// Write copies bytes from p into the buffer and wakes a reader. 61// It is an error to write more data than the buffer can hold. 62func (p *pipe) Write(d []byte) (n int, err error) { 63 p.mu.Lock() 64 defer p.mu.Unlock() 65 if p.c.L == nil { 66 p.c.L = &p.mu 67 } 68 defer p.c.Signal() 69 if p.err != nil { 70 return 0, errClosedPipeWrite 71 } 72 return p.b.Write(d) 73} 74 75// CloseWithError causes the next Read (waking up a current blocked 76// Read if needed) to return the provided err after all data has been 77// read. 78// 79// The error must be non-nil. 80func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) } 81 82// BreakWithError causes the next Read (waking up a current blocked 83// Read if needed) to return the provided err immediately, without 84// waiting for unread data. 85func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) } 86 87// closeWithErrorAndCode is like CloseWithError but also sets some code to run 88// in the caller's goroutine before returning the error. 89func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) } 90 91func (p *pipe) closeWithError(dst *error, err error, fn func()) { 92 if err == nil { 93 panic("err must be non-nil") 94 } 95 p.mu.Lock() 96 defer p.mu.Unlock() 97 if p.c.L == nil { 98 p.c.L = &p.mu 99 } 100 defer p.c.Signal() 101 if *dst != nil { 102 // Already been done. 103 return 104 } 105 p.readFn = fn 106 *dst = err 107 p.closeDoneLocked() 108} 109 110// requires p.mu be held. 111func (p *pipe) closeDoneLocked() { 112 if p.donec == nil { 113 return 114 } 115 // Close if unclosed. This isn't racy since we always 116 // hold p.mu while closing. 117 select { 118 case <-p.donec: 119 default: 120 close(p.donec) 121 } 122} 123 124// Err returns the error (if any) first set by BreakWithError or CloseWithError. 125func (p *pipe) Err() error { 126 p.mu.Lock() 127 defer p.mu.Unlock() 128 if p.breakErr != nil { 129 return p.breakErr 130 } 131 return p.err 132} 133 134// Done returns a channel which is closed if and when this pipe is closed 135// with CloseWithError. 136func (p *pipe) Done() <-chan struct{} { 137 p.mu.Lock() 138 defer p.mu.Unlock() 139 if p.donec == nil { 140 p.donec = make(chan struct{}) 141 if p.err != nil || p.breakErr != nil { 142 // Already hit an error. 143 p.closeDoneLocked() 144 } 145 } 146 return p.donec 147} 148