1//  Copyright (c) 2017 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package scorch
16
17import (
18	"container/heap"
19	"encoding/binary"
20	"fmt"
21	"reflect"
22	"sort"
23	"sync"
24	"sync/atomic"
25
26	"github.com/RoaringBitmap/roaring"
27	"github.com/blevesearch/bleve/document"
28	"github.com/blevesearch/bleve/index"
29	"github.com/blevesearch/bleve/index/scorch/segment"
30	"github.com/couchbase/vellum"
31	lev "github.com/couchbase/vellum/levenshtein"
32)
33
34// re usable, threadsafe levenshtein builders
35var lb1, lb2 *lev.LevenshteinAutomatonBuilder
36
37type asynchSegmentResult struct {
38	dict    segment.TermDictionary
39	dictItr segment.DictionaryIterator
40
41	index int
42	docs  *roaring.Bitmap
43
44	postings segment.PostingsList
45
46	err error
47}
48
49var reflectStaticSizeIndexSnapshot int
50
51func init() {
52	var is interface{} = IndexSnapshot{}
53	reflectStaticSizeIndexSnapshot = int(reflect.TypeOf(is).Size())
54	var err error
55	lb1, err = lev.NewLevenshteinAutomatonBuilder(1, true)
56	if err != nil {
57		panic(fmt.Errorf("Levenshtein automaton ed1 builder err: %v", err))
58	}
59	lb2, err = lev.NewLevenshteinAutomatonBuilder(2, true)
60	if err != nil {
61		panic(fmt.Errorf("Levenshtein automaton ed2 builder err: %v", err))
62	}
63}
64
65type IndexSnapshot struct {
66	parent   *Scorch
67	segment  []*SegmentSnapshot
68	offsets  []uint64
69	internal map[string][]byte
70	epoch    uint64
71	size     uint64
72	creator  string
73
74	m    sync.Mutex // Protects the fields that follow.
75	refs int64
76
77	m2        sync.Mutex                                 // Protects the fields that follow.
78	fieldTFRs map[string][]*IndexSnapshotTermFieldReader // keyed by field, recycled TFR's
79}
80
81func (i *IndexSnapshot) Segments() []*SegmentSnapshot {
82	return i.segment
83}
84
85func (i *IndexSnapshot) Internal() map[string][]byte {
86	return i.internal
87}
88
89func (i *IndexSnapshot) AddRef() {
90	i.m.Lock()
91	i.refs++
92	i.m.Unlock()
93}
94
95func (i *IndexSnapshot) DecRef() (err error) {
96	i.m.Lock()
97	i.refs--
98	if i.refs == 0 {
99		for _, s := range i.segment {
100			if s != nil {
101				err2 := s.segment.DecRef()
102				if err == nil {
103					err = err2
104				}
105			}
106		}
107		if i.parent != nil {
108			go i.parent.AddEligibleForRemoval(i.epoch)
109		}
110	}
111	i.m.Unlock()
112	return err
113}
114
115func (i *IndexSnapshot) Close() error {
116	return i.DecRef()
117}
118
119func (i *IndexSnapshot) Size() int {
120	return int(i.size)
121}
122
123func (i *IndexSnapshot) updateSize() {
124	i.size += uint64(reflectStaticSizeIndexSnapshot)
125	for _, s := range i.segment {
126		i.size += uint64(s.Size())
127	}
128}
129
130func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string,
131	makeItr func(i segment.TermDictionary) segment.DictionaryIterator,
132	randomLookup bool) (*IndexSnapshotFieldDict, error) {
133
134	results := make(chan *asynchSegmentResult)
135	for index, segment := range i.segment {
136		go func(index int, segment *SegmentSnapshot) {
137			dict, err := segment.segment.Dictionary(field)
138			if err != nil {
139				results <- &asynchSegmentResult{err: err}
140			} else {
141				if randomLookup {
142					results <- &asynchSegmentResult{dict: dict}
143				} else {
144					results <- &asynchSegmentResult{dictItr: makeItr(dict)}
145				}
146			}
147		}(index, segment)
148	}
149
150	var err error
151	rv := &IndexSnapshotFieldDict{
152		snapshot: i,
153		cursors:  make([]*segmentDictCursor, 0, len(i.segment)),
154	}
155	for count := 0; count < len(i.segment); count++ {
156		asr := <-results
157		if asr.err != nil && err == nil {
158			err = asr.err
159		} else {
160			if !randomLookup {
161				next, err2 := asr.dictItr.Next()
162				if err2 != nil && err == nil {
163					err = err2
164				}
165				if next != nil {
166					rv.cursors = append(rv.cursors, &segmentDictCursor{
167						itr:  asr.dictItr,
168						curr: *next,
169					})
170				}
171			} else {
172				rv.cursors = append(rv.cursors, &segmentDictCursor{
173					dict: asr.dict,
174				})
175			}
176		}
177	}
178	// after ensuring we've read all items on channel
179	if err != nil {
180		return nil, err
181	}
182
183	if !randomLookup {
184		// prepare heap
185		heap.Init(rv)
186	}
187
188	return rv, nil
189}
190
191func (i *IndexSnapshot) FieldDict(field string) (index.FieldDict, error) {
192	return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
193		return i.Iterator()
194	}, false)
195}
196
197func (i *IndexSnapshot) FieldDictRange(field string, startTerm []byte,
198	endTerm []byte) (index.FieldDict, error) {
199	return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
200		return i.RangeIterator(string(startTerm), string(endTerm))
201	}, false)
202}
203
204func (i *IndexSnapshot) FieldDictPrefix(field string,
205	termPrefix []byte) (index.FieldDict, error) {
206	return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
207		return i.PrefixIterator(string(termPrefix))
208	}, false)
209}
210
211func (i *IndexSnapshot) FieldDictRegexp(field string,
212	termRegex string) (index.FieldDict, error) {
213	// TODO: potential optimization where the literal prefix represents the,
214	//       entire regexp, allowing us to use PrefixIterator(prefixTerm)?
215
216	a, prefixBeg, prefixEnd, err := segment.ParseRegexp(termRegex)
217	if err != nil {
218		return nil, err
219	}
220
221	return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
222		return i.AutomatonIterator(a, prefixBeg, prefixEnd)
223	}, false)
224}
225
226func (i *IndexSnapshot) getLevAutomaton(term string,
227	fuzziness uint8) (vellum.Automaton, error) {
228	if fuzziness == 1 {
229		return lb1.BuildDfa(term, fuzziness)
230	} else if fuzziness == 2 {
231		return lb2.BuildDfa(term, fuzziness)
232	}
233	return nil, fmt.Errorf("fuzziness exceeds the max limit")
234}
235
236func (i *IndexSnapshot) FieldDictFuzzy(field string,
237	term string, fuzziness int, prefix string) (index.FieldDict, error) {
238	a, err := i.getLevAutomaton(term, uint8(fuzziness))
239	if err != nil {
240		return nil, err
241	}
242
243	var prefixBeg, prefixEnd []byte
244	if prefix != "" {
245		prefixBeg = []byte(prefix)
246		prefixEnd = segment.IncrementBytes(prefixBeg)
247	}
248
249	return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
250		return i.AutomatonIterator(a, prefixBeg, prefixEnd)
251	}, false)
252}
253
254func (i *IndexSnapshot) FieldDictOnly(field string,
255	onlyTerms [][]byte, includeCount bool) (index.FieldDict, error) {
256	return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator {
257		return i.OnlyIterator(onlyTerms, includeCount)
258	}, false)
259}
260
261func (i *IndexSnapshot) FieldDictContains(field string) (index.FieldDictContains, error) {
262	return i.newIndexSnapshotFieldDict(field, nil, true)
263}
264
265func (i *IndexSnapshot) DocIDReaderAll() (index.DocIDReader, error) {
266	results := make(chan *asynchSegmentResult)
267	for index, segment := range i.segment {
268		go func(index int, segment *SegmentSnapshot) {
269			results <- &asynchSegmentResult{
270				index: index,
271				docs:  segment.DocNumbersLive(),
272			}
273		}(index, segment)
274	}
275
276	return i.newDocIDReader(results)
277}
278
279func (i *IndexSnapshot) DocIDReaderOnly(ids []string) (index.DocIDReader, error) {
280	results := make(chan *asynchSegmentResult)
281	for index, segment := range i.segment {
282		go func(index int, segment *SegmentSnapshot) {
283			docs, err := segment.DocNumbers(ids)
284			if err != nil {
285				results <- &asynchSegmentResult{err: err}
286			} else {
287				results <- &asynchSegmentResult{
288					index: index,
289					docs:  docs,
290				}
291			}
292		}(index, segment)
293	}
294
295	return i.newDocIDReader(results)
296}
297
298func (i *IndexSnapshot) newDocIDReader(results chan *asynchSegmentResult) (index.DocIDReader, error) {
299	rv := &IndexSnapshotDocIDReader{
300		snapshot:  i,
301		iterators: make([]roaring.IntIterable, len(i.segment)),
302	}
303	var err error
304	for count := 0; count < len(i.segment); count++ {
305		asr := <-results
306		if asr.err != nil {
307			if err == nil {
308				// returns the first error encountered
309				err = asr.err
310			}
311		} else if err == nil {
312			rv.iterators[asr.index] = asr.docs.Iterator()
313		}
314	}
315
316	if err != nil {
317		return nil, err
318	}
319
320	return rv, nil
321}
322
323func (i *IndexSnapshot) Fields() ([]string, error) {
324	// FIXME not making this concurrent for now as it's not used in hot path
325	// of any searches at the moment (just a debug aid)
326	fieldsMap := map[string]struct{}{}
327	for _, segment := range i.segment {
328		fields := segment.Fields()
329		for _, field := range fields {
330			fieldsMap[field] = struct{}{}
331		}
332	}
333	rv := make([]string, 0, len(fieldsMap))
334	for k := range fieldsMap {
335		rv = append(rv, k)
336	}
337	return rv, nil
338}
339
340func (i *IndexSnapshot) GetInternal(key []byte) ([]byte, error) {
341	return i.internal[string(key)], nil
342}
343
344func (i *IndexSnapshot) DocCount() (uint64, error) {
345	var rv uint64
346	for _, segment := range i.segment {
347		rv += segment.Count()
348	}
349	return rv, nil
350}
351
352func (i *IndexSnapshot) Document(id string) (rv *document.Document, err error) {
353	// FIXME could be done more efficiently directly, but reusing for simplicity
354	tfr, err := i.TermFieldReader([]byte(id), "_id", false, false, false)
355	if err != nil {
356		return nil, err
357	}
358	defer func() {
359		if cerr := tfr.Close(); err == nil && cerr != nil {
360			err = cerr
361		}
362	}()
363
364	next, err := tfr.Next(nil)
365	if err != nil {
366		return nil, err
367	}
368
369	if next == nil {
370		// no such doc exists
371		return nil, nil
372	}
373
374	docNum, err := docInternalToNumber(next.ID)
375	if err != nil {
376		return nil, err
377	}
378	segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
379
380	rv = document.NewDocument(id)
381	err = i.segment[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, val []byte, pos []uint64) bool {
382		if name == "_id" {
383			return true
384		}
385
386		// copy value, array positions to preserve them beyond the scope of this callback
387		value := append([]byte(nil), val...)
388		arrayPos := append([]uint64(nil), pos...)
389
390		switch typ {
391		case 't':
392			rv.AddField(document.NewTextField(name, arrayPos, value))
393		case 'n':
394			rv.AddField(document.NewNumericFieldFromBytes(name, arrayPos, value))
395		case 'd':
396			rv.AddField(document.NewDateTimeFieldFromBytes(name, arrayPos, value))
397		case 'b':
398			rv.AddField(document.NewBooleanFieldFromBytes(name, arrayPos, value))
399		case 'g':
400			rv.AddField(document.NewGeoPointFieldFromBytes(name, arrayPos, value))
401		}
402
403		return true
404	})
405	if err != nil {
406		return nil, err
407	}
408
409	return rv, nil
410}
411
412func (i *IndexSnapshot) segmentIndexAndLocalDocNumFromGlobal(docNum uint64) (int, uint64) {
413	segmentIndex := sort.Search(len(i.offsets),
414		func(x int) bool {
415			return i.offsets[x] > docNum
416		}) - 1
417
418	localDocNum := docNum - i.offsets[segmentIndex]
419	return int(segmentIndex), localDocNum
420}
421
422func (i *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) {
423	docNum, err := docInternalToNumber(id)
424	if err != nil {
425		return "", err
426	}
427	segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
428
429	v, err := i.segment[segmentIndex].DocID(localDocNum)
430	if err != nil {
431		return "", err
432	}
433	if v == nil {
434		return "", fmt.Errorf("document number %d not found", docNum)
435	}
436
437	return string(v), nil
438}
439
440func (i *IndexSnapshot) InternalID(id string) (rv index.IndexInternalID, err error) {
441	// FIXME could be done more efficiently directly, but reusing for simplicity
442	tfr, err := i.TermFieldReader([]byte(id), "_id", false, false, false)
443	if err != nil {
444		return nil, err
445	}
446	defer func() {
447		if cerr := tfr.Close(); err == nil && cerr != nil {
448			err = cerr
449		}
450	}()
451
452	next, err := tfr.Next(nil)
453	if err != nil || next == nil {
454		return nil, err
455	}
456
457	return next.ID, nil
458}
459
460func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
461	includeNorm, includeTermVectors bool) (index.TermFieldReader, error) {
462	rv := i.allocTermFieldReaderDicts(field)
463
464	rv.term = term
465	rv.field = field
466	rv.snapshot = i
467	if rv.postings == nil {
468		rv.postings = make([]segment.PostingsList, len(i.segment))
469	}
470	if rv.iterators == nil {
471		rv.iterators = make([]segment.PostingsIterator, len(i.segment))
472	}
473	rv.segmentOffset = 0
474	rv.includeFreq = includeFreq
475	rv.includeNorm = includeNorm
476	rv.includeTermVectors = includeTermVectors
477	rv.currPosting = nil
478	rv.currID = rv.currID[:0]
479
480	if rv.dicts == nil {
481		rv.dicts = make([]segment.TermDictionary, len(i.segment))
482		for i, segment := range i.segment {
483			dict, err := segment.segment.Dictionary(field)
484			if err != nil {
485				return nil, err
486			}
487			rv.dicts[i] = dict
488		}
489	}
490
491	for i, segment := range i.segment {
492		pl, err := rv.dicts[i].PostingsList(term, segment.deleted, rv.postings[i])
493		if err != nil {
494			return nil, err
495		}
496		rv.postings[i] = pl
497		rv.iterators[i] = pl.Iterator(includeFreq, includeNorm, includeTermVectors, rv.iterators[i])
498	}
499	atomic.AddUint64(&i.parent.stats.TotTermSearchersStarted, uint64(1))
500	return rv, nil
501}
502
503func (i *IndexSnapshot) allocTermFieldReaderDicts(field string) (tfr *IndexSnapshotTermFieldReader) {
504	i.m2.Lock()
505	if i.fieldTFRs != nil {
506		tfrs := i.fieldTFRs[field]
507		last := len(tfrs) - 1
508		if last >= 0 {
509			tfr = tfrs[last]
510			tfrs[last] = nil
511			i.fieldTFRs[field] = tfrs[:last]
512			i.m2.Unlock()
513			return
514		}
515	}
516	i.m2.Unlock()
517	return &IndexSnapshotTermFieldReader{
518		recycle: true,
519	}
520}
521
522func (i *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReader) {
523	if !tfr.recycle {
524		// Do not recycle an optimized unadorned term field reader (used for
525		// ConjunctionUnadorned or DisjunctionUnadorned), during when a fresh
526		// roaring.Bitmap is built by AND-ing or OR-ing individual bitmaps,
527		// and we'll need to release them for GC. (See MB-40916)
528		return
529	}
530
531	i.parent.rootLock.RLock()
532	obsolete := i.parent.root != i
533	i.parent.rootLock.RUnlock()
534	if obsolete {
535		// if we're not the current root (mutations happened), don't bother recycling
536		return
537	}
538
539	i.m2.Lock()
540	if i.fieldTFRs == nil {
541		i.fieldTFRs = map[string][]*IndexSnapshotTermFieldReader{}
542	}
543	i.fieldTFRs[tfr.field] = append(i.fieldTFRs[tfr.field], tfr)
544	i.m2.Unlock()
545}
546
547func docNumberToBytes(buf []byte, in uint64) []byte {
548	if len(buf) != 8 {
549		if cap(buf) >= 8 {
550			buf = buf[0:8]
551		} else {
552			buf = make([]byte, 8)
553		}
554	}
555	binary.BigEndian.PutUint64(buf, in)
556	return buf
557}
558
559func docInternalToNumber(in index.IndexInternalID) (uint64, error) {
560	if len(in) != 8 {
561		return 0, fmt.Errorf("wrong len for IndexInternalID: %q", in)
562	}
563	return binary.BigEndian.Uint64(in), nil
564}
565
566func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID,
567	fields []string, visitor index.DocumentFieldTermVisitor) error {
568	_, err := i.documentVisitFieldTerms(id, fields, visitor, nil)
569	return err
570}
571
572func (i *IndexSnapshot) documentVisitFieldTerms(id index.IndexInternalID,
573	fields []string, visitor index.DocumentFieldTermVisitor,
574	dvs segment.DocVisitState) (segment.DocVisitState, error) {
575	docNum, err := docInternalToNumber(id)
576	if err != nil {
577		return nil, err
578	}
579
580	segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
581	if segmentIndex >= len(i.segment) {
582		return nil, nil
583	}
584
585	_, dvs, err = i.documentVisitFieldTermsOnSegment(
586		segmentIndex, localDocNum, fields, nil, visitor, dvs)
587
588	return dvs, err
589}
590
591func (i *IndexSnapshot) documentVisitFieldTermsOnSegment(
592	segmentIndex int, localDocNum uint64, fields []string, cFields []string,
593	visitor index.DocumentFieldTermVisitor, dvs segment.DocVisitState) (
594	cFieldsOut []string, dvsOut segment.DocVisitState, err error) {
595	ss := i.segment[segmentIndex]
596
597	var vFields []string // fields that are visitable via the segment
598
599	ssv, ssvOk := ss.segment.(segment.DocumentFieldTermVisitable)
600	if ssvOk && ssv != nil {
601		vFields, err = ssv.VisitableDocValueFields()
602		if err != nil {
603			return nil, nil, err
604		}
605	}
606
607	var errCh chan error
608
609	// cFields represents the fields that we'll need from the
610	// cachedDocs, and might be optionally be provided by the caller,
611	// if the caller happens to know we're on the same segmentIndex
612	// from a previous invocation
613	if cFields == nil {
614		cFields = subtractStrings(fields, vFields)
615
616		if !ss.cachedDocs.hasFields(cFields) {
617			errCh = make(chan error, 1)
618
619			go func() {
620				err := ss.cachedDocs.prepareFields(cFields, ss)
621				if err != nil {
622					errCh <- err
623				}
624				close(errCh)
625			}()
626		}
627	}
628
629	if ssvOk && ssv != nil && len(vFields) > 0 {
630		dvs, err = ssv.VisitDocumentFieldTerms(localDocNum, fields, visitor, dvs)
631		if err != nil {
632			return nil, nil, err
633		}
634	}
635
636	if errCh != nil {
637		err = <-errCh
638		if err != nil {
639			return nil, nil, err
640		}
641	}
642
643	if len(cFields) > 0 {
644		ss.cachedDocs.visitDoc(localDocNum, cFields, visitor)
645	}
646
647	return cFields, dvs, nil
648}
649
650func (i *IndexSnapshot) DocValueReader(fields []string) (
651	index.DocValueReader, error) {
652	return &DocValueReader{i: i, fields: fields, currSegmentIndex: -1}, nil
653}
654
655type DocValueReader struct {
656	i      *IndexSnapshot
657	fields []string
658	dvs    segment.DocVisitState
659
660	currSegmentIndex int
661	currCachedFields []string
662}
663
664func (dvr *DocValueReader) VisitDocValues(id index.IndexInternalID,
665	visitor index.DocumentFieldTermVisitor) (err error) {
666	docNum, err := docInternalToNumber(id)
667	if err != nil {
668		return err
669	}
670
671	segmentIndex, localDocNum := dvr.i.segmentIndexAndLocalDocNumFromGlobal(docNum)
672	if segmentIndex >= len(dvr.i.segment) {
673		return nil
674	}
675
676	if dvr.currSegmentIndex != segmentIndex {
677		dvr.currSegmentIndex = segmentIndex
678		dvr.currCachedFields = nil
679	}
680
681	dvr.currCachedFields, dvr.dvs, err = dvr.i.documentVisitFieldTermsOnSegment(
682		dvr.currSegmentIndex, localDocNum, dvr.fields, dvr.currCachedFields, visitor, dvr.dvs)
683
684	return err
685}
686
687func (i *IndexSnapshot) DumpAll() chan interface{} {
688	rv := make(chan interface{})
689	go func() {
690		close(rv)
691	}()
692	return rv
693}
694
695func (i *IndexSnapshot) DumpDoc(id string) chan interface{} {
696	rv := make(chan interface{})
697	go func() {
698		close(rv)
699	}()
700	return rv
701}
702
703func (i *IndexSnapshot) DumpFields() chan interface{} {
704	rv := make(chan interface{})
705	go func() {
706		close(rv)
707	}()
708	return rv
709}
710
711func (i *IndexSnapshot) diskSegmentsPaths() map[string]struct{} {
712	rv := make(map[string]struct{}, len(i.segment))
713	for _, segmentSnapshot := range i.segment {
714		if seg, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok {
715			rv[seg.Path()] = struct{}{}
716		}
717	}
718	return rv
719}
720
721// reClaimableDocsRatio gives a ratio about the obsoleted or
722// reclaimable documents present in a given index snapshot.
723func (i *IndexSnapshot) reClaimableDocsRatio() float64 {
724	var totalCount, liveCount uint64
725	for _, segmentSnapshot := range i.segment {
726		if _, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok {
727			totalCount += uint64(segmentSnapshot.FullSize())
728			liveCount += uint64(segmentSnapshot.Count())
729		}
730	}
731
732	if totalCount > 0 {
733		return float64(totalCount-liveCount) / float64(totalCount)
734	}
735	return 0
736}
737
738// subtractStrings returns set a minus elements of set b.
739func subtractStrings(a, b []string) []string {
740	if len(b) == 0 {
741		return a
742	}
743
744	rv := make([]string, 0, len(a))
745OUTER:
746	for _, as := range a {
747		for _, bs := range b {
748			if as == bs {
749				continue OUTER
750			}
751		}
752		rv = append(rv, as)
753	}
754	return rv
755}
756