1// Copyright (c) 2017 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package zap 16 17import ( 18 "bytes" 19 "encoding/binary" 20 "io" 21) 22 23type chunkedIntCoder struct { 24 final []byte 25 chunkSize uint64 26 chunkBuf bytes.Buffer 27 chunkLens []uint64 28 currChunk uint64 29 30 buf []byte 31} 32 33// newChunkedIntCoder returns a new chunk int coder which packs data into 34// chunks based on the provided chunkSize and supports up to the specified 35// maxDocNum 36func newChunkedIntCoder(chunkSize uint64, maxDocNum uint64) *chunkedIntCoder { 37 total := maxDocNum/chunkSize + 1 38 rv := &chunkedIntCoder{ 39 chunkSize: chunkSize, 40 chunkLens: make([]uint64, total), 41 final: make([]byte, 0, 64), 42 } 43 44 return rv 45} 46 47// Reset lets you reuse this chunked int coder. buffers are reset and reused 48// from previous use. you cannot change the chunk size or max doc num. 49func (c *chunkedIntCoder) Reset() { 50 c.final = c.final[:0] 51 c.chunkBuf.Reset() 52 c.currChunk = 0 53 for i := range c.chunkLens { 54 c.chunkLens[i] = 0 55 } 56} 57 58// Add encodes the provided integers into the correct chunk for the provided 59// doc num. You MUST call Add() with increasing docNums. 60func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { 61 chunk := docNum / c.chunkSize 62 if chunk != c.currChunk { 63 // starting a new chunk 64 c.Close() 65 c.chunkBuf.Reset() 66 c.currChunk = chunk 67 } 68 69 if len(c.buf) < binary.MaxVarintLen64 { 70 c.buf = make([]byte, binary.MaxVarintLen64) 71 } 72 73 for _, val := range vals { 74 wb := binary.PutUvarint(c.buf, val) 75 _, err := c.chunkBuf.Write(c.buf[:wb]) 76 if err != nil { 77 return err 78 } 79 } 80 81 return nil 82} 83 84func (c *chunkedIntCoder) AddBytes(docNum uint64, buf []byte) error { 85 chunk := docNum / c.chunkSize 86 if chunk != c.currChunk { 87 // starting a new chunk 88 c.Close() 89 c.chunkBuf.Reset() 90 c.currChunk = chunk 91 } 92 93 _, err := c.chunkBuf.Write(buf) 94 return err 95} 96 97// Close indicates you are done calling Add() this allows the final chunk 98// to be encoded. 99func (c *chunkedIntCoder) Close() { 100 encodingBytes := c.chunkBuf.Bytes() 101 c.chunkLens[c.currChunk] = uint64(len(encodingBytes)) 102 c.final = append(c.final, encodingBytes...) 103 c.currChunk = uint64(cap(c.chunkLens)) // sentinel to detect double close 104} 105 106// Write commits all the encoded chunked integers to the provided writer. 107func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { 108 bufNeeded := binary.MaxVarintLen64 * (1 + len(c.chunkLens)) 109 if len(c.buf) < bufNeeded { 110 c.buf = make([]byte, bufNeeded) 111 } 112 buf := c.buf 113 114 // convert the chunk lengths into chunk offsets 115 chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens) 116 117 // write out the number of chunks & each chunk offsets 118 n := binary.PutUvarint(buf, uint64(len(chunkOffsets))) 119 for _, chunkOffset := range chunkOffsets { 120 n += binary.PutUvarint(buf[n:], chunkOffset) 121 } 122 123 tw, err := w.Write(buf[:n]) 124 if err != nil { 125 return tw, err 126 } 127 128 // write out the data 129 nw, err := w.Write(c.final) 130 tw += nw 131 if err != nil { 132 return tw, err 133 } 134 return tw, nil 135} 136 137func (c *chunkedIntCoder) FinalSize() int { 138 return len(c.final) 139} 140 141// modifyLengthsToEndOffsets converts the chunk length array 142// to a chunk offset array. The readChunkBoundary 143// will figure out the start and end of every chunk from 144// these offsets. Starting offset of i'th index is stored 145// in i-1'th position except for 0'th index and ending offset 146// is stored at i'th index position. 147// For 0'th element, starting position is always zero. 148// eg: 149// Lens -> 5 5 5 5 => 5 10 15 20 150// Lens -> 0 5 0 5 => 0 5 5 10 151// Lens -> 0 0 0 5 => 0 0 0 5 152// Lens -> 5 0 0 0 => 5 5 5 5 153// Lens -> 0 5 0 0 => 0 5 5 5 154// Lens -> 0 0 5 0 => 0 0 5 5 155func modifyLengthsToEndOffsets(lengths []uint64) []uint64 { 156 var runningOffset uint64 157 var index, i int 158 for i = 1; i <= len(lengths); i++ { 159 runningOffset += lengths[i-1] 160 lengths[index] = runningOffset 161 index++ 162 } 163 return lengths 164} 165 166func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) { 167 var start uint64 168 if chunk > 0 { 169 start = offsets[chunk-1] 170 } 171 return start, offsets[chunk] 172} 173