1// The `fwd` package provides a buffered reader
2// and writer. Each has methods that help improve
3// the encoding/decoding performance of some binary
4// protocols.
5//
6// The `fwd.Writer` and `fwd.Reader` type provide similar
7// functionality to their counterparts in `bufio`, plus
8// a few extra utility methods that simplify read-ahead
9// and write-ahead. I wrote this package to improve serialization
10// performance for http://github.com/tinylib/msgp,
11// where it provided about a 2x speedup over `bufio` for certain
12// workloads. However, care must be taken to understand the semantics of the
13// extra methods provided by this package, as they allow
14// the user to access and manipulate the buffer memory
15// directly.
16//
17// The extra methods for `fwd.Reader` are `Peek`, `Skip`
18// and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`,
19// will re-allocate the read buffer in order to accommodate arbitrarily
20// large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes
21// in the stream, and uses the `io.Seeker` interface if the underlying
22// stream implements it. `(*fwd.Reader).Next` returns a slice pointing
23// to the next `n` bytes in the read buffer (like `Peek`), but also
24// increments the read position. This allows users to process streams
25// in arbitrary block sizes without having to manage appropriately-sized
26// slices. Additionally, obviating the need to copy the data from the
27// buffer to another location in memory can improve performance dramatically
28// in CPU-bound applications.
29//
30// `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which
31// returns a slice pointing to the next `n` bytes of the writer, and increments
32// the write position by the length of the returned slice. This allows users
33// to write directly to the end of the buffer.
34//
35package fwd
36
37import (
38	"io"
39	"os"
40)
41
42const (
43	// DefaultReaderSize is the default size of the read buffer
44	DefaultReaderSize = 2048
45
46	// minimum read buffer; straight from bufio
47	minReaderSize = 16
48)
49
50// NewReader returns a new *Reader that reads from 'r'
51func NewReader(r io.Reader) *Reader {
52	return NewReaderSize(r, DefaultReaderSize)
53}
54
55// NewReaderSize returns a new *Reader that
56// reads from 'r' and has a buffer size 'n'.
57func NewReaderSize(r io.Reader, n int) *Reader {
58	buf := make([]byte, 0, max(n, minReaderSize))
59	return NewReaderBuf(r, buf)
60}
61
62// NewReaderBuf returns a new *Reader that
63// reads from 'r' and uses 'buf' as a buffer.
64// 'buf' is not used when has smaller capacity than 16,
65// custom buffer is allocated instead.
66func NewReaderBuf(r io.Reader, buf []byte) *Reader {
67	if cap(buf) < minReaderSize {
68		buf = make([]byte, 0, minReaderSize)
69	}
70	buf = buf[:0]
71	rd := &Reader{
72		r:    r,
73		data: buf,
74	}
75	if s, ok := r.(io.Seeker); ok {
76		rd.rs = s
77	}
78	return rd
79}
80
81// Reader is a buffered look-ahead reader
82type Reader struct {
83	r io.Reader // underlying reader
84
85	// data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space
86	data  []byte // data
87	n     int    // read offset
88	state error  // last read error
89
90	// if the reader past to NewReader was
91	// also an io.Seeker, this is non-nil
92	rs io.Seeker
93}
94
95// Reset resets the underlying reader
96// and the read buffer.
97func (r *Reader) Reset(rd io.Reader) {
98	r.r = rd
99	r.data = r.data[0:0]
100	r.n = 0
101	r.state = nil
102	if s, ok := rd.(io.Seeker); ok {
103		r.rs = s
104	} else {
105		r.rs = nil
106	}
107}
108
109// more() does one read on the underlying reader
110func (r *Reader) more() {
111	// move data backwards so that
112	// the read offset is 0; this way
113	// we can supply the maximum number of
114	// bytes to the reader
115	if r.n != 0 {
116		if r.n < len(r.data) {
117			r.data = r.data[:copy(r.data[0:], r.data[r.n:])]
118		} else {
119			r.data = r.data[:0]
120		}
121		r.n = 0
122	}
123	var a int
124	a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)])
125	if a == 0 && r.state == nil {
126		r.state = io.ErrNoProgress
127		return
128	} else if a > 0 && r.state == io.EOF {
129		// discard the io.EOF if we read more than 0 bytes.
130		// the next call to Read should return io.EOF again.
131		r.state = nil
132	}
133	r.data = r.data[:len(r.data)+a]
134}
135
136// pop error
137func (r *Reader) err() (e error) {
138	e, r.state = r.state, nil
139	return
140}
141
142// pop error; EOF -> io.ErrUnexpectedEOF
143func (r *Reader) noEOF() (e error) {
144	e, r.state = r.state, nil
145	if e == io.EOF {
146		e = io.ErrUnexpectedEOF
147	}
148	return
149}
150
151// buffered bytes
152func (r *Reader) buffered() int { return len(r.data) - r.n }
153
154// Buffered returns the number of bytes currently in the buffer
155func (r *Reader) Buffered() int { return len(r.data) - r.n }
156
157// BufferSize returns the total size of the buffer
158func (r *Reader) BufferSize() int { return cap(r.data) }
159
160// Peek returns the next 'n' buffered bytes,
161// reading from the underlying reader if necessary.
162// It will only return a slice shorter than 'n' bytes
163// if it also returns an error. Peek does not advance
164// the reader. EOF errors are *not* returned as
165// io.ErrUnexpectedEOF.
166func (r *Reader) Peek(n int) ([]byte, error) {
167	// in the degenerate case,
168	// we may need to realloc
169	// (the caller asked for more
170	// bytes than the size of the buffer)
171	if cap(r.data) < n {
172		old := r.data[r.n:]
173		r.data = make([]byte, n+r.buffered())
174		r.data = r.data[:copy(r.data, old)]
175		r.n = 0
176	}
177
178	// keep filling until
179	// we hit an error or
180	// read enough bytes
181	for r.buffered() < n && r.state == nil {
182		r.more()
183	}
184
185	// we must have hit an error
186	if r.buffered() < n {
187		return r.data[r.n:], r.err()
188	}
189
190	return r.data[r.n : r.n+n], nil
191}
192
193// discard(n) discards up to 'n' buffered bytes, and
194// and returns the number of bytes discarded
195func (r *Reader) discard(n int) int {
196	inbuf := r.buffered()
197	if inbuf <= n {
198		r.n = 0
199		r.data = r.data[:0]
200		return inbuf
201	}
202	r.n += n
203	return n
204}
205
206// Skip moves the reader forward 'n' bytes.
207// Returns the number of bytes skipped and any
208// errors encountered. It is analogous to Seek(n, 1).
209// If the underlying reader implements io.Seeker, then
210// that method will be used to skip forward.
211//
212// If the reader encounters
213// an EOF before skipping 'n' bytes, it
214// returns io.ErrUnexpectedEOF. If the
215// underlying reader implements io.Seeker, then
216// those rules apply instead. (Many implementations
217// will not return `io.EOF` until the next call
218// to Read.)
219func (r *Reader) Skip(n int) (int, error) {
220	if n < 0 {
221		return 0, os.ErrInvalid
222	}
223
224	// discard some or all of the current buffer
225	skipped := r.discard(n)
226
227	// if we can Seek() through the remaining bytes, do that
228	if n > skipped && r.rs != nil {
229		nn, err := r.rs.Seek(int64(n-skipped), 1)
230		return int(nn) + skipped, err
231	}
232	// otherwise, keep filling the buffer
233	// and discarding it up to 'n'
234	for skipped < n && r.state == nil {
235		r.more()
236		skipped += r.discard(n - skipped)
237	}
238	return skipped, r.noEOF()
239}
240
241// Next returns the next 'n' bytes in the stream.
242// Unlike Peek, Next advances the reader position.
243// The returned bytes point to the same
244// data as the buffer, so the slice is
245// only valid until the next reader method call.
246// An EOF is considered an unexpected error.
247// If an the returned slice is less than the
248// length asked for, an error will be returned,
249// and the reader position will not be incremented.
250func (r *Reader) Next(n int) ([]byte, error) {
251
252	// in case the buffer is too small
253	if cap(r.data) < n {
254		old := r.data[r.n:]
255		r.data = make([]byte, n+r.buffered())
256		r.data = r.data[:copy(r.data, old)]
257		r.n = 0
258	}
259
260	// fill at least 'n' bytes
261	for r.buffered() < n && r.state == nil {
262		r.more()
263	}
264
265	if r.buffered() < n {
266		return r.data[r.n:], r.noEOF()
267	}
268	out := r.data[r.n : r.n+n]
269	r.n += n
270	return out, nil
271}
272
273// Read implements `io.Reader`
274func (r *Reader) Read(b []byte) (int, error) {
275	// if we have data in the buffer, just
276	// return that.
277	if r.buffered() != 0 {
278		x := copy(b, r.data[r.n:])
279		r.n += x
280		return x, nil
281	}
282	var n int
283	// we have no buffered data; determine
284	// whether or not to buffer or call
285	// the underlying reader directly
286	if len(b) >= cap(r.data) {
287		n, r.state = r.r.Read(b)
288	} else {
289		r.more()
290		n = copy(b, r.data)
291		r.n = n
292	}
293	if n == 0 {
294		return 0, r.err()
295	}
296	return n, nil
297}
298
299// ReadFull attempts to read len(b) bytes into
300// 'b'. It returns the number of bytes read into
301// 'b', and an error if it does not return len(b).
302// EOF is considered an unexpected error.
303func (r *Reader) ReadFull(b []byte) (int, error) {
304	var n int  // read into b
305	var nn int // scratch
306	l := len(b)
307	// either read buffered data,
308	// or read directly for the underlying
309	// buffer, or fetch more buffered data.
310	for n < l && r.state == nil {
311		if r.buffered() != 0 {
312			nn = copy(b[n:], r.data[r.n:])
313			n += nn
314			r.n += nn
315		} else if l-n > cap(r.data) {
316			nn, r.state = r.r.Read(b[n:])
317			n += nn
318		} else {
319			r.more()
320		}
321	}
322	if n < l {
323		return n, r.noEOF()
324	}
325	return n, nil
326}
327
328// ReadByte implements `io.ByteReader`
329func (r *Reader) ReadByte() (byte, error) {
330	for r.buffered() < 1 && r.state == nil {
331		r.more()
332	}
333	if r.buffered() < 1 {
334		return 0, r.err()
335	}
336	b := r.data[r.n]
337	r.n++
338	return b, nil
339}
340
341// WriteTo implements `io.WriterTo`
342func (r *Reader) WriteTo(w io.Writer) (int64, error) {
343	var (
344		i   int64
345		ii  int
346		err error
347	)
348	// first, clear buffer
349	if r.buffered() > 0 {
350		ii, err = w.Write(r.data[r.n:])
351		i += int64(ii)
352		if err != nil {
353			return i, err
354		}
355		r.data = r.data[0:0]
356		r.n = 0
357	}
358	for r.state == nil {
359		// here we just do
360		// 1:1 reads and writes
361		r.more()
362		if r.buffered() > 0 {
363			ii, err = w.Write(r.data)
364			i += int64(ii)
365			if err != nil {
366				return i, err
367			}
368			r.data = r.data[0:0]
369			r.n = 0
370		}
371	}
372	if r.state != io.EOF {
373		return i, r.err()
374	}
375	return i, nil
376}
377
378func max(a int, b int) int {
379	if a < b {
380		return b
381	}
382	return a
383}
384