1// Package buffer implements a buffer for serialization, consisting of a chain of []byte-s to 2// reduce copying and to allow reuse of individual chunks. 3package buffer 4 5import ( 6 "io" 7 "sync" 8) 9 10// PoolConfig contains configuration for the allocation and reuse strategy. 11type PoolConfig struct { 12 StartSize int // Minimum chunk size that is allocated. 13 PooledSize int // Minimum chunk size that is reused, reusing chunks too small will result in overhead. 14 MaxSize int // Maximum chunk size that will be allocated. 15} 16 17var config = PoolConfig{ 18 StartSize: 128, 19 PooledSize: 512, 20 MaxSize: 32768, 21} 22 23// Reuse pool: chunk size -> pool. 24var buffers = map[int]*sync.Pool{} 25 26func initBuffers() { 27 for l := config.PooledSize; l <= config.MaxSize; l *= 2 { 28 buffers[l] = new(sync.Pool) 29 } 30} 31 32func init() { 33 initBuffers() 34} 35 36// Init sets up a non-default pooling and allocation strategy. Should be run before serialization is done. 37func Init(cfg PoolConfig) { 38 config = cfg 39 initBuffers() 40} 41 42// putBuf puts a chunk to reuse pool if it can be reused. 43func putBuf(buf []byte) { 44 size := cap(buf) 45 if size < config.PooledSize { 46 return 47 } 48 if c := buffers[size]; c != nil { 49 c.Put(buf[:0]) 50 } 51} 52 53// getBuf gets a chunk from reuse pool or creates a new one if reuse failed. 54func getBuf(size int) []byte { 55 if size < config.PooledSize { 56 return make([]byte, 0, size) 57 } 58 59 if c := buffers[size]; c != nil { 60 v := c.Get() 61 if v != nil { 62 return v.([]byte) 63 } 64 } 65 return make([]byte, 0, size) 66} 67 68// Buffer is a buffer optimized for serialization without extra copying. 69type Buffer struct { 70 71 // Buf is the current chunk that can be used for serialization. 72 Buf []byte 73 74 toPool []byte 75 bufs [][]byte 76} 77 78// EnsureSpace makes sure that the current chunk contains at least s free bytes, 79// possibly creating a new chunk. 80func (b *Buffer) EnsureSpace(s int) { 81 if cap(b.Buf)-len(b.Buf) >= s { 82 return 83 } 84 l := len(b.Buf) 85 if l > 0 { 86 if cap(b.toPool) != cap(b.Buf) { 87 // Chunk was reallocated, toPool can be pooled. 88 putBuf(b.toPool) 89 } 90 if cap(b.bufs) == 0 { 91 b.bufs = make([][]byte, 0, 8) 92 } 93 b.bufs = append(b.bufs, b.Buf) 94 l = cap(b.toPool) * 2 95 } else { 96 l = config.StartSize 97 } 98 99 if l > config.MaxSize { 100 l = config.MaxSize 101 } 102 b.Buf = getBuf(l) 103 b.toPool = b.Buf 104} 105 106// AppendByte appends a single byte to buffer. 107func (b *Buffer) AppendByte(data byte) { 108 if cap(b.Buf) == len(b.Buf) { // EnsureSpace won't be inlined. 109 b.EnsureSpace(1) 110 } 111 b.Buf = append(b.Buf, data) 112} 113 114// AppendBytes appends a byte slice to buffer. 115func (b *Buffer) AppendBytes(data []byte) { 116 for len(data) > 0 { 117 if cap(b.Buf) == len(b.Buf) { // EnsureSpace won't be inlined. 118 b.EnsureSpace(1) 119 } 120 121 sz := cap(b.Buf) - len(b.Buf) 122 if sz > len(data) { 123 sz = len(data) 124 } 125 126 b.Buf = append(b.Buf, data[:sz]...) 127 data = data[sz:] 128 } 129} 130 131// AppendBytes appends a string to buffer. 132func (b *Buffer) AppendString(data string) { 133 for len(data) > 0 { 134 if cap(b.Buf) == len(b.Buf) { // EnsureSpace won't be inlined. 135 b.EnsureSpace(1) 136 } 137 138 sz := cap(b.Buf) - len(b.Buf) 139 if sz > len(data) { 140 sz = len(data) 141 } 142 143 b.Buf = append(b.Buf, data[:sz]...) 144 data = data[sz:] 145 } 146} 147 148// Size computes the size of a buffer by adding sizes of every chunk. 149func (b *Buffer) Size() int { 150 size := len(b.Buf) 151 for _, buf := range b.bufs { 152 size += len(buf) 153 } 154 return size 155} 156 157// DumpTo outputs the contents of a buffer to a writer and resets the buffer. 158func (b *Buffer) DumpTo(w io.Writer) (written int, err error) { 159 var n int 160 for _, buf := range b.bufs { 161 if err == nil { 162 n, err = w.Write(buf) 163 written += n 164 } 165 putBuf(buf) 166 } 167 168 if err == nil { 169 n, err = w.Write(b.Buf) 170 written += n 171 } 172 putBuf(b.toPool) 173 174 b.bufs = nil 175 b.Buf = nil 176 b.toPool = nil 177 178 return 179} 180 181// BuildBytes creates a single byte slice with all the contents of the buffer. Data is 182// copied if it does not fit in a single chunk. You can optionally provide one byte 183// slice as argument that it will try to reuse. 184func (b *Buffer) BuildBytes(reuse ...[]byte) []byte { 185 if len(b.bufs) == 0 { 186 ret := b.Buf 187 b.toPool = nil 188 b.Buf = nil 189 return ret 190 } 191 192 var ret []byte 193 size := b.Size() 194 195 // If we got a buffer as argument and it is big enought, reuse it. 196 if len(reuse) == 1 && cap(reuse[0]) >= size { 197 ret = reuse[0][:0] 198 } else { 199 ret = make([]byte, 0, size) 200 } 201 for _, buf := range b.bufs { 202 ret = append(ret, buf...) 203 putBuf(buf) 204 } 205 206 ret = append(ret, b.Buf...) 207 putBuf(b.toPool) 208 209 b.bufs = nil 210 b.toPool = nil 211 b.Buf = nil 212 213 return ret 214} 215 216type readCloser struct { 217 offset int 218 bufs [][]byte 219} 220 221func (r *readCloser) Read(p []byte) (n int, err error) { 222 for _, buf := range r.bufs { 223 // Copy as much as we can. 224 x := copy(p[n:], buf[r.offset:]) 225 n += x // Increment how much we filled. 226 227 // Did we empty the whole buffer? 228 if r.offset+x == len(buf) { 229 // On to the next buffer. 230 r.offset = 0 231 r.bufs = r.bufs[1:] 232 233 // We can release this buffer. 234 putBuf(buf) 235 } else { 236 r.offset += x 237 } 238 239 if n == len(p) { 240 break 241 } 242 } 243 // No buffers left or nothing read? 244 if len(r.bufs) == 0 { 245 err = io.EOF 246 } 247 return 248} 249 250func (r *readCloser) Close() error { 251 // Release all remaining buffers. 252 for _, buf := range r.bufs { 253 putBuf(buf) 254 } 255 // In case Close gets called multiple times. 256 r.bufs = nil 257 258 return nil 259} 260 261// ReadCloser creates an io.ReadCloser with all the contents of the buffer. 262func (b *Buffer) ReadCloser() io.ReadCloser { 263 ret := &readCloser{0, append(b.bufs, b.Buf)} 264 265 b.bufs = nil 266 b.toPool = nil 267 b.Buf = nil 268 269 return ret 270} 271