1// Copyright (c) 2017 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package zap 16 17import ( 18 "bytes" 19 "encoding/binary" 20 "io" 21 "reflect" 22 23 "github.com/golang/snappy" 24) 25 26var reflectStaticSizeMetaData int 27 28func init() { 29 var md MetaData 30 reflectStaticSizeMetaData = int(reflect.TypeOf(md).Size()) 31} 32 33var termSeparator byte = 0xff 34var termSeparatorSplitSlice = []byte{termSeparator} 35 36type chunkedContentCoder struct { 37 final []byte 38 chunkSize uint64 39 currChunk uint64 40 chunkLens []uint64 41 42 w io.Writer 43 progressiveWrite bool 44 45 chunkMetaBuf bytes.Buffer 46 chunkBuf bytes.Buffer 47 48 chunkMeta []MetaData 49 50 compressed []byte // temp buf for snappy compression 51} 52 53// MetaData represents the data information inside a 54// chunk. 55type MetaData struct { 56 DocNum uint64 // docNum of the data inside the chunk 57 DocDvOffset uint64 // offset of data inside the chunk for the given docid 58} 59 60// newChunkedContentCoder returns a new chunk content coder which 61// packs data into chunks based on the provided chunkSize 62func newChunkedContentCoder(chunkSize uint64, maxDocNum uint64, 63 w io.Writer, progressiveWrite bool) *chunkedContentCoder { 64 total := maxDocNum/chunkSize + 1 65 rv := &chunkedContentCoder{ 66 chunkSize: chunkSize, 67 chunkLens: make([]uint64, total), 68 chunkMeta: make([]MetaData, 0, total), 69 w: w, 70 progressiveWrite: progressiveWrite, 71 } 72 73 return rv 74} 75 76// Reset lets you reuse this chunked content coder. Buffers are reset 77// and re used. You cannot change the chunk size. 78func (c *chunkedContentCoder) Reset() { 79 c.currChunk = 0 80 c.final = c.final[:0] 81 c.chunkBuf.Reset() 82 c.chunkMetaBuf.Reset() 83 for i := range c.chunkLens { 84 c.chunkLens[i] = 0 85 } 86 c.chunkMeta = c.chunkMeta[:0] 87} 88 89func (c *chunkedContentCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) { 90 total := int(maxDocNum/chunkSize + 1) 91 c.chunkSize = chunkSize 92 if cap(c.chunkLens) < total { 93 c.chunkLens = make([]uint64, total) 94 } else { 95 c.chunkLens = c.chunkLens[:total] 96 } 97 if cap(c.chunkMeta) < total { 98 c.chunkMeta = make([]MetaData, 0, total) 99 } 100} 101 102// Close indicates you are done calling Add() this allows 103// the final chunk to be encoded. 104func (c *chunkedContentCoder) Close() error { 105 return c.flushContents() 106} 107 108func (c *chunkedContentCoder) flushContents() error { 109 // flush the contents, with meta information at first 110 buf := make([]byte, binary.MaxVarintLen64) 111 n := binary.PutUvarint(buf, uint64(len(c.chunkMeta))) 112 _, err := c.chunkMetaBuf.Write(buf[:n]) 113 if err != nil { 114 return err 115 } 116 117 // write out the metaData slice 118 for _, meta := range c.chunkMeta { 119 _, err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvOffset) 120 if err != nil { 121 return err 122 } 123 } 124 125 // write the metadata to final data 126 metaData := c.chunkMetaBuf.Bytes() 127 c.final = append(c.final, c.chunkMetaBuf.Bytes()...) 128 // write the compressed data to the final data 129 c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes()) 130 c.final = append(c.final, c.compressed...) 131 132 c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData)) 133 134 if c.progressiveWrite { 135 _, err := c.w.Write(c.final) 136 if err != nil { 137 return err 138 } 139 c.final = c.final[:0] 140 } 141 142 return nil 143} 144 145// Add encodes the provided byte slice into the correct chunk for the provided 146// doc num. You MUST call Add() with increasing docNums. 147func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error { 148 chunk := docNum / c.chunkSize 149 if chunk != c.currChunk { 150 // flush out the previous chunk details 151 err := c.flushContents() 152 if err != nil { 153 return err 154 } 155 // clearing the chunk specific meta for next chunk 156 c.chunkBuf.Reset() 157 c.chunkMetaBuf.Reset() 158 c.chunkMeta = c.chunkMeta[:0] 159 c.currChunk = chunk 160 } 161 162 // get the starting offset for this doc 163 dvOffset := c.chunkBuf.Len() 164 dvSize, err := c.chunkBuf.Write(vals) 165 if err != nil { 166 return err 167 } 168 169 c.chunkMeta = append(c.chunkMeta, MetaData{ 170 DocNum: docNum, 171 DocDvOffset: uint64(dvOffset + dvSize), 172 }) 173 return nil 174} 175 176// Write commits all the encoded chunked contents to the provided writer. 177// 178// | ..... data ..... | chunk offsets (varints) 179// | position of chunk offsets (uint64) | number of offsets (uint64) | 180// 181func (c *chunkedContentCoder) Write() (int, error) { 182 var tw int 183 184 if c.final != nil { 185 // write out the data section first 186 nw, err := c.w.Write(c.final) 187 tw += nw 188 if err != nil { 189 return tw, err 190 } 191 } 192 193 chunkOffsetsStart := uint64(tw) 194 195 if cap(c.final) < binary.MaxVarintLen64 { 196 c.final = make([]byte, binary.MaxVarintLen64) 197 } else { 198 c.final = c.final[0:binary.MaxVarintLen64] 199 } 200 chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens) 201 // write out the chunk offsets 202 for _, chunkOffset := range chunkOffsets { 203 n := binary.PutUvarint(c.final, chunkOffset) 204 nw, err := c.w.Write(c.final[:n]) 205 tw += nw 206 if err != nil { 207 return tw, err 208 } 209 } 210 211 chunkOffsetsLen := uint64(tw) - chunkOffsetsStart 212 213 c.final = c.final[0:8] 214 // write out the length of chunk offsets 215 binary.BigEndian.PutUint64(c.final, chunkOffsetsLen) 216 nw, err := c.w.Write(c.final) 217 tw += nw 218 if err != nil { 219 return tw, err 220 } 221 222 // write out the number of chunks 223 binary.BigEndian.PutUint64(c.final, uint64(len(c.chunkLens))) 224 nw, err = c.w.Write(c.final) 225 tw += nw 226 if err != nil { 227 return tw, err 228 } 229 230 c.final = c.final[:0] 231 232 return tw, nil 233} 234 235// ReadDocValueBoundary elicits the start, end offsets from a 236// metaData header slice 237func ReadDocValueBoundary(chunk int, metaHeaders []MetaData) (uint64, uint64) { 238 var start uint64 239 if chunk > 0 { 240 start = metaHeaders[chunk-1].DocDvOffset 241 } 242 return start, metaHeaders[chunk].DocDvOffset 243} 244