1// Copyright (c) 2017 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package zap 16 17import ( 18 "bytes" 19 "encoding/binary" 20 "fmt" 21 "math" 22 "reflect" 23 "sort" 24 25 "github.com/blevesearch/bleve/index" 26 "github.com/blevesearch/bleve/index/scorch/segment" 27 "github.com/blevesearch/bleve/size" 28 "github.com/golang/snappy" 29) 30 31var reflectStaticSizedocValueReader int 32 33func init() { 34 var dvi docValueReader 35 reflectStaticSizedocValueReader = int(reflect.TypeOf(dvi).Size()) 36} 37 38type docNumTermsVisitor func(docNum uint64, terms []byte) error 39 40type docVisitState struct { 41 dvrs map[uint16]*docValueReader 42 segment *SegmentBase 43} 44 45type docValueReader struct { 46 field string 47 curChunkNum uint64 48 chunkOffsets []uint64 49 dvDataLoc uint64 50 curChunkHeader []MetaData 51 curChunkData []byte // compressed data cache 52 uncompressed []byte // temp buf for snappy decompression 53} 54 55func (di *docValueReader) size() int { 56 return reflectStaticSizedocValueReader + size.SizeOfPtr + 57 len(di.field) + 58 len(di.chunkOffsets)*size.SizeOfUint64 + 59 len(di.curChunkHeader)*reflectStaticSizeMetaData + 60 len(di.curChunkData) 61} 62 63func (di *docValueReader) cloneInto(rv *docValueReader) *docValueReader { 64 if rv == nil { 65 rv = &docValueReader{} 66 } 67 68 rv.field = di.field 69 rv.curChunkNum = math.MaxUint64 70 rv.chunkOffsets = di.chunkOffsets // immutable, so it's sharable 71 rv.dvDataLoc = di.dvDataLoc 72 rv.curChunkHeader = rv.curChunkHeader[:0] 73 rv.curChunkData = nil 74 rv.uncompressed = rv.uncompressed[:0] 75 76 return rv 77} 78 79func (di *docValueReader) curChunkNumber() uint64 { 80 return di.curChunkNum 81} 82 83func (s *SegmentBase) loadFieldDocValueReader(field string, 84 fieldDvLocStart, fieldDvLocEnd uint64) (*docValueReader, error) { 85 // get the docValue offset for the given fields 86 if fieldDvLocStart == fieldNotUninverted { 87 // no docValues found, nothing to do 88 return nil, nil 89 } 90 91 // read the number of chunks, and chunk offsets position 92 var numChunks, chunkOffsetsPosition uint64 93 94 if fieldDvLocEnd-fieldDvLocStart > 16 { 95 numChunks = binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-8 : fieldDvLocEnd]) 96 // read the length of chunk offsets 97 chunkOffsetsLen := binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-16 : fieldDvLocEnd-8]) 98 // acquire position of chunk offsets 99 chunkOffsetsPosition = (fieldDvLocEnd - 16) - chunkOffsetsLen 100 } else { 101 return nil, fmt.Errorf("loadFieldDocValueReader: fieldDvLoc too small: %d-%d", fieldDvLocEnd, fieldDvLocStart) 102 } 103 104 fdvIter := &docValueReader{ 105 curChunkNum: math.MaxUint64, 106 field: field, 107 chunkOffsets: make([]uint64, int(numChunks)), 108 } 109 110 // read the chunk offsets 111 var offset uint64 112 for i := 0; i < int(numChunks); i++ { 113 loc, read := binary.Uvarint(s.mem[chunkOffsetsPosition+offset : chunkOffsetsPosition+offset+binary.MaxVarintLen64]) 114 if read <= 0 { 115 return nil, fmt.Errorf("corrupted chunk offset during segment load") 116 } 117 fdvIter.chunkOffsets[i] = loc 118 offset += uint64(read) 119 } 120 121 // set the data offset 122 fdvIter.dvDataLoc = fieldDvLocStart 123 124 return fdvIter, nil 125} 126 127func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error { 128 // advance to the chunk where the docValues 129 // reside for the given docNum 130 destChunkDataLoc, curChunkEnd := di.dvDataLoc, di.dvDataLoc 131 start, end := readChunkBoundary(int(chunkNumber), di.chunkOffsets) 132 if start >= end { 133 di.curChunkHeader = di.curChunkHeader[:0] 134 di.curChunkData = nil 135 di.curChunkNum = chunkNumber 136 di.uncompressed = di.uncompressed[:0] 137 return nil 138 } 139 140 destChunkDataLoc += start 141 curChunkEnd += end 142 143 // read the number of docs reside in the chunk 144 numDocs, read := binary.Uvarint(s.mem[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64]) 145 if read <= 0 { 146 return fmt.Errorf("failed to read the chunk") 147 } 148 chunkMetaLoc := destChunkDataLoc + uint64(read) 149 150 offset := uint64(0) 151 if cap(di.curChunkHeader) < int(numDocs) { 152 di.curChunkHeader = make([]MetaData, int(numDocs)) 153 } else { 154 di.curChunkHeader = di.curChunkHeader[:int(numDocs)] 155 } 156 for i := 0; i < int(numDocs); i++ { 157 di.curChunkHeader[i].DocNum, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) 158 offset += uint64(read) 159 di.curChunkHeader[i].DocDvOffset, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) 160 offset += uint64(read) 161 } 162 163 compressedDataLoc := chunkMetaLoc + offset 164 dataLength := curChunkEnd - compressedDataLoc 165 di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength] 166 di.curChunkNum = chunkNumber 167 di.uncompressed = di.uncompressed[:0] 168 return nil 169} 170 171func (di *docValueReader) iterateAllDocValues(s *SegmentBase, visitor docNumTermsVisitor) error { 172 for i := 0; i < len(di.chunkOffsets); i++ { 173 err := di.loadDvChunk(uint64(i), s) 174 if err != nil { 175 return err 176 } 177 if di.curChunkData == nil || len(di.curChunkHeader) == 0 { 178 continue 179 } 180 181 // uncompress the already loaded data 182 uncompressed, err := snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) 183 if err != nil { 184 return err 185 } 186 di.uncompressed = uncompressed 187 188 start := uint64(0) 189 for _, entry := range di.curChunkHeader { 190 err = visitor(entry.DocNum, uncompressed[start:entry.DocDvOffset]) 191 if err != nil { 192 return err 193 } 194 195 start = entry.DocDvOffset 196 } 197 } 198 199 return nil 200} 201 202func (di *docValueReader) visitDocValues(docNum uint64, 203 visitor index.DocumentFieldTermVisitor) error { 204 // binary search the term locations for the docNum 205 start, end := di.getDocValueLocs(docNum) 206 if start == math.MaxUint64 || end == math.MaxUint64 || start == end { 207 return nil 208 } 209 210 var uncompressed []byte 211 var err error 212 // use the uncompressed copy if available 213 if len(di.uncompressed) > 0 { 214 uncompressed = di.uncompressed 215 } else { 216 // uncompress the already loaded data 217 uncompressed, err = snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) 218 if err != nil { 219 return err 220 } 221 di.uncompressed = uncompressed 222 } 223 224 // pick the terms for the given docNum 225 uncompressed = uncompressed[start:end] 226 for { 227 i := bytes.Index(uncompressed, termSeparatorSplitSlice) 228 if i < 0 { 229 break 230 } 231 232 visitor(di.field, uncompressed[0:i]) 233 uncompressed = uncompressed[i+1:] 234 } 235 236 return nil 237} 238 239func (di *docValueReader) getDocValueLocs(docNum uint64) (uint64, uint64) { 240 i := sort.Search(len(di.curChunkHeader), func(i int) bool { 241 return di.curChunkHeader[i].DocNum >= docNum 242 }) 243 if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocNum == docNum { 244 return ReadDocValueBoundary(i, di.curChunkHeader) 245 } 246 return math.MaxUint64, math.MaxUint64 247} 248 249// VisitDocumentFieldTerms is an implementation of the 250// DocumentFieldTermVisitable interface 251func (s *SegmentBase) VisitDocumentFieldTerms(localDocNum uint64, fields []string, 252 visitor index.DocumentFieldTermVisitor, dvsIn segment.DocVisitState) ( 253 segment.DocVisitState, error) { 254 dvs, ok := dvsIn.(*docVisitState) 255 if !ok || dvs == nil { 256 dvs = &docVisitState{} 257 } else { 258 if dvs.segment != s { 259 dvs.segment = s 260 dvs.dvrs = nil 261 } 262 } 263 264 var fieldIDPlus1 uint16 265 if dvs.dvrs == nil { 266 dvs.dvrs = make(map[uint16]*docValueReader, len(fields)) 267 for _, field := range fields { 268 if fieldIDPlus1, ok = s.fieldsMap[field]; !ok { 269 continue 270 } 271 fieldID := fieldIDPlus1 - 1 272 if dvIter, exists := s.fieldDvReaders[fieldID]; exists && 273 dvIter != nil { 274 dvs.dvrs[fieldID] = dvIter.cloneInto(dvs.dvrs[fieldID]) 275 } 276 } 277 } 278 279 // find the chunkNumber where the docValues are stored 280 // NOTE: doc values continue to use legacy chunk mode 281 chunkFactor, err := getChunkSize(LegacyChunkMode, 0, 0) 282 if err != nil { 283 return nil, err 284 } 285 docInChunk := localDocNum / chunkFactor 286 var dvr *docValueReader 287 for _, field := range fields { 288 if fieldIDPlus1, ok = s.fieldsMap[field]; !ok { 289 continue 290 } 291 fieldID := fieldIDPlus1 - 1 292 if dvr, ok = dvs.dvrs[fieldID]; ok && dvr != nil { 293 // check if the chunk is already loaded 294 if docInChunk != dvr.curChunkNumber() { 295 err := dvr.loadDvChunk(docInChunk, s) 296 if err != nil { 297 return dvs, err 298 } 299 } 300 301 _ = dvr.visitDocValues(localDocNum, visitor) 302 } 303 } 304 return dvs, nil 305} 306 307// VisitableDocValueFields returns the list of fields with 308// persisted doc value terms ready to be visitable using the 309// VisitDocumentFieldTerms method. 310func (s *SegmentBase) VisitableDocValueFields() ([]string, error) { 311 return s.fieldDvNames, nil 312} 313