1// The `fwd` package provides a buffered reader 2// and writer. Each has methods that help improve 3// the encoding/decoding performance of some binary 4// protocols. 5// 6// The `fwd.Writer` and `fwd.Reader` type provide similar 7// functionality to their counterparts in `bufio`, plus 8// a few extra utility methods that simplify read-ahead 9// and write-ahead. I wrote this package to improve serialization 10// performance for http://github.com/tinylib/msgp, 11// where it provided about a 2x speedup over `bufio` for certain 12// workloads. However, care must be taken to understand the semantics of the 13// extra methods provided by this package, as they allow 14// the user to access and manipulate the buffer memory 15// directly. 16// 17// The extra methods for `fwd.Reader` are `Peek`, `Skip` 18// and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`, 19// will re-allocate the read buffer in order to accommodate arbitrarily 20// large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes 21// in the stream, and uses the `io.Seeker` interface if the underlying 22// stream implements it. `(*fwd.Reader).Next` returns a slice pointing 23// to the next `n` bytes in the read buffer (like `Peek`), but also 24// increments the read position. This allows users to process streams 25// in arbitrary block sizes without having to manage appropriately-sized 26// slices. Additionally, obviating the need to copy the data from the 27// buffer to another location in memory can improve performance dramatically 28// in CPU-bound applications. 29// 30// `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which 31// returns a slice pointing to the next `n` bytes of the writer, and increments 32// the write position by the length of the returned slice. This allows users 33// to write directly to the end of the buffer. 34// 35package fwd 36 37import ( 38 "io" 39 "os" 40) 41 42const ( 43 // DefaultReaderSize is the default size of the read buffer 44 DefaultReaderSize = 2048 45 46 // minimum read buffer; straight from bufio 47 minReaderSize = 16 48) 49 50// NewReader returns a new *Reader that reads from 'r' 51func NewReader(r io.Reader) *Reader { 52 return NewReaderSize(r, DefaultReaderSize) 53} 54 55// NewReaderSize returns a new *Reader that 56// reads from 'r' and has a buffer size 'n'. 57func NewReaderSize(r io.Reader, n int) *Reader { 58 buf := make([]byte, 0, max(n, minReaderSize)) 59 return NewReaderBuf(r, buf) 60} 61 62// NewReaderBuf returns a new *Reader that 63// reads from 'r' and uses 'buf' as a buffer. 64// 'buf' is not used when has smaller capacity than 16, 65// custom buffer is allocated instead. 66func NewReaderBuf(r io.Reader, buf []byte) *Reader { 67 if cap(buf) < minReaderSize { 68 buf = make([]byte, 0, minReaderSize) 69 } 70 buf = buf[:0] 71 rd := &Reader{ 72 r: r, 73 data: buf, 74 } 75 if s, ok := r.(io.Seeker); ok { 76 rd.rs = s 77 } 78 return rd 79} 80 81// Reader is a buffered look-ahead reader 82type Reader struct { 83 r io.Reader // underlying reader 84 85 // data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space 86 data []byte // data 87 n int // read offset 88 state error // last read error 89 90 // if the reader past to NewReader was 91 // also an io.Seeker, this is non-nil 92 rs io.Seeker 93} 94 95// Reset resets the underlying reader 96// and the read buffer. 97func (r *Reader) Reset(rd io.Reader) { 98 r.r = rd 99 r.data = r.data[0:0] 100 r.n = 0 101 r.state = nil 102 if s, ok := rd.(io.Seeker); ok { 103 r.rs = s 104 } else { 105 r.rs = nil 106 } 107} 108 109// more() does one read on the underlying reader 110func (r *Reader) more() { 111 // move data backwards so that 112 // the read offset is 0; this way 113 // we can supply the maximum number of 114 // bytes to the reader 115 if r.n != 0 { 116 if r.n < len(r.data) { 117 r.data = r.data[:copy(r.data[0:], r.data[r.n:])] 118 } else { 119 r.data = r.data[:0] 120 } 121 r.n = 0 122 } 123 var a int 124 a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)]) 125 if a == 0 && r.state == nil { 126 r.state = io.ErrNoProgress 127 return 128 } else if a > 0 && r.state == io.EOF { 129 // discard the io.EOF if we read more than 0 bytes. 130 // the next call to Read should return io.EOF again. 131 r.state = nil 132 } 133 r.data = r.data[:len(r.data)+a] 134} 135 136// pop error 137func (r *Reader) err() (e error) { 138 e, r.state = r.state, nil 139 return 140} 141 142// pop error; EOF -> io.ErrUnexpectedEOF 143func (r *Reader) noEOF() (e error) { 144 e, r.state = r.state, nil 145 if e == io.EOF { 146 e = io.ErrUnexpectedEOF 147 } 148 return 149} 150 151// buffered bytes 152func (r *Reader) buffered() int { return len(r.data) - r.n } 153 154// Buffered returns the number of bytes currently in the buffer 155func (r *Reader) Buffered() int { return len(r.data) - r.n } 156 157// BufferSize returns the total size of the buffer 158func (r *Reader) BufferSize() int { return cap(r.data) } 159 160// Peek returns the next 'n' buffered bytes, 161// reading from the underlying reader if necessary. 162// It will only return a slice shorter than 'n' bytes 163// if it also returns an error. Peek does not advance 164// the reader. EOF errors are *not* returned as 165// io.ErrUnexpectedEOF. 166func (r *Reader) Peek(n int) ([]byte, error) { 167 // in the degenerate case, 168 // we may need to realloc 169 // (the caller asked for more 170 // bytes than the size of the buffer) 171 if cap(r.data) < n { 172 old := r.data[r.n:] 173 r.data = make([]byte, n+r.buffered()) 174 r.data = r.data[:copy(r.data, old)] 175 r.n = 0 176 } 177 178 // keep filling until 179 // we hit an error or 180 // read enough bytes 181 for r.buffered() < n && r.state == nil { 182 r.more() 183 } 184 185 // we must have hit an error 186 if r.buffered() < n { 187 return r.data[r.n:], r.err() 188 } 189 190 return r.data[r.n : r.n+n], nil 191} 192 193// discard(n) discards up to 'n' buffered bytes, and 194// and returns the number of bytes discarded 195func (r *Reader) discard(n int) int { 196 inbuf := r.buffered() 197 if inbuf <= n { 198 r.n = 0 199 r.data = r.data[:0] 200 return inbuf 201 } 202 r.n += n 203 return n 204} 205 206// Skip moves the reader forward 'n' bytes. 207// Returns the number of bytes skipped and any 208// errors encountered. It is analogous to Seek(n, 1). 209// If the underlying reader implements io.Seeker, then 210// that method will be used to skip forward. 211// 212// If the reader encounters 213// an EOF before skipping 'n' bytes, it 214// returns io.ErrUnexpectedEOF. If the 215// underlying reader implements io.Seeker, then 216// those rules apply instead. (Many implementations 217// will not return `io.EOF` until the next call 218// to Read.) 219func (r *Reader) Skip(n int) (int, error) { 220 if n < 0 { 221 return 0, os.ErrInvalid 222 } 223 224 // discard some or all of the current buffer 225 skipped := r.discard(n) 226 227 // if we can Seek() through the remaining bytes, do that 228 if n > skipped && r.rs != nil { 229 nn, err := r.rs.Seek(int64(n-skipped), 1) 230 return int(nn) + skipped, err 231 } 232 // otherwise, keep filling the buffer 233 // and discarding it up to 'n' 234 for skipped < n && r.state == nil { 235 r.more() 236 skipped += r.discard(n - skipped) 237 } 238 return skipped, r.noEOF() 239} 240 241// Next returns the next 'n' bytes in the stream. 242// Unlike Peek, Next advances the reader position. 243// The returned bytes point to the same 244// data as the buffer, so the slice is 245// only valid until the next reader method call. 246// An EOF is considered an unexpected error. 247// If an the returned slice is less than the 248// length asked for, an error will be returned, 249// and the reader position will not be incremented. 250func (r *Reader) Next(n int) ([]byte, error) { 251 252 // in case the buffer is too small 253 if cap(r.data) < n { 254 old := r.data[r.n:] 255 r.data = make([]byte, n+r.buffered()) 256 r.data = r.data[:copy(r.data, old)] 257 r.n = 0 258 } 259 260 // fill at least 'n' bytes 261 for r.buffered() < n && r.state == nil { 262 r.more() 263 } 264 265 if r.buffered() < n { 266 return r.data[r.n:], r.noEOF() 267 } 268 out := r.data[r.n : r.n+n] 269 r.n += n 270 return out, nil 271} 272 273// Read implements `io.Reader` 274func (r *Reader) Read(b []byte) (int, error) { 275 // if we have data in the buffer, just 276 // return that. 277 if r.buffered() != 0 { 278 x := copy(b, r.data[r.n:]) 279 r.n += x 280 return x, nil 281 } 282 var n int 283 // we have no buffered data; determine 284 // whether or not to buffer or call 285 // the underlying reader directly 286 if len(b) >= cap(r.data) { 287 n, r.state = r.r.Read(b) 288 } else { 289 r.more() 290 n = copy(b, r.data) 291 r.n = n 292 } 293 if n == 0 { 294 return 0, r.err() 295 } 296 return n, nil 297} 298 299// ReadFull attempts to read len(b) bytes into 300// 'b'. It returns the number of bytes read into 301// 'b', and an error if it does not return len(b). 302// EOF is considered an unexpected error. 303func (r *Reader) ReadFull(b []byte) (int, error) { 304 var n int // read into b 305 var nn int // scratch 306 l := len(b) 307 // either read buffered data, 308 // or read directly for the underlying 309 // buffer, or fetch more buffered data. 310 for n < l && r.state == nil { 311 if r.buffered() != 0 { 312 nn = copy(b[n:], r.data[r.n:]) 313 n += nn 314 r.n += nn 315 } else if l-n > cap(r.data) { 316 nn, r.state = r.r.Read(b[n:]) 317 n += nn 318 } else { 319 r.more() 320 } 321 } 322 if n < l { 323 return n, r.noEOF() 324 } 325 return n, nil 326} 327 328// ReadByte implements `io.ByteReader` 329func (r *Reader) ReadByte() (byte, error) { 330 for r.buffered() < 1 && r.state == nil { 331 r.more() 332 } 333 if r.buffered() < 1 { 334 return 0, r.err() 335 } 336 b := r.data[r.n] 337 r.n++ 338 return b, nil 339} 340 341// WriteTo implements `io.WriterTo` 342func (r *Reader) WriteTo(w io.Writer) (int64, error) { 343 var ( 344 i int64 345 ii int 346 err error 347 ) 348 // first, clear buffer 349 if r.buffered() > 0 { 350 ii, err = w.Write(r.data[r.n:]) 351 i += int64(ii) 352 if err != nil { 353 return i, err 354 } 355 r.data = r.data[0:0] 356 r.n = 0 357 } 358 for r.state == nil { 359 // here we just do 360 // 1:1 reads and writes 361 r.more() 362 if r.buffered() > 0 { 363 ii, err = w.Write(r.data) 364 i += int64(ii) 365 if err != nil { 366 return i, err 367 } 368 r.data = r.data[0:0] 369 r.n = 0 370 } 371 } 372 if r.state != io.EOF { 373 return i, r.err() 374 } 375 return i, nil 376} 377 378func max(a int, b int) int { 379 if a < b { 380 return b 381 } 382 return a 383} 384