1// Copyright (c) 2017 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package zap 16 17import ( 18 "bytes" 19 "encoding/binary" 20 "io" 21) 22 23// We can safely use 0 to represent termNotEncoded since 0 24// could never be a valid address for term location information. 25// (stored field index is always non-empty and earlier in the 26// file) 27const termNotEncoded = 0 28 29type chunkedIntCoder struct { 30 final []byte 31 chunkSize uint64 32 chunkBuf bytes.Buffer 33 chunkLens []uint64 34 currChunk uint64 35 36 buf []byte 37} 38 39// newChunkedIntCoder returns a new chunk int coder which packs data into 40// chunks based on the provided chunkSize and supports up to the specified 41// maxDocNum 42func newChunkedIntCoder(chunkSize uint64, maxDocNum uint64) *chunkedIntCoder { 43 total := maxDocNum/chunkSize + 1 44 rv := &chunkedIntCoder{ 45 chunkSize: chunkSize, 46 chunkLens: make([]uint64, total), 47 final: make([]byte, 0, 64), 48 } 49 50 return rv 51} 52 53// Reset lets you reuse this chunked int coder. buffers are reset and reused 54// from previous use. you cannot change the chunk size or max doc num. 55func (c *chunkedIntCoder) Reset() { 56 c.final = c.final[:0] 57 c.chunkBuf.Reset() 58 c.currChunk = 0 59 for i := range c.chunkLens { 60 c.chunkLens[i] = 0 61 } 62} 63 64// SetChunkSize changes the chunk size. It is only valid to do so 65// with a new chunkedIntCoder, or immediately after calling Reset() 66func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) { 67 total := int(maxDocNum/chunkSize + 1) 68 c.chunkSize = chunkSize 69 if cap(c.chunkLens) < total { 70 c.chunkLens = make([]uint64, total) 71 } else { 72 c.chunkLens = c.chunkLens[:total] 73 } 74} 75 76// Add encodes the provided integers into the correct chunk for the provided 77// doc num. You MUST call Add() with increasing docNums. 78func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { 79 chunk := docNum / c.chunkSize 80 if chunk != c.currChunk { 81 // starting a new chunk 82 c.Close() 83 c.chunkBuf.Reset() 84 c.currChunk = chunk 85 } 86 87 if len(c.buf) < binary.MaxVarintLen64 { 88 c.buf = make([]byte, binary.MaxVarintLen64) 89 } 90 91 for _, val := range vals { 92 wb := binary.PutUvarint(c.buf, val) 93 _, err := c.chunkBuf.Write(c.buf[:wb]) 94 if err != nil { 95 return err 96 } 97 } 98 99 return nil 100} 101 102func (c *chunkedIntCoder) AddBytes(docNum uint64, buf []byte) error { 103 chunk := docNum / c.chunkSize 104 if chunk != c.currChunk { 105 // starting a new chunk 106 c.Close() 107 c.chunkBuf.Reset() 108 c.currChunk = chunk 109 } 110 111 _, err := c.chunkBuf.Write(buf) 112 return err 113} 114 115// Close indicates you are done calling Add() this allows the final chunk 116// to be encoded. 117func (c *chunkedIntCoder) Close() { 118 encodingBytes := c.chunkBuf.Bytes() 119 c.chunkLens[c.currChunk] = uint64(len(encodingBytes)) 120 c.final = append(c.final, encodingBytes...) 121 c.currChunk = uint64(cap(c.chunkLens)) // sentinel to detect double close 122} 123 124// Write commits all the encoded chunked integers to the provided writer. 125func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { 126 bufNeeded := binary.MaxVarintLen64 * (1 + len(c.chunkLens)) 127 if len(c.buf) < bufNeeded { 128 c.buf = make([]byte, bufNeeded) 129 } 130 buf := c.buf 131 132 // convert the chunk lengths into chunk offsets 133 chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens) 134 135 // write out the number of chunks & each chunk offsets 136 n := binary.PutUvarint(buf, uint64(len(chunkOffsets))) 137 for _, chunkOffset := range chunkOffsets { 138 n += binary.PutUvarint(buf[n:], chunkOffset) 139 } 140 141 tw, err := w.Write(buf[:n]) 142 if err != nil { 143 return tw, err 144 } 145 146 // write out the data 147 nw, err := w.Write(c.final) 148 tw += nw 149 if err != nil { 150 return tw, err 151 } 152 return tw, nil 153} 154 155// writeAt commits all the encoded chunked integers to the provided writer 156// and returns the starting offset, total bytes written and an error 157func (c *chunkedIntCoder) writeAt(w io.Writer) (uint64, int, error) { 158 startOffset := uint64(termNotEncoded) 159 if len(c.final) <= 0 { 160 return startOffset, 0, nil 161 } 162 163 if chw := w.(*CountHashWriter); chw != nil { 164 startOffset = uint64(chw.Count()) 165 } 166 167 tw, err := c.Write(w) 168 return startOffset, tw, err 169} 170 171func (c *chunkedIntCoder) FinalSize() int { 172 return len(c.final) 173} 174 175// modifyLengthsToEndOffsets converts the chunk length array 176// to a chunk offset array. The readChunkBoundary 177// will figure out the start and end of every chunk from 178// these offsets. Starting offset of i'th index is stored 179// in i-1'th position except for 0'th index and ending offset 180// is stored at i'th index position. 181// For 0'th element, starting position is always zero. 182// eg: 183// Lens -> 5 5 5 5 => 5 10 15 20 184// Lens -> 0 5 0 5 => 0 5 5 10 185// Lens -> 0 0 0 5 => 0 0 0 5 186// Lens -> 5 0 0 0 => 5 5 5 5 187// Lens -> 0 5 0 0 => 0 5 5 5 188// Lens -> 0 0 5 0 => 0 0 5 5 189func modifyLengthsToEndOffsets(lengths []uint64) []uint64 { 190 var runningOffset uint64 191 var index, i int 192 for i = 1; i <= len(lengths); i++ { 193 runningOffset += lengths[i-1] 194 lengths[index] = runningOffset 195 index++ 196 } 197 return lengths 198} 199 200func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) { 201 var start uint64 202 if chunk > 0 { 203 start = offsets[chunk-1] 204 } 205 return start, offsets[chunk] 206} 207