1// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8	"errors"
9	"io"
10	"sync"
11)
12
13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15// underlying buffer is an interface. (io.Pipe is always unbuffered)
16type pipe struct {
17	mu       sync.Mutex
18	c        sync.Cond     // c.L lazily initialized to &p.mu
19	b        pipeBuffer    // nil when done reading
20	unread   int           // bytes unread when done
21	err      error         // read error once empty. non-nil means closed.
22	breakErr error         // immediate read error (caller doesn't see rest of b)
23	donec    chan struct{} // closed on error
24	readFn   func()        // optional code to run in Read before error
25}
26
27type pipeBuffer interface {
28	Len() int
29	io.Writer
30	io.Reader
31}
32
33// setBuffer initializes the pipe buffer.
34// It has no effect if the pipe is already closed.
35func (p *pipe) setBuffer(b pipeBuffer) {
36	p.mu.Lock()
37	defer p.mu.Unlock()
38	if p.err != nil || p.breakErr != nil {
39		return
40	}
41	p.b = b
42}
43
44func (p *pipe) Len() int {
45	p.mu.Lock()
46	defer p.mu.Unlock()
47	if p.b == nil {
48		return p.unread
49	}
50	return p.b.Len()
51}
52
53// Read waits until data is available and copies bytes
54// from the buffer into p.
55func (p *pipe) Read(d []byte) (n int, err error) {
56	p.mu.Lock()
57	defer p.mu.Unlock()
58	if p.c.L == nil {
59		p.c.L = &p.mu
60	}
61	for {
62		if p.breakErr != nil {
63			return 0, p.breakErr
64		}
65		if p.b != nil && p.b.Len() > 0 {
66			return p.b.Read(d)
67		}
68		if p.err != nil {
69			if p.readFn != nil {
70				p.readFn()     // e.g. copy trailers
71				p.readFn = nil // not sticky like p.err
72			}
73			p.b = nil
74			return 0, p.err
75		}
76		p.c.Wait()
77	}
78}
79
80var errClosedPipeWrite = errors.New("write on closed buffer")
81
82// Write copies bytes from p into the buffer and wakes a reader.
83// It is an error to write more data than the buffer can hold.
84func (p *pipe) Write(d []byte) (n int, err error) {
85	p.mu.Lock()
86	defer p.mu.Unlock()
87	if p.c.L == nil {
88		p.c.L = &p.mu
89	}
90	defer p.c.Signal()
91	if p.err != nil {
92		return 0, errClosedPipeWrite
93	}
94	if p.breakErr != nil {
95		p.unread += len(d)
96		return len(d), nil // discard when there is no reader
97	}
98	return p.b.Write(d)
99}
100
101// CloseWithError causes the next Read (waking up a current blocked
102// Read if needed) to return the provided err after all data has been
103// read.
104//
105// The error must be non-nil.
106func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
107
108// BreakWithError causes the next Read (waking up a current blocked
109// Read if needed) to return the provided err immediately, without
110// waiting for unread data.
111func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
112
113// closeWithErrorAndCode is like CloseWithError but also sets some code to run
114// in the caller's goroutine before returning the error.
115func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
116
117func (p *pipe) closeWithError(dst *error, err error, fn func()) {
118	if err == nil {
119		panic("err must be non-nil")
120	}
121	p.mu.Lock()
122	defer p.mu.Unlock()
123	if p.c.L == nil {
124		p.c.L = &p.mu
125	}
126	defer p.c.Signal()
127	if *dst != nil {
128		// Already been done.
129		return
130	}
131	p.readFn = fn
132	if dst == &p.breakErr {
133		if p.b != nil {
134			p.unread += p.b.Len()
135		}
136		p.b = nil
137	}
138	*dst = err
139	p.closeDoneLocked()
140}
141
142// requires p.mu be held.
143func (p *pipe) closeDoneLocked() {
144	if p.donec == nil {
145		return
146	}
147	// Close if unclosed. This isn't racy since we always
148	// hold p.mu while closing.
149	select {
150	case <-p.donec:
151	default:
152		close(p.donec)
153	}
154}
155
156// Err returns the error (if any) first set by BreakWithError or CloseWithError.
157func (p *pipe) Err() error {
158	p.mu.Lock()
159	defer p.mu.Unlock()
160	if p.breakErr != nil {
161		return p.breakErr
162	}
163	return p.err
164}
165
166// Done returns a channel which is closed if and when this pipe is closed
167// with CloseWithError.
168func (p *pipe) Done() <-chan struct{} {
169	p.mu.Lock()
170	defer p.mu.Unlock()
171	if p.donec == nil {
172		p.donec = make(chan struct{})
173		if p.err != nil || p.breakErr != nil {
174			// Already hit an error.
175			p.closeDoneLocked()
176		}
177	}
178	return p.donec
179}
180