1package ioutils // import "github.com/ory/dockertest/v3/docker/pkg/ioutils"
2
3import (
4	"errors"
5	"io"
6	"sync"
7)
8
9// maxCap is the highest capacity to use in byte slices that buffer data.
10const maxCap = 1e6
11
12// minCap is the lowest capacity to use in byte slices that buffer data
13const minCap = 64
14
15// blockThreshold is the minimum number of bytes in the buffer which will cause
16// a write to BytesPipe to block when allocating a new slice.
17const blockThreshold = 1e6
18
19var (
20	// ErrClosed is returned when Write is called on a closed BytesPipe.
21	ErrClosed = errors.New("write to closed BytesPipe")
22
23	bufPools     = make(map[int]*sync.Pool)
24	bufPoolsLock sync.Mutex
25)
26
27// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
28// All written data may be read at most once. Also, BytesPipe allocates
29// and releases new byte slices to adjust to current needs, so the buffer
30// won't be overgrown after peak loads.
31type BytesPipe struct {
32	mu       sync.Mutex
33	wait     *sync.Cond
34	buf      []*fixedBuffer
35	bufLen   int
36	closeErr error // error to return from next Read. set to nil if not closed.
37}
38
39// NewBytesPipe creates new BytesPipe, initialized by specified slice.
40// If buf is nil, then it will be initialized with slice which cap is 64.
41// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
42func NewBytesPipe() *BytesPipe {
43	bp := &BytesPipe{}
44	bp.buf = append(bp.buf, getBuffer(minCap))
45	bp.wait = sync.NewCond(&bp.mu)
46	return bp
47}
48
49// Write writes p to BytesPipe.
50// It can allocate new []byte slices in a process of writing.
51func (bp *BytesPipe) Write(p []byte) (int, error) {
52	bp.mu.Lock()
53
54	written := 0
55loop0:
56	for {
57		if bp.closeErr != nil {
58			bp.mu.Unlock()
59			return written, ErrClosed
60		}
61
62		if len(bp.buf) == 0 {
63			bp.buf = append(bp.buf, getBuffer(64))
64		}
65		// get the last buffer
66		b := bp.buf[len(bp.buf)-1]
67
68		n, err := b.Write(p)
69		written += n
70		bp.bufLen += n
71
72		// errBufferFull is an error we expect to get if the buffer is full
73		if err != nil && err != errBufferFull {
74			bp.wait.Broadcast()
75			bp.mu.Unlock()
76			return written, err
77		}
78
79		// if there was enough room to write all then break
80		if len(p) == n {
81			break
82		}
83
84		// more data: write to the next slice
85		p = p[n:]
86
87		// make sure the buffer doesn't grow too big from this write
88		for bp.bufLen >= blockThreshold {
89			bp.wait.Wait()
90			if bp.closeErr != nil {
91				continue loop0
92			}
93		}
94
95		// add new byte slice to the buffers slice and continue writing
96		nextCap := b.Cap() * 2
97		if nextCap > maxCap {
98			nextCap = maxCap
99		}
100		bp.buf = append(bp.buf, getBuffer(nextCap))
101	}
102	bp.wait.Broadcast()
103	bp.mu.Unlock()
104	return written, nil
105}
106
107// CloseWithError causes further reads from a BytesPipe to return immediately.
108func (bp *BytesPipe) CloseWithError(err error) error {
109	bp.mu.Lock()
110	if err != nil {
111		bp.closeErr = err
112	} else {
113		bp.closeErr = io.EOF
114	}
115	bp.wait.Broadcast()
116	bp.mu.Unlock()
117	return nil
118}
119
120// Close causes further reads from a BytesPipe to return immediately.
121func (bp *BytesPipe) Close() error {
122	return bp.CloseWithError(nil)
123}
124
125// Read reads bytes from BytesPipe.
126// Data could be read only once.
127func (bp *BytesPipe) Read(p []byte) (n int, err error) {
128	bp.mu.Lock()
129	if bp.bufLen == 0 {
130		if bp.closeErr != nil {
131			bp.mu.Unlock()
132			return 0, bp.closeErr
133		}
134		bp.wait.Wait()
135		if bp.bufLen == 0 && bp.closeErr != nil {
136			err := bp.closeErr
137			bp.mu.Unlock()
138			return 0, err
139		}
140	}
141
142	for bp.bufLen > 0 {
143		b := bp.buf[0]
144		read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error
145		n += read
146		bp.bufLen -= read
147
148		if b.Len() == 0 {
149			// it's empty so return it to the pool and move to the next one
150			returnBuffer(b)
151			bp.buf[0] = nil
152			bp.buf = bp.buf[1:]
153		}
154
155		if len(p) == read {
156			break
157		}
158
159		p = p[read:]
160	}
161
162	bp.wait.Broadcast()
163	bp.mu.Unlock()
164	return
165}
166
167func returnBuffer(b *fixedBuffer) {
168	b.Reset()
169	bufPoolsLock.Lock()
170	pool := bufPools[b.Cap()]
171	bufPoolsLock.Unlock()
172	if pool != nil {
173		pool.Put(b)
174	}
175}
176
177func getBuffer(size int) *fixedBuffer {
178	bufPoolsLock.Lock()
179	pool, ok := bufPools[size]
180	if !ok {
181		pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }}
182		bufPools[size] = pool
183	}
184	bufPoolsLock.Unlock()
185	return pool.Get().(*fixedBuffer)
186}
187