1package tsi1
2
3import (
4	"errors"
5	"fmt"
6	"io/ioutil"
7	"os"
8	"path/filepath"
9	"regexp"
10	"runtime"
11	"strconv"
12	"sync"
13	"sync/atomic"
14	"unsafe"
15
16	"github.com/cespare/xxhash"
17	"github.com/influxdata/influxdb/models"
18	"github.com/influxdata/influxdb/pkg/estimator"
19	"github.com/influxdata/influxdb/pkg/estimator/hll"
20	"github.com/influxdata/influxdb/pkg/slices"
21	"github.com/influxdata/influxdb/tsdb"
22	"github.com/influxdata/influxql"
23	"go.uber.org/zap"
24)
25
26// IndexName is the name of the index.
27const IndexName = tsdb.TSI1IndexName
28
29// ErrCompactionInterrupted is returned if compactions are disabled or
30// an index is closed while a compaction is occurring.
31var ErrCompactionInterrupted = errors.New("tsi1: compaction interrupted")
32
33func init() {
34	// FIXME(edd): Remove this.
35	if os.Getenv("TSI_PARTITIONS") != "" {
36		i, err := strconv.Atoi(os.Getenv("TSI_PARTITIONS"))
37		if err != nil {
38			panic(err)
39		}
40		DefaultPartitionN = uint64(i)
41	}
42
43	tsdb.RegisterIndex(IndexName, func(_ uint64, db, path string, _ *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Index {
44		idx := NewIndex(sfile, db, WithPath(path), WithMaximumLogFileSize(int64(opt.Config.MaxIndexLogFileSize)))
45		return idx
46	})
47}
48
49// DefaultPartitionN determines how many shards the index will be partitioned into.
50//
51// NOTE: Currently, this must not be change once a database is created. Further,
52// it must also be a power of 2.
53//
54var DefaultPartitionN uint64 = 8
55
56// An IndexOption is a functional option for changing the configuration of
57// an Index.
58type IndexOption func(i *Index)
59
60// WithPath sets the root path of the Index
61var WithPath = func(path string) IndexOption {
62	return func(i *Index) {
63		i.path = path
64	}
65}
66
67// DisableCompactions disables compactions on the Index.
68var DisableCompactions = func() IndexOption {
69	return func(i *Index) {
70		i.disableCompactions = true
71	}
72}
73
74// WithLogger sets the logger for the Index.
75var WithLogger = func(l zap.Logger) IndexOption {
76	return func(i *Index) {
77		i.logger = l.With(zap.String("index", "tsi"))
78	}
79}
80
81// WithMaximumLogFileSize sets the maximum size of LogFiles before they're
82// compacted into IndexFiles.
83var WithMaximumLogFileSize = func(size int64) IndexOption {
84	return func(i *Index) {
85		i.maxLogFileSize = size
86	}
87}
88
89// DisableFsync disables flushing and syncing of underlying files. Primarily this
90// impacts the LogFiles. This option can be set when working with the index in
91// an offline manner, for cases where a hard failure can be overcome by re-running the tooling.
92var DisableFsync = func() IndexOption {
93	return func(i *Index) {
94		i.disableFsync = true
95	}
96}
97
98// WithLogFileBufferSize sets the size of the buffer used within LogFiles.
99// Typically appending an entry to a LogFile involves writing 11 or 12 bytes, so
100// depending on how many new series are being created within a batch, it may
101// be appropriate to set this.
102var WithLogFileBufferSize = func(sz int) IndexOption {
103	return func(i *Index) {
104		if sz > 1<<17 { // 128K
105			sz = 1 << 17
106		} else if sz < 1<<12 {
107			sz = 1 << 12 // 4K (runtime default)
108		}
109		i.logfileBufferSize = sz
110	}
111}
112
113// Index represents a collection of layered index files and WAL.
114type Index struct {
115	mu         sync.RWMutex
116	partitions []*Partition
117	opened     bool
118
119	// The following may be set when initializing an Index.
120	path               string      // Root directory of the index partitions.
121	disableCompactions bool        // Initially disables compactions on the index.
122	maxLogFileSize     int64       // Maximum size of a LogFile before it's compacted.
123	logfileBufferSize  int         // The size of the buffer used by the LogFile.
124	disableFsync       bool        // Disables flushing buffers and fsyning files. Used when working with indexes offline.
125	logger             *zap.Logger // Index's logger.
126
127	// The following must be set when initializing an Index.
128	sfile    *tsdb.SeriesFile // series lookup file
129	database string           // Name of database.
130
131	// Cached sketches.
132	mSketch, mTSketch estimator.Sketch // Measurement sketches
133	sSketch, sTSketch estimator.Sketch // Series sketches
134
135	// Index's version.
136	version int
137
138	// Number of partitions used by the index.
139	PartitionN uint64
140}
141
142func (i *Index) UniqueReferenceID() uintptr {
143	return uintptr(unsafe.Pointer(i))
144}
145
146// NewIndex returns a new instance of Index.
147func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *Index {
148	idx := &Index{
149		maxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,
150		logger:         zap.NewNop(),
151		version:        Version,
152		sfile:          sfile,
153		database:       database,
154		mSketch:        hll.NewDefaultPlus(),
155		mTSketch:       hll.NewDefaultPlus(),
156		sSketch:        hll.NewDefaultPlus(),
157		sTSketch:       hll.NewDefaultPlus(),
158		PartitionN:     DefaultPartitionN,
159	}
160
161	for _, option := range options {
162		option(idx)
163	}
164
165	return idx
166}
167
168// Bytes estimates the memory footprint of this Index, in bytes.
169func (i *Index) Bytes() int {
170	var b int
171	i.mu.RLock()
172	b += 24 // mu RWMutex is 24 bytes
173	b += int(unsafe.Sizeof(i.partitions))
174	for _, p := range i.partitions {
175		b += int(unsafe.Sizeof(p)) + p.bytes()
176	}
177	b += int(unsafe.Sizeof(i.opened))
178	b += int(unsafe.Sizeof(i.path)) + len(i.path)
179	b += int(unsafe.Sizeof(i.disableCompactions))
180	b += int(unsafe.Sizeof(i.maxLogFileSize))
181	b += int(unsafe.Sizeof(i.logger))
182	b += int(unsafe.Sizeof(i.sfile))
183	// Do not count SeriesFile because it belongs to the code that constructed this Index.
184	b += int(unsafe.Sizeof(i.mSketch)) + i.mSketch.Bytes()
185	b += int(unsafe.Sizeof(i.mTSketch)) + i.mTSketch.Bytes()
186	b += int(unsafe.Sizeof(i.sSketch)) + i.sSketch.Bytes()
187	b += int(unsafe.Sizeof(i.sTSketch)) + i.sTSketch.Bytes()
188	b += int(unsafe.Sizeof(i.database)) + len(i.database)
189	b += int(unsafe.Sizeof(i.version))
190	b += int(unsafe.Sizeof(i.PartitionN))
191	i.mu.RUnlock()
192	return b
193}
194
195// Database returns the name of the database the index was initialized with.
196func (i *Index) Database() string {
197	return i.database
198}
199
200// WithLogger sets the logger on the index after it's been created.
201//
202// It's not safe to call WithLogger after the index has been opened, or before
203// it has been closed.
204func (i *Index) WithLogger(l *zap.Logger) {
205	i.mu.Lock()
206	defer i.mu.Unlock()
207	i.logger = l.With(zap.String("index", "tsi"))
208}
209
210// Type returns the type of Index this is.
211func (i *Index) Type() string { return IndexName }
212
213// SeriesFile returns the series file attached to the index.
214func (i *Index) SeriesFile() *tsdb.SeriesFile { return i.sfile }
215
216// SeriesIDSet returns the set of series ids associated with series in this
217// index. Any series IDs for series no longer present in the index are filtered out.
218func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet {
219	seriesIDSet := tsdb.NewSeriesIDSet()
220	others := make([]*tsdb.SeriesIDSet, 0, i.PartitionN)
221	for _, p := range i.partitions {
222		others = append(others, p.seriesIDSet)
223	}
224	seriesIDSet.Merge(others...)
225	return seriesIDSet
226}
227
228// Open opens the index.
229func (i *Index) Open() error {
230	i.mu.Lock()
231	defer i.mu.Unlock()
232
233	if i.opened {
234		return errors.New("index already open")
235	}
236
237	// Ensure root exists.
238	if err := os.MkdirAll(i.path, 0777); err != nil {
239		return err
240	}
241
242	// Initialize index partitions.
243	i.partitions = make([]*Partition, i.PartitionN)
244	for j := 0; j < len(i.partitions); j++ {
245		p := NewPartition(i.sfile, filepath.Join(i.path, fmt.Sprint(j)))
246		p.MaxLogFileSize = i.maxLogFileSize
247		p.nosync = i.disableFsync
248		p.logbufferSize = i.logfileBufferSize
249		p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1)))
250		i.partitions[j] = p
251	}
252
253	// Open all the Partitions in parallel.
254	partitionN := len(i.partitions)
255	n := i.availableThreads()
256
257	// Store results.
258	errC := make(chan error, partitionN)
259
260	// Run fn on each partition using a fixed number of goroutines.
261	var pidx uint32 // Index of maximum Partition being worked on.
262	for k := 0; k < n; k++ {
263		go func(k int) {
264			for {
265				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
266				if idx >= partitionN {
267					return // No more work.
268				}
269				err := i.partitions[idx].Open()
270				errC <- err
271			}
272		}(k)
273	}
274
275	// Check for error
276	for i := 0; i < partitionN; i++ {
277		if err := <-errC; err != nil {
278			return err
279		}
280	}
281
282	// Refresh cached sketches.
283	if err := i.updateSeriesSketches(); err != nil {
284		return err
285	} else if err := i.updateMeasurementSketches(); err != nil {
286		return err
287	}
288
289	// Mark opened.
290	i.opened = true
291	i.logger.Info(fmt.Sprintf("index opened with %d partitions", partitionN))
292	return nil
293}
294
295// Compact requests a compaction of partitions.
296func (i *Index) Compact() {
297	i.mu.Lock()
298	defer i.mu.Unlock()
299	for _, p := range i.partitions {
300		p.Compact()
301	}
302}
303
304func (i *Index) EnableCompactions() {
305	for _, p := range i.partitions {
306		p.EnableCompactions()
307	}
308}
309
310func (i *Index) DisableCompactions() {
311	for _, p := range i.partitions {
312		p.DisableCompactions()
313	}
314}
315
316// Wait blocks until all outstanding compactions have completed.
317func (i *Index) Wait() {
318	for _, p := range i.partitions {
319		p.Wait()
320	}
321}
322
323// Close closes the index.
324func (i *Index) Close() error {
325	// Lock index and close partitions.
326	i.mu.Lock()
327	defer i.mu.Unlock()
328
329	for _, p := range i.partitions {
330		if err := p.Close(); err != nil {
331			return err
332		}
333	}
334
335	// Mark index as closed.
336	i.opened = false
337	return nil
338}
339
340// Path returns the path the index was opened with.
341func (i *Index) Path() string { return i.path }
342
343// PartitionAt returns the partition by index.
344func (i *Index) PartitionAt(index int) *Partition {
345	return i.partitions[index]
346}
347
348// partition returns the appropriate Partition for a provided series key.
349func (i *Index) partition(key []byte) *Partition {
350	return i.partitions[int(xxhash.Sum64(key)&(i.PartitionN-1))]
351}
352
353// partitionIdx returns the index of the partition that key belongs in.
354func (i *Index) partitionIdx(key []byte) int {
355	return int(xxhash.Sum64(key) & (i.PartitionN - 1))
356}
357
358// availableThreads returns the minimum of GOMAXPROCS and the number of
359// partitions in the Index.
360func (i *Index) availableThreads() int {
361	n := runtime.GOMAXPROCS(0)
362	if len(i.partitions) < n {
363		return len(i.partitions)
364	}
365	return n
366}
367
368// updateMeasurementSketches rebuilds the cached measurement sketches.
369func (i *Index) updateMeasurementSketches() error {
370	i.mSketch, i.mTSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
371	for j := 0; j < int(i.PartitionN); j++ {
372		if s, t, err := i.partitions[j].MeasurementsSketches(); err != nil {
373			return err
374		} else if i.mSketch.Merge(s); err != nil {
375			return err
376		} else if i.mTSketch.Merge(t); err != nil {
377			return err
378		}
379	}
380	return nil
381}
382
383// updateSeriesSketches rebuilds the cached series sketches.
384func (i *Index) updateSeriesSketches() error {
385	i.sSketch, i.sTSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
386	for j := 0; j < int(i.PartitionN); j++ {
387		if s, t, err := i.partitions[j].SeriesSketches(); err != nil {
388			return err
389		} else if i.sSketch.Merge(s); err != nil {
390			return err
391		} else if i.sTSketch.Merge(t); err != nil {
392			return err
393		}
394	}
395	return nil
396}
397
398// SetFieldSet sets a shared field set from the engine.
399func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
400	for _, p := range i.partitions {
401		p.SetFieldSet(fs)
402	}
403}
404
405// FieldSet returns the assigned fieldset.
406func (i *Index) FieldSet() *tsdb.MeasurementFieldSet {
407	if len(i.partitions) == 0 {
408		return nil
409	}
410	return i.partitions[0].FieldSet()
411}
412
413// ForEachMeasurementName iterates over all measurement names in the index,
414// applying fn. It returns the first error encountered, if any.
415//
416// ForEachMeasurementName does not call fn on each partition concurrently so the
417// call may provide a non-goroutine safe fn.
418func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
419	itr, err := i.MeasurementIterator()
420	if err != nil {
421		return err
422	} else if itr == nil {
423		return nil
424	}
425	defer itr.Close()
426
427	// Iterate over all measurements.
428	for {
429		e, err := itr.Next()
430		if err != nil {
431			return err
432		} else if e == nil {
433			break
434		}
435
436		if err := fn(e); err != nil {
437			return err
438		}
439	}
440	return nil
441}
442
443// MeasurementExists returns true if a measurement exists.
444func (i *Index) MeasurementExists(name []byte) (bool, error) {
445	n := i.availableThreads()
446
447	// Store errors
448	var found uint32 // Use this to signal we found the measurement.
449	errC := make(chan error, i.PartitionN)
450
451	// Check each partition for the measurement concurrently.
452	var pidx uint32 // Index of maximum Partition being worked on.
453	for k := 0; k < n; k++ {
454		go func() {
455			for {
456				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check
457				if idx >= len(i.partitions) {
458					return // No more work.
459				}
460
461				// Check if the measurement has been found. If it has don't
462				// need to check this partition and can just move on.
463				if atomic.LoadUint32(&found) == 1 {
464					errC <- nil
465					continue
466				}
467
468				b, err := i.partitions[idx].MeasurementExists(name)
469				if b {
470					atomic.StoreUint32(&found, 1)
471				}
472				errC <- err
473			}
474		}()
475	}
476
477	// Check for error
478	for i := 0; i < cap(errC); i++ {
479		if err := <-errC; err != nil {
480			return false, err
481		}
482	}
483
484	// Check if we found the measurement.
485	return atomic.LoadUint32(&found) == 1, nil
486}
487
488// MeasurementHasSeries returns true if a measurement has non-tombstoned series.
489func (i *Index) MeasurementHasSeries(name []byte) (bool, error) {
490	for _, p := range i.partitions {
491		if v, err := p.MeasurementHasSeries(name); err != nil {
492			return false, err
493		} else if v {
494			return true, nil
495		}
496	}
497	return false, nil
498}
499
500// fetchByteValues is a helper for gathering values from each partition in the index,
501// based on some criteria.
502//
503// fn is a function that works on partition idx and calls into some method on
504// the partition that returns some ordered values.
505func (i *Index) fetchByteValues(fn func(idx int) ([][]byte, error)) ([][]byte, error) {
506	n := i.availableThreads()
507
508	// Store results.
509	names := make([][][]byte, i.PartitionN)
510	errC := make(chan error, i.PartitionN)
511
512	var pidx uint32 // Index of maximum Partition being worked on.
513	for k := 0; k < n; k++ {
514		go func() {
515			for {
516				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
517				if idx >= len(i.partitions) {
518					return // No more work.
519				}
520
521				pnames, err := fn(idx)
522
523				// This is safe since there are no readers on names until all
524				// the writers are done.
525				names[idx] = pnames
526				errC <- err
527			}
528		}()
529	}
530
531	// Check for error
532	for i := 0; i < cap(errC); i++ {
533		if err := <-errC; err != nil {
534			return nil, err
535		}
536	}
537
538	// It's now safe to read from names.
539	return slices.MergeSortedBytes(names[:]...), nil
540}
541
542// MeasurementIterator returns an iterator over all measurements.
543func (i *Index) MeasurementIterator() (tsdb.MeasurementIterator, error) {
544	itrs := make([]tsdb.MeasurementIterator, 0, len(i.partitions))
545	for _, p := range i.partitions {
546		itr, err := p.MeasurementIterator()
547		if err != nil {
548			tsdb.MeasurementIterators(itrs).Close()
549			return nil, err
550		} else if itr != nil {
551			itrs = append(itrs, itr)
552		}
553	}
554	return tsdb.MergeMeasurementIterators(itrs...), nil
555}
556
557// MeasurementSeriesIDIterator returns an iterator over all series in a measurement.
558func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) {
559	itrs := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
560	for _, p := range i.partitions {
561		itr, err := p.MeasurementSeriesIDIterator(name)
562		if err != nil {
563			tsdb.SeriesIDIterators(itrs).Close()
564			return nil, err
565		} else if itr != nil {
566			itrs = append(itrs, itr)
567		}
568	}
569	return tsdb.MergeSeriesIDIterators(itrs...), nil
570}
571
572// MeasurementNamesByRegex returns measurement names for the provided regex.
573func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
574	return i.fetchByteValues(func(idx int) ([][]byte, error) {
575		return i.partitions[idx].MeasurementNamesByRegex(re)
576	})
577}
578
579// DropMeasurement deletes a measurement from the index. It returns the first
580// error encountered, if any.
581func (i *Index) DropMeasurement(name []byte) error {
582	n := i.availableThreads()
583
584	// Store results.
585	errC := make(chan error, i.PartitionN)
586
587	var pidx uint32 // Index of maximum Partition being worked on.
588	for k := 0; k < n; k++ {
589		go func() {
590			for {
591				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
592				if idx >= len(i.partitions) {
593					return // No more work.
594				}
595				errC <- i.partitions[idx].DropMeasurement(name)
596			}
597		}()
598	}
599
600	// Check for error
601	for i := 0; i < cap(errC); i++ {
602		if err := <-errC; err != nil {
603			return err
604		}
605	}
606
607	// Update sketches.
608	i.mTSketch.Add(name)
609	if err := i.updateSeriesSketches(); err != nil {
610		return err
611	}
612
613	return nil
614}
615
616// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
617func (i *Index) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsSlice []models.Tags) error {
618	// All slices must be of equal length.
619	if len(names) != len(tagsSlice) {
620		return errors.New("names/tags length mismatch in index")
621	}
622
623	// We need to move different series into collections for each partition
624	// to process.
625	pNames := make([][][]byte, i.PartitionN)
626	pTags := make([][]models.Tags, i.PartitionN)
627
628	// Determine partition for series using each series key.
629	for ki, key := range keys {
630		pidx := i.partitionIdx(key)
631		pNames[pidx] = append(pNames[pidx], names[ki])
632		pTags[pidx] = append(pTags[pidx], tagsSlice[ki])
633	}
634
635	// Process each subset of series on each partition.
636	n := i.availableThreads()
637
638	// Store errors.
639	errC := make(chan error, i.PartitionN)
640
641	var pidx uint32 // Index of maximum Partition being worked on.
642	for k := 0; k < n; k++ {
643		go func() {
644			for {
645				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
646				if idx >= len(i.partitions) {
647					return // No more work.
648				}
649				errC <- i.partitions[idx].createSeriesListIfNotExists(pNames[idx], pTags[idx])
650			}
651		}()
652	}
653
654	// Check for error
655	for i := 0; i < cap(errC); i++ {
656		if err := <-errC; err != nil {
657			return err
658		}
659	}
660
661	// Update sketches.
662	for _, key := range keys {
663		i.sSketch.Add(key)
664	}
665	for _, name := range names {
666		i.mSketch.Add(name)
667	}
668
669	return nil
670}
671
672// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
673func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
674	if err := i.partition(key).createSeriesListIfNotExists([][]byte{name}, []models.Tags{tags}); err != nil {
675		return err
676	}
677	i.sSketch.Add(key)
678	i.mSketch.Add(name)
679	return nil
680}
681
682// InitializeSeries is a no-op. This only applies to the in-memory index.
683func (i *Index) InitializeSeries(keys, names [][]byte, tags []models.Tags) error {
684	return nil
685}
686
687// DropSeries drops the provided series from the index.  If cascade is true
688// and this is the last series to the measurement, the measurment will also be dropped.
689func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error {
690	// Remove from partition.
691	if err := i.partition(key).DropSeries(seriesID); err != nil {
692		return err
693	}
694
695	// Add sketch tombstone.
696	i.sTSketch.Add(key)
697
698	if !cascade {
699		return nil
700	}
701
702	// Extract measurement name.
703	name, _ := models.ParseKeyBytes(key)
704
705	// Check if that was the last series for the measurement in the entire index.
706	if ok, err := i.MeasurementHasSeries(name); err != nil {
707		return err
708	} else if ok {
709		return nil
710	}
711
712	// If no more series exist in the measurement then delete the measurement.
713	if err := i.DropMeasurement(name); err != nil {
714		return err
715	}
716	return nil
717}
718
719// DropSeriesGlobal is a no-op on the tsi1 index.
720func (i *Index) DropSeriesGlobal(key []byte) error { return nil }
721
722// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
723// series for the measurment.
724func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error {
725	// Check if that was the last series for the measurement in the entire index.
726	if ok, err := i.MeasurementHasSeries(name); err != nil {
727		return err
728	} else if ok {
729		return nil
730	}
731
732	// If no more series exist in the measurement then delete the measurement.
733	return i.DropMeasurement(name)
734}
735
736// MeasurementsSketches returns the two measurement sketches for the index.
737func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
738	return i.mSketch, i.mTSketch, nil
739}
740
741// SeriesSketches returns the two series sketches for the index.
742func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
743	return i.sSketch, i.sTSketch, nil
744}
745
746// Since indexes are not shared across shards, the count returned by SeriesN
747// cannot be combined with other shard's results. If you need to count series
748// across indexes then use either the database-wide series file, or merge the
749// index-level bitsets or sketches.
750func (i *Index) SeriesN() int64 {
751	return int64(i.SeriesIDSet().Cardinality())
752}
753
754// HasTagKey returns true if tag key exists. It returns the first error
755// encountered if any.
756func (i *Index) HasTagKey(name, key []byte) (bool, error) {
757	n := i.availableThreads()
758
759	// Store errors
760	var found uint32 // Use this to signal we found the tag key.
761	errC := make(chan error, i.PartitionN)
762
763	// Check each partition for the tag key concurrently.
764	var pidx uint32 // Index of maximum Partition being worked on.
765	for k := 0; k < n; k++ {
766		go func() {
767			for {
768				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check
769				if idx >= len(i.partitions) {
770					return // No more work.
771				}
772
773				// Check if the tag key has already been found. If it has, we
774				// don't need to check this partition and can just move on.
775				if atomic.LoadUint32(&found) == 1 {
776					errC <- nil
777					continue
778				}
779
780				b, err := i.partitions[idx].HasTagKey(name, key)
781				if b {
782					atomic.StoreUint32(&found, 1)
783				}
784				errC <- err
785			}
786		}()
787	}
788
789	// Check for error
790	for i := 0; i < cap(errC); i++ {
791		if err := <-errC; err != nil {
792			return false, err
793		}
794	}
795
796	// Check if we found the tag key.
797	return atomic.LoadUint32(&found) == 1, nil
798}
799
800// HasTagValue returns true if tag value exists.
801func (i *Index) HasTagValue(name, key, value []byte) (bool, error) {
802	n := i.availableThreads()
803
804	// Store errors
805	var found uint32 // Use this to signal we found the tag key.
806	errC := make(chan error, i.PartitionN)
807
808	// Check each partition for the tag key concurrently.
809	var pidx uint32 // Index of maximum Partition being worked on.
810	for k := 0; k < n; k++ {
811		go func() {
812			for {
813				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to check
814				if idx >= len(i.partitions) {
815					return // No more work.
816				}
817
818				// Check if the tag key has already been found. If it has, we
819				// don't need to check this partition and can just move on.
820				if atomic.LoadUint32(&found) == 1 {
821					errC <- nil
822					continue
823				}
824
825				b, err := i.partitions[idx].HasTagValue(name, key, value)
826				if b {
827					atomic.StoreUint32(&found, 1)
828				}
829				errC <- err
830			}
831		}()
832	}
833
834	// Check for error
835	for i := 0; i < cap(errC); i++ {
836		if err := <-errC; err != nil {
837			return false, err
838		}
839	}
840
841	// Check if we found the tag key.
842	return atomic.LoadUint32(&found) == 1, nil
843}
844
845// TagKeyIterator returns an iterator for all keys across a single measurement.
846func (i *Index) TagKeyIterator(name []byte) (tsdb.TagKeyIterator, error) {
847	a := make([]tsdb.TagKeyIterator, 0, len(i.partitions))
848	for _, p := range i.partitions {
849		itr := p.TagKeyIterator(name)
850		if itr != nil {
851			a = append(a, itr)
852		}
853	}
854	return tsdb.MergeTagKeyIterators(a...), nil
855}
856
857// TagValueIterator returns an iterator for all values across a single key.
858func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error) {
859	a := make([]tsdb.TagValueIterator, 0, len(i.partitions))
860	for _, p := range i.partitions {
861		itr := p.TagValueIterator(name, key)
862		if itr != nil {
863			a = append(a, itr)
864		}
865	}
866	return tsdb.MergeTagValueIterators(a...), nil
867}
868
869// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
870func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
871	a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
872	for _, p := range i.partitions {
873		itr := p.TagKeySeriesIDIterator(name, key)
874		if itr != nil {
875			a = append(a, itr)
876		}
877	}
878	return tsdb.MergeSeriesIDIterators(a...), nil
879}
880
881// TagValueSeriesIDIterator returns a series iterator for a single tag value.
882func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
883	a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
884	for _, p := range i.partitions {
885		itr := p.TagValueSeriesIDIterator(name, key, value)
886		if itr != nil {
887			a = append(a, itr)
888		}
889	}
890	return tsdb.MergeSeriesIDIterators(a...), nil
891}
892
893// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
894func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
895	n := i.availableThreads()
896
897	// Store results.
898	keys := make([]map[string]struct{}, i.PartitionN)
899	errC := make(chan error, i.PartitionN)
900
901	var pidx uint32 // Index of maximum Partition being worked on.
902	for k := 0; k < n; k++ {
903		go func() {
904			for {
905				idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
906				if idx >= len(i.partitions) {
907					return // No more work.
908				}
909
910				// This is safe since there are no readers on keys until all
911				// the writers are done.
912				tagKeys, err := i.partitions[idx].MeasurementTagKeysByExpr(name, expr)
913				keys[idx] = tagKeys
914				errC <- err
915			}
916		}()
917	}
918
919	// Check for error
920	for i := 0; i < cap(errC); i++ {
921		if err := <-errC; err != nil {
922			return nil, err
923		}
924	}
925
926	// Merge into single map.
927	result := keys[0]
928	for k := 1; k < len(i.partitions); k++ {
929		for k := range keys[k] {
930			result[k] = struct{}{}
931		}
932	}
933	return result, nil
934}
935
936// DiskSizeBytes returns the size of the index on disk.
937func (i *Index) DiskSizeBytes() int64 {
938	fs, err := i.RetainFileSet()
939	if err != nil {
940		i.logger.Warn("Index is closing down")
941		return 0
942	}
943	defer fs.Release()
944
945	var manifestSize int64
946	// Get MANIFEST sizes from each partition.
947	for _, p := range i.partitions {
948		manifestSize += p.manifestSize
949	}
950	return fs.Size() + manifestSize
951}
952
953// TagKeyCardinality always returns zero.
954// It is not possible to determine cardinality of tags across index files, and
955// thus it cannot be done across partitions.
956func (i *Index) TagKeyCardinality(name, key []byte) int {
957	return 0
958}
959
960// RetainFileSet returns the set of all files across all partitions.
961// This is only needed when all files need to be retained for an operation.
962func (i *Index) RetainFileSet() (*FileSet, error) {
963	i.mu.RLock()
964	defer i.mu.RUnlock()
965
966	fs, _ := NewFileSet(nil, i.sfile, nil)
967	for _, p := range i.partitions {
968		pfs, err := p.RetainFileSet()
969		if err != nil {
970			fs.Close()
971			return nil, err
972		}
973		fs.files = append(fs.files, pfs.files...)
974	}
975	return fs, nil
976}
977
978// SetFieldName is a no-op on this index.
979func (i *Index) SetFieldName(measurement []byte, name string) {}
980
981// Rebuild rebuilds an index. It's a no-op for this index.
982func (i *Index) Rebuild() {}
983
984// IsIndexDir returns true if directory contains at least one partition directory.
985func IsIndexDir(path string) (bool, error) {
986	fis, err := ioutil.ReadDir(path)
987	if err != nil {
988		return false, err
989	}
990	for _, fi := range fis {
991		if !fi.IsDir() {
992			continue
993		} else if ok, err := IsPartitionDir(filepath.Join(path, fi.Name())); err != nil {
994			return false, err
995		} else if ok {
996			return true, nil
997		}
998	}
999	return false, nil
1000}
1001