1// The `fwd` package provides a buffered reader
2// and writer. Each has methods that help improve
3// the encoding/decoding performance of some binary
4// protocols.
5//
6// The `fwd.Writer` and `fwd.Reader` type provide similar
7// functionality to their counterparts in `bufio`, plus
8// a few extra utility methods that simplify read-ahead
9// and write-ahead. I wrote this package to improve serialization
10// performance for http://github.com/tinylib/msgp,
11// where it provided about a 2x speedup over `bufio` for certain
12// workloads. However, care must be taken to understand the semantics of the
13// extra methods provided by this package, as they allow
14// the user to access and manipulate the buffer memory
15// directly.
16//
17// The extra methods for `fwd.Reader` are `Peek`, `Skip`
18// and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`,
19// will re-allocate the read buffer in order to accommodate arbitrarily
20// large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes
21// in the stream, and uses the `io.Seeker` interface if the underlying
22// stream implements it. `(*fwd.Reader).Next` returns a slice pointing
23// to the next `n` bytes in the read buffer (like `Peek`), but also
24// increments the read position. This allows users to process streams
25// in arbitrary block sizes without having to manage appropriately-sized
26// slices. Additionally, obviating the need to copy the data from the
27// buffer to another location in memory can improve performance dramatically
28// in CPU-bound applications.
29//
30// `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which
31// returns a slice pointing to the next `n` bytes of the writer, and increments
32// the write position by the length of the returned slice. This allows users
33// to write directly to the end of the buffer.
34//
35package fwd
36
37import "io"
38
39const (
40	// DefaultReaderSize is the default size of the read buffer
41	DefaultReaderSize = 2048
42
43	// minimum read buffer; straight from bufio
44	minReaderSize = 16
45)
46
47// NewReader returns a new *Reader that reads from 'r'
48func NewReader(r io.Reader) *Reader {
49	return NewReaderSize(r, DefaultReaderSize)
50}
51
52// NewReaderSize returns a new *Reader that
53// reads from 'r' and has a buffer size 'n'
54func NewReaderSize(r io.Reader, n int) *Reader {
55	rd := &Reader{
56		r:    r,
57		data: make([]byte, 0, max(minReaderSize, n)),
58	}
59	if s, ok := r.(io.Seeker); ok {
60		rd.rs = s
61	}
62	return rd
63}
64
65// Reader is a buffered look-ahead reader
66type Reader struct {
67	r io.Reader // underlying reader
68
69	// data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space
70	data  []byte // data
71	n     int    // read offset
72	state error  // last read error
73
74	// if the reader past to NewReader was
75	// also an io.Seeker, this is non-nil
76	rs io.Seeker
77}
78
79// Reset resets the underlying reader
80// and the read buffer.
81func (r *Reader) Reset(rd io.Reader) {
82	r.r = rd
83	r.data = r.data[0:0]
84	r.n = 0
85	r.state = nil
86	if s, ok := rd.(io.Seeker); ok {
87		r.rs = s
88	} else {
89		r.rs = nil
90	}
91}
92
93// more() does one read on the underlying reader
94func (r *Reader) more() {
95	// move data backwards so that
96	// the read offset is 0; this way
97	// we can supply the maximum number of
98	// bytes to the reader
99	if r.n != 0 {
100		if r.n < len(r.data) {
101			r.data = r.data[:copy(r.data[0:], r.data[r.n:])]
102		} else {
103			r.data = r.data[:0]
104		}
105		r.n = 0
106	}
107	var a int
108	a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)])
109	if a == 0 && r.state == nil {
110		r.state = io.ErrNoProgress
111		return
112	} else if a > 0 && r.state == io.EOF {
113		// discard the io.EOF if we read more than 0 bytes.
114		// the next call to Read should return io.EOF again.
115		r.state = nil
116	}
117	r.data = r.data[:len(r.data)+a]
118}
119
120// pop error
121func (r *Reader) err() (e error) {
122	e, r.state = r.state, nil
123	return
124}
125
126// pop error; EOF -> io.ErrUnexpectedEOF
127func (r *Reader) noEOF() (e error) {
128	e, r.state = r.state, nil
129	if e == io.EOF {
130		e = io.ErrUnexpectedEOF
131	}
132	return
133}
134
135// buffered bytes
136func (r *Reader) buffered() int { return len(r.data) - r.n }
137
138// Buffered returns the number of bytes currently in the buffer
139func (r *Reader) Buffered() int { return len(r.data) - r.n }
140
141// BufferSize returns the total size of the buffer
142func (r *Reader) BufferSize() int { return cap(r.data) }
143
144// Peek returns the next 'n' buffered bytes,
145// reading from the underlying reader if necessary.
146// It will only return a slice shorter than 'n' bytes
147// if it also returns an error. Peek does not advance
148// the reader. EOF errors are *not* returned as
149// io.ErrUnexpectedEOF.
150func (r *Reader) Peek(n int) ([]byte, error) {
151	// in the degenerate case,
152	// we may need to realloc
153	// (the caller asked for more
154	// bytes than the size of the buffer)
155	if cap(r.data) < n {
156		old := r.data[r.n:]
157		r.data = make([]byte, n+r.buffered())
158		r.data = r.data[:copy(r.data, old)]
159		r.n = 0
160	}
161
162	// keep filling until
163	// we hit an error or
164	// read enough bytes
165	for r.buffered() < n && r.state == nil {
166		r.more()
167	}
168
169	// we must have hit an error
170	if r.buffered() < n {
171		return r.data[r.n:], r.err()
172	}
173
174	return r.data[r.n : r.n+n], nil
175}
176
177// Skip moves the reader forward 'n' bytes.
178// Returns the number of bytes skipped and any
179// errors encountered. It is analogous to Seek(n, 1).
180// If the underlying reader implements io.Seeker, then
181// that method will be used to skip forward.
182//
183// If the reader encounters
184// an EOF before skipping 'n' bytes, it
185// returns io.ErrUnexpectedEOF. If the
186// underlying reader implements io.Seeker, then
187// those rules apply instead. (Many implementations
188// will not return `io.EOF` until the next call
189// to Read.)
190func (r *Reader) Skip(n int) (int, error) {
191
192	// fast path
193	if r.buffered() >= n {
194		r.n += n
195		return n, nil
196	}
197
198	// use seeker implementation
199	// if we can
200	if r.rs != nil {
201		return r.skipSeek(n)
202	}
203
204	// loop on filling
205	// and then erasing
206	o := n
207	for r.buffered() < n && r.state == nil {
208		r.more()
209		// we can skip forward
210		// up to r.buffered() bytes
211		step := min(r.buffered(), n)
212		r.n += step
213		n -= step
214	}
215	// at this point, n should be
216	// 0 if everything went smoothly
217	return o - n, r.noEOF()
218}
219
220// Next returns the next 'n' bytes in the stream.
221// Unlike Peek, Next advances the reader position.
222// The returned bytes point to the same
223// data as the buffer, so the slice is
224// only valid until the next reader method call.
225// An EOF is considered an unexpected error.
226// If an the returned slice is less than the
227// length asked for, an error will be returned,
228// and the reader position will not be incremented.
229func (r *Reader) Next(n int) ([]byte, error) {
230
231	// in case the buffer is too small
232	if cap(r.data) < n {
233		old := r.data[r.n:]
234		r.data = make([]byte, n+r.buffered())
235		r.data = r.data[:copy(r.data, old)]
236		r.n = 0
237	}
238
239	// fill at least 'n' bytes
240	for r.buffered() < n && r.state == nil {
241		r.more()
242	}
243
244	if r.buffered() < n {
245		return r.data[r.n:], r.noEOF()
246	}
247	out := r.data[r.n : r.n+n]
248	r.n += n
249	return out, nil
250}
251
252// skipSeek uses the io.Seeker to seek forward.
253// only call this function when n > r.buffered()
254func (r *Reader) skipSeek(n int) (int, error) {
255	o := r.buffered()
256	// first, clear buffer
257	n -= o
258	r.n = 0
259	r.data = r.data[:0]
260
261	// then seek forward remaning bytes
262	i, err := r.rs.Seek(int64(n), 1)
263	return int(i) + o, err
264}
265
266// Read implements `io.Reader`
267func (r *Reader) Read(b []byte) (int, error) {
268	// if we have data in the buffer, just
269	// return that.
270	if r.buffered() != 0 {
271		x := copy(b, r.data[r.n:])
272		r.n += x
273		return x, nil
274	}
275	var n int
276	// we have no buffered data; determine
277	// whether or not to buffer or call
278	// the underlying reader directly
279	if len(b) >= cap(r.data) {
280		n, r.state = r.r.Read(b)
281	} else {
282		r.more()
283		n = copy(b, r.data)
284		r.n = n
285	}
286	if n == 0 {
287		return 0, r.err()
288	}
289	return n, nil
290}
291
292// ReadFull attempts to read len(b) bytes into
293// 'b'. It returns the number of bytes read into
294// 'b', and an error if it does not return len(b).
295// EOF is considered an unexpected error.
296func (r *Reader) ReadFull(b []byte) (int, error) {
297	var n int  // read into b
298	var nn int // scratch
299	l := len(b)
300	// either read buffered data,
301	// or read directly for the underlying
302	// buffer, or fetch more buffered data.
303	for n < l && r.state == nil {
304		if r.buffered() != 0 {
305			nn = copy(b[n:], r.data[r.n:])
306			n += nn
307			r.n += nn
308		} else if l-n > cap(r.data) {
309			nn, r.state = r.r.Read(b[n:])
310			n += nn
311		} else {
312			r.more()
313		}
314	}
315	if n < l {
316		return n, r.noEOF()
317	}
318	return n, nil
319}
320
321// ReadByte implements `io.ByteReader`
322func (r *Reader) ReadByte() (byte, error) {
323	for r.buffered() < 1 && r.state == nil {
324		r.more()
325	}
326	if r.buffered() < 1 {
327		return 0, r.err()
328	}
329	b := r.data[r.n]
330	r.n++
331	return b, nil
332}
333
334// WriteTo implements `io.WriterTo`
335func (r *Reader) WriteTo(w io.Writer) (int64, error) {
336	var (
337		i   int64
338		ii  int
339		err error
340	)
341	// first, clear buffer
342	if r.buffered() > 0 {
343		ii, err = w.Write(r.data[r.n:])
344		i += int64(ii)
345		if err != nil {
346			return i, err
347		}
348		r.data = r.data[0:0]
349		r.n = 0
350	}
351	for r.state == nil {
352		// here we just do
353		// 1:1 reads and writes
354		r.more()
355		if r.buffered() > 0 {
356			ii, err = w.Write(r.data)
357			i += int64(ii)
358			if err != nil {
359				return i, err
360			}
361			r.data = r.data[0:0]
362			r.n = 0
363		}
364	}
365	if r.state != io.EOF {
366		return i, r.err()
367	}
368	return i, nil
369}
370
371func min(a int, b int) int {
372	if a < b {
373		return a
374	}
375	return b
376}
377
378func max(a int, b int) int {
379	if a < b {
380		return b
381	}
382	return a
383}
384