1package tsi1
2
3import (
4	"errors"
5	"fmt"
6	"io/ioutil"
7	"os"
8	"path/filepath"
9	"regexp"
10	"runtime"
11	"strconv"
12	"sync"
13	"sync/atomic"
14	"unsafe"
15
16	"github.com/cespare/xxhash"
17	"github.com/influxdata/influxdb/models"
18	"github.com/influxdata/influxdb/pkg/estimator"
19	"github.com/influxdata/influxdb/pkg/estimator/hll"
20	"github.com/influxdata/influxdb/pkg/slices"
21	"github.com/influxdata/influxdb/tsdb"
22	"github.com/influxdata/influxql"
23	"go.uber.org/zap"
24)
25
26// IndexName is the name of the index.
27const IndexName = tsdb.TSI1IndexName
28
29// ErrCompactionInterrupted is returned if compactions are disabled or
30// an index is closed while a compaction is occurring.
31var ErrCompactionInterrupted = errors.New("tsi1: compaction interrupted")
32
33func init() {
34	if os.Getenv("INFLUXDB_EXP_TSI_PARTITIONS") != "" {
35		i, err := strconv.Atoi(os.Getenv("INFLUXDB_EXP_TSI_PARTITIONS"))
36		if err != nil {
37			panic(err)
38		}
39		DefaultPartitionN = uint64(i)
40	}
41
42	tsdb.RegisterIndex(IndexName, func(_ uint64, db, path string, _ *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Index {
43		idx := NewIndex(sfile, db,
44			WithPath(path),
45			WithMaximumLogFileSize(int64(opt.Config.MaxIndexLogFileSize)),
46			WithSeriesIDCacheSize(opt.Config.SeriesIDSetCacheSize),
47		)
48		return idx
49	})
50}
51
52// DefaultPartitionN determines how many shards the index will be partitioned into.
53//
54// NOTE: Currently, this must not be change once a database is created. Further,
55// it must also be a power of 2.
56//
57var DefaultPartitionN uint64 = 8
58
59// An IndexOption is a functional option for changing the configuration of
60// an Index.
61type IndexOption func(i *Index)
62
63// WithPath sets the root path of the Index
64var WithPath = func(path string) IndexOption {
65	return func(i *Index) {
66		i.path = path
67	}
68}
69
70// DisableCompactions disables compactions on the Index.
71var DisableCompactions = func() IndexOption {
72	return func(i *Index) {
73		i.disableCompactions = true
74	}
75}
76
77// WithLogger sets the logger for the Index.
78var WithLogger = func(l zap.Logger) IndexOption {
79	return func(i *Index) {
80		i.logger = l.With(zap.String("index", "tsi"))
81	}
82}
83
84// WithMaximumLogFileSize sets the maximum size of LogFiles before they're
85// compacted into IndexFiles.
86var WithMaximumLogFileSize = func(size int64) IndexOption {
87	return func(i *Index) {
88		i.maxLogFileSize = size
89	}
90}
91
92// DisableFsync disables flushing and syncing of underlying files. Primarily this
93// impacts the LogFiles. This option can be set when working with the index in
94// an offline manner, for cases where a hard failure can be overcome by re-running the tooling.
95var DisableFsync = func() IndexOption {
96	return func(i *Index) {
97		i.disableFsync = true
98	}
99}
100
101// WithLogFileBufferSize sets the size of the buffer used within LogFiles.
102// Typically appending an entry to a LogFile involves writing 11 or 12 bytes, so
103// depending on how many new series are being created within a batch, it may
104// be appropriate to set this.
105var WithLogFileBufferSize = func(sz int) IndexOption {
106	return func(i *Index) {
107		if sz > 1<<17 { // 128K
108			sz = 1 << 17
109		} else if sz < 1<<12 {
110			sz = 1 << 12 // 4K (runtime default)
111		}
112		i.logfileBufferSize = sz
113	}
114}
115
116// WithSeriesIDCacheSize sets the size of the series id set cache.
117// If set to 0, then the cache is disabled.
118var WithSeriesIDCacheSize = func(sz int) IndexOption {
119	return func(i *Index) {
120		i.tagValueCacheSize = sz
121	}
122}
123
124// Index represents a collection of layered index files and WAL.
125type Index struct {
126	mu         sync.RWMutex
127	partitions []*Partition
128	opened     bool
129
130	tagValueCache     *TagValueSeriesIDCache
131	tagValueCacheSize int
132
133	// The following may be set when initializing an Index.
134	path               string      // Root directory of the index partitions.
135	disableCompactions bool        // Initially disables compactions on the index.
136	maxLogFileSize     int64       // Maximum size of a LogFile before it's compacted.
137	logfileBufferSize  int         // The size of the buffer used by the LogFile.
138	disableFsync       bool        // Disables flushing buffers and fsyning files. Used when working with indexes offline.
139	logger             *zap.Logger // Index's logger.
140
141	// The following must be set when initializing an Index.
142	sfile    *tsdb.SeriesFile // series lookup file
143	database string           // Name of database.
144
145	// Cached sketches.
146	mSketch, mTSketch estimator.Sketch // Measurement sketches
147	sSketch, sTSketch estimator.Sketch // Series sketches
148
149	// Index's version.
150	version int
151
152	// Number of partitions used by the index.
153	PartitionN uint64
154}
155
156func (i *Index) UniqueReferenceID() uintptr {
157	return uintptr(unsafe.Pointer(i))
158}
159
160// NewIndex returns a new instance of Index.
161func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *Index {
162	idx := &Index{
163		tagValueCacheSize: tsdb.DefaultSeriesIDSetCacheSize,
164		maxLogFileSize:    tsdb.DefaultMaxIndexLogFileSize,
165		logger:            zap.NewNop(),
166		version:           Version,
167		sfile:             sfile,
168		database:          database,
169		mSketch:           hll.NewDefaultPlus(),
170		mTSketch:          hll.NewDefaultPlus(),
171		sSketch:           hll.NewDefaultPlus(),
172		sTSketch:          hll.NewDefaultPlus(),
173		PartitionN:        DefaultPartitionN,
174	}
175
176	for _, option := range options {
177		option(idx)
178	}
179
180	idx.tagValueCache = NewTagValueSeriesIDCache(idx.tagValueCacheSize)
181	return idx
182}
183
184// Bytes estimates the memory footprint of this Index, in bytes.
185func (i *Index) Bytes() int {
186	var b int
187	i.mu.RLock()
188	b += 24 // mu RWMutex is 24 bytes
189	b += int(unsafe.Sizeof(i.partitions))
190	for _, p := range i.partitions {
191		b += int(unsafe.Sizeof(p)) + p.bytes()
192	}
193	b += int(unsafe.Sizeof(i.opened))
194	b += int(unsafe.Sizeof(i.path)) + len(i.path)
195	b += int(unsafe.Sizeof(i.disableCompactions))
196	b += int(unsafe.Sizeof(i.maxLogFileSize))
197	b += int(unsafe.Sizeof(i.logger))
198	b += int(unsafe.Sizeof(i.sfile))
199	// Do not count SeriesFile because it belongs to the code that constructed this Index.
200	b += int(unsafe.Sizeof(i.mSketch)) + i.mSketch.Bytes()
201	b += int(unsafe.Sizeof(i.mTSketch)) + i.mTSketch.Bytes()
202	b += int(unsafe.Sizeof(i.sSketch)) + i.sSketch.Bytes()
203	b += int(unsafe.Sizeof(i.sTSketch)) + i.sTSketch.Bytes()
204	b += int(unsafe.Sizeof(i.database)) + len(i.database)
205	b += int(unsafe.Sizeof(i.version))
206	b += int(unsafe.Sizeof(i.PartitionN))
207	i.mu.RUnlock()
208	return b
209}
210
211// Database returns the name of the database the index was initialized with.
212func (i *Index) Database() string {
213	return i.database
214}
215
216// WithLogger sets the logger on the index after it's been created.
217//
218// It's not safe to call WithLogger after the index has been opened, or before
219// it has been closed.
220func (i *Index) WithLogger(l *zap.Logger) {
221	i.mu.Lock()
222	defer i.mu.Unlock()
223	i.logger = l.With(zap.String("index", "tsi"))
224}
225
226// Type returns the type of Index this is.
227func (i *Index) Type() string { return IndexName }
228
229// SeriesFile returns the series file attached to the index.
230func (i *Index) SeriesFile() *tsdb.SeriesFile { return i.sfile }
231
232// SeriesIDSet returns the set of series ids associated with series in this
233// index. Any series IDs for series no longer present in the index are filtered out.
234func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet {
235	seriesIDSet := tsdb.NewSeriesIDSet()
236	others := make([]*tsdb.SeriesIDSet, 0, i.PartitionN)
237	for _, p := range i.partitions {
238		others = append(others, p.seriesIDSet)
239	}
240	seriesIDSet.Merge(others...)
241	return seriesIDSet
242}
243
244// Open opens the index.
245func (i *Index) Open() error {
246	i.mu.Lock()
247	defer i.mu.Unlock()
248
249	if i.opened {
250		return errors.New("index already open")
251	}
252
253	// Ensure root exists.
254	if err := os.MkdirAll(i.path, 0777); err != nil {
255		return err
256	}
257
258	// Initialize index partitions.
259	i.partitions = make([]*Partition, i.PartitionN)
260	for j := 0; j < len(i.partitions); j++ {
261		p := NewPartition(i.sfile, filepath.Join(i.path, fmt.Sprint(j)))
262		p.MaxLogFileSize = i.maxLogFileSize
263		p.nosync = i.disableFsync
264		p.logbufferSize = i.logfileBufferSize
265		p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1)))
266		i.partitions[j] = p
267	}
268
269	// Open all the Partitions in parallel.
270	partitionN := len(i.partitions)
271	n := i.availableThreads()
272
273	// Store results.
274	errC := make(chan error, partitionN)
275
276	// Run fn on each partition using a fixed number of goroutines.
277	var pidx uint32 // Index of maximum Partition being worked on.
278	for k := 0; k < n; k++ {
279		go func(k int) {
280			for {
281				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
282				if idx >= partitionN {
283					return // No more work.
284				}
285				err := i.partitions[idx].Open()
286				errC <- err
287			}
288		}(k)
289	}
290
291	// Check for error
292	for i := 0; i < partitionN; i++ {
293		if err := <-errC; err != nil {
294			return err
295		}
296	}
297
298	// Refresh cached sketches.
299	if err := i.updateSeriesSketches(); err != nil {
300		return err
301	} else if err := i.updateMeasurementSketches(); err != nil {
302		return err
303	}
304
305	// Mark opened.
306	i.opened = true
307	i.logger.Info(fmt.Sprintf("index opened with %d partitions", partitionN))
308	return nil
309}
310
311// Compact requests a compaction of partitions.
312func (i *Index) Compact() {
313	i.mu.Lock()
314	defer i.mu.Unlock()
315	for _, p := range i.partitions {
316		p.Compact()
317	}
318}
319
320func (i *Index) EnableCompactions() {
321	for _, p := range i.partitions {
322		p.EnableCompactions()
323	}
324}
325
326func (i *Index) DisableCompactions() {
327	for _, p := range i.partitions {
328		p.DisableCompactions()
329	}
330}
331
332// Wait blocks until all outstanding compactions have completed.
333func (i *Index) Wait() {
334	for _, p := range i.partitions {
335		p.Wait()
336	}
337}
338
339// Close closes the index.
340func (i *Index) Close() error {
341	// Lock index and close partitions.
342	i.mu.Lock()
343	defer i.mu.Unlock()
344
345	for _, p := range i.partitions {
346		if err := p.Close(); err != nil {
347			return err
348		}
349	}
350
351	// Mark index as closed.
352	i.opened = false
353	return nil
354}
355
356// Path returns the path the index was opened with.
357func (i *Index) Path() string { return i.path }
358
359// PartitionAt returns the partition by index.
360func (i *Index) PartitionAt(index int) *Partition {
361	return i.partitions[index]
362}
363
364// partition returns the appropriate Partition for a provided series key.
365func (i *Index) partition(key []byte) *Partition {
366	return i.partitions[int(xxhash.Sum64(key)&(i.PartitionN-1))]
367}
368
369// partitionIdx returns the index of the partition that key belongs in.
370func (i *Index) partitionIdx(key []byte) int {
371	return int(xxhash.Sum64(key) & (i.PartitionN - 1))
372}
373
374// availableThreads returns the minimum of GOMAXPROCS and the number of
375// partitions in the Index.
376func (i *Index) availableThreads() int {
377	n := runtime.GOMAXPROCS(0)
378	if len(i.partitions) < n {
379		return len(i.partitions)
380	}
381	return n
382}
383
384// updateMeasurementSketches rebuilds the cached measurement sketches.
385func (i *Index) updateMeasurementSketches() error {
386	for j := 0; j < int(i.PartitionN); j++ {
387		if s, t, err := i.partitions[j].MeasurementsSketches(); err != nil {
388			return err
389		} else if i.mSketch.Merge(s); err != nil {
390			return err
391		} else if i.mTSketch.Merge(t); err != nil {
392			return err
393		}
394	}
395	return nil
396}
397
398// updateSeriesSketches rebuilds the cached series sketches.
399func (i *Index) updateSeriesSketches() error {
400	for j := 0; j < int(i.PartitionN); j++ {
401		if s, t, err := i.partitions[j].SeriesSketches(); err != nil {
402			return err
403		} else if i.sSketch.Merge(s); err != nil {
404			return err
405		} else if i.sTSketch.Merge(t); err != nil {
406			return err
407		}
408	}
409	return nil
410}
411
412// SetFieldSet sets a shared field set from the engine.
413func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
414	for _, p := range i.partitions {
415		p.SetFieldSet(fs)
416	}
417}
418
419// FieldSet returns the assigned fieldset.
420func (i *Index) FieldSet() *tsdb.MeasurementFieldSet {
421	if len(i.partitions) == 0 {
422		return nil
423	}
424	return i.partitions[0].FieldSet()
425}
426
427// ForEachMeasurementName iterates over all measurement names in the index,
428// applying fn. It returns the first error encountered, if any.
429//
430// ForEachMeasurementName does not call fn on each partition concurrently so the
431// call may provide a non-goroutine safe fn.
432func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
433	itr, err := i.MeasurementIterator()
434	if err != nil {
435		return err
436	} else if itr == nil {
437		return nil
438	}
439	defer itr.Close()
440
441	// Iterate over all measurements.
442	for {
443		e, err := itr.Next()
444		if err != nil {
445			return err
446		} else if e == nil {
447			break
448		}
449
450		if err := fn(e); err != nil {
451			return err
452		}
453	}
454	return nil
455}
456
457// MeasurementExists returns true if a measurement exists.
458func (i *Index) MeasurementExists(name []byte) (bool, error) {
459	n := i.availableThreads()
460
461	// Store errors
462	var found uint32 // Use this to signal we found the measurement.
463	errC := make(chan error, i.PartitionN)
464
465	// Check each partition for the measurement concurrently.
466	var pidx uint32 // Index of maximum Partition being worked on.
467	for k := 0; k < n; k++ {
468		go func() {
469			for {
470				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check
471				if idx >= len(i.partitions) {
472					return // No more work.
473				}
474
475				// Check if the measurement has been found. If it has don't
476				// need to check this partition and can just move on.
477				if atomic.LoadUint32(&found) == 1 {
478					errC <- nil
479					continue
480				}
481
482				b, err := i.partitions[idx].MeasurementExists(name)
483				if b {
484					atomic.StoreUint32(&found, 1)
485				}
486				errC <- err
487			}
488		}()
489	}
490
491	// Check for error
492	for i := 0; i < cap(errC); i++ {
493		if err := <-errC; err != nil {
494			return false, err
495		}
496	}
497
498	// Check if we found the measurement.
499	return atomic.LoadUint32(&found) == 1, nil
500}
501
502// MeasurementHasSeries returns true if a measurement has non-tombstoned series.
503func (i *Index) MeasurementHasSeries(name []byte) (bool, error) {
504	for _, p := range i.partitions {
505		if v, err := p.MeasurementHasSeries(name); err != nil {
506			return false, err
507		} else if v {
508			return true, nil
509		}
510	}
511	return false, nil
512}
513
514// fetchByteValues is a helper for gathering values from each partition in the index,
515// based on some criteria.
516//
517// fn is a function that works on partition idx and calls into some method on
518// the partition that returns some ordered values.
519func (i *Index) fetchByteValues(fn func(idx int) ([][]byte, error)) ([][]byte, error) {
520	n := i.availableThreads()
521
522	// Store results.
523	names := make([][][]byte, i.PartitionN)
524	errC := make(chan error, i.PartitionN)
525
526	var pidx uint32 // Index of maximum Partition being worked on.
527	for k := 0; k < n; k++ {
528		go func() {
529			for {
530				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
531				if idx >= len(i.partitions) {
532					return // No more work.
533				}
534
535				pnames, err := fn(idx)
536
537				// This is safe since there are no readers on names until all
538				// the writers are done.
539				names[idx] = pnames
540				errC <- err
541			}
542		}()
543	}
544
545	// Check for error
546	for i := 0; i < cap(errC); i++ {
547		if err := <-errC; err != nil {
548			return nil, err
549		}
550	}
551
552	// It's now safe to read from names.
553	return slices.MergeSortedBytes(names...), nil
554}
555
556// MeasurementIterator returns an iterator over all measurements.
557func (i *Index) MeasurementIterator() (tsdb.MeasurementIterator, error) {
558	itrs := make([]tsdb.MeasurementIterator, 0, len(i.partitions))
559	for _, p := range i.partitions {
560		itr, err := p.MeasurementIterator()
561		if err != nil {
562			tsdb.MeasurementIterators(itrs).Close()
563			return nil, err
564		} else if itr != nil {
565			itrs = append(itrs, itr)
566		}
567	}
568	return tsdb.MergeMeasurementIterators(itrs...), nil
569}
570
571// MeasurementSeriesIDIterator returns an iterator over all series in a measurement.
572func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) {
573	itrs := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
574	for _, p := range i.partitions {
575		itr, err := p.MeasurementSeriesIDIterator(name)
576		if err != nil {
577			tsdb.SeriesIDIterators(itrs).Close()
578			return nil, err
579		} else if itr != nil {
580			itrs = append(itrs, itr)
581		}
582	}
583	return tsdb.MergeSeriesIDIterators(itrs...), nil
584}
585
586// MeasurementNamesByRegex returns measurement names for the provided regex.
587func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
588	return i.fetchByteValues(func(idx int) ([][]byte, error) {
589		return i.partitions[idx].MeasurementNamesByRegex(re)
590	})
591}
592
593// DropMeasurement deletes a measurement from the index. It returns the first
594// error encountered, if any.
595func (i *Index) DropMeasurement(name []byte) error {
596	n := i.availableThreads()
597
598	// Store results.
599	errC := make(chan error, i.PartitionN)
600
601	var pidx uint32 // Index of maximum Partition being worked on.
602	for k := 0; k < n; k++ {
603		go func() {
604			for {
605				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
606				if idx >= len(i.partitions) {
607					return // No more work.
608				}
609				errC <- i.partitions[idx].DropMeasurement(name)
610			}
611		}()
612	}
613
614	// Check for error
615	for i := 0; i < cap(errC); i++ {
616		if err := <-errC; err != nil {
617			return err
618		}
619	}
620
621	// Update sketches under lock.
622	i.mu.Lock()
623	defer i.mu.Unlock()
624
625	i.mTSketch.Add(name)
626	if err := i.updateSeriesSketches(); err != nil {
627		return err
628	}
629
630	return nil
631}
632
633// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
634func (i *Index) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags, tracker tsdb.StatsTracker) error {
635	// All slices must be of equal length.
636	if len(names) != len(tagsSlice) {
637		return errors.New("names/tags length mismatch in index")
638	}
639
640	// We need to move different series into collections for each partition
641	// to process.
642	pNames := make([][][]byte, i.PartitionN)
643	pTags := make([][]models.Tags, i.PartitionN)
644
645	// Determine partition for series using each series key.
646	for ki, key := range keys {
647		pidx := i.partitionIdx(key)
648		pNames[pidx] = append(pNames[pidx], names[ki])
649		pTags[pidx] = append(pTags[pidx], tagsSlice[ki])
650	}
651
652	// Process each subset of series on each partition.
653	n := i.availableThreads()
654
655	// Store errors.
656	errC := make(chan error, i.PartitionN)
657
658	var pidx uint32 // Index of maximum Partition being worked on.
659	for k := 0; k < n; k++ {
660		go func() {
661			for {
662				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
663				if idx >= len(i.partitions) {
664					return // No more work.
665				}
666
667				ids, err := i.partitions[idx].createSeriesListIfNotExists(pNames[idx], pTags[idx], tracker)
668
669				var updateCache bool
670				for _, id := range ids {
671					if id != 0 {
672						updateCache = true
673						break
674					}
675				}
676
677				if !updateCache {
678					errC <- err
679					continue
680				}
681
682				// Some cached bitset results may need to be updated.
683				i.tagValueCache.RLock()
684				for j, id := range ids {
685					if id == 0 {
686						continue
687					}
688
689					name := pNames[idx][j]
690					tags := pTags[idx][j]
691					if i.tagValueCache.measurementContainsSets(name) {
692						for _, pair := range tags {
693							// TODO(edd): It's not clear to me yet whether it will be better to take a lock
694							// on every series id set, or whether to gather them all up under the cache rlock
695							// and then take the cache lock and update them all at once (without invoking a lock
696							// on each series id set).
697							//
698							// Taking the cache lock will block all queries, but is one lock. Taking each series set
699							// lock might be many lock/unlocks but will only block a query that needs that particular set.
700							//
701							// Need to think on it, but I think taking a lock on each series id set is the way to go.
702							//
703							// One other option here is to take a lock on the series id set when we first encounter it
704							// and then keep it locked until we're done with all the ids.
705							//
706							// Note: this will only add `id` to the set if it exists.
707							i.tagValueCache.addToSet(name, pair.Key, pair.Value, id) // Takes a lock on the series id set
708						}
709					}
710				}
711				i.tagValueCache.RUnlock()
712
713				errC <- err
714			}
715		}()
716	}
717
718	// Check for error
719	for i := 0; i < cap(errC); i++ {
720		if err := <-errC; err != nil {
721			return err
722		}
723	}
724
725	// Update sketches under lock.
726	i.mu.Lock()
727	defer i.mu.Unlock()
728
729	for _, key := range keys {
730		i.sSketch.Add(key)
731	}
732	for _, name := range names {
733		i.mSketch.Add(name)
734	}
735
736	return nil
737}
738
739// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
740func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags, tracker tsdb.StatsTracker) error {
741	ids, err := i.partition(key).createSeriesListIfNotExists([][]byte{name}, []models.Tags{tags}, tracker)
742	if err != nil {
743		return err
744	}
745
746	i.mu.Lock()
747	i.sSketch.Add(key)
748	i.mSketch.Add(name)
749	i.mu.Unlock()
750
751	if ids[0] == 0 {
752		return nil // No new series, nothing further to update.
753	}
754
755	// If there are cached sets for any of the tag pairs, they will need to be
756	// updated with the series id.
757	i.tagValueCache.RLock()
758	if i.tagValueCache.measurementContainsSets(name) {
759		for _, pair := range tags {
760			// TODO(edd): It's not clear to me yet whether it will be better to take a lock
761			// on every series id set, or whether to gather them all up under the cache rlock
762			// and then take the cache lock and update them all at once (without invoking a lock
763			// on each series id set).
764			//
765			// Taking the cache lock will block all queries, but is one lock. Taking each series set
766			// lock might be many lock/unlocks but will only block a query that needs that particular set.
767			//
768			// Need to think on it, but I think taking a lock on each series id set is the way to go.
769			//
770			// Note this will only add `id` to the set if it exists.
771			i.tagValueCache.addToSet(name, pair.Key, pair.Value, ids[0]) // Takes a lock on the series id set
772		}
773	}
774	i.tagValueCache.RUnlock()
775	return nil
776}
777
778// InitializeSeries is a no-op. This only applies to the in-memory index.
779func (i *Index) InitializeSeries(keys, names [][]byte, tags []models.Tags) error {
780	return nil
781}
782
783// DropSeries drops the provided series from the index.  If cascade is true
784// and this is the last series to the measurement, the measurment will also be dropped.
785func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error {
786	// Remove from partition.
787	if err := i.partition(key).DropSeries(seriesID); err != nil {
788		return err
789	}
790
791	// Add sketch tombstone.
792	i.mu.Lock()
793	i.sTSketch.Add(key)
794	i.mu.Unlock()
795
796	if !cascade {
797		return nil
798	}
799
800	// Extract measurement name & tags.
801	name, tags := models.ParseKeyBytes(key)
802
803	// If there are cached sets for any of the tag pairs, they will need to be
804	// updated with the series id.
805	i.tagValueCache.RLock()
806	if i.tagValueCache.measurementContainsSets(name) {
807		for _, pair := range tags {
808			i.tagValueCache.delete(name, pair.Key, pair.Value, seriesID) // Takes a lock on the series id set
809		}
810	}
811	i.tagValueCache.RUnlock()
812
813	// Check if that was the last series for the measurement in the entire index.
814	if ok, err := i.MeasurementHasSeries(name); err != nil {
815		return err
816	} else if ok {
817		return nil
818	}
819
820	// If no more series exist in the measurement then delete the measurement.
821	if err := i.DropMeasurement(name); err != nil {
822		return err
823	}
824	return nil
825}
826
827// DropSeries drops the provided series from the index.  If cascade is true
828// and this is the last series to the measurement, the measurment will also be dropped.
829func (i *Index) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error {
830	// All slices must be of equal length.
831	if len(seriesIDs) != len(keys) {
832		return errors.New("seriesIDs/keys length mismatch in index")
833	}
834
835	// We need to move different series into collections for each partition
836	// to process.
837	pSeriesIDs := make([][]uint64, i.PartitionN)
838	pKeys := make([][][]byte, i.PartitionN)
839
840	for idx, key := range keys {
841		pidx := i.partitionIdx(key)
842		pSeriesIDs[pidx] = append(pSeriesIDs[pidx], seriesIDs[idx])
843		pKeys[pidx] = append(pKeys[pidx], key)
844	}
845
846	// Process each subset of series on each partition.
847	n := i.availableThreads()
848
849	// Store errors.
850	errC := make(chan error, i.PartitionN)
851
852	var pidx uint32 // Index of maximum Partition being worked on.
853	for k := 0; k < n; k++ {
854		go func() {
855			for {
856				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
857				if idx >= len(i.partitions) {
858					return // No more work.
859				}
860
861				// Drop from partition.
862				err := i.partitions[idx].DropSeriesList(pSeriesIDs[idx])
863				errC <- err
864			}
865		}()
866	}
867
868	// Check for error
869	for i := 0; i < cap(errC); i++ {
870		if err := <-errC; err != nil {
871			return err
872		}
873	}
874
875	// Add sketch tombstone.
876	i.mu.Lock()
877	for _, key := range keys {
878		i.sTSketch.Add(key)
879	}
880	i.mu.Unlock()
881
882	return nil
883}
884
885// DropSeriesGlobal is a no-op on the tsi1 index.
886func (i *Index) DropSeriesGlobal(key []byte) error { return nil }
887
888// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
889// series for the measurment.
890func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) {
891	// Check if that was the last series for the measurement in the entire index.
892	if ok, err := i.MeasurementHasSeries(name); err != nil {
893		return false, err
894	} else if ok {
895		return false, nil
896	}
897
898	// If no more series exist in the measurement then delete the measurement.
899	return true, i.DropMeasurement(name)
900}
901
902// MeasurementsSketches returns the two measurement sketches for the index.
903func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
904	i.mu.RLock()
905	defer i.mu.RUnlock()
906	return i.mSketch.Clone(), i.mTSketch.Clone(), nil
907}
908
909// SeriesSketches returns the two series sketches for the index.
910func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
911	i.mu.RLock()
912	defer i.mu.RUnlock()
913	return i.sSketch.Clone(), i.sTSketch.Clone(), nil
914}
915
916// Since indexes are not shared across shards, the count returned by SeriesN
917// cannot be combined with other shard's results. If you need to count series
918// across indexes then use either the database-wide series file, or merge the
919// index-level bitsets or sketches.
920func (i *Index) SeriesN() int64 {
921	return int64(i.SeriesIDSet().Cardinality())
922}
923
924// HasTagKey returns true if tag key exists. It returns the first error
925// encountered if any.
926func (i *Index) HasTagKey(name, key []byte) (bool, error) {
927	n := i.availableThreads()
928
929	// Store errors
930	var found uint32 // Use this to signal we found the tag key.
931	errC := make(chan error, i.PartitionN)
932
933	// Check each partition for the tag key concurrently.
934	var pidx uint32 // Index of maximum Partition being worked on.
935	for k := 0; k < n; k++ {
936		go func() {
937			for {
938				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check
939				if idx >= len(i.partitions) {
940					return // No more work.
941				}
942
943				// Check if the tag key has already been found. If it has, we
944				// don't need to check this partition and can just move on.
945				if atomic.LoadUint32(&found) == 1 {
946					errC <- nil
947					continue
948				}
949
950				b, err := i.partitions[idx].HasTagKey(name, key)
951				if b {
952					atomic.StoreUint32(&found, 1)
953				}
954				errC <- err
955			}
956		}()
957	}
958
959	// Check for error
960	for i := 0; i < cap(errC); i++ {
961		if err := <-errC; err != nil {
962			return false, err
963		}
964	}
965
966	// Check if we found the tag key.
967	return atomic.LoadUint32(&found) == 1, nil
968}
969
970// HasTagValue returns true if tag value exists.
971func (i *Index) HasTagValue(name, key, value []byte) (bool, error) {
972	n := i.availableThreads()
973
974	// Store errors
975	var found uint32 // Use this to signal we found the tag key.
976	errC := make(chan error, i.PartitionN)
977
978	// Check each partition for the tag key concurrently.
979	var pidx uint32 // Index of maximum Partition being worked on.
980	for k := 0; k < n; k++ {
981		go func() {
982			for {
983				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check
984				if idx >= len(i.partitions) {
985					return // No more work.
986				}
987
988				// Check if the tag key has already been found. If it has, we
989				// don't need to check this partition and can just move on.
990				if atomic.LoadUint32(&found) == 1 {
991					errC <- nil
992					continue
993				}
994
995				b, err := i.partitions[idx].HasTagValue(name, key, value)
996				if b {
997					atomic.StoreUint32(&found, 1)
998				}
999				errC <- err
1000			}
1001		}()
1002	}
1003
1004	// Check for error
1005	for i := 0; i < cap(errC); i++ {
1006		if err := <-errC; err != nil {
1007			return false, err
1008		}
1009	}
1010
1011	// Check if we found the tag key.
1012	return atomic.LoadUint32(&found) == 1, nil
1013}
1014
1015// TagKeyIterator returns an iterator for all keys across a single measurement.
1016func (i *Index) TagKeyIterator(name []byte) (tsdb.TagKeyIterator, error) {
1017	a := make([]tsdb.TagKeyIterator, 0, len(i.partitions))
1018	for _, p := range i.partitions {
1019		itr := p.TagKeyIterator(name)
1020		if itr != nil {
1021			a = append(a, itr)
1022		}
1023	}
1024	return tsdb.MergeTagKeyIterators(a...), nil
1025}
1026
1027// TagValueIterator returns an iterator for all values across a single key.
1028func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error) {
1029	a := make([]tsdb.TagValueIterator, 0, len(i.partitions))
1030	for _, p := range i.partitions {
1031		itr := p.TagValueIterator(name, key)
1032		if itr != nil {
1033			a = append(a, itr)
1034		}
1035	}
1036	return tsdb.MergeTagValueIterators(a...), nil
1037}
1038
1039// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
1040func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
1041	a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
1042	for _, p := range i.partitions {
1043		itr, err := p.TagKeySeriesIDIterator(name, key)
1044		if err != nil {
1045			return nil, err
1046		} else if itr != nil {
1047			a = append(a, itr)
1048		}
1049	}
1050	return tsdb.MergeSeriesIDIterators(a...), nil
1051}
1052
1053// TagValueSeriesIDIterator returns a series iterator for a single tag value.
1054func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
1055	// Check series ID set cache...
1056	if i.tagValueCacheSize > 0 {
1057		if ss := i.tagValueCache.Get(name, key, value); ss != nil {
1058			// Return a clone because the set is mutable.
1059			return tsdb.NewSeriesIDSetIterator(ss.Clone()), nil
1060		}
1061	}
1062
1063	a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
1064	for _, p := range i.partitions {
1065		itr, err := p.TagValueSeriesIDIterator(name, key, value)
1066		if err != nil {
1067			return nil, err
1068		} else if itr != nil {
1069			a = append(a, itr)
1070		}
1071	}
1072
1073	itr := tsdb.MergeSeriesIDIterators(a...)
1074	if i.tagValueCacheSize == 0 {
1075		return itr, nil
1076	}
1077
1078	// Check if the iterator contains only series id sets. Cache them...
1079	if ssitr, ok := itr.(tsdb.SeriesIDSetIterator); ok {
1080		ss := ssitr.SeriesIDSet()
1081		i.tagValueCache.Put(name, key, value, ss)
1082	}
1083	return itr, nil
1084}
1085
1086// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
1087func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
1088	n := i.availableThreads()
1089
1090	// Store results.
1091	keys := make([]map[string]struct{}, i.PartitionN)
1092	errC := make(chan error, i.PartitionN)
1093
1094	var pidx uint32 // Index of maximum Partition being worked on.
1095	for k := 0; k < n; k++ {
1096		go func() {
1097			for {
1098				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
1099				if idx >= len(i.partitions) {
1100					return // No more work.
1101				}
1102
1103				// This is safe since there are no readers on keys until all
1104				// the writers are done.
1105				tagKeys, err := i.partitions[idx].MeasurementTagKeysByExpr(name, expr)
1106				keys[idx] = tagKeys
1107				errC <- err
1108			}
1109		}()
1110	}
1111
1112	// Check for error
1113	for i := 0; i < cap(errC); i++ {
1114		if err := <-errC; err != nil {
1115			return nil, err
1116		}
1117	}
1118
1119	// Merge into single map.
1120	result := keys[0]
1121	for k := 1; k < len(i.partitions); k++ {
1122		for k := range keys[k] {
1123			result[k] = struct{}{}
1124		}
1125	}
1126	return result, nil
1127}
1128
1129// DiskSizeBytes returns the size of the index on disk.
1130func (i *Index) DiskSizeBytes() int64 {
1131	fs, err := i.RetainFileSet()
1132	if err != nil {
1133		i.logger.Warn("Index is closing down")
1134		return 0
1135	}
1136	defer fs.Release()
1137
1138	var manifestSize int64
1139	// Get MANIFEST sizes from each partition.
1140	for _, p := range i.partitions {
1141		manifestSize += p.manifestSize
1142	}
1143	return fs.Size() + manifestSize
1144}
1145
1146// TagKeyCardinality always returns zero.
1147// It is not possible to determine cardinality of tags across index files, and
1148// thus it cannot be done across partitions.
1149func (i *Index) TagKeyCardinality(name, key []byte) int {
1150	return 0
1151}
1152
1153// RetainFileSet returns the set of all files across all partitions.
1154// This is only needed when all files need to be retained for an operation.
1155func (i *Index) RetainFileSet() (*FileSet, error) {
1156	i.mu.RLock()
1157	defer i.mu.RUnlock()
1158
1159	fs, _ := NewFileSet(nil, i.sfile, nil)
1160	for _, p := range i.partitions {
1161		pfs, err := p.RetainFileSet()
1162		if err != nil {
1163			fs.Close()
1164			return nil, err
1165		}
1166		fs.files = append(fs.files, pfs.files...)
1167	}
1168	return fs, nil
1169}
1170
1171// SetFieldName is a no-op on this index.
1172func (i *Index) SetFieldName(measurement []byte, name string) {}
1173
1174// Rebuild rebuilds an index. It's a no-op for this index.
1175func (i *Index) Rebuild() {}
1176
1177// IsIndexDir returns true if directory contains at least one partition directory.
1178func IsIndexDir(path string) (bool, error) {
1179	fis, err := ioutil.ReadDir(path)
1180	if err != nil {
1181		return false, err
1182	}
1183	for _, fi := range fis {
1184		if !fi.IsDir() {
1185			continue
1186		} else if ok, err := IsPartitionDir(filepath.Join(path, fi.Name())); err != nil {
1187			return false, err
1188		} else if ok {
1189			return true, nil
1190		}
1191	}
1192	return false, nil
1193}
1194