1// Copyright (c) 2017 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package zap 16 17import ( 18 "bytes" 19 "encoding/binary" 20 "fmt" 21 "io" 22 "os" 23 "sync" 24 25 "github.com/RoaringBitmap/roaring" 26 "github.com/Smerity/govarint" 27 "github.com/blevesearch/bleve/index/scorch/segment" 28 "github.com/couchbase/vellum" 29 mmap "github.com/edsrzf/mmap-go" 30 "github.com/golang/snappy" 31) 32 33// Open returns a zap impl of a segment 34func Open(path string) (segment.Segment, error) { 35 f, err := os.Open(path) 36 if err != nil { 37 return nil, err 38 } 39 mm, err := mmap.Map(f, mmap.RDONLY, 0) 40 if err != nil { 41 // mmap failed, try to close the file 42 _ = f.Close() 43 return nil, err 44 } 45 46 rv := &Segment{ 47 SegmentBase: SegmentBase{ 48 mem: mm[0 : len(mm)-FooterSize], 49 fieldsMap: make(map[string]uint16), 50 fieldDvIterMap: make(map[uint16]*docValueIterator), 51 }, 52 f: f, 53 mm: mm, 54 path: path, 55 refs: 1, 56 } 57 58 err = rv.loadConfig() 59 if err != nil { 60 _ = rv.Close() 61 return nil, err 62 } 63 64 err = rv.loadFields() 65 if err != nil { 66 _ = rv.Close() 67 return nil, err 68 } 69 70 err = rv.loadDvIterators() 71 if err != nil { 72 _ = rv.Close() 73 return nil, err 74 } 75 76 return rv, nil 77} 78 79// SegmentBase is a memory only, read-only implementation of the 80// segment.Segment interface, using zap's data representation. 81type SegmentBase struct { 82 mem []byte 83 memCRC uint32 84 chunkFactor uint32 85 fieldsMap map[string]uint16 // fieldName -> fieldID+1 86 fieldsInv []string // fieldID -> fieldName 87 numDocs uint64 88 storedIndexOffset uint64 89 fieldsIndexOffset uint64 90 docValueOffset uint64 91 dictLocs []uint64 92 fieldDvIterMap map[uint16]*docValueIterator // naive chunk cache per field 93} 94 95func (sb *SegmentBase) AddRef() {} 96func (sb *SegmentBase) DecRef() (err error) { return nil } 97func (sb *SegmentBase) Close() (err error) { return nil } 98 99// Segment implements a persisted segment.Segment interface, by 100// embedding an mmap()'ed SegmentBase. 101type Segment struct { 102 SegmentBase 103 104 f *os.File 105 mm mmap.MMap 106 path string 107 version uint32 108 crc uint32 109 110 m sync.Mutex // Protects the fields that follow. 111 refs int64 112} 113 114func (s *Segment) SizeInBytes() uint64 { 115 // 8 /* size of file pointer */ 116 // 4 /* size of version -> uint32 */ 117 // 4 /* size of crc -> uint32 */ 118 sizeOfUints := 16 119 120 sizeInBytes := (len(s.path) + int(segment.SizeOfString)) + sizeOfUints 121 122 // mutex, refs -> int64 123 sizeInBytes += 16 124 125 // do not include the mmap'ed part 126 return uint64(sizeInBytes) + s.SegmentBase.SizeInBytes() - uint64(len(s.mem)) 127} 128 129func (s *SegmentBase) SizeInBytes() uint64 { 130 // 4 /* size of memCRC -> uint32 */ 131 // 4 /* size of chunkFactor -> uint32 */ 132 // 8 /* size of numDocs -> uint64 */ 133 // 8 /* size of storedIndexOffset -> uint64 */ 134 // 8 /* size of fieldsIndexOffset -> uint64 */ 135 // 8 /* size of docValueOffset -> uint64 */ 136 sizeInBytes := 40 137 138 sizeInBytes += len(s.mem) + int(segment.SizeOfSlice) 139 140 // fieldsMap 141 for k, _ := range s.fieldsMap { 142 sizeInBytes += (len(k) + int(segment.SizeOfString)) + 2 /* size of uint16 */ 143 } 144 sizeInBytes += int(segment.SizeOfMap) /* overhead from map */ 145 146 // fieldsInv, dictLocs 147 for _, entry := range s.fieldsInv { 148 sizeInBytes += (len(entry) + int(segment.SizeOfString)) 149 } 150 sizeInBytes += len(s.dictLocs) * 8 /* size of uint64 */ 151 sizeInBytes += int(segment.SizeOfSlice) * 3 /* overhead from slices */ 152 153 // fieldDvIterMap 154 sizeInBytes += len(s.fieldDvIterMap) * 155 int(segment.SizeOfPointer+2 /* size of uint16 */) 156 for _, entry := range s.fieldDvIterMap { 157 if entry != nil { 158 sizeInBytes += int(entry.sizeInBytes()) 159 } 160 } 161 sizeInBytes += int(segment.SizeOfMap) 162 163 return uint64(sizeInBytes) 164} 165 166func (s *Segment) AddRef() { 167 s.m.Lock() 168 s.refs++ 169 s.m.Unlock() 170} 171 172func (s *Segment) DecRef() (err error) { 173 s.m.Lock() 174 s.refs-- 175 if s.refs == 0 { 176 err = s.closeActual() 177 } 178 s.m.Unlock() 179 return err 180} 181 182func (s *Segment) loadConfig() error { 183 crcOffset := len(s.mm) - 4 184 s.crc = binary.BigEndian.Uint32(s.mm[crcOffset : crcOffset+4]) 185 186 verOffset := crcOffset - 4 187 s.version = binary.BigEndian.Uint32(s.mm[verOffset : verOffset+4]) 188 if s.version != version { 189 return fmt.Errorf("unsupported version %d", s.version) 190 } 191 192 chunkOffset := verOffset - 4 193 s.chunkFactor = binary.BigEndian.Uint32(s.mm[chunkOffset : chunkOffset+4]) 194 195 docValueOffset := chunkOffset - 8 196 s.docValueOffset = binary.BigEndian.Uint64(s.mm[docValueOffset : docValueOffset+8]) 197 198 fieldsIndexOffset := docValueOffset - 8 199 s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) 200 201 storedIndexOffset := fieldsIndexOffset - 8 202 s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedIndexOffset : storedIndexOffset+8]) 203 204 numDocsOffset := storedIndexOffset - 8 205 s.numDocs = binary.BigEndian.Uint64(s.mm[numDocsOffset : numDocsOffset+8]) 206 return nil 207} 208 209func (s *SegmentBase) loadFields() error { 210 // NOTE for now we assume the fields index immediately preceeds 211 // the footer, and if this changes, need to adjust accordingly (or 212 // store explicit length), where s.mem was sliced from s.mm in Open(). 213 fieldsIndexEnd := uint64(len(s.mem)) 214 215 // iterate through fields index 216 var fieldID uint64 217 for s.fieldsIndexOffset+(8*fieldID) < fieldsIndexEnd { 218 addr := binary.BigEndian.Uint64(s.mem[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8]) 219 220 dictLoc, read := binary.Uvarint(s.mem[addr:fieldsIndexEnd]) 221 n := uint64(read) 222 s.dictLocs = append(s.dictLocs, dictLoc) 223 224 var nameLen uint64 225 nameLen, read = binary.Uvarint(s.mem[addr+n : fieldsIndexEnd]) 226 n += uint64(read) 227 228 name := string(s.mem[addr+n : addr+n+nameLen]) 229 s.fieldsInv = append(s.fieldsInv, name) 230 s.fieldsMap[name] = uint16(fieldID + 1) 231 232 fieldID++ 233 } 234 return nil 235} 236 237// Dictionary returns the term dictionary for the specified field 238func (s *SegmentBase) Dictionary(field string) (segment.TermDictionary, error) { 239 dict, err := s.dictionary(field) 240 if err == nil && dict == nil { 241 return &segment.EmptyDictionary{}, nil 242 } 243 return dict, err 244} 245 246func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) { 247 fieldIDPlus1 := sb.fieldsMap[field] 248 if fieldIDPlus1 > 0 { 249 rv = &Dictionary{ 250 sb: sb, 251 field: field, 252 fieldID: fieldIDPlus1 - 1, 253 } 254 255 dictStart := sb.dictLocs[rv.fieldID] 256 if dictStart > 0 { 257 // read the length of the vellum data 258 vellumLen, read := binary.Uvarint(sb.mem[dictStart : dictStart+binary.MaxVarintLen64]) 259 fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen] 260 if fstBytes != nil { 261 rv.fst, err = vellum.Load(fstBytes) 262 if err != nil { 263 return nil, fmt.Errorf("dictionary field %s vellum err: %v", field, err) 264 } 265 } 266 } 267 } 268 269 return rv, nil 270} 271 272// VisitDocument invokes the DocFieldValueVistor for each stored field 273// for the specified doc number 274func (s *SegmentBase) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error { 275 // first make sure this is a valid number in this segment 276 if num < s.numDocs { 277 meta, compressed := s.getDocStoredMetaAndCompressed(num) 278 uncompressed, err := snappy.Decode(nil, compressed) 279 if err != nil { 280 return err 281 } 282 // now decode meta and process 283 reader := bytes.NewReader(meta) 284 decoder := govarint.NewU64Base128Decoder(reader) 285 286 keepGoing := true 287 for keepGoing { 288 field, err := decoder.GetU64() 289 if err == io.EOF { 290 break 291 } 292 if err != nil { 293 return err 294 } 295 typ, err := decoder.GetU64() 296 if err != nil { 297 return err 298 } 299 offset, err := decoder.GetU64() 300 if err != nil { 301 return err 302 } 303 l, err := decoder.GetU64() 304 if err != nil { 305 return err 306 } 307 numap, err := decoder.GetU64() 308 if err != nil { 309 return err 310 } 311 var arrayPos []uint64 312 if numap > 0 { 313 arrayPos = make([]uint64, numap) 314 for i := 0; i < int(numap); i++ { 315 ap, err := decoder.GetU64() 316 if err != nil { 317 return err 318 } 319 arrayPos[i] = ap 320 } 321 } 322 323 value := uncompressed[offset : offset+l] 324 keepGoing = visitor(s.fieldsInv[field], byte(typ), value, arrayPos) 325 } 326 } 327 return nil 328} 329 330// Count returns the number of documents in this segment. 331func (s *SegmentBase) Count() uint64 { 332 return s.numDocs 333} 334 335// DocNumbers returns a bitset corresponding to the doc numbers of all the 336// provided _id strings 337func (s *SegmentBase) DocNumbers(ids []string) (*roaring.Bitmap, error) { 338 rv := roaring.New() 339 340 if len(s.fieldsMap) > 0 { 341 idDict, err := s.dictionary("_id") 342 if err != nil { 343 return nil, err 344 } 345 346 var postings *PostingsList 347 for _, id := range ids { 348 postings, err = idDict.postingsList([]byte(id), nil, postings) 349 if err != nil { 350 return nil, err 351 } 352 if postings.postings != nil { 353 rv.Or(postings.postings) 354 } 355 } 356 } 357 358 return rv, nil 359} 360 361// Fields returns the field names used in this segment 362func (s *SegmentBase) Fields() []string { 363 return s.fieldsInv 364} 365 366// Path returns the path of this segment on disk 367func (s *Segment) Path() string { 368 return s.path 369} 370 371// Close releases all resources associated with this segment 372func (s *Segment) Close() (err error) { 373 return s.DecRef() 374} 375 376func (s *Segment) closeActual() (err error) { 377 if s.mm != nil { 378 err = s.mm.Unmap() 379 } 380 // try to close file even if unmap failed 381 if s.f != nil { 382 err2 := s.f.Close() 383 if err == nil { 384 // try to return first error 385 err = err2 386 } 387 } 388 return 389} 390 391// some helpers i started adding for the command-line utility 392 393// Data returns the underlying mmaped data slice 394func (s *Segment) Data() []byte { 395 return s.mm 396} 397 398// CRC returns the CRC value stored in the file footer 399func (s *Segment) CRC() uint32 { 400 return s.crc 401} 402 403// Version returns the file version in the file footer 404func (s *Segment) Version() uint32 { 405 return s.version 406} 407 408// ChunkFactor returns the chunk factor in the file footer 409func (s *Segment) ChunkFactor() uint32 { 410 return s.chunkFactor 411} 412 413// FieldsIndexOffset returns the fields index offset in the file footer 414func (s *Segment) FieldsIndexOffset() uint64 { 415 return s.fieldsIndexOffset 416} 417 418// StoredIndexOffset returns the stored value index offset in the file footer 419func (s *Segment) StoredIndexOffset() uint64 { 420 return s.storedIndexOffset 421} 422 423// DocValueOffset returns the docValue offset in the file footer 424func (s *Segment) DocValueOffset() uint64 { 425 return s.docValueOffset 426} 427 428// NumDocs returns the number of documents in the file footer 429func (s *Segment) NumDocs() uint64 { 430 return s.numDocs 431} 432 433// DictAddr is a helper function to compute the file offset where the 434// dictionary is stored for the specified field. 435func (s *Segment) DictAddr(field string) (uint64, error) { 436 fieldIDPlus1, ok := s.fieldsMap[field] 437 if !ok { 438 return 0, fmt.Errorf("no such field '%s'", field) 439 } 440 441 return s.dictLocs[fieldIDPlus1-1], nil 442} 443 444func (s *SegmentBase) loadDvIterators() error { 445 if s.docValueOffset == fieldNotUninverted { 446 return nil 447 } 448 449 var read uint64 450 for fieldID, field := range s.fieldsInv { 451 fieldLoc, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64]) 452 if n <= 0 { 453 return fmt.Errorf("loadDvIterators: failed to read the docvalue offsets for field %d", fieldID) 454 } 455 s.fieldDvIterMap[uint16(fieldID)], _ = s.loadFieldDocValueIterator(field, fieldLoc) 456 read += uint64(n) 457 } 458 return nil 459} 460