1// Copyright (c) 2017 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package scorch 16 17import ( 18 "container/heap" 19 "encoding/binary" 20 "fmt" 21 "reflect" 22 "sort" 23 "sync" 24 "sync/atomic" 25 26 "github.com/RoaringBitmap/roaring" 27 "github.com/blevesearch/bleve/document" 28 "github.com/blevesearch/bleve/index" 29 "github.com/blevesearch/bleve/index/scorch/segment" 30 "github.com/couchbase/vellum" 31 lev "github.com/couchbase/vellum/levenshtein" 32) 33 34// re usable, threadsafe levenshtein builders 35var lb1, lb2 *lev.LevenshteinAutomatonBuilder 36 37type asynchSegmentResult struct { 38 dict segment.TermDictionary 39 dictItr segment.DictionaryIterator 40 41 index int 42 docs *roaring.Bitmap 43 44 postings segment.PostingsList 45 46 err error 47} 48 49var reflectStaticSizeIndexSnapshot int 50 51func init() { 52 var is interface{} = IndexSnapshot{} 53 reflectStaticSizeIndexSnapshot = int(reflect.TypeOf(is).Size()) 54 var err error 55 lb1, err = lev.NewLevenshteinAutomatonBuilder(1, true) 56 if err != nil { 57 panic(fmt.Errorf("Levenshtein automaton ed1 builder err: %v", err)) 58 } 59 lb2, err = lev.NewLevenshteinAutomatonBuilder(2, true) 60 if err != nil { 61 panic(fmt.Errorf("Levenshtein automaton ed2 builder err: %v", err)) 62 } 63} 64 65type IndexSnapshot struct { 66 parent *Scorch 67 segment []*SegmentSnapshot 68 offsets []uint64 69 internal map[string][]byte 70 epoch uint64 71 size uint64 72 creator string 73 74 m sync.Mutex // Protects the fields that follow. 75 refs int64 76 77 m2 sync.Mutex // Protects the fields that follow. 78 fieldTFRs map[string][]*IndexSnapshotTermFieldReader // keyed by field, recycled TFR's 79} 80 81func (i *IndexSnapshot) Segments() []*SegmentSnapshot { 82 return i.segment 83} 84 85func (i *IndexSnapshot) Internal() map[string][]byte { 86 return i.internal 87} 88 89func (i *IndexSnapshot) AddRef() { 90 i.m.Lock() 91 i.refs++ 92 i.m.Unlock() 93} 94 95func (i *IndexSnapshot) DecRef() (err error) { 96 i.m.Lock() 97 i.refs-- 98 if i.refs == 0 { 99 for _, s := range i.segment { 100 if s != nil { 101 err2 := s.segment.DecRef() 102 if err == nil { 103 err = err2 104 } 105 } 106 } 107 if i.parent != nil { 108 go i.parent.AddEligibleForRemoval(i.epoch) 109 } 110 } 111 i.m.Unlock() 112 return err 113} 114 115func (i *IndexSnapshot) Close() error { 116 return i.DecRef() 117} 118 119func (i *IndexSnapshot) Size() int { 120 return int(i.size) 121} 122 123func (i *IndexSnapshot) updateSize() { 124 i.size += uint64(reflectStaticSizeIndexSnapshot) 125 for _, s := range i.segment { 126 i.size += uint64(s.Size()) 127 } 128} 129 130func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, 131 makeItr func(i segment.TermDictionary) segment.DictionaryIterator, 132 randomLookup bool) (*IndexSnapshotFieldDict, error) { 133 134 results := make(chan *asynchSegmentResult) 135 for index, segment := range i.segment { 136 go func(index int, segment *SegmentSnapshot) { 137 dict, err := segment.segment.Dictionary(field) 138 if err != nil { 139 results <- &asynchSegmentResult{err: err} 140 } else { 141 if randomLookup { 142 results <- &asynchSegmentResult{dict: dict} 143 } else { 144 results <- &asynchSegmentResult{dictItr: makeItr(dict)} 145 } 146 } 147 }(index, segment) 148 } 149 150 var err error 151 rv := &IndexSnapshotFieldDict{ 152 snapshot: i, 153 cursors: make([]*segmentDictCursor, 0, len(i.segment)), 154 } 155 for count := 0; count < len(i.segment); count++ { 156 asr := <-results 157 if asr.err != nil && err == nil { 158 err = asr.err 159 } else { 160 if !randomLookup { 161 next, err2 := asr.dictItr.Next() 162 if err2 != nil && err == nil { 163 err = err2 164 } 165 if next != nil { 166 rv.cursors = append(rv.cursors, &segmentDictCursor{ 167 itr: asr.dictItr, 168 curr: *next, 169 }) 170 } 171 } else { 172 rv.cursors = append(rv.cursors, &segmentDictCursor{ 173 dict: asr.dict, 174 }) 175 } 176 } 177 } 178 // after ensuring we've read all items on channel 179 if err != nil { 180 return nil, err 181 } 182 183 if !randomLookup { 184 // prepare heap 185 heap.Init(rv) 186 } 187 188 return rv, nil 189} 190 191func (i *IndexSnapshot) FieldDict(field string) (index.FieldDict, error) { 192 return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { 193 return i.Iterator() 194 }, false) 195} 196 197func (i *IndexSnapshot) FieldDictRange(field string, startTerm []byte, 198 endTerm []byte) (index.FieldDict, error) { 199 return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { 200 return i.RangeIterator(string(startTerm), string(endTerm)) 201 }, false) 202} 203 204func (i *IndexSnapshot) FieldDictPrefix(field string, 205 termPrefix []byte) (index.FieldDict, error) { 206 return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { 207 return i.PrefixIterator(string(termPrefix)) 208 }, false) 209} 210 211func (i *IndexSnapshot) FieldDictRegexp(field string, 212 termRegex string) (index.FieldDict, error) { 213 // TODO: potential optimization where the literal prefix represents the, 214 // entire regexp, allowing us to use PrefixIterator(prefixTerm)? 215 216 a, prefixBeg, prefixEnd, err := segment.ParseRegexp(termRegex) 217 if err != nil { 218 return nil, err 219 } 220 221 return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { 222 return i.AutomatonIterator(a, prefixBeg, prefixEnd) 223 }, false) 224} 225 226func (i *IndexSnapshot) getLevAutomaton(term string, 227 fuzziness uint8) (vellum.Automaton, error) { 228 if fuzziness == 1 { 229 return lb1.BuildDfa(term, fuzziness) 230 } else if fuzziness == 2 { 231 return lb2.BuildDfa(term, fuzziness) 232 } 233 return nil, fmt.Errorf("fuzziness exceeds the max limit") 234} 235 236func (i *IndexSnapshot) FieldDictFuzzy(field string, 237 term string, fuzziness int, prefix string) (index.FieldDict, error) { 238 a, err := i.getLevAutomaton(term, uint8(fuzziness)) 239 if err != nil { 240 return nil, err 241 } 242 243 var prefixBeg, prefixEnd []byte 244 if prefix != "" { 245 prefixBeg = []byte(prefix) 246 prefixEnd = segment.IncrementBytes(prefixBeg) 247 } 248 249 return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { 250 return i.AutomatonIterator(a, prefixBeg, prefixEnd) 251 }, false) 252} 253 254func (i *IndexSnapshot) FieldDictOnly(field string, 255 onlyTerms [][]byte, includeCount bool) (index.FieldDict, error) { 256 return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { 257 return i.OnlyIterator(onlyTerms, includeCount) 258 }, false) 259} 260 261func (i *IndexSnapshot) FieldDictContains(field string) (index.FieldDictContains, error) { 262 return i.newIndexSnapshotFieldDict(field, nil, true) 263} 264 265func (i *IndexSnapshot) DocIDReaderAll() (index.DocIDReader, error) { 266 results := make(chan *asynchSegmentResult) 267 for index, segment := range i.segment { 268 go func(index int, segment *SegmentSnapshot) { 269 results <- &asynchSegmentResult{ 270 index: index, 271 docs: segment.DocNumbersLive(), 272 } 273 }(index, segment) 274 } 275 276 return i.newDocIDReader(results) 277} 278 279func (i *IndexSnapshot) DocIDReaderOnly(ids []string) (index.DocIDReader, error) { 280 results := make(chan *asynchSegmentResult) 281 for index, segment := range i.segment { 282 go func(index int, segment *SegmentSnapshot) { 283 docs, err := segment.DocNumbers(ids) 284 if err != nil { 285 results <- &asynchSegmentResult{err: err} 286 } else { 287 results <- &asynchSegmentResult{ 288 index: index, 289 docs: docs, 290 } 291 } 292 }(index, segment) 293 } 294 295 return i.newDocIDReader(results) 296} 297 298func (i *IndexSnapshot) newDocIDReader(results chan *asynchSegmentResult) (index.DocIDReader, error) { 299 rv := &IndexSnapshotDocIDReader{ 300 snapshot: i, 301 iterators: make([]roaring.IntIterable, len(i.segment)), 302 } 303 var err error 304 for count := 0; count < len(i.segment); count++ { 305 asr := <-results 306 if asr.err != nil { 307 if err == nil { 308 // returns the first error encountered 309 err = asr.err 310 } 311 } else if err == nil { 312 rv.iterators[asr.index] = asr.docs.Iterator() 313 } 314 } 315 316 if err != nil { 317 return nil, err 318 } 319 320 return rv, nil 321} 322 323func (i *IndexSnapshot) Fields() ([]string, error) { 324 // FIXME not making this concurrent for now as it's not used in hot path 325 // of any searches at the moment (just a debug aid) 326 fieldsMap := map[string]struct{}{} 327 for _, segment := range i.segment { 328 fields := segment.Fields() 329 for _, field := range fields { 330 fieldsMap[field] = struct{}{} 331 } 332 } 333 rv := make([]string, 0, len(fieldsMap)) 334 for k := range fieldsMap { 335 rv = append(rv, k) 336 } 337 return rv, nil 338} 339 340func (i *IndexSnapshot) GetInternal(key []byte) ([]byte, error) { 341 return i.internal[string(key)], nil 342} 343 344func (i *IndexSnapshot) DocCount() (uint64, error) { 345 var rv uint64 346 for _, segment := range i.segment { 347 rv += segment.Count() 348 } 349 return rv, nil 350} 351 352func (i *IndexSnapshot) Document(id string) (rv *document.Document, err error) { 353 // FIXME could be done more efficiently directly, but reusing for simplicity 354 tfr, err := i.TermFieldReader([]byte(id), "_id", false, false, false) 355 if err != nil { 356 return nil, err 357 } 358 defer func() { 359 if cerr := tfr.Close(); err == nil && cerr != nil { 360 err = cerr 361 } 362 }() 363 364 next, err := tfr.Next(nil) 365 if err != nil { 366 return nil, err 367 } 368 369 if next == nil { 370 // no such doc exists 371 return nil, nil 372 } 373 374 docNum, err := docInternalToNumber(next.ID) 375 if err != nil { 376 return nil, err 377 } 378 segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) 379 380 rv = document.NewDocument(id) 381 err = i.segment[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, val []byte, pos []uint64) bool { 382 if name == "_id" { 383 return true 384 } 385 386 // copy value, array positions to preserve them beyond the scope of this callback 387 value := append([]byte(nil), val...) 388 arrayPos := append([]uint64(nil), pos...) 389 390 switch typ { 391 case 't': 392 rv.AddField(document.NewTextField(name, arrayPos, value)) 393 case 'n': 394 rv.AddField(document.NewNumericFieldFromBytes(name, arrayPos, value)) 395 case 'd': 396 rv.AddField(document.NewDateTimeFieldFromBytes(name, arrayPos, value)) 397 case 'b': 398 rv.AddField(document.NewBooleanFieldFromBytes(name, arrayPos, value)) 399 case 'g': 400 rv.AddField(document.NewGeoPointFieldFromBytes(name, arrayPos, value)) 401 } 402 403 return true 404 }) 405 if err != nil { 406 return nil, err 407 } 408 409 return rv, nil 410} 411 412func (i *IndexSnapshot) segmentIndexAndLocalDocNumFromGlobal(docNum uint64) (int, uint64) { 413 segmentIndex := sort.Search(len(i.offsets), 414 func(x int) bool { 415 return i.offsets[x] > docNum 416 }) - 1 417 418 localDocNum := docNum - i.offsets[segmentIndex] 419 return int(segmentIndex), localDocNum 420} 421 422func (i *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) { 423 docNum, err := docInternalToNumber(id) 424 if err != nil { 425 return "", err 426 } 427 segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) 428 429 v, err := i.segment[segmentIndex].DocID(localDocNum) 430 if err != nil { 431 return "", err 432 } 433 if v == nil { 434 return "", fmt.Errorf("document number %d not found", docNum) 435 } 436 437 return string(v), nil 438} 439 440func (i *IndexSnapshot) InternalID(id string) (rv index.IndexInternalID, err error) { 441 // FIXME could be done more efficiently directly, but reusing for simplicity 442 tfr, err := i.TermFieldReader([]byte(id), "_id", false, false, false) 443 if err != nil { 444 return nil, err 445 } 446 defer func() { 447 if cerr := tfr.Close(); err == nil && cerr != nil { 448 err = cerr 449 } 450 }() 451 452 next, err := tfr.Next(nil) 453 if err != nil || next == nil { 454 return nil, err 455 } 456 457 return next.ID, nil 458} 459 460func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, 461 includeNorm, includeTermVectors bool) (index.TermFieldReader, error) { 462 rv := i.allocTermFieldReaderDicts(field) 463 464 rv.term = term 465 rv.field = field 466 rv.snapshot = i 467 if rv.postings == nil { 468 rv.postings = make([]segment.PostingsList, len(i.segment)) 469 } 470 if rv.iterators == nil { 471 rv.iterators = make([]segment.PostingsIterator, len(i.segment)) 472 } 473 rv.segmentOffset = 0 474 rv.includeFreq = includeFreq 475 rv.includeNorm = includeNorm 476 rv.includeTermVectors = includeTermVectors 477 rv.currPosting = nil 478 rv.currID = rv.currID[:0] 479 480 if rv.dicts == nil { 481 rv.dicts = make([]segment.TermDictionary, len(i.segment)) 482 for i, segment := range i.segment { 483 dict, err := segment.segment.Dictionary(field) 484 if err != nil { 485 return nil, err 486 } 487 rv.dicts[i] = dict 488 } 489 } 490 491 for i, segment := range i.segment { 492 pl, err := rv.dicts[i].PostingsList(term, segment.deleted, rv.postings[i]) 493 if err != nil { 494 return nil, err 495 } 496 rv.postings[i] = pl 497 rv.iterators[i] = pl.Iterator(includeFreq, includeNorm, includeTermVectors, rv.iterators[i]) 498 } 499 atomic.AddUint64(&i.parent.stats.TotTermSearchersStarted, uint64(1)) 500 return rv, nil 501} 502 503func (i *IndexSnapshot) allocTermFieldReaderDicts(field string) (tfr *IndexSnapshotTermFieldReader) { 504 i.m2.Lock() 505 if i.fieldTFRs != nil { 506 tfrs := i.fieldTFRs[field] 507 last := len(tfrs) - 1 508 if last >= 0 { 509 tfr = tfrs[last] 510 tfrs[last] = nil 511 i.fieldTFRs[field] = tfrs[:last] 512 i.m2.Unlock() 513 return 514 } 515 } 516 i.m2.Unlock() 517 return &IndexSnapshotTermFieldReader{ 518 recycle: true, 519 } 520} 521 522func (i *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReader) { 523 if !tfr.recycle { 524 // Do not recycle an optimized unadorned term field reader (used for 525 // ConjunctionUnadorned or DisjunctionUnadorned), during when a fresh 526 // roaring.Bitmap is built by AND-ing or OR-ing individual bitmaps, 527 // and we'll need to release them for GC. (See MB-40916) 528 return 529 } 530 531 i.parent.rootLock.RLock() 532 obsolete := i.parent.root != i 533 i.parent.rootLock.RUnlock() 534 if obsolete { 535 // if we're not the current root (mutations happened), don't bother recycling 536 return 537 } 538 539 i.m2.Lock() 540 if i.fieldTFRs == nil { 541 i.fieldTFRs = map[string][]*IndexSnapshotTermFieldReader{} 542 } 543 i.fieldTFRs[tfr.field] = append(i.fieldTFRs[tfr.field], tfr) 544 i.m2.Unlock() 545} 546 547func docNumberToBytes(buf []byte, in uint64) []byte { 548 if len(buf) != 8 { 549 if cap(buf) >= 8 { 550 buf = buf[0:8] 551 } else { 552 buf = make([]byte, 8) 553 } 554 } 555 binary.BigEndian.PutUint64(buf, in) 556 return buf 557} 558 559func docInternalToNumber(in index.IndexInternalID) (uint64, error) { 560 if len(in) != 8 { 561 return 0, fmt.Errorf("wrong len for IndexInternalID: %q", in) 562 } 563 return binary.BigEndian.Uint64(in), nil 564} 565 566func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID, 567 fields []string, visitor index.DocumentFieldTermVisitor) error { 568 _, err := i.documentVisitFieldTerms(id, fields, visitor, nil) 569 return err 570} 571 572func (i *IndexSnapshot) documentVisitFieldTerms(id index.IndexInternalID, 573 fields []string, visitor index.DocumentFieldTermVisitor, 574 dvs segment.DocVisitState) (segment.DocVisitState, error) { 575 docNum, err := docInternalToNumber(id) 576 if err != nil { 577 return nil, err 578 } 579 580 segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) 581 if segmentIndex >= len(i.segment) { 582 return nil, nil 583 } 584 585 _, dvs, err = i.documentVisitFieldTermsOnSegment( 586 segmentIndex, localDocNum, fields, nil, visitor, dvs) 587 588 return dvs, err 589} 590 591func (i *IndexSnapshot) documentVisitFieldTermsOnSegment( 592 segmentIndex int, localDocNum uint64, fields []string, cFields []string, 593 visitor index.DocumentFieldTermVisitor, dvs segment.DocVisitState) ( 594 cFieldsOut []string, dvsOut segment.DocVisitState, err error) { 595 ss := i.segment[segmentIndex] 596 597 var vFields []string // fields that are visitable via the segment 598 599 ssv, ssvOk := ss.segment.(segment.DocumentFieldTermVisitable) 600 if ssvOk && ssv != nil { 601 vFields, err = ssv.VisitableDocValueFields() 602 if err != nil { 603 return nil, nil, err 604 } 605 } 606 607 var errCh chan error 608 609 // cFields represents the fields that we'll need from the 610 // cachedDocs, and might be optionally be provided by the caller, 611 // if the caller happens to know we're on the same segmentIndex 612 // from a previous invocation 613 if cFields == nil { 614 cFields = subtractStrings(fields, vFields) 615 616 if !ss.cachedDocs.hasFields(cFields) { 617 errCh = make(chan error, 1) 618 619 go func() { 620 err := ss.cachedDocs.prepareFields(cFields, ss) 621 if err != nil { 622 errCh <- err 623 } 624 close(errCh) 625 }() 626 } 627 } 628 629 if ssvOk && ssv != nil && len(vFields) > 0 { 630 dvs, err = ssv.VisitDocumentFieldTerms(localDocNum, fields, visitor, dvs) 631 if err != nil { 632 return nil, nil, err 633 } 634 } 635 636 if errCh != nil { 637 err = <-errCh 638 if err != nil { 639 return nil, nil, err 640 } 641 } 642 643 if len(cFields) > 0 { 644 ss.cachedDocs.visitDoc(localDocNum, cFields, visitor) 645 } 646 647 return cFields, dvs, nil 648} 649 650func (i *IndexSnapshot) DocValueReader(fields []string) ( 651 index.DocValueReader, error) { 652 return &DocValueReader{i: i, fields: fields, currSegmentIndex: -1}, nil 653} 654 655type DocValueReader struct { 656 i *IndexSnapshot 657 fields []string 658 dvs segment.DocVisitState 659 660 currSegmentIndex int 661 currCachedFields []string 662} 663 664func (dvr *DocValueReader) VisitDocValues(id index.IndexInternalID, 665 visitor index.DocumentFieldTermVisitor) (err error) { 666 docNum, err := docInternalToNumber(id) 667 if err != nil { 668 return err 669 } 670 671 segmentIndex, localDocNum := dvr.i.segmentIndexAndLocalDocNumFromGlobal(docNum) 672 if segmentIndex >= len(dvr.i.segment) { 673 return nil 674 } 675 676 if dvr.currSegmentIndex != segmentIndex { 677 dvr.currSegmentIndex = segmentIndex 678 dvr.currCachedFields = nil 679 } 680 681 dvr.currCachedFields, dvr.dvs, err = dvr.i.documentVisitFieldTermsOnSegment( 682 dvr.currSegmentIndex, localDocNum, dvr.fields, dvr.currCachedFields, visitor, dvr.dvs) 683 684 return err 685} 686 687func (i *IndexSnapshot) DumpAll() chan interface{} { 688 rv := make(chan interface{}) 689 go func() { 690 close(rv) 691 }() 692 return rv 693} 694 695func (i *IndexSnapshot) DumpDoc(id string) chan interface{} { 696 rv := make(chan interface{}) 697 go func() { 698 close(rv) 699 }() 700 return rv 701} 702 703func (i *IndexSnapshot) DumpFields() chan interface{} { 704 rv := make(chan interface{}) 705 go func() { 706 close(rv) 707 }() 708 return rv 709} 710 711func (i *IndexSnapshot) diskSegmentsPaths() map[string]struct{} { 712 rv := make(map[string]struct{}, len(i.segment)) 713 for _, segmentSnapshot := range i.segment { 714 if seg, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok { 715 rv[seg.Path()] = struct{}{} 716 } 717 } 718 return rv 719} 720 721// reClaimableDocsRatio gives a ratio about the obsoleted or 722// reclaimable documents present in a given index snapshot. 723func (i *IndexSnapshot) reClaimableDocsRatio() float64 { 724 var totalCount, liveCount uint64 725 for _, segmentSnapshot := range i.segment { 726 if _, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok { 727 totalCount += uint64(segmentSnapshot.FullSize()) 728 liveCount += uint64(segmentSnapshot.Count()) 729 } 730 } 731 732 if totalCount > 0 { 733 return float64(totalCount-liveCount) / float64(totalCount) 734 } 735 return 0 736} 737 738// subtractStrings returns set a minus elements of set b. 739func subtractStrings(a, b []string) []string { 740 if len(b) == 0 { 741 return a 742 } 743 744 rv := make([]string, 0, len(a)) 745OUTER: 746 for _, as := range a { 747 for _, bs := range b { 748 if as == bs { 749 continue OUTER 750 } 751 } 752 rv = append(rv, as) 753 } 754 return rv 755} 756