1// The `fwd` package provides a buffered reader 2// and writer. Each has methods that help improve 3// the encoding/decoding performance of some binary 4// protocols. 5// 6// The `fwd.Writer` and `fwd.Reader` type provide similar 7// functionality to their counterparts in `bufio`, plus 8// a few extra utility methods that simplify read-ahead 9// and write-ahead. I wrote this package to improve serialization 10// performance for http://github.com/tinylib/msgp, 11// where it provided about a 2x speedup over `bufio` for certain 12// workloads. However, care must be taken to understand the semantics of the 13// extra methods provided by this package, as they allow 14// the user to access and manipulate the buffer memory 15// directly. 16// 17// The extra methods for `fwd.Reader` are `Peek`, `Skip` 18// and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`, 19// will re-allocate the read buffer in order to accommodate arbitrarily 20// large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes 21// in the stream, and uses the `io.Seeker` interface if the underlying 22// stream implements it. `(*fwd.Reader).Next` returns a slice pointing 23// to the next `n` bytes in the read buffer (like `Peek`), but also 24// increments the read position. This allows users to process streams 25// in arbitrary block sizes without having to manage appropriately-sized 26// slices. Additionally, obviating the need to copy the data from the 27// buffer to another location in memory can improve performance dramatically 28// in CPU-bound applications. 29// 30// `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which 31// returns a slice pointing to the next `n` bytes of the writer, and increments 32// the write position by the length of the returned slice. This allows users 33// to write directly to the end of the buffer. 34// 35package fwd 36 37import "io" 38 39const ( 40 // DefaultReaderSize is the default size of the read buffer 41 DefaultReaderSize = 2048 42 43 // minimum read buffer; straight from bufio 44 minReaderSize = 16 45) 46 47// NewReader returns a new *Reader that reads from 'r' 48func NewReader(r io.Reader) *Reader { 49 return NewReaderSize(r, DefaultReaderSize) 50} 51 52// NewReaderSize returns a new *Reader that 53// reads from 'r' and has a buffer size 'n' 54func NewReaderSize(r io.Reader, n int) *Reader { 55 rd := &Reader{ 56 r: r, 57 data: make([]byte, 0, max(minReaderSize, n)), 58 } 59 if s, ok := r.(io.Seeker); ok { 60 rd.rs = s 61 } 62 return rd 63} 64 65// Reader is a buffered look-ahead reader 66type Reader struct { 67 r io.Reader // underlying reader 68 69 // data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space 70 data []byte // data 71 n int // read offset 72 state error // last read error 73 74 // if the reader past to NewReader was 75 // also an io.Seeker, this is non-nil 76 rs io.Seeker 77} 78 79// Reset resets the underlying reader 80// and the read buffer. 81func (r *Reader) Reset(rd io.Reader) { 82 r.r = rd 83 r.data = r.data[0:0] 84 r.n = 0 85 r.state = nil 86 if s, ok := rd.(io.Seeker); ok { 87 r.rs = s 88 } else { 89 r.rs = nil 90 } 91} 92 93// more() does one read on the underlying reader 94func (r *Reader) more() { 95 // move data backwards so that 96 // the read offset is 0; this way 97 // we can supply the maximum number of 98 // bytes to the reader 99 if r.n != 0 { 100 if r.n < len(r.data) { 101 r.data = r.data[:copy(r.data[0:], r.data[r.n:])] 102 } else { 103 r.data = r.data[:0] 104 } 105 r.n = 0 106 } 107 var a int 108 a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)]) 109 if a == 0 && r.state == nil { 110 r.state = io.ErrNoProgress 111 return 112 } else if a > 0 && r.state == io.EOF { 113 // discard the io.EOF if we read more than 0 bytes. 114 // the next call to Read should return io.EOF again. 115 r.state = nil 116 } 117 r.data = r.data[:len(r.data)+a] 118} 119 120// pop error 121func (r *Reader) err() (e error) { 122 e, r.state = r.state, nil 123 return 124} 125 126// pop error; EOF -> io.ErrUnexpectedEOF 127func (r *Reader) noEOF() (e error) { 128 e, r.state = r.state, nil 129 if e == io.EOF { 130 e = io.ErrUnexpectedEOF 131 } 132 return 133} 134 135// buffered bytes 136func (r *Reader) buffered() int { return len(r.data) - r.n } 137 138// Buffered returns the number of bytes currently in the buffer 139func (r *Reader) Buffered() int { return len(r.data) - r.n } 140 141// BufferSize returns the total size of the buffer 142func (r *Reader) BufferSize() int { return cap(r.data) } 143 144// Peek returns the next 'n' buffered bytes, 145// reading from the underlying reader if necessary. 146// It will only return a slice shorter than 'n' bytes 147// if it also returns an error. Peek does not advance 148// the reader. EOF errors are *not* returned as 149// io.ErrUnexpectedEOF. 150func (r *Reader) Peek(n int) ([]byte, error) { 151 // in the degenerate case, 152 // we may need to realloc 153 // (the caller asked for more 154 // bytes than the size of the buffer) 155 if cap(r.data) < n { 156 old := r.data[r.n:] 157 r.data = make([]byte, n+r.buffered()) 158 r.data = r.data[:copy(r.data, old)] 159 r.n = 0 160 } 161 162 // keep filling until 163 // we hit an error or 164 // read enough bytes 165 for r.buffered() < n && r.state == nil { 166 r.more() 167 } 168 169 // we must have hit an error 170 if r.buffered() < n { 171 return r.data[r.n:], r.err() 172 } 173 174 return r.data[r.n : r.n+n], nil 175} 176 177// Skip moves the reader forward 'n' bytes. 178// Returns the number of bytes skipped and any 179// errors encountered. It is analogous to Seek(n, 1). 180// If the underlying reader implements io.Seeker, then 181// that method will be used to skip forward. 182// 183// If the reader encounters 184// an EOF before skipping 'n' bytes, it 185// returns io.ErrUnexpectedEOF. If the 186// underlying reader implements io.Seeker, then 187// those rules apply instead. (Many implementations 188// will not return `io.EOF` until the next call 189// to Read.) 190func (r *Reader) Skip(n int) (int, error) { 191 192 // fast path 193 if r.buffered() >= n { 194 r.n += n 195 return n, nil 196 } 197 198 // use seeker implementation 199 // if we can 200 if r.rs != nil { 201 return r.skipSeek(n) 202 } 203 204 // loop on filling 205 // and then erasing 206 o := n 207 for r.buffered() < n && r.state == nil { 208 r.more() 209 // we can skip forward 210 // up to r.buffered() bytes 211 step := min(r.buffered(), n) 212 r.n += step 213 n -= step 214 } 215 // at this point, n should be 216 // 0 if everything went smoothly 217 return o - n, r.noEOF() 218} 219 220// Next returns the next 'n' bytes in the stream. 221// Unlike Peek, Next advances the reader position. 222// The returned bytes point to the same 223// data as the buffer, so the slice is 224// only valid until the next reader method call. 225// An EOF is considered an unexpected error. 226// If an the returned slice is less than the 227// length asked for, an error will be returned, 228// and the reader position will not be incremented. 229func (r *Reader) Next(n int) ([]byte, error) { 230 231 // in case the buffer is too small 232 if cap(r.data) < n { 233 old := r.data[r.n:] 234 r.data = make([]byte, n+r.buffered()) 235 r.data = r.data[:copy(r.data, old)] 236 r.n = 0 237 } 238 239 // fill at least 'n' bytes 240 for r.buffered() < n && r.state == nil { 241 r.more() 242 } 243 244 if r.buffered() < n { 245 return r.data[r.n:], r.noEOF() 246 } 247 out := r.data[r.n : r.n+n] 248 r.n += n 249 return out, nil 250} 251 252// skipSeek uses the io.Seeker to seek forward. 253// only call this function when n > r.buffered() 254func (r *Reader) skipSeek(n int) (int, error) { 255 o := r.buffered() 256 // first, clear buffer 257 n -= o 258 r.n = 0 259 r.data = r.data[:0] 260 261 // then seek forward remaning bytes 262 i, err := r.rs.Seek(int64(n), 1) 263 return int(i) + o, err 264} 265 266// Read implements `io.Reader` 267func (r *Reader) Read(b []byte) (int, error) { 268 // if we have data in the buffer, just 269 // return that. 270 if r.buffered() != 0 { 271 x := copy(b, r.data[r.n:]) 272 r.n += x 273 return x, nil 274 } 275 var n int 276 // we have no buffered data; determine 277 // whether or not to buffer or call 278 // the underlying reader directly 279 if len(b) >= cap(r.data) { 280 n, r.state = r.r.Read(b) 281 } else { 282 r.more() 283 n = copy(b, r.data) 284 r.n = n 285 } 286 if n == 0 { 287 return 0, r.err() 288 } 289 return n, nil 290} 291 292// ReadFull attempts to read len(b) bytes into 293// 'b'. It returns the number of bytes read into 294// 'b', and an error if it does not return len(b). 295// EOF is considered an unexpected error. 296func (r *Reader) ReadFull(b []byte) (int, error) { 297 var n int // read into b 298 var nn int // scratch 299 l := len(b) 300 // either read buffered data, 301 // or read directly for the underlying 302 // buffer, or fetch more buffered data. 303 for n < l && r.state == nil { 304 if r.buffered() != 0 { 305 nn = copy(b[n:], r.data[r.n:]) 306 n += nn 307 r.n += nn 308 } else if l-n > cap(r.data) { 309 nn, r.state = r.r.Read(b[n:]) 310 n += nn 311 } else { 312 r.more() 313 } 314 } 315 if n < l { 316 return n, r.noEOF() 317 } 318 return n, nil 319} 320 321// ReadByte implements `io.ByteReader` 322func (r *Reader) ReadByte() (byte, error) { 323 for r.buffered() < 1 && r.state == nil { 324 r.more() 325 } 326 if r.buffered() < 1 { 327 return 0, r.err() 328 } 329 b := r.data[r.n] 330 r.n++ 331 return b, nil 332} 333 334// WriteTo implements `io.WriterTo` 335func (r *Reader) WriteTo(w io.Writer) (int64, error) { 336 var ( 337 i int64 338 ii int 339 err error 340 ) 341 // first, clear buffer 342 if r.buffered() > 0 { 343 ii, err = w.Write(r.data[r.n:]) 344 i += int64(ii) 345 if err != nil { 346 return i, err 347 } 348 r.data = r.data[0:0] 349 r.n = 0 350 } 351 for r.state == nil { 352 // here we just do 353 // 1:1 reads and writes 354 r.more() 355 if r.buffered() > 0 { 356 ii, err = w.Write(r.data) 357 i += int64(ii) 358 if err != nil { 359 return i, err 360 } 361 r.data = r.data[0:0] 362 r.n = 0 363 } 364 } 365 if r.state != io.EOF { 366 return i, r.err() 367 } 368 return i, nil 369} 370 371func min(a int, b int) int { 372 if a < b { 373 return a 374 } 375 return b 376} 377 378func max(a int, b int) int { 379 if a < b { 380 return b 381 } 382 return a 383} 384