1// Copyright (c) 2017 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package zap 16 17import ( 18 "bytes" 19 "encoding/binary" 20 "io" 21 "math" 22) 23 24const termNotEncoded = math.MaxUint64 25 26type chunkedIntCoder struct { 27 final []byte 28 chunkSize uint64 29 chunkBuf bytes.Buffer 30 chunkLens []uint64 31 currChunk uint64 32 33 buf []byte 34} 35 36// newChunkedIntCoder returns a new chunk int coder which packs data into 37// chunks based on the provided chunkSize and supports up to the specified 38// maxDocNum 39func newChunkedIntCoder(chunkSize uint64, maxDocNum uint64) *chunkedIntCoder { 40 total := maxDocNum/chunkSize + 1 41 rv := &chunkedIntCoder{ 42 chunkSize: chunkSize, 43 chunkLens: make([]uint64, total), 44 final: make([]byte, 0, 64), 45 } 46 47 return rv 48} 49 50// Reset lets you reuse this chunked int coder. buffers are reset and reused 51// from previous use. you cannot change the chunk size or max doc num. 52func (c *chunkedIntCoder) Reset() { 53 c.final = c.final[:0] 54 c.chunkBuf.Reset() 55 c.currChunk = 0 56 for i := range c.chunkLens { 57 c.chunkLens[i] = 0 58 } 59} 60 61// SetChunkSize changes the chunk size. It is only valid to do so 62// with a new chunkedIntCoder, or immediately after calling Reset() 63func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) { 64 total := int(maxDocNum/chunkSize + 1) 65 c.chunkSize = chunkSize 66 if cap(c.chunkLens) < total { 67 c.chunkLens = make([]uint64, total) 68 } else { 69 c.chunkLens = c.chunkLens[:total] 70 } 71} 72 73// Add encodes the provided integers into the correct chunk for the provided 74// doc num. You MUST call Add() with increasing docNums. 75func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { 76 chunk := docNum / c.chunkSize 77 if chunk != c.currChunk { 78 // starting a new chunk 79 c.Close() 80 c.chunkBuf.Reset() 81 c.currChunk = chunk 82 } 83 84 if len(c.buf) < binary.MaxVarintLen64 { 85 c.buf = make([]byte, binary.MaxVarintLen64) 86 } 87 88 for _, val := range vals { 89 wb := binary.PutUvarint(c.buf, val) 90 _, err := c.chunkBuf.Write(c.buf[:wb]) 91 if err != nil { 92 return err 93 } 94 } 95 96 return nil 97} 98 99func (c *chunkedIntCoder) AddBytes(docNum uint64, buf []byte) error { 100 chunk := docNum / c.chunkSize 101 if chunk != c.currChunk { 102 // starting a new chunk 103 c.Close() 104 c.chunkBuf.Reset() 105 c.currChunk = chunk 106 } 107 108 _, err := c.chunkBuf.Write(buf) 109 return err 110} 111 112// Close indicates you are done calling Add() this allows the final chunk 113// to be encoded. 114func (c *chunkedIntCoder) Close() { 115 encodingBytes := c.chunkBuf.Bytes() 116 c.chunkLens[c.currChunk] = uint64(len(encodingBytes)) 117 c.final = append(c.final, encodingBytes...) 118 c.currChunk = uint64(cap(c.chunkLens)) // sentinel to detect double close 119} 120 121// Write commits all the encoded chunked integers to the provided writer. 122func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { 123 bufNeeded := binary.MaxVarintLen64 * (1 + len(c.chunkLens)) 124 if len(c.buf) < bufNeeded { 125 c.buf = make([]byte, bufNeeded) 126 } 127 buf := c.buf 128 129 // convert the chunk lengths into chunk offsets 130 chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens) 131 132 // write out the number of chunks & each chunk offsets 133 n := binary.PutUvarint(buf, uint64(len(chunkOffsets))) 134 for _, chunkOffset := range chunkOffsets { 135 n += binary.PutUvarint(buf[n:], chunkOffset) 136 } 137 138 tw, err := w.Write(buf[:n]) 139 if err != nil { 140 return tw, err 141 } 142 143 // write out the data 144 nw, err := w.Write(c.final) 145 tw += nw 146 if err != nil { 147 return tw, err 148 } 149 return tw, nil 150} 151 152// writeAt commits all the encoded chunked integers to the provided writer 153// and returns the starting offset, total bytes written and an error 154func (c *chunkedIntCoder) writeAt(w io.Writer) (uint64, int, error) { 155 startOffset := uint64(termNotEncoded) 156 if len(c.final) <= 0 { 157 return startOffset, 0, nil 158 } 159 160 if chw := w.(*CountHashWriter); chw != nil { 161 startOffset = uint64(chw.Count()) 162 } 163 164 tw, err := c.Write(w) 165 return startOffset, tw, err 166} 167 168func (c *chunkedIntCoder) FinalSize() int { 169 return len(c.final) 170} 171 172// modifyLengthsToEndOffsets converts the chunk length array 173// to a chunk offset array. The readChunkBoundary 174// will figure out the start and end of every chunk from 175// these offsets. Starting offset of i'th index is stored 176// in i-1'th position except for 0'th index and ending offset 177// is stored at i'th index position. 178// For 0'th element, starting position is always zero. 179// eg: 180// Lens -> 5 5 5 5 => 5 10 15 20 181// Lens -> 0 5 0 5 => 0 5 5 10 182// Lens -> 0 0 0 5 => 0 0 0 5 183// Lens -> 5 0 0 0 => 5 5 5 5 184// Lens -> 0 5 0 0 => 0 5 5 5 185// Lens -> 0 0 5 0 => 0 0 5 5 186func modifyLengthsToEndOffsets(lengths []uint64) []uint64 { 187 var runningOffset uint64 188 var index, i int 189 for i = 1; i <= len(lengths); i++ { 190 runningOffset += lengths[i-1] 191 lengths[index] = runningOffset 192 index++ 193 } 194 return lengths 195} 196 197func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) { 198 var start uint64 199 if chunk > 0 { 200 start = offsets[chunk-1] 201 } 202 return start, offsets[chunk] 203} 204