1// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8	"errors"
9	"io"
10	"sync"
11)
12
13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15// underlying buffer is an interface. (io.Pipe is always unbuffered)
16type pipe struct {
17	mu       sync.Mutex
18	c        sync.Cond     // c.L lazily initialized to &p.mu
19	b        pipeBuffer    // nil when done reading
20	unread   int           // bytes unread when done
21	err      error         // read error once empty. non-nil means closed.
22	breakErr error         // immediate read error (caller doesn't see rest of b)
23	donec    chan struct{} // closed on error
24	readFn   func()        // optional code to run in Read before error
25}
26
27type pipeBuffer interface {
28	Len() int
29	io.Writer
30	io.Reader
31}
32
33func (p *pipe) Len() int {
34	p.mu.Lock()
35	defer p.mu.Unlock()
36	if p.b == nil {
37		return p.unread
38	}
39	return p.b.Len()
40}
41
42// Read waits until data is available and copies bytes
43// from the buffer into p.
44func (p *pipe) Read(d []byte) (n int, err error) {
45	p.mu.Lock()
46	defer p.mu.Unlock()
47	if p.c.L == nil {
48		p.c.L = &p.mu
49	}
50	for {
51		if p.breakErr != nil {
52			return 0, p.breakErr
53		}
54		if p.b != nil && p.b.Len() > 0 {
55			return p.b.Read(d)
56		}
57		if p.err != nil {
58			if p.readFn != nil {
59				p.readFn()     // e.g. copy trailers
60				p.readFn = nil // not sticky like p.err
61			}
62			p.b = nil
63			return 0, p.err
64		}
65		p.c.Wait()
66	}
67}
68
69var errClosedPipeWrite = errors.New("write on closed buffer")
70
71// Write copies bytes from p into the buffer and wakes a reader.
72// It is an error to write more data than the buffer can hold.
73func (p *pipe) Write(d []byte) (n int, err error) {
74	p.mu.Lock()
75	defer p.mu.Unlock()
76	if p.c.L == nil {
77		p.c.L = &p.mu
78	}
79	defer p.c.Signal()
80	if p.err != nil {
81		return 0, errClosedPipeWrite
82	}
83	if p.breakErr != nil {
84		p.unread += len(d)
85		return len(d), nil // discard when there is no reader
86	}
87	return p.b.Write(d)
88}
89
90// CloseWithError causes the next Read (waking up a current blocked
91// Read if needed) to return the provided err after all data has been
92// read.
93//
94// The error must be non-nil.
95func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
96
97// BreakWithError causes the next Read (waking up a current blocked
98// Read if needed) to return the provided err immediately, without
99// waiting for unread data.
100func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
101
102// closeWithErrorAndCode is like CloseWithError but also sets some code to run
103// in the caller's goroutine before returning the error.
104func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
105
106func (p *pipe) closeWithError(dst *error, err error, fn func()) {
107	if err == nil {
108		panic("err must be non-nil")
109	}
110	p.mu.Lock()
111	defer p.mu.Unlock()
112	if p.c.L == nil {
113		p.c.L = &p.mu
114	}
115	defer p.c.Signal()
116	if *dst != nil {
117		// Already been done.
118		return
119	}
120	p.readFn = fn
121	if dst == &p.breakErr {
122		if p.b != nil {
123			p.unread += p.b.Len()
124		}
125		p.b = nil
126	}
127	*dst = err
128	p.closeDoneLocked()
129}
130
131// requires p.mu be held.
132func (p *pipe) closeDoneLocked() {
133	if p.donec == nil {
134		return
135	}
136	// Close if unclosed. This isn't racy since we always
137	// hold p.mu while closing.
138	select {
139	case <-p.donec:
140	default:
141		close(p.donec)
142	}
143}
144
145// Err returns the error (if any) first set by BreakWithError or CloseWithError.
146func (p *pipe) Err() error {
147	p.mu.Lock()
148	defer p.mu.Unlock()
149	if p.breakErr != nil {
150		return p.breakErr
151	}
152	return p.err
153}
154
155// Done returns a channel which is closed if and when this pipe is closed
156// with CloseWithError.
157func (p *pipe) Done() <-chan struct{} {
158	p.mu.Lock()
159	defer p.mu.Unlock()
160	if p.donec == nil {
161		p.donec = make(chan struct{})
162		if p.err != nil || p.breakErr != nil {
163			// Already hit an error.
164			p.closeDoneLocked()
165		}
166	}
167	return p.donec
168}
169