1// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8	"errors"
9	"io"
10	"sync"
11)
12
13// pipe is a goroutine-safe io.Reader/io.Writer pair.  It's like
14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15// underlying buffer is an interface. (io.Pipe is always unbuffered)
16type pipe struct {
17	mu       sync.Mutex
18	c        sync.Cond // c.L lazily initialized to &p.mu
19	b        pipeBuffer
20	err      error         // read error once empty. non-nil means closed.
21	breakErr error         // immediate read error (caller doesn't see rest of b)
22	donec    chan struct{} // closed on error
23	readFn   func()        // optional code to run in Read before error
24}
25
26type pipeBuffer interface {
27	Len() int
28	io.Writer
29	io.Reader
30}
31
32// Read waits until data is available and copies bytes
33// from the buffer into p.
34func (p *pipe) Read(d []byte) (n int, err error) {
35	p.mu.Lock()
36	defer p.mu.Unlock()
37	if p.c.L == nil {
38		p.c.L = &p.mu
39	}
40	for {
41		if p.breakErr != nil {
42			return 0, p.breakErr
43		}
44		if p.b.Len() > 0 {
45			return p.b.Read(d)
46		}
47		if p.err != nil {
48			if p.readFn != nil {
49				p.readFn()     // e.g. copy trailers
50				p.readFn = nil // not sticky like p.err
51			}
52			return 0, p.err
53		}
54		p.c.Wait()
55	}
56}
57
58var errClosedPipeWrite = errors.New("write on closed buffer")
59
60// Write copies bytes from p into the buffer and wakes a reader.
61// It is an error to write more data than the buffer can hold.
62func (p *pipe) Write(d []byte) (n int, err error) {
63	p.mu.Lock()
64	defer p.mu.Unlock()
65	if p.c.L == nil {
66		p.c.L = &p.mu
67	}
68	defer p.c.Signal()
69	if p.err != nil {
70		return 0, errClosedPipeWrite
71	}
72	return p.b.Write(d)
73}
74
75// CloseWithError causes the next Read (waking up a current blocked
76// Read if needed) to return the provided err after all data has been
77// read.
78//
79// The error must be non-nil.
80func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
81
82// BreakWithError causes the next Read (waking up a current blocked
83// Read if needed) to return the provided err immediately, without
84// waiting for unread data.
85func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
86
87// closeWithErrorAndCode is like CloseWithError but also sets some code to run
88// in the caller's goroutine before returning the error.
89func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
90
91func (p *pipe) closeWithError(dst *error, err error, fn func()) {
92	if err == nil {
93		panic("err must be non-nil")
94	}
95	p.mu.Lock()
96	defer p.mu.Unlock()
97	if p.c.L == nil {
98		p.c.L = &p.mu
99	}
100	defer p.c.Signal()
101	if *dst != nil {
102		// Already been done.
103		return
104	}
105	p.readFn = fn
106	*dst = err
107	p.closeDoneLocked()
108}
109
110// requires p.mu be held.
111func (p *pipe) closeDoneLocked() {
112	if p.donec == nil {
113		return
114	}
115	// Close if unclosed. This isn't racy since we always
116	// hold p.mu while closing.
117	select {
118	case <-p.donec:
119	default:
120		close(p.donec)
121	}
122}
123
124// Err returns the error (if any) first set by BreakWithError or CloseWithError.
125func (p *pipe) Err() error {
126	p.mu.Lock()
127	defer p.mu.Unlock()
128	if p.breakErr != nil {
129		return p.breakErr
130	}
131	return p.err
132}
133
134// Done returns a channel which is closed if and when this pipe is closed
135// with CloseWithError.
136func (p *pipe) Done() <-chan struct{} {
137	p.mu.Lock()
138	defer p.mu.Unlock()
139	if p.donec == nil {
140		p.donec = make(chan struct{})
141		if p.err != nil || p.breakErr != nil {
142			// Already hit an error.
143			p.closeDoneLocked()
144		}
145	}
146	return p.donec
147}
148