1// Package buffer implements a buffer for serialization, consisting of a chain of []byte-s to 2// reduce copying and to allow reuse of individual chunks. 3package buffer 4 5import ( 6 "io" 7 "net" 8 "sync" 9) 10 11// PoolConfig contains configuration for the allocation and reuse strategy. 12type PoolConfig struct { 13 StartSize int // Minimum chunk size that is allocated. 14 PooledSize int // Minimum chunk size that is reused, reusing chunks too small will result in overhead. 15 MaxSize int // Maximum chunk size that will be allocated. 16} 17 18var config = PoolConfig{ 19 StartSize: 128, 20 PooledSize: 512, 21 MaxSize: 32768, 22} 23 24// Reuse pool: chunk size -> pool. 25var buffers = map[int]*sync.Pool{} 26 27func initBuffers() { 28 for l := config.PooledSize; l <= config.MaxSize; l *= 2 { 29 buffers[l] = new(sync.Pool) 30 } 31} 32 33func init() { 34 initBuffers() 35} 36 37// Init sets up a non-default pooling and allocation strategy. Should be run before serialization is done. 38func Init(cfg PoolConfig) { 39 config = cfg 40 initBuffers() 41} 42 43// putBuf puts a chunk to reuse pool if it can be reused. 44func putBuf(buf []byte) { 45 size := cap(buf) 46 if size < config.PooledSize { 47 return 48 } 49 if c := buffers[size]; c != nil { 50 c.Put(buf[:0]) 51 } 52} 53 54// getBuf gets a chunk from reuse pool or creates a new one if reuse failed. 55func getBuf(size int) []byte { 56 if size >= config.PooledSize { 57 if c := buffers[size]; c != nil { 58 v := c.Get() 59 if v != nil { 60 return v.([]byte) 61 } 62 } 63 } 64 return make([]byte, 0, size) 65} 66 67// Buffer is a buffer optimized for serialization without extra copying. 68type Buffer struct { 69 70 // Buf is the current chunk that can be used for serialization. 71 Buf []byte 72 73 toPool []byte 74 bufs [][]byte 75} 76 77// EnsureSpace makes sure that the current chunk contains at least s free bytes, 78// possibly creating a new chunk. 79func (b *Buffer) EnsureSpace(s int) { 80 if cap(b.Buf)-len(b.Buf) < s { 81 b.ensureSpaceSlow(s) 82 } 83} 84 85func (b *Buffer) ensureSpaceSlow(s int) { 86 l := len(b.Buf) 87 if l > 0 { 88 if cap(b.toPool) != cap(b.Buf) { 89 // Chunk was reallocated, toPool can be pooled. 90 putBuf(b.toPool) 91 } 92 if cap(b.bufs) == 0 { 93 b.bufs = make([][]byte, 0, 8) 94 } 95 b.bufs = append(b.bufs, b.Buf) 96 l = cap(b.toPool) * 2 97 } else { 98 l = config.StartSize 99 } 100 101 if l > config.MaxSize { 102 l = config.MaxSize 103 } 104 b.Buf = getBuf(l) 105 b.toPool = b.Buf 106} 107 108// AppendByte appends a single byte to buffer. 109func (b *Buffer) AppendByte(data byte) { 110 b.EnsureSpace(1) 111 b.Buf = append(b.Buf, data) 112} 113 114// AppendBytes appends a byte slice to buffer. 115func (b *Buffer) AppendBytes(data []byte) { 116 if len(data) <= cap(b.Buf)-len(b.Buf) { 117 b.Buf = append(b.Buf, data...) // fast path 118 } else { 119 b.appendBytesSlow(data) 120 } 121} 122 123func (b *Buffer) appendBytesSlow(data []byte) { 124 for len(data) > 0 { 125 b.EnsureSpace(1) 126 127 sz := cap(b.Buf) - len(b.Buf) 128 if sz > len(data) { 129 sz = len(data) 130 } 131 132 b.Buf = append(b.Buf, data[:sz]...) 133 data = data[sz:] 134 } 135} 136 137// AppendString appends a string to buffer. 138func (b *Buffer) AppendString(data string) { 139 if len(data) <= cap(b.Buf)-len(b.Buf) { 140 b.Buf = append(b.Buf, data...) // fast path 141 } else { 142 b.appendStringSlow(data) 143 } 144} 145 146func (b *Buffer) appendStringSlow(data string) { 147 for len(data) > 0 { 148 b.EnsureSpace(1) 149 150 sz := cap(b.Buf) - len(b.Buf) 151 if sz > len(data) { 152 sz = len(data) 153 } 154 155 b.Buf = append(b.Buf, data[:sz]...) 156 data = data[sz:] 157 } 158} 159 160// Size computes the size of a buffer by adding sizes of every chunk. 161func (b *Buffer) Size() int { 162 size := len(b.Buf) 163 for _, buf := range b.bufs { 164 size += len(buf) 165 } 166 return size 167} 168 169// DumpTo outputs the contents of a buffer to a writer and resets the buffer. 170func (b *Buffer) DumpTo(w io.Writer) (written int, err error) { 171 bufs := net.Buffers(b.bufs) 172 if len(b.Buf) > 0 { 173 bufs = append(bufs, b.Buf) 174 } 175 n, err := bufs.WriteTo(w) 176 177 for _, buf := range b.bufs { 178 putBuf(buf) 179 } 180 putBuf(b.toPool) 181 182 b.bufs = nil 183 b.Buf = nil 184 b.toPool = nil 185 186 return int(n), err 187} 188 189// BuildBytes creates a single byte slice with all the contents of the buffer. Data is 190// copied if it does not fit in a single chunk. You can optionally provide one byte 191// slice as argument that it will try to reuse. 192func (b *Buffer) BuildBytes(reuse ...[]byte) []byte { 193 if len(b.bufs) == 0 { 194 ret := b.Buf 195 b.toPool = nil 196 b.Buf = nil 197 return ret 198 } 199 200 var ret []byte 201 size := b.Size() 202 203 // If we got a buffer as argument and it is big enough, reuse it. 204 if len(reuse) == 1 && cap(reuse[0]) >= size { 205 ret = reuse[0][:0] 206 } else { 207 ret = make([]byte, 0, size) 208 } 209 for _, buf := range b.bufs { 210 ret = append(ret, buf...) 211 putBuf(buf) 212 } 213 214 ret = append(ret, b.Buf...) 215 putBuf(b.toPool) 216 217 b.bufs = nil 218 b.toPool = nil 219 b.Buf = nil 220 221 return ret 222} 223 224type readCloser struct { 225 offset int 226 bufs [][]byte 227} 228 229func (r *readCloser) Read(p []byte) (n int, err error) { 230 for _, buf := range r.bufs { 231 // Copy as much as we can. 232 x := copy(p[n:], buf[r.offset:]) 233 n += x // Increment how much we filled. 234 235 // Did we empty the whole buffer? 236 if r.offset+x == len(buf) { 237 // On to the next buffer. 238 r.offset = 0 239 r.bufs = r.bufs[1:] 240 241 // We can release this buffer. 242 putBuf(buf) 243 } else { 244 r.offset += x 245 } 246 247 if n == len(p) { 248 break 249 } 250 } 251 // No buffers left or nothing read? 252 if len(r.bufs) == 0 { 253 err = io.EOF 254 } 255 return 256} 257 258func (r *readCloser) Close() error { 259 // Release all remaining buffers. 260 for _, buf := range r.bufs { 261 putBuf(buf) 262 } 263 // In case Close gets called multiple times. 264 r.bufs = nil 265 266 return nil 267} 268 269// ReadCloser creates an io.ReadCloser with all the contents of the buffer. 270func (b *Buffer) ReadCloser() io.ReadCloser { 271 ret := &readCloser{0, append(b.bufs, b.Buf)} 272 273 b.bufs = nil 274 b.toPool = nil 275 b.Buf = nil 276 277 return ret 278} 279