1//  Copyright (c) 2018 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package scorch
16
17import (
18	"encoding/json"
19	"fmt"
20	"io/ioutil"
21	"os"
22	"sync"
23	"sync/atomic"
24	"time"
25
26	"github.com/RoaringBitmap/roaring"
27	"github.com/blevesearch/bleve/analysis"
28	"github.com/blevesearch/bleve/document"
29	"github.com/blevesearch/bleve/index"
30	"github.com/blevesearch/bleve/index/scorch/segment"
31	"github.com/blevesearch/bleve/index/scorch/segment/zap"
32	"github.com/blevesearch/bleve/index/store"
33	"github.com/blevesearch/bleve/registry"
34	bolt "github.com/etcd-io/bbolt"
35)
36
37const Name = "scorch"
38
39const Version uint8 = 2
40
41var ErrClosed = fmt.Errorf("scorch closed")
42
43type Scorch struct {
44	nextSegmentID uint64
45	stats         Stats
46	iStats        internalStats
47
48	readOnly      bool
49	version       uint8
50	config        map[string]interface{}
51	analysisQueue *index.AnalysisQueue
52	path          string
53
54	unsafeBatch bool
55
56	rootLock             sync.RWMutex
57	root                 *IndexSnapshot // holds 1 ref-count on the root
58	rootPersisted        []chan error   // closed when root is persisted
59	persistedCallbacks   []index.BatchCallback
60	nextSnapshotEpoch    uint64
61	eligibleForRemoval   []uint64        // Index snapshot epochs that are safe to GC.
62	ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
63
64	numSnapshotsToKeep int
65	closeCh            chan struct{}
66	introductions      chan *segmentIntroduction
67	persists           chan *persistIntroduction
68	merges             chan *segmentMerge
69	introducerNotifier chan *epochWatcher
70	revertToSnapshots  chan *snapshotReversion
71	persisterNotifier  chan *epochWatcher
72	rootBolt           *bolt.DB
73	asyncTasks         sync.WaitGroup
74
75	onEvent      func(event Event)
76	onAsyncError func(err error)
77
78	pauseLock sync.RWMutex
79
80	pauseCount uint64
81}
82
83type internalStats struct {
84	persistEpoch          uint64
85	persistSnapshotSize   uint64
86	mergeEpoch            uint64
87	mergeSnapshotSize     uint64
88	newSegBufBytesAdded   uint64
89	newSegBufBytesRemoved uint64
90	analysisBytesAdded    uint64
91	analysisBytesRemoved  uint64
92}
93
94func NewScorch(storeName string,
95	config map[string]interface{},
96	analysisQueue *index.AnalysisQueue) (index.Index, error) {
97	rv := &Scorch{
98		version:              Version,
99		config:               config,
100		analysisQueue:        analysisQueue,
101		nextSnapshotEpoch:    1,
102		closeCh:              make(chan struct{}),
103		ineligibleForRemoval: map[string]bool{},
104	}
105	rv.root = &IndexSnapshot{parent: rv, refs: 1, creator: "NewScorch"}
106	ro, ok := config["read_only"].(bool)
107	if ok {
108		rv.readOnly = ro
109	}
110	ub, ok := config["unsafe_batch"].(bool)
111	if ok {
112		rv.unsafeBatch = ub
113	}
114	ecbName, ok := config["eventCallbackName"].(string)
115	if ok {
116		rv.onEvent = RegistryEventCallbacks[ecbName]
117	}
118	aecbName, ok := config["asyncErrorCallbackName"].(string)
119	if ok {
120		rv.onAsyncError = RegistryAsyncErrorCallbacks[aecbName]
121	}
122	return rv, nil
123}
124
125func (s *Scorch) paused() uint64 {
126	s.pauseLock.Lock()
127	pc := s.pauseCount
128	s.pauseLock.Unlock()
129	return pc
130}
131
132func (s *Scorch) incrPause() {
133	s.pauseLock.Lock()
134	s.pauseCount++
135	s.pauseLock.Unlock()
136}
137
138func (s *Scorch) decrPause() {
139	s.pauseLock.Lock()
140	s.pauseCount--
141	s.pauseLock.Unlock()
142}
143
144func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
145	if s.onEvent != nil {
146		s.incrPause()
147		s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
148		s.decrPause()
149	}
150}
151
152func (s *Scorch) fireAsyncError(err error) {
153	if s.onAsyncError != nil {
154		s.onAsyncError(err)
155	}
156	atomic.AddUint64(&s.stats.TotOnErrors, 1)
157}
158
159func (s *Scorch) Open() error {
160	err := s.openBolt()
161	if err != nil {
162		return err
163	}
164
165	s.asyncTasks.Add(1)
166	go s.mainLoop()
167
168	if !s.readOnly && s.path != "" {
169		s.asyncTasks.Add(1)
170		go s.persisterLoop()
171		s.asyncTasks.Add(1)
172		go s.mergerLoop()
173	}
174
175	return nil
176}
177
178func (s *Scorch) openBolt() error {
179	var ok bool
180	s.path, ok = s.config["path"].(string)
181	if !ok {
182		return fmt.Errorf("must specify path")
183	}
184	if s.path == "" {
185		s.unsafeBatch = true
186	}
187
188	var rootBoltOpt *bolt.Options
189	if s.readOnly {
190		rootBoltOpt = &bolt.Options{
191			ReadOnly: true,
192		}
193	} else {
194		if s.path != "" {
195			err := os.MkdirAll(s.path, 0700)
196			if err != nil {
197				return err
198			}
199		}
200	}
201
202	rootBoltPath := s.path + string(os.PathSeparator) + "root.bolt"
203	var err error
204	if s.path != "" {
205		s.rootBolt, err = bolt.Open(rootBoltPath, 0600, rootBoltOpt)
206		if err != nil {
207			return err
208		}
209
210		// now see if there is any existing state to load
211		err = s.loadFromBolt()
212		if err != nil {
213			_ = s.Close()
214			return err
215		}
216	}
217
218	atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, uint64(len(s.root.segment)))
219
220	s.introductions = make(chan *segmentIntroduction)
221	s.persists = make(chan *persistIntroduction)
222	s.merges = make(chan *segmentMerge)
223	s.introducerNotifier = make(chan *epochWatcher, 1)
224	s.revertToSnapshots = make(chan *snapshotReversion)
225	s.persisterNotifier = make(chan *epochWatcher, 1)
226
227	if !s.readOnly && s.path != "" {
228		err := s.removeOldZapFiles() // Before persister or merger create any new files.
229		if err != nil {
230			_ = s.Close()
231			return err
232		}
233	}
234
235	s.numSnapshotsToKeep = NumSnapshotsToKeep
236	if v, ok := s.config["numSnapshotsToKeep"]; ok {
237		var t int
238		if t, err = parseToInteger(v); err != nil {
239			return fmt.Errorf("numSnapshotsToKeep parse err: %v", err)
240		}
241		if t > 0 {
242			s.numSnapshotsToKeep = t
243		}
244	}
245
246	return nil
247}
248
249func (s *Scorch) Close() (err error) {
250	startTime := time.Now()
251	defer func() {
252		s.fireEvent(EventKindClose, time.Since(startTime))
253	}()
254
255	s.fireEvent(EventKindCloseStart, 0)
256
257	// signal to async tasks we want to close
258	close(s.closeCh)
259	// wait for them to close
260	s.asyncTasks.Wait()
261	// now close the root bolt
262	if s.rootBolt != nil {
263		err = s.rootBolt.Close()
264		s.rootLock.Lock()
265		if s.root != nil {
266			_ = s.root.DecRef()
267		}
268		s.root = nil
269		s.rootLock.Unlock()
270	}
271
272	return
273}
274
275func (s *Scorch) Update(doc *document.Document) error {
276	b := index.NewBatch()
277	b.Update(doc)
278	return s.Batch(b)
279}
280
281func (s *Scorch) Delete(id string) error {
282	b := index.NewBatch()
283	b.Delete(id)
284	return s.Batch(b)
285}
286
287// Batch applices a batch of changes to the index atomically
288func (s *Scorch) Batch(batch *index.Batch) (err error) {
289	start := time.Now()
290
291	defer func() {
292		s.fireEvent(EventKindBatchIntroduction, time.Since(start))
293	}()
294
295	resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
296
297	var numUpdates uint64
298	var numDeletes uint64
299	var numPlainTextBytes uint64
300	var ids []string
301	for docID, doc := range batch.IndexOps {
302		if doc != nil {
303			// insert _id field
304			doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil))
305			numUpdates++
306			numPlainTextBytes += doc.NumPlainTextBytes()
307		} else {
308			numDeletes++
309		}
310		ids = append(ids, docID)
311	}
312
313	// FIXME could sort ids list concurrent with analysis?
314
315	if numUpdates > 0 {
316		go func() {
317			for _, doc := range batch.IndexOps {
318				if doc != nil {
319					aw := index.NewAnalysisWork(s, doc, resultChan)
320					// put the work on the queue
321					s.analysisQueue.Queue(aw)
322				}
323			}
324		}()
325	}
326
327	// wait for analysis result
328	analysisResults := make([]*index.AnalysisResult, int(numUpdates))
329	var itemsDeQueued uint64
330	var totalAnalysisSize int
331	for itemsDeQueued < numUpdates {
332		result := <-resultChan
333		resultSize := result.Size()
334		atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize))
335		totalAnalysisSize += resultSize
336		analysisResults[itemsDeQueued] = result
337		itemsDeQueued++
338	}
339	close(resultChan)
340	defer atomic.AddUint64(&s.iStats.analysisBytesRemoved, uint64(totalAnalysisSize))
341
342	atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start)))
343
344	indexStart := time.Now()
345
346	// notify handlers that we're about to introduce a segment
347	s.fireEvent(EventKindBatchIntroductionStart, 0)
348
349	var newSegment segment.Segment
350	var bufBytes uint64
351	if len(analysisResults) > 0 {
352		newSegment, bufBytes, err = zap.AnalysisResultsToSegmentBase(analysisResults, DefaultChunkFactor)
353		if err != nil {
354			return err
355		}
356		atomic.AddUint64(&s.iStats.newSegBufBytesAdded, bufBytes)
357	} else {
358		atomic.AddUint64(&s.stats.TotBatchesEmpty, 1)
359	}
360
361	err = s.prepareSegment(newSegment, ids, batch.InternalOps, batch.PersistedCallback())
362	if err != nil {
363		if newSegment != nil {
364			_ = newSegment.Close()
365		}
366		atomic.AddUint64(&s.stats.TotOnErrors, 1)
367	} else {
368		atomic.AddUint64(&s.stats.TotUpdates, numUpdates)
369		atomic.AddUint64(&s.stats.TotDeletes, numDeletes)
370		atomic.AddUint64(&s.stats.TotBatches, 1)
371		atomic.AddUint64(&s.stats.TotIndexedPlainTextBytes, numPlainTextBytes)
372	}
373
374	atomic.AddUint64(&s.iStats.newSegBufBytesRemoved, bufBytes)
375	atomic.AddUint64(&s.stats.TotIndexTime, uint64(time.Since(indexStart)))
376
377	return err
378}
379
380func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
381	internalOps map[string][]byte, persistedCallback index.BatchCallback) error {
382
383	// new introduction
384	introduction := &segmentIntroduction{
385		id:                atomic.AddUint64(&s.nextSegmentID, 1),
386		data:              newSegment,
387		ids:               ids,
388		obsoletes:         make(map[uint64]*roaring.Bitmap),
389		internal:          internalOps,
390		applied:           make(chan error),
391		persistedCallback: persistedCallback,
392	}
393
394	if !s.unsafeBatch {
395		introduction.persisted = make(chan error, 1)
396	}
397
398	// optimistically prepare obsoletes outside of rootLock
399	s.rootLock.RLock()
400	root := s.root
401	root.AddRef()
402	s.rootLock.RUnlock()
403
404	defer func() { _ = root.DecRef() }()
405
406	for _, seg := range root.segment {
407		delta, err := seg.segment.DocNumbers(ids)
408		if err != nil {
409			return err
410		}
411		introduction.obsoletes[seg.id] = delta
412	}
413
414	introStartTime := time.Now()
415
416	s.introductions <- introduction
417
418	// block until this segment is applied
419	err := <-introduction.applied
420	if err != nil {
421		return err
422	}
423
424	if introduction.persisted != nil {
425		err = <-introduction.persisted
426	}
427
428	introTime := uint64(time.Since(introStartTime))
429	atomic.AddUint64(&s.stats.TotBatchIntroTime, introTime)
430	if atomic.LoadUint64(&s.stats.MaxBatchIntroTime) < introTime {
431		atomic.StoreUint64(&s.stats.MaxBatchIntroTime, introTime)
432	}
433
434	return err
435}
436
437func (s *Scorch) SetInternal(key, val []byte) error {
438	b := index.NewBatch()
439	b.SetInternal(key, val)
440	return s.Batch(b)
441}
442
443func (s *Scorch) DeleteInternal(key []byte) error {
444	b := index.NewBatch()
445	b.DeleteInternal(key)
446	return s.Batch(b)
447}
448
449// Reader returns a low-level accessor on the index data. Close it to
450// release associated resources.
451func (s *Scorch) Reader() (index.IndexReader, error) {
452	return s.currentSnapshot(), nil
453}
454
455func (s *Scorch) currentSnapshot() *IndexSnapshot {
456	s.rootLock.RLock()
457	rv := s.root
458	if rv != nil {
459		rv.AddRef()
460	}
461	s.rootLock.RUnlock()
462	return rv
463}
464
465func (s *Scorch) Stats() json.Marshaler {
466	return &s.stats
467}
468
469func (s *Scorch) diskFileStats(rootSegmentPaths map[string]struct{}) (uint64,
470	uint64, uint64) {
471	var numFilesOnDisk, numBytesUsedDisk, numBytesOnDiskByRoot uint64
472	if s.path != "" {
473		finfos, err := ioutil.ReadDir(s.path)
474		if err == nil {
475			for _, finfo := range finfos {
476				if !finfo.IsDir() {
477					numBytesUsedDisk += uint64(finfo.Size())
478					numFilesOnDisk++
479					if rootSegmentPaths != nil {
480						fname := s.path + string(os.PathSeparator) + finfo.Name()
481						if _, fileAtRoot := rootSegmentPaths[fname]; fileAtRoot {
482							numBytesOnDiskByRoot += uint64(finfo.Size())
483						}
484					}
485				}
486			}
487		}
488	}
489	// if no root files path given, then consider all disk files.
490	if rootSegmentPaths == nil {
491		return numFilesOnDisk, numBytesUsedDisk, numBytesUsedDisk
492	}
493
494	return numFilesOnDisk, numBytesUsedDisk, numBytesOnDiskByRoot
495}
496
497func (s *Scorch) rootDiskSegmentsPaths() map[string]struct{} {
498	rv := make(map[string]struct{}, len(s.root.segment))
499	for _, segmentSnapshot := range s.root.segment {
500		switch seg := segmentSnapshot.segment.(type) {
501		case *zap.Segment:
502			rv[seg.Path()] = struct{}{}
503		}
504	}
505	return rv
506}
507
508func (s *Scorch) StatsMap() map[string]interface{} {
509	m := s.stats.ToMap()
510
511	s.rootLock.RLock()
512	rootSegPaths := s.rootDiskSegmentsPaths()
513	m["CurFilesIneligibleForRemoval"] = uint64(len(s.ineligibleForRemoval))
514	s.rootLock.RUnlock()
515
516	numFilesOnDisk, numBytesUsedDisk, numBytesOnDiskByRoot := s.diskFileStats(rootSegPaths)
517
518	m["CurOnDiskBytes"] = numBytesUsedDisk
519	m["CurOnDiskFiles"] = numFilesOnDisk
520
521	// TODO: consider one day removing these backwards compatible
522	// names for apps using the old names
523	m["updates"] = m["TotUpdates"]
524	m["deletes"] = m["TotDeletes"]
525	m["batches"] = m["TotBatches"]
526	m["errors"] = m["TotOnErrors"]
527	m["analysis_time"] = m["TotAnalysisTime"]
528	m["index_time"] = m["TotIndexTime"]
529	m["term_searchers_started"] = m["TotTermSearchersStarted"]
530	m["term_searchers_finished"] = m["TotTermSearchersFinished"]
531	m["num_plain_text_bytes_indexed"] = m["TotIndexedPlainTextBytes"]
532	m["num_items_introduced"] = m["TotIntroducedItems"]
533	m["num_items_persisted"] = m["TotPersistedItems"]
534	m["num_recs_to_persist"] = m["TotItemsToPersist"]
535	// total disk bytes found in index directory inclusive of older snapshots
536	m["num_bytes_used_disk"] = numBytesUsedDisk
537	// total disk bytes by the latest root index, exclusive of older snapshots
538	m["num_bytes_used_disk_by_root"] = numBytesOnDiskByRoot
539	m["num_files_on_disk"] = numFilesOnDisk
540	m["num_root_memorysegments"] = m["TotMemorySegmentsAtRoot"]
541	m["num_root_filesegments"] = m["TotFileSegmentsAtRoot"]
542	m["num_persister_nap_pause_completed"] = m["TotPersisterNapPauseCompleted"]
543	m["num_persister_nap_merger_break"] = m["TotPersisterMergerNapBreak"]
544	m["total_compaction_written_bytes"] = m["TotFileMergeWrittenBytes"]
545
546	return m
547}
548
549func (s *Scorch) Analyze(d *document.Document) *index.AnalysisResult {
550	rv := &index.AnalysisResult{
551		Document: d,
552		Analyzed: make([]analysis.TokenFrequencies, len(d.Fields)+len(d.CompositeFields)),
553		Length:   make([]int, len(d.Fields)+len(d.CompositeFields)),
554	}
555
556	for i, field := range d.Fields {
557		if field.Options().IsIndexed() {
558			fieldLength, tokenFreqs := field.Analyze()
559			rv.Analyzed[i] = tokenFreqs
560			rv.Length[i] = fieldLength
561
562			if len(d.CompositeFields) > 0 && field.Name() != "_id" {
563				// see if any of the composite fields need this
564				for _, compositeField := range d.CompositeFields {
565					compositeField.Compose(field.Name(), fieldLength, tokenFreqs)
566				}
567			}
568		}
569	}
570
571	return rv
572}
573
574func (s *Scorch) Advanced() (store.KVStore, error) {
575	return nil, nil
576}
577
578func (s *Scorch) AddEligibleForRemoval(epoch uint64) {
579	s.rootLock.Lock()
580	if s.root == nil || s.root.epoch != epoch {
581		s.eligibleForRemoval = append(s.eligibleForRemoval, epoch)
582	}
583	s.rootLock.Unlock()
584}
585
586func (s *Scorch) MemoryUsed() (memUsed uint64) {
587	indexSnapshot := s.currentSnapshot()
588	if indexSnapshot == nil {
589		return
590	}
591
592	defer func() {
593		_ = indexSnapshot.Close()
594	}()
595
596	// Account for current root snapshot overhead
597	memUsed += uint64(indexSnapshot.Size())
598
599	// Account for snapshot that the persister may be working on
600	persistEpoch := atomic.LoadUint64(&s.iStats.persistEpoch)
601	persistSnapshotSize := atomic.LoadUint64(&s.iStats.persistSnapshotSize)
602	if persistEpoch != 0 && indexSnapshot.epoch > persistEpoch {
603		// the snapshot that the persister is working on isn't the same as
604		// the current snapshot
605		memUsed += persistSnapshotSize
606	}
607
608	// Account for snapshot that the merger may be working on
609	mergeEpoch := atomic.LoadUint64(&s.iStats.mergeEpoch)
610	mergeSnapshotSize := atomic.LoadUint64(&s.iStats.mergeSnapshotSize)
611	if mergeEpoch != 0 && indexSnapshot.epoch > mergeEpoch {
612		// the snapshot that the merger is working on isn't the same as
613		// the current snapshot
614		memUsed += mergeSnapshotSize
615	}
616
617	memUsed += (atomic.LoadUint64(&s.iStats.newSegBufBytesAdded) -
618		atomic.LoadUint64(&s.iStats.newSegBufBytesRemoved))
619
620	memUsed += (atomic.LoadUint64(&s.iStats.analysisBytesAdded) -
621		atomic.LoadUint64(&s.iStats.analysisBytesRemoved))
622
623	return memUsed
624}
625
626func (s *Scorch) markIneligibleForRemoval(filename string) {
627	s.rootLock.Lock()
628	s.ineligibleForRemoval[filename] = true
629	s.rootLock.Unlock()
630}
631
632func (s *Scorch) unmarkIneligibleForRemoval(filename string) {
633	s.rootLock.Lock()
634	delete(s.ineligibleForRemoval, filename)
635	s.rootLock.Unlock()
636}
637
638func init() {
639	registry.RegisterIndexType(Name, NewScorch)
640}
641
642func parseToInteger(i interface{}) (int, error) {
643	switch v := i.(type) {
644	case float64:
645		return int(v), nil
646	case int:
647		return v, nil
648
649	default:
650		return 0, fmt.Errorf("expects int or float64 value")
651	}
652}
653