1// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8	"errors"
9	"io"
10	"sync"
11)
12
13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15// underlying buffer is an interface. (io.Pipe is always unbuffered)
16type pipe struct {
17	mu       sync.Mutex
18	c        sync.Cond // c.L lazily initialized to &p.mu
19	b        pipeBuffer
20	err      error         // read error once empty. non-nil means closed.
21	breakErr error         // immediate read error (caller doesn't see rest of b)
22	donec    chan struct{} // closed on error
23	readFn   func()        // optional code to run in Read before error
24}
25
26type pipeBuffer interface {
27	Len() int
28	io.Writer
29	io.Reader
30}
31
32func (p *pipe) Len() int {
33	p.mu.Lock()
34	defer p.mu.Unlock()
35	return p.b.Len()
36}
37
38// Read waits until data is available and copies bytes
39// from the buffer into p.
40func (p *pipe) Read(d []byte) (n int, err error) {
41	p.mu.Lock()
42	defer p.mu.Unlock()
43	if p.c.L == nil {
44		p.c.L = &p.mu
45	}
46	for {
47		if p.breakErr != nil {
48			return 0, p.breakErr
49		}
50		if p.b.Len() > 0 {
51			return p.b.Read(d)
52		}
53		if p.err != nil {
54			if p.readFn != nil {
55				p.readFn()     // e.g. copy trailers
56				p.readFn = nil // not sticky like p.err
57			}
58			return 0, p.err
59		}
60		p.c.Wait()
61	}
62}
63
64var errClosedPipeWrite = errors.New("write on closed buffer")
65
66// Write copies bytes from p into the buffer and wakes a reader.
67// It is an error to write more data than the buffer can hold.
68func (p *pipe) Write(d []byte) (n int, err error) {
69	p.mu.Lock()
70	defer p.mu.Unlock()
71	if p.c.L == nil {
72		p.c.L = &p.mu
73	}
74	defer p.c.Signal()
75	if p.err != nil {
76		return 0, errClosedPipeWrite
77	}
78	return p.b.Write(d)
79}
80
81// CloseWithError causes the next Read (waking up a current blocked
82// Read if needed) to return the provided err after all data has been
83// read.
84//
85// The error must be non-nil.
86func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
87
88// BreakWithError causes the next Read (waking up a current blocked
89// Read if needed) to return the provided err immediately, without
90// waiting for unread data.
91func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
92
93// closeWithErrorAndCode is like CloseWithError but also sets some code to run
94// in the caller's goroutine before returning the error.
95func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
96
97func (p *pipe) closeWithError(dst *error, err error, fn func()) {
98	if err == nil {
99		panic("err must be non-nil")
100	}
101	p.mu.Lock()
102	defer p.mu.Unlock()
103	if p.c.L == nil {
104		p.c.L = &p.mu
105	}
106	defer p.c.Signal()
107	if *dst != nil {
108		// Already been done.
109		return
110	}
111	p.readFn = fn
112	*dst = err
113	p.closeDoneLocked()
114}
115
116// requires p.mu be held.
117func (p *pipe) closeDoneLocked() {
118	if p.donec == nil {
119		return
120	}
121	// Close if unclosed. This isn't racy since we always
122	// hold p.mu while closing.
123	select {
124	case <-p.donec:
125	default:
126		close(p.donec)
127	}
128}
129
130// Err returns the error (if any) first set by BreakWithError or CloseWithError.
131func (p *pipe) Err() error {
132	p.mu.Lock()
133	defer p.mu.Unlock()
134	if p.breakErr != nil {
135		return p.breakErr
136	}
137	return p.err
138}
139
140// Done returns a channel which is closed if and when this pipe is closed
141// with CloseWithError.
142func (p *pipe) Done() <-chan struct{} {
143	p.mu.Lock()
144	defer p.mu.Unlock()
145	if p.donec == nil {
146		p.donec = make(chan struct{})
147		if p.err != nil || p.breakErr != nil {
148			// Already hit an error.
149			p.closeDoneLocked()
150		}
151	}
152	return p.donec
153}
154