1package tsdb
2
3import (
4	"bytes"
5	"errors"
6	"fmt"
7	"io"
8	"os"
9	"regexp"
10	"sort"
11	"sync"
12
13	"github.com/influxdata/influxdb/models"
14	"github.com/influxdata/influxdb/pkg/bytesutil"
15	"github.com/influxdata/influxdb/pkg/estimator"
16	"github.com/influxdata/influxdb/pkg/slices"
17	"github.com/influxdata/influxdb/query"
18	"github.com/influxdata/influxql"
19	"go.uber.org/zap"
20)
21
22// Available index types.
23const (
24	InmemIndexName = "inmem"
25	TSI1IndexName  = "tsi1"
26)
27
28// ErrIndexClosing can be returned to from an Index method if the index is currently closing.
29var ErrIndexClosing = errors.New("index is closing")
30
31type Index interface {
32	Open() error
33	Close() error
34	WithLogger(*zap.Logger)
35
36	Database() string
37	MeasurementExists(name []byte) (bool, error)
38	MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
39	DropMeasurement(name []byte) error
40	ForEachMeasurementName(fn func(name []byte) error) error
41
42	InitializeSeries(keys, names [][]byte, tags []models.Tags) error
43	CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
44	CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
45	DropSeries(seriesID uint64, key []byte, cascade bool) error
46	DropSeriesList(seriesID []uint64, key [][]byte, cascade bool) error
47	DropMeasurementIfSeriesNotExist(name []byte) (bool, error)
48
49	// Used to clean up series in inmem index that were dropped with a shard.
50	DropSeriesGlobal(key []byte) error
51
52	MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
53	SeriesN() int64
54	SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
55	SeriesIDSet() *SeriesIDSet
56
57	HasTagKey(name, key []byte) (bool, error)
58	HasTagValue(name, key, value []byte) (bool, error)
59
60	MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
61
62	TagKeyCardinality(name, key []byte) int
63
64	// InfluxQL system iterators
65	MeasurementIterator() (MeasurementIterator, error)
66	TagKeyIterator(name []byte) (TagKeyIterator, error)
67	TagValueIterator(name, key []byte) (TagValueIterator, error)
68	MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error)
69	TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error)
70	TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error)
71
72	// Sets a shared fieldset from the engine.
73	FieldSet() *MeasurementFieldSet
74	SetFieldSet(fs *MeasurementFieldSet)
75
76	// Size of the index on disk, if applicable.
77	DiskSizeBytes() int64
78
79	// Bytes estimates the memory footprint of this Index, in bytes.
80	Bytes() int
81
82	// To be removed w/ tsi1.
83	SetFieldName(measurement []byte, name string)
84
85	Type() string
86	// Returns a unique reference ID to the index instance.
87	// For inmem, returns a reference to the backing Index, not ShardIndex.
88	UniqueReferenceID() uintptr
89
90	Rebuild()
91}
92
93// SeriesElem represents a generic series element.
94type SeriesElem interface {
95	Name() []byte
96	Tags() models.Tags
97	Deleted() bool
98
99	// InfluxQL expression associated with series during filtering.
100	Expr() influxql.Expr
101}
102
103// SeriesIterator represents a iterator over a list of series.
104type SeriesIterator interface {
105	Close() error
106	Next() (SeriesElem, error)
107}
108
109// NewSeriesIteratorAdapter returns an adapter for converting series ids to series.
110func NewSeriesIteratorAdapter(sfile *SeriesFile, itr SeriesIDIterator) SeriesIterator {
111	return &seriesIteratorAdapter{
112		sfile: sfile,
113		itr:   itr,
114	}
115}
116
117type seriesIteratorAdapter struct {
118	sfile *SeriesFile
119	itr   SeriesIDIterator
120}
121
122func (itr *seriesIteratorAdapter) Close() error { return itr.itr.Close() }
123
124func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
125	for {
126		elem, err := itr.itr.Next()
127		if err != nil {
128			return nil, err
129		} else if elem.SeriesID == 0 {
130			return nil, nil
131		}
132
133		// Skip if this key has been tombstoned.
134		key := itr.sfile.SeriesKey(elem.SeriesID)
135		if len(key) == 0 {
136			continue
137		}
138
139		name, tags := ParseSeriesKey(key)
140		deleted := itr.sfile.IsDeleted(elem.SeriesID)
141		return &seriesElemAdapter{
142			name:    name,
143			tags:    tags,
144			deleted: deleted,
145			expr:    elem.Expr,
146		}, nil
147	}
148}
149
150type seriesElemAdapter struct {
151	name    []byte
152	tags    models.Tags
153	deleted bool
154	expr    influxql.Expr
155}
156
157func (e *seriesElemAdapter) Name() []byte        { return e.name }
158func (e *seriesElemAdapter) Tags() models.Tags   { return e.tags }
159func (e *seriesElemAdapter) Deleted() bool       { return e.deleted }
160func (e *seriesElemAdapter) Expr() influxql.Expr { return e.expr }
161
162// SeriesIDElem represents a single series and optional expression.
163type SeriesIDElem struct {
164	SeriesID uint64
165	Expr     influxql.Expr
166}
167
168// SeriesIDElems represents a list of series id elements.
169type SeriesIDElems []SeriesIDElem
170
171func (a SeriesIDElems) Len() int           { return len(a) }
172func (a SeriesIDElems) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
173func (a SeriesIDElems) Less(i, j int) bool { return a[i].SeriesID < a[j].SeriesID }
174
175// SeriesIDIterator represents a iterator over a list of series ids.
176type SeriesIDIterator interface {
177	Next() (SeriesIDElem, error)
178	Close() error
179}
180
181// SeriesIDSetIterator represents an iterator that can produce a SeriesIDSet.
182type SeriesIDSetIterator interface {
183	SeriesIDIterator
184	SeriesIDSet() *SeriesIDSet
185}
186
187type seriesIDSetIterator struct {
188	ss     *SeriesIDSet
189	itr    SeriesIDSetIterable
190	closer io.Closer
191}
192
193func NewSeriesIDSetIterator(ss *SeriesIDSet) SeriesIDSetIterator {
194	if ss == nil || ss.bitmap == nil {
195		return nil
196	}
197	return &seriesIDSetIterator{ss: ss, itr: ss.Iterator()}
198}
199
200func NewSeriesIDSetIteratorWithCloser(ss *SeriesIDSet, closer io.Closer) SeriesIDSetIterator {
201	if ss == nil || ss.bitmap == nil {
202		return nil
203	}
204	return &seriesIDSetIterator{ss: ss, itr: ss.Iterator(), closer: closer}
205}
206
207func (itr *seriesIDSetIterator) Next() (SeriesIDElem, error) {
208	if !itr.itr.HasNext() {
209		return SeriesIDElem{}, nil
210	}
211	return SeriesIDElem{SeriesID: uint64(itr.itr.Next())}, nil
212}
213
214func (itr *seriesIDSetIterator) Close() error {
215	if itr.closer != nil {
216		return itr.closer.Close()
217	}
218	return nil
219}
220
221func (itr *seriesIDSetIterator) SeriesIDSet() *SeriesIDSet { return itr.ss }
222
223type SeriesIDSetIterators []SeriesIDSetIterator
224
225func (a SeriesIDSetIterators) Close() (err error) {
226	for i := range a {
227		if e := a[i].Close(); e != nil && err == nil {
228			err = e
229		}
230	}
231	return err
232}
233
234// NewSeriesIDSetIterators returns a slice of SeriesIDSetIterator if all itrs
235// can be type casted. Otherwise returns nil.
236func NewSeriesIDSetIterators(itrs []SeriesIDIterator) []SeriesIDSetIterator {
237	if len(itrs) == 0 {
238		return nil
239	}
240
241	a := make([]SeriesIDSetIterator, len(itrs))
242	for i := range itrs {
243		if itr, ok := itrs[i].(SeriesIDSetIterator); ok {
244			a[i] = itr
245		} else {
246			return nil
247		}
248	}
249	return a
250}
251
252// ReadAllSeriesIDIterator returns all ids from the iterator.
253func ReadAllSeriesIDIterator(itr SeriesIDIterator) ([]uint64, error) {
254	if itr == nil {
255		return nil, nil
256	}
257
258	var a []uint64
259	for {
260		e, err := itr.Next()
261		if err != nil {
262			return nil, err
263		} else if e.SeriesID == 0 {
264			break
265		}
266		a = append(a, e.SeriesID)
267	}
268	return a, nil
269}
270
271// NewSeriesIDSliceIterator returns a SeriesIDIterator that iterates over a slice.
272func NewSeriesIDSliceIterator(ids []uint64) *SeriesIDSliceIterator {
273	return &SeriesIDSliceIterator{ids: ids}
274}
275
276// SeriesIDSliceIterator iterates over a slice of series ids.
277type SeriesIDSliceIterator struct {
278	ids []uint64
279}
280
281// Next returns the next series id in the slice.
282func (itr *SeriesIDSliceIterator) Next() (SeriesIDElem, error) {
283	if len(itr.ids) == 0 {
284		return SeriesIDElem{}, nil
285	}
286	id := itr.ids[0]
287	itr.ids = itr.ids[1:]
288	return SeriesIDElem{SeriesID: id}, nil
289}
290
291func (itr *SeriesIDSliceIterator) Close() error { return nil }
292
293// SeriesIDSet returns a set of all remaining ids.
294func (itr *SeriesIDSliceIterator) SeriesIDSet() *SeriesIDSet {
295	s := NewSeriesIDSet()
296	for _, id := range itr.ids {
297		s.AddNoLock(id)
298	}
299	return s
300}
301
302type SeriesIDIterators []SeriesIDIterator
303
304func (a SeriesIDIterators) Close() (err error) {
305	for i := range a {
306		if e := a[i].Close(); e != nil && err == nil {
307			err = e
308		}
309	}
310	return err
311}
312
313func (a SeriesIDIterators) filterNonNil() []SeriesIDIterator {
314	other := make([]SeriesIDIterator, 0, len(a))
315	for _, itr := range a {
316		if itr == nil {
317			continue
318		}
319		other = append(other, itr)
320	}
321	return other
322}
323
324// seriesQueryAdapterIterator adapts SeriesIDIterator to an influxql.Iterator.
325type seriesQueryAdapterIterator struct {
326	once     sync.Once
327	sfile    *SeriesFile
328	itr      SeriesIDIterator
329	fieldset *MeasurementFieldSet
330	opt      query.IteratorOptions
331
332	point query.FloatPoint // reusable point
333}
334
335// NewSeriesQueryAdapterIterator returns a new instance of SeriesQueryAdapterIterator.
336func NewSeriesQueryAdapterIterator(sfile *SeriesFile, itr SeriesIDIterator, fieldset *MeasurementFieldSet, opt query.IteratorOptions) query.Iterator {
337	return &seriesQueryAdapterIterator{
338		sfile:    sfile,
339		itr:      itr,
340		fieldset: fieldset,
341		point: query.FloatPoint{
342			Aux: make([]interface{}, len(opt.Aux)),
343		},
344		opt: opt,
345	}
346}
347
348// Stats returns stats about the points processed.
349func (itr *seriesQueryAdapterIterator) Stats() query.IteratorStats { return query.IteratorStats{} }
350
351// Close closes the iterator.
352func (itr *seriesQueryAdapterIterator) Close() error {
353	itr.once.Do(func() {
354		itr.itr.Close()
355	})
356	return nil
357}
358
359// Next emits the next point in the iterator.
360func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) {
361	for {
362		// Read next series element.
363		e, err := itr.itr.Next()
364		if err != nil {
365			return nil, err
366		} else if e.SeriesID == 0 {
367			return nil, nil
368		}
369
370		// Skip if key has been tombstoned.
371		seriesKey := itr.sfile.SeriesKey(e.SeriesID)
372		if len(seriesKey) == 0 {
373			continue
374		}
375
376		// Convert to a key.
377		name, tags := ParseSeriesKey(seriesKey)
378		key := string(models.MakeKey(name, tags))
379
380		// Write auxiliary fields.
381		for i, f := range itr.opt.Aux {
382			switch f.Val {
383			case "key":
384				itr.point.Aux[i] = key
385			}
386		}
387		return &itr.point, nil
388	}
389}
390
391// filterUndeletedSeriesIDIterator returns all series which are not deleted.
392type filterUndeletedSeriesIDIterator struct {
393	sfile *SeriesFile
394	itr   SeriesIDIterator
395}
396
397// FilterUndeletedSeriesIDIterator returns an iterator which filters all deleted series.
398func FilterUndeletedSeriesIDIterator(sfile *SeriesFile, itr SeriesIDIterator) SeriesIDIterator {
399	if itr == nil {
400		return nil
401	}
402	return &filterUndeletedSeriesIDIterator{sfile: sfile, itr: itr}
403}
404
405func (itr *filterUndeletedSeriesIDIterator) Close() error {
406	return itr.itr.Close()
407}
408
409func (itr *filterUndeletedSeriesIDIterator) Next() (SeriesIDElem, error) {
410	for {
411		e, err := itr.itr.Next()
412		if err != nil {
413			return SeriesIDElem{}, err
414		} else if e.SeriesID == 0 {
415			return SeriesIDElem{}, nil
416		} else if itr.sfile.IsDeleted(e.SeriesID) {
417			continue
418		}
419		return e, nil
420	}
421}
422
423// seriesIDExprIterator is an iterator that attaches an associated expression.
424type seriesIDExprIterator struct {
425	itr  SeriesIDIterator
426	expr influxql.Expr
427}
428
429// newSeriesIDExprIterator returns a new instance of seriesIDExprIterator.
430func newSeriesIDExprIterator(itr SeriesIDIterator, expr influxql.Expr) SeriesIDIterator {
431	if itr == nil {
432		return nil
433	}
434
435	return &seriesIDExprIterator{
436		itr:  itr,
437		expr: expr,
438	}
439}
440
441func (itr *seriesIDExprIterator) Close() error {
442	return itr.itr.Close()
443}
444
445// Next returns the next element in the iterator.
446func (itr *seriesIDExprIterator) Next() (SeriesIDElem, error) {
447	elem, err := itr.itr.Next()
448	if err != nil {
449		return SeriesIDElem{}, err
450	} else if elem.SeriesID == 0 {
451		return SeriesIDElem{}, nil
452	}
453	elem.Expr = itr.expr
454	return elem, nil
455}
456
457// MergeSeriesIDIterators returns an iterator that merges a set of iterators.
458// Iterators that are first in the list take precedence and a deletion by those
459// early iterators will invalidate elements by later iterators.
460func MergeSeriesIDIterators(itrs ...SeriesIDIterator) SeriesIDIterator {
461	if n := len(itrs); n == 0 {
462		return nil
463	} else if n == 1 {
464		return itrs[0]
465	}
466	itrs = SeriesIDIterators(itrs).filterNonNil()
467
468	// Merge as series id sets, if available.
469	if a := NewSeriesIDSetIterators(itrs); a != nil {
470		sets := make([]*SeriesIDSet, len(a))
471		for i := range a {
472			sets[i] = a[i].SeriesIDSet()
473		}
474
475		ss := NewSeriesIDSet()
476		ss.Merge(sets...)
477
478		// Attach underlying iterators as the closer
479		return NewSeriesIDSetIteratorWithCloser(ss, SeriesIDSetIterators(a))
480	}
481
482	return &seriesIDMergeIterator{
483		buf:  make([]SeriesIDElem, len(itrs)),
484		itrs: itrs,
485	}
486}
487
488// seriesIDMergeIterator is an iterator that merges multiple iterators together.
489type seriesIDMergeIterator struct {
490	buf  []SeriesIDElem
491	itrs []SeriesIDIterator
492}
493
494func (itr *seriesIDMergeIterator) Close() error {
495	SeriesIDIterators(itr.itrs).Close()
496	return nil
497}
498
499// Next returns the element with the next lowest name/tags across the iterators.
500func (itr *seriesIDMergeIterator) Next() (SeriesIDElem, error) {
501	// Find next lowest id amongst the buffers.
502	var elem SeriesIDElem
503	for i := range itr.buf {
504		buf := &itr.buf[i]
505
506		// Fill buffer.
507		if buf.SeriesID == 0 {
508			elem, err := itr.itrs[i].Next()
509			if err != nil {
510				return SeriesIDElem{}, nil
511			} else if elem.SeriesID == 0 {
512				continue
513			}
514			itr.buf[i] = elem
515		}
516
517		if elem.SeriesID == 0 || buf.SeriesID < elem.SeriesID {
518			elem = *buf
519		}
520	}
521
522	// Return EOF if no elements remaining.
523	if elem.SeriesID == 0 {
524		return SeriesIDElem{}, nil
525	}
526
527	// Clear matching buffers.
528	for i := range itr.buf {
529		if itr.buf[i].SeriesID == elem.SeriesID {
530			itr.buf[i].SeriesID = 0
531		}
532	}
533	return elem, nil
534}
535
536// IntersectSeriesIDIterators returns an iterator that only returns series which
537// occur in both iterators. If both series have associated expressions then
538// they are combined together.
539func IntersectSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
540	if itr0 == nil || itr1 == nil {
541		if itr0 != nil {
542			itr0.Close()
543		}
544		if itr1 != nil {
545			itr1.Close()
546		}
547		return nil
548	}
549
550	// Create series id set, if available.
551	if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
552		return NewSeriesIDSetIteratorWithCloser(a[0].SeriesIDSet().And(a[1].SeriesIDSet()), SeriesIDSetIterators(a))
553	}
554
555	return &seriesIDIntersectIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
556}
557
558// seriesIDIntersectIterator is an iterator that merges two iterators together.
559type seriesIDIntersectIterator struct {
560	buf  [2]SeriesIDElem
561	itrs [2]SeriesIDIterator
562}
563
564func (itr *seriesIDIntersectIterator) Close() (err error) {
565	if e := itr.itrs[0].Close(); e != nil && err == nil {
566		err = e
567	}
568	if e := itr.itrs[1].Close(); e != nil && err == nil {
569		err = e
570	}
571	return err
572}
573
574// Next returns the next element which occurs in both iterators.
575func (itr *seriesIDIntersectIterator) Next() (_ SeriesIDElem, err error) {
576	for {
577		// Fill buffers.
578		if itr.buf[0].SeriesID == 0 {
579			if itr.buf[0], err = itr.itrs[0].Next(); err != nil {
580				return SeriesIDElem{}, err
581			}
582		}
583		if itr.buf[1].SeriesID == 0 {
584			if itr.buf[1], err = itr.itrs[1].Next(); err != nil {
585				return SeriesIDElem{}, err
586			}
587		}
588
589		// Exit if either buffer is still empty.
590		if itr.buf[0].SeriesID == 0 || itr.buf[1].SeriesID == 0 {
591			return SeriesIDElem{}, nil
592		}
593
594		// Skip if both series are not equal.
595		if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a < b {
596			itr.buf[0].SeriesID = 0
597			continue
598		} else if a > b {
599			itr.buf[1].SeriesID = 0
600			continue
601		}
602
603		// Merge series together if equal.
604		elem := itr.buf[0]
605
606		// Attach expression.
607		expr0 := itr.buf[0].Expr
608		expr1 := itr.buf[1].Expr
609		if expr0 == nil {
610			elem.Expr = expr1
611		} else if expr1 == nil {
612			elem.Expr = expr0
613		} else {
614			elem.Expr = influxql.Reduce(&influxql.BinaryExpr{
615				Op:  influxql.AND,
616				LHS: expr0,
617				RHS: expr1,
618			}, nil)
619		}
620
621		itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0
622		return elem, nil
623	}
624}
625
626// UnionSeriesIDIterators returns an iterator that returns series from both
627// both iterators. If both series have associated expressions then they are
628// combined together.
629func UnionSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
630	// Return other iterator if either one is nil.
631	if itr0 == nil {
632		return itr1
633	} else if itr1 == nil {
634		return itr0
635	}
636
637	// Create series id set, if available.
638	if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
639		ss := NewSeriesIDSet()
640		ss.Merge(a[0].SeriesIDSet(), a[1].SeriesIDSet())
641		return NewSeriesIDSetIteratorWithCloser(ss, SeriesIDSetIterators(a))
642	}
643
644	return &seriesIDUnionIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
645}
646
647// seriesIDUnionIterator is an iterator that unions two iterators together.
648type seriesIDUnionIterator struct {
649	buf  [2]SeriesIDElem
650	itrs [2]SeriesIDIterator
651}
652
653func (itr *seriesIDUnionIterator) Close() (err error) {
654	if e := itr.itrs[0].Close(); e != nil && err == nil {
655		err = e
656	}
657	if e := itr.itrs[1].Close(); e != nil && err == nil {
658		err = e
659	}
660	return err
661}
662
663// Next returns the next element which occurs in both iterators.
664func (itr *seriesIDUnionIterator) Next() (_ SeriesIDElem, err error) {
665	// Fill buffers.
666	if itr.buf[0].SeriesID == 0 {
667		if itr.buf[0], err = itr.itrs[0].Next(); err != nil {
668			return SeriesIDElem{}, err
669		}
670	}
671	if itr.buf[1].SeriesID == 0 {
672		if itr.buf[1], err = itr.itrs[1].Next(); err != nil {
673			return SeriesIDElem{}, err
674		}
675	}
676
677	// Return non-zero or lesser series.
678	if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a == 0 && b == 0 {
679		return SeriesIDElem{}, nil
680	} else if b == 0 || (a != 0 && a < b) {
681		elem := itr.buf[0]
682		itr.buf[0].SeriesID = 0
683		return elem, nil
684	} else if a == 0 || (b != 0 && a > b) {
685		elem := itr.buf[1]
686		itr.buf[1].SeriesID = 0
687		return elem, nil
688	}
689
690	// Attach element.
691	elem := itr.buf[0]
692
693	// Attach expression.
694	expr0 := itr.buf[0].Expr
695	expr1 := itr.buf[1].Expr
696	if expr0 != nil && expr1 != nil {
697		elem.Expr = influxql.Reduce(&influxql.BinaryExpr{
698			Op:  influxql.OR,
699			LHS: expr0,
700			RHS: expr1,
701		}, nil)
702	} else {
703		elem.Expr = nil
704	}
705
706	itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0
707	return elem, nil
708}
709
710// DifferenceSeriesIDIterators returns an iterator that only returns series which
711// occur the first iterator but not the second iterator.
712func DifferenceSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
713	if itr0 == nil && itr1 == nil {
714		return nil
715	} else if itr1 == nil {
716		return itr0
717	} else if itr0 == nil {
718		itr1.Close()
719		return nil
720	}
721
722	// Create series id set, if available.
723	if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
724		return NewSeriesIDSetIteratorWithCloser(a[0].SeriesIDSet().AndNot(a[1].SeriesIDSet()), SeriesIDSetIterators(a))
725	}
726
727	return &seriesIDDifferenceIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
728}
729
730// seriesIDDifferenceIterator is an iterator that merges two iterators together.
731type seriesIDDifferenceIterator struct {
732	buf  [2]SeriesIDElem
733	itrs [2]SeriesIDIterator
734}
735
736func (itr *seriesIDDifferenceIterator) Close() (err error) {
737	if e := itr.itrs[0].Close(); e != nil && err == nil {
738		err = e
739	}
740	if e := itr.itrs[1].Close(); e != nil && err == nil {
741		err = e
742	}
743	return err
744}
745
746// Next returns the next element which occurs only in the first iterator.
747func (itr *seriesIDDifferenceIterator) Next() (_ SeriesIDElem, err error) {
748	for {
749		// Fill buffers.
750		if itr.buf[0].SeriesID == 0 {
751			if itr.buf[0], err = itr.itrs[0].Next(); err != nil {
752				return SeriesIDElem{}, err
753			}
754		}
755		if itr.buf[1].SeriesID == 0 {
756			if itr.buf[1], err = itr.itrs[1].Next(); err != nil {
757				return SeriesIDElem{}, err
758			}
759		}
760
761		// Exit if first buffer is still empty.
762		if itr.buf[0].SeriesID == 0 {
763			return SeriesIDElem{}, nil
764		} else if itr.buf[1].SeriesID == 0 {
765			elem := itr.buf[0]
766			itr.buf[0].SeriesID = 0
767			return elem, nil
768		}
769
770		// Return first series if it's less.
771		// If second series is less then skip it.
772		// If both series are equal then skip both.
773		if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a < b {
774			elem := itr.buf[0]
775			itr.buf[0].SeriesID = 0
776			return elem, nil
777		} else if a > b {
778			itr.buf[1].SeriesID = 0
779			continue
780		} else {
781			itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0
782			continue
783		}
784	}
785}
786
787// seriesPointIterator adapts SeriesIterator to an influxql.Iterator.
788type seriesPointIterator struct {
789	once     sync.Once
790	indexSet IndexSet
791	mitr     MeasurementIterator
792	keys     [][]byte
793	opt      query.IteratorOptions
794
795	point query.FloatPoint // reusable point
796}
797
798// newSeriesPointIterator returns a new instance of seriesPointIterator.
799func NewSeriesPointIterator(indexSet IndexSet, opt query.IteratorOptions) (_ query.Iterator, err error) {
800	// Only equality operators are allowed.
801	influxql.WalkFunc(opt.Condition, func(n influxql.Node) {
802		switch n := n.(type) {
803		case *influxql.BinaryExpr:
804			switch n.Op {
805			case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX,
806				influxql.OR, influxql.AND:
807			default:
808				err = errors.New("invalid tag comparison operator")
809			}
810		}
811	})
812	if err != nil {
813		return nil, err
814	}
815
816	mitr, err := indexSet.MeasurementIterator()
817	if err != nil {
818		return nil, err
819	}
820
821	return &seriesPointIterator{
822		indexSet: indexSet,
823		mitr:     mitr,
824		point: query.FloatPoint{
825			Aux: make([]interface{}, len(opt.Aux)),
826		},
827		opt: opt,
828	}, nil
829}
830
831// Stats returns stats about the points processed.
832func (itr *seriesPointIterator) Stats() query.IteratorStats { return query.IteratorStats{} }
833
834// Close closes the iterator.
835func (itr *seriesPointIterator) Close() (err error) {
836	itr.once.Do(func() {
837		if itr.mitr != nil {
838			err = itr.mitr.Close()
839		}
840	})
841	return err
842}
843
844// Next emits the next point in the iterator.
845func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
846	for {
847		// Read series keys for next measurement if no more keys remaining.
848		// Exit if there are no measurements remaining.
849		if len(itr.keys) == 0 {
850			m, err := itr.mitr.Next()
851			if err != nil {
852				return nil, err
853			} else if m == nil {
854				return nil, nil
855			}
856
857			if err := itr.readSeriesKeys(m); err != nil {
858				return nil, err
859			}
860			continue
861		}
862
863		name, tags := ParseSeriesKey(itr.keys[0])
864		itr.keys = itr.keys[1:]
865
866		// TODO(edd): It seems to me like this authorisation check should be
867		// further down in the index. At this point we're going to be filtering
868		// series that have already been materialised in the LogFiles and
869		// IndexFiles.
870		if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) {
871			continue
872		}
873
874		// Convert to a key.
875		key := string(models.MakeKey(name, tags))
876
877		// Write auxiliary fields.
878		for i, f := range itr.opt.Aux {
879			switch f.Val {
880			case "key":
881				itr.point.Aux[i] = key
882			}
883		}
884
885		return &itr.point, nil
886	}
887}
888
889func (itr *seriesPointIterator) readSeriesKeys(name []byte) error {
890	sitr, err := itr.indexSet.MeasurementSeriesByExprIterator(name, itr.opt.Condition)
891	if err != nil {
892		return err
893	} else if sitr == nil {
894		return nil
895	}
896	defer sitr.Close()
897
898	// Slurp all series keys.
899	itr.keys = itr.keys[:0]
900	for i := 0; ; i++ {
901		elem, err := sitr.Next()
902		if err != nil {
903			return err
904		} else if elem.SeriesID == 0 {
905			break
906		}
907
908		// Periodically check for interrupt.
909		if i&0xFF == 0xFF {
910			select {
911			case <-itr.opt.InterruptCh:
912				return itr.Close()
913			default:
914			}
915		}
916
917		key := itr.indexSet.SeriesFile.SeriesKey(elem.SeriesID)
918		if len(key) == 0 {
919			continue
920		}
921		itr.keys = append(itr.keys, key)
922	}
923
924	// Sort keys.
925	sort.Sort(seriesKeys(itr.keys))
926	return nil
927}
928
929// MeasurementIterator represents a iterator over a list of measurements.
930type MeasurementIterator interface {
931	Close() error
932	Next() ([]byte, error)
933}
934
935type MeasurementIterators []MeasurementIterator
936
937func (a MeasurementIterators) Close() (err error) {
938	for i := range a {
939		if e := a[i].Close(); e != nil && err == nil {
940			err = e
941		}
942	}
943	return err
944}
945
946type measurementSliceIterator struct {
947	names [][]byte
948}
949
950// NewMeasurementSliceIterator returns an iterator over a slice of in-memory measurement names.
951func NewMeasurementSliceIterator(names [][]byte) *measurementSliceIterator {
952	return &measurementSliceIterator{names: names}
953}
954
955func (itr *measurementSliceIterator) Close() (err error) { return nil }
956
957func (itr *measurementSliceIterator) Next() (name []byte, err error) {
958	if len(itr.names) == 0 {
959		return nil, nil
960	}
961	name, itr.names = itr.names[0], itr.names[1:]
962	return name, nil
963}
964
965// MergeMeasurementIterators returns an iterator that merges a set of iterators.
966// Iterators that are first in the list take precedence and a deletion by those
967// early iterators will invalidate elements by later iterators.
968func MergeMeasurementIterators(itrs ...MeasurementIterator) MeasurementIterator {
969	if len(itrs) == 0 {
970		return nil
971	} else if len(itrs) == 1 {
972		return itrs[0]
973	}
974
975	return &measurementMergeIterator{
976		buf:  make([][]byte, len(itrs)),
977		itrs: itrs,
978	}
979}
980
981type measurementMergeIterator struct {
982	buf  [][]byte
983	itrs []MeasurementIterator
984}
985
986func (itr *measurementMergeIterator) Close() (err error) {
987	for i := range itr.itrs {
988		if e := itr.itrs[i].Close(); e != nil && err == nil {
989			err = e
990		}
991	}
992	return err
993}
994
995// Next returns the element with the next lowest name across the iterators.
996//
997// If multiple iterators contain the same name then the first is returned
998// and the remaining ones are skipped.
999func (itr *measurementMergeIterator) Next() (_ []byte, err error) {
1000	// Find next lowest name amongst the buffers.
1001	var name []byte
1002	for i, buf := range itr.buf {
1003		// Fill buffer if empty.
1004		if buf == nil {
1005			if buf, err = itr.itrs[i].Next(); err != nil {
1006				return nil, err
1007			} else if buf != nil {
1008				itr.buf[i] = buf
1009			} else {
1010				continue
1011			}
1012		}
1013
1014		// Find next lowest name.
1015		if name == nil || bytes.Compare(itr.buf[i], name) == -1 {
1016			name = itr.buf[i]
1017		}
1018	}
1019
1020	// Return nil if no elements remaining.
1021	if name == nil {
1022		return nil, nil
1023	}
1024
1025	// Merge all elements together and clear buffers.
1026	for i, buf := range itr.buf {
1027		if buf == nil || !bytes.Equal(buf, name) {
1028			continue
1029		}
1030		itr.buf[i] = nil
1031	}
1032	return name, nil
1033}
1034
1035// TagKeyIterator represents a iterator over a list of tag keys.
1036type TagKeyIterator interface {
1037	Close() error
1038	Next() ([]byte, error)
1039}
1040
1041type TagKeyIterators []TagKeyIterator
1042
1043func (a TagKeyIterators) Close() (err error) {
1044	for i := range a {
1045		if e := a[i].Close(); e != nil && err == nil {
1046			err = e
1047		}
1048	}
1049	return err
1050}
1051
1052// NewTagKeySliceIterator returns a TagKeyIterator that iterates over a slice.
1053func NewTagKeySliceIterator(keys [][]byte) *tagKeySliceIterator {
1054	return &tagKeySliceIterator{keys: keys}
1055}
1056
1057// tagKeySliceIterator iterates over a slice of tag keys.
1058type tagKeySliceIterator struct {
1059	keys [][]byte
1060}
1061
1062// Next returns the next tag key in the slice.
1063func (itr *tagKeySliceIterator) Next() ([]byte, error) {
1064	if len(itr.keys) == 0 {
1065		return nil, nil
1066	}
1067	key := itr.keys[0]
1068	itr.keys = itr.keys[1:]
1069	return key, nil
1070}
1071
1072func (itr *tagKeySliceIterator) Close() error { return nil }
1073
1074// MergeTagKeyIterators returns an iterator that merges a set of iterators.
1075func MergeTagKeyIterators(itrs ...TagKeyIterator) TagKeyIterator {
1076	if len(itrs) == 0 {
1077		return nil
1078	} else if len(itrs) == 1 {
1079		return itrs[0]
1080	}
1081
1082	return &tagKeyMergeIterator{
1083		buf:  make([][]byte, len(itrs)),
1084		itrs: itrs,
1085	}
1086}
1087
1088type tagKeyMergeIterator struct {
1089	buf  [][]byte
1090	itrs []TagKeyIterator
1091}
1092
1093func (itr *tagKeyMergeIterator) Close() error {
1094	for i := range itr.itrs {
1095		itr.itrs[i].Close()
1096	}
1097	return nil
1098}
1099
1100// Next returns the element with the next lowest key across the iterators.
1101//
1102// If multiple iterators contain the same key then the first is returned
1103// and the remaining ones are skipped.
1104func (itr *tagKeyMergeIterator) Next() (_ []byte, err error) {
1105	// Find next lowest key amongst the buffers.
1106	var key []byte
1107	for i, buf := range itr.buf {
1108		// Fill buffer.
1109		if buf == nil {
1110			if buf, err = itr.itrs[i].Next(); err != nil {
1111				return nil, err
1112			} else if buf != nil {
1113				itr.buf[i] = buf
1114			} else {
1115				continue
1116			}
1117		}
1118
1119		// Find next lowest key.
1120		if key == nil || bytes.Compare(buf, key) == -1 {
1121			key = buf
1122		}
1123	}
1124
1125	// Return nil if no elements remaining.
1126	if key == nil {
1127		return nil, nil
1128	}
1129
1130	// Merge elements and clear buffers.
1131	for i, buf := range itr.buf {
1132		if buf == nil || !bytes.Equal(buf, key) {
1133			continue
1134		}
1135		itr.buf[i] = nil
1136	}
1137	return key, nil
1138}
1139
1140// TagValueIterator represents a iterator over a list of tag values.
1141type TagValueIterator interface {
1142	Close() error
1143	Next() ([]byte, error)
1144}
1145
1146type TagValueIterators []TagValueIterator
1147
1148func (a TagValueIterators) Close() (err error) {
1149	for i := range a {
1150		if e := a[i].Close(); e != nil && err == nil {
1151			err = e
1152		}
1153	}
1154	return err
1155}
1156
1157// NewTagValueSliceIterator returns a TagValueIterator that iterates over a slice.
1158func NewTagValueSliceIterator(values [][]byte) *tagValueSliceIterator {
1159	return &tagValueSliceIterator{values: values}
1160}
1161
1162// tagValueSliceIterator iterates over a slice of tag values.
1163type tagValueSliceIterator struct {
1164	values [][]byte
1165}
1166
1167// Next returns the next tag value in the slice.
1168func (itr *tagValueSliceIterator) Next() ([]byte, error) {
1169	if len(itr.values) == 0 {
1170		return nil, nil
1171	}
1172	value := itr.values[0]
1173	itr.values = itr.values[1:]
1174	return value, nil
1175}
1176
1177func (itr *tagValueSliceIterator) Close() error { return nil }
1178
1179// MergeTagValueIterators returns an iterator that merges a set of iterators.
1180func MergeTagValueIterators(itrs ...TagValueIterator) TagValueIterator {
1181	if len(itrs) == 0 {
1182		return nil
1183	} else if len(itrs) == 1 {
1184		return itrs[0]
1185	}
1186
1187	return &tagValueMergeIterator{
1188		buf:  make([][]byte, len(itrs)),
1189		itrs: itrs,
1190	}
1191}
1192
1193type tagValueMergeIterator struct {
1194	buf  [][]byte
1195	itrs []TagValueIterator
1196}
1197
1198func (itr *tagValueMergeIterator) Close() error {
1199	for i := range itr.itrs {
1200		itr.itrs[i].Close()
1201	}
1202	return nil
1203}
1204
1205// Next returns the element with the next lowest value across the iterators.
1206//
1207// If multiple iterators contain the same value then the first is returned
1208// and the remaining ones are skipped.
1209func (itr *tagValueMergeIterator) Next() (_ []byte, err error) {
1210	// Find next lowest value amongst the buffers.
1211	var value []byte
1212	for i, buf := range itr.buf {
1213		// Fill buffer.
1214		if buf == nil {
1215			if buf, err = itr.itrs[i].Next(); err != nil {
1216				return nil, err
1217			} else if buf != nil {
1218				itr.buf[i] = buf
1219			} else {
1220				continue
1221			}
1222		}
1223
1224		// Find next lowest value.
1225		if value == nil || bytes.Compare(buf, value) == -1 {
1226			value = buf
1227		}
1228	}
1229
1230	// Return nil if no elements remaining.
1231	if value == nil {
1232		return nil, nil
1233	}
1234
1235	// Merge elements and clear buffers.
1236	for i, buf := range itr.buf {
1237		if buf == nil || !bytes.Equal(buf, value) {
1238			continue
1239		}
1240		itr.buf[i] = nil
1241	}
1242	return value, nil
1243}
1244
1245// IndexSet represents a list of indexes, all belonging to one database.
1246type IndexSet struct {
1247	Indexes    []Index                // The set of indexes comprising this IndexSet.
1248	SeriesFile *SeriesFile            // The Series File associated with the db for this set.
1249	fieldSets  []*MeasurementFieldSet // field sets for _all_ indexes in this set's DB.
1250}
1251
1252// HasInmemIndex returns true if any in-memory index is in use.
1253func (is IndexSet) HasInmemIndex() bool {
1254	for _, idx := range is.Indexes {
1255		if idx.Type() == InmemIndexName {
1256			return true
1257		}
1258	}
1259	return false
1260}
1261
1262// Database returns the database name of the first index.
1263func (is IndexSet) Database() string {
1264	if len(is.Indexes) == 0 {
1265		return ""
1266	}
1267	return is.Indexes[0].Database()
1268}
1269
1270// HasField determines if any of the field sets on the set of indexes in the
1271// IndexSet have the provided field for the provided measurement.
1272func (is IndexSet) HasField(measurement []byte, field string) bool {
1273	if len(is.Indexes) == 0 {
1274		return false
1275	}
1276
1277	if len(is.fieldSets) == 0 {
1278		// field sets may not have been initialised yet.
1279		is.fieldSets = make([]*MeasurementFieldSet, 0, len(is.Indexes))
1280		for _, idx := range is.Indexes {
1281			is.fieldSets = append(is.fieldSets, idx.FieldSet())
1282		}
1283	}
1284
1285	for _, fs := range is.fieldSets {
1286		if fs.Fields(measurement).HasField(field) {
1287			return true
1288		}
1289	}
1290	return false
1291}
1292
1293// DedupeInmemIndexes returns an index set which removes duplicate indexes.
1294// Useful because inmem indexes are shared by shards per database.
1295func (is IndexSet) DedupeInmemIndexes() IndexSet {
1296	other := IndexSet{
1297		Indexes:    make([]Index, 0, len(is.Indexes)),
1298		SeriesFile: is.SeriesFile,
1299		fieldSets:  make([]*MeasurementFieldSet, 0, len(is.Indexes)),
1300	}
1301
1302	uniqueIndexes := make(map[uintptr]Index)
1303	for _, idx := range is.Indexes {
1304		uniqueIndexes[idx.UniqueReferenceID()] = idx
1305	}
1306
1307	for _, idx := range uniqueIndexes {
1308		other.Indexes = append(other.Indexes, idx)
1309		other.fieldSets = append(other.fieldSets, idx.FieldSet())
1310	}
1311
1312	return other
1313}
1314
1315// MeasurementNamesByExpr returns a slice of measurement names matching the
1316// provided condition. If no condition is provided then all names are returned.
1317func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
1318	release := is.SeriesFile.Retain()
1319	defer release()
1320
1321	// Return filtered list if expression exists.
1322	if expr != nil {
1323		names, err := is.measurementNamesByExpr(auth, expr)
1324		if err != nil {
1325			return nil, err
1326		}
1327		return slices.CopyChunkedByteSlices(names, 1000), nil
1328	}
1329
1330	itr, err := is.measurementIterator()
1331	if err != nil {
1332		return nil, err
1333	} else if itr == nil {
1334		return nil, nil
1335	}
1336	defer itr.Close()
1337
1338	// Iterate over all measurements if no condition exists.
1339	var names [][]byte
1340	for {
1341		e, err := itr.Next()
1342		if err != nil {
1343			return nil, err
1344		} else if e == nil {
1345			break
1346		}
1347
1348		// Determine if there exists at least one authorised series for the
1349		// measurement name.
1350		if is.measurementAuthorizedSeries(auth, e, nil) {
1351			names = append(names, e)
1352		}
1353	}
1354	return slices.CopyChunkedByteSlices(names, 1000), nil
1355}
1356
1357func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
1358	if expr == nil {
1359		return nil, nil
1360	}
1361
1362	switch e := expr.(type) {
1363	case *influxql.BinaryExpr:
1364		switch e.Op {
1365		case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
1366			tag, ok := e.LHS.(*influxql.VarRef)
1367			if !ok {
1368				return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
1369			}
1370
1371			// Retrieve value or regex expression from RHS.
1372			var value string
1373			var regex *regexp.Regexp
1374			if influxql.IsRegexOp(e.Op) {
1375				re, ok := e.RHS.(*influxql.RegexLiteral)
1376				if !ok {
1377					return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
1378				}
1379				regex = re.Val
1380			} else {
1381				s, ok := e.RHS.(*influxql.StringLiteral)
1382				if !ok {
1383					return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
1384				}
1385				value = s.Val
1386			}
1387
1388			// Match on name, if specified.
1389			if tag.Val == "_name" {
1390				return is.measurementNamesByNameFilter(auth, e.Op, value, regex)
1391			} else if influxql.IsSystemName(tag.Val) {
1392				return nil, nil
1393			}
1394			return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex)
1395
1396		case influxql.OR, influxql.AND:
1397			lhs, err := is.measurementNamesByExpr(auth, e.LHS)
1398			if err != nil {
1399				return nil, err
1400			}
1401
1402			rhs, err := is.measurementNamesByExpr(auth, e.RHS)
1403			if err != nil {
1404				return nil, err
1405			}
1406
1407			if e.Op == influxql.OR {
1408				return bytesutil.Union(lhs, rhs), nil
1409			}
1410			return bytesutil.Intersect(lhs, rhs), nil
1411
1412		default:
1413			return nil, fmt.Errorf("invalid tag comparison operator")
1414		}
1415
1416	case *influxql.ParenExpr:
1417		return is.measurementNamesByExpr(auth, e.Expr)
1418	default:
1419		return nil, fmt.Errorf("%#v", expr)
1420	}
1421}
1422
1423// measurementNamesByNameFilter returns matching measurement names in sorted order.
1424func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) {
1425	itr, err := is.measurementIterator()
1426	if err != nil {
1427		return nil, err
1428	} else if itr == nil {
1429		return nil, nil
1430	}
1431	defer itr.Close()
1432
1433	var names [][]byte
1434	for {
1435		e, err := itr.Next()
1436		if err != nil {
1437			return nil, err
1438		} else if e == nil {
1439			break
1440		}
1441
1442		var matched bool
1443		switch op {
1444		case influxql.EQ:
1445			matched = string(e) == val
1446		case influxql.NEQ:
1447			matched = string(e) != val
1448		case influxql.EQREGEX:
1449			matched = regex.Match(e)
1450		case influxql.NEQREGEX:
1451			matched = !regex.Match(e)
1452		}
1453
1454		if matched && is.measurementAuthorizedSeries(auth, e, nil) {
1455			names = append(names, e)
1456		}
1457	}
1458	bytesutil.Sort(names)
1459	return names, nil
1460}
1461
1462// MeasurementNamesByPredicate returns a slice of measurement names matching the
1463// provided condition. If no condition is provided then all names are returned.
1464// This behaves differently from MeasurementNamesByExpr because it will
1465// return measurements using flux predicates.
1466func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
1467	release := is.SeriesFile.Retain()
1468	defer release()
1469
1470	// Return filtered list if expression exists.
1471	if expr != nil {
1472		names, err := is.measurementNamesByPredicate(auth, expr)
1473		if err != nil {
1474			return nil, err
1475		}
1476		return slices.CopyChunkedByteSlices(names, 1000), nil
1477	}
1478
1479	itr, err := is.measurementIterator()
1480	if err != nil {
1481		return nil, err
1482	} else if itr == nil {
1483		return nil, nil
1484	}
1485	defer itr.Close()
1486
1487	// Iterate over all measurements if no condition exists.
1488	var names [][]byte
1489	for {
1490		e, err := itr.Next()
1491		if err != nil {
1492			return nil, err
1493		} else if e == nil {
1494			break
1495		}
1496
1497		// Determine if there exists at least one authorised series for the
1498		// measurement name.
1499		if is.measurementAuthorizedSeries(auth, e, nil) {
1500			names = append(names, e)
1501		}
1502	}
1503	return slices.CopyChunkedByteSlices(names, 1000), nil
1504}
1505
1506func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
1507	if expr == nil {
1508		return nil, nil
1509	}
1510
1511	switch e := expr.(type) {
1512	case *influxql.BinaryExpr:
1513		switch e.Op {
1514		case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
1515			tag, ok := e.LHS.(*influxql.VarRef)
1516			if !ok {
1517				return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
1518			}
1519
1520			// Retrieve value or regex expression from RHS.
1521			var value string
1522			var regex *regexp.Regexp
1523			if influxql.IsRegexOp(e.Op) {
1524				re, ok := e.RHS.(*influxql.RegexLiteral)
1525				if !ok {
1526					return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
1527				}
1528				regex = re.Val
1529			} else {
1530				s, ok := e.RHS.(*influxql.StringLiteral)
1531				if !ok {
1532					return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
1533				}
1534				value = s.Val
1535			}
1536
1537			// Match on name, if specified.
1538			if tag.Val == "_name" {
1539				return is.measurementNamesByNameFilter(auth, e.Op, value, regex)
1540			} else if influxql.IsSystemName(tag.Val) {
1541				return nil, nil
1542			}
1543			return is.measurementNamesByTagPredicate(auth, e.Op, tag.Val, value, regex)
1544
1545		case influxql.OR, influxql.AND:
1546			lhs, err := is.measurementNamesByPredicate(auth, e.LHS)
1547			if err != nil {
1548				return nil, err
1549			}
1550
1551			rhs, err := is.measurementNamesByPredicate(auth, e.RHS)
1552			if err != nil {
1553				return nil, err
1554			}
1555
1556			if e.Op == influxql.OR {
1557				return bytesutil.Union(lhs, rhs), nil
1558			}
1559			return bytesutil.Intersect(lhs, rhs), nil
1560
1561		default:
1562			return nil, fmt.Errorf("invalid tag comparison operator")
1563		}
1564
1565	case *influxql.ParenExpr:
1566		return is.measurementNamesByPredicate(auth, e.Expr)
1567	default:
1568		return nil, fmt.Errorf("%#v", expr)
1569	}
1570}
1571
1572func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
1573	var names [][]byte
1574
1575	mitr, err := is.measurementIterator()
1576	if err != nil {
1577		return nil, err
1578	} else if mitr == nil {
1579		return nil, nil
1580	}
1581	defer mitr.Close()
1582
1583	// valEqual determines if the provided []byte is equal to the tag value
1584	// to be filtered on.
1585	valEqual := regex.Match
1586	if op == influxql.EQ || op == influxql.NEQ {
1587		vb := []byte(val)
1588		valEqual = func(b []byte) bool { return bytes.Equal(vb, b) }
1589	}
1590
1591	var tagMatch bool
1592	var authorized bool
1593	for {
1594		me, err := mitr.Next()
1595		if err != nil {
1596			return nil, err
1597		} else if me == nil {
1598			break
1599		}
1600		// If the measurement doesn't have the tag key, then it won't be considered.
1601		if ok, err := is.hasTagKey(me, []byte(key)); err != nil {
1602			return nil, err
1603		} else if !ok {
1604			continue
1605		}
1606		tagMatch = false
1607		// Authorization must be explicitly granted when an authorizer is present.
1608		authorized = query.AuthorizerIsOpen(auth)
1609
1610		vitr, err := is.tagValueIterator(me, []byte(key))
1611		if err != nil {
1612			return nil, err
1613		}
1614
1615		if vitr != nil {
1616			defer vitr.Close()
1617			for {
1618				ve, err := vitr.Next()
1619				if err != nil {
1620					return nil, err
1621				} else if ve == nil {
1622					break
1623				}
1624				if !valEqual(ve) {
1625					continue
1626				}
1627
1628				tagMatch = true
1629				if query.AuthorizerIsOpen(auth) {
1630					break
1631				}
1632
1633				// When an authorizer is present, the measurement should be
1634				// included only if one of it's series is authorized.
1635				sitr, err := is.tagValueSeriesIDIterator(me, []byte(key), ve)
1636				if err != nil {
1637					return nil, err
1638				} else if sitr == nil {
1639					continue
1640				}
1641				defer sitr.Close()
1642				sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
1643
1644				// Locate a series with this matching tag value that's authorized.
1645				for {
1646					se, err := sitr.Next()
1647					if err != nil {
1648						return nil, err
1649					}
1650
1651					if se.SeriesID == 0 {
1652						break
1653					}
1654
1655					name, tags := is.SeriesFile.Series(se.SeriesID)
1656					if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
1657						authorized = true
1658						break
1659					}
1660				}
1661
1662				if err := sitr.Close(); err != nil {
1663					return nil, err
1664				}
1665
1666				if tagMatch && authorized {
1667					// The measurement can definitely be included or rejected.
1668					break
1669				}
1670			}
1671			if err := vitr.Close(); err != nil {
1672				return nil, err
1673			}
1674		}
1675
1676		// For negation operators, to determine if the measurement is authorized,
1677		// an authorized series belonging to the measurement must be located.
1678		// Then, the measurement can be added iff !tagMatch && authorized.
1679		if (op == influxql.NEQ || op == influxql.NEQREGEX) && !tagMatch {
1680			authorized = is.measurementAuthorizedSeries(auth, me, nil)
1681		}
1682
1683		// tags match | operation is EQ | measurement matches
1684		// --------------------------------------------------
1685		//     True   |       True      |      True
1686		//     True   |       False     |      False
1687		//     False  |       True      |      False
1688		//     False  |       False     |      True
1689		if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) && authorized {
1690			names = append(names, me)
1691			continue
1692		}
1693	}
1694
1695	bytesutil.Sort(names)
1696	return names, nil
1697}
1698
1699func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
1700	var names [][]byte
1701
1702	mitr, err := is.measurementIterator()
1703	if err != nil {
1704		return nil, err
1705	} else if mitr == nil {
1706		return nil, nil
1707	}
1708	defer mitr.Close()
1709
1710	var checkMeasurement func(auth query.Authorizer, me []byte) (bool, error)
1711	switch op {
1712	case influxql.EQ:
1713		checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
1714			return is.measurementHasTagValue(auth, me, []byte(key), []byte(val))
1715		}
1716	case influxql.NEQ:
1717		checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
1718			// If there is an authorized series in this measurement and that series
1719			// does not contain the tag key/value.
1720			ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool {
1721				return tags.GetString(key) == val
1722			})
1723			return ok, nil
1724		}
1725	case influxql.EQREGEX:
1726		checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
1727			return is.measurementHasTagValueRegex(auth, me, []byte(key), regex)
1728		}
1729	case influxql.NEQREGEX:
1730		checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
1731			// If there is an authorized series in this measurement and that series
1732			// does not contain the tag key/value.
1733			ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool {
1734				return regex.MatchString(tags.GetString(key))
1735			})
1736			return ok, nil
1737		}
1738	default:
1739		return nil, fmt.Errorf("unsupported operand: %s", op)
1740	}
1741
1742	for {
1743		me, err := mitr.Next()
1744		if err != nil {
1745			return nil, err
1746		} else if me == nil {
1747			break
1748		}
1749
1750		ok, err := checkMeasurement(auth, me)
1751		if err != nil {
1752			return nil, err
1753		} else if ok {
1754			names = append(names, me)
1755		}
1756	}
1757
1758	bytesutil.Sort(names)
1759	return names, nil
1760}
1761
1762// measurementAuthorizedSeries determines if the measurement contains a series
1763// that is authorized to be read.
1764func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byte, exclude func(tags models.Tags) bool) bool {
1765	if query.AuthorizerIsOpen(auth) && exclude == nil {
1766		return true
1767	}
1768
1769	if auth == nil {
1770		auth = query.OpenAuthorizer
1771	}
1772
1773	sitr, err := is.measurementSeriesIDIterator(name)
1774	if err != nil || sitr == nil {
1775		return false
1776	}
1777	defer sitr.Close()
1778	sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
1779
1780	for {
1781		series, err := sitr.Next()
1782		if err != nil {
1783			return false
1784		}
1785
1786		if series.SeriesID == 0 {
1787			return false // End of iterator
1788		}
1789
1790		name, tags := is.SeriesFile.Series(series.SeriesID)
1791		if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
1792			if exclude != nil && exclude(tags) {
1793				continue
1794			}
1795			return true
1796		}
1797	}
1798}
1799
1800func (is IndexSet) measurementHasTagValue(auth query.Authorizer, me, key, value []byte) (bool, error) {
1801	if len(value) == 0 {
1802		return is.measurementHasEmptyTagValue(auth, me, key)
1803	}
1804
1805	hasTagValue, err := is.HasTagValue(me, key, value)
1806	if err != nil || !hasTagValue {
1807		return false, err
1808	}
1809
1810	// If the authorizer is open, return true.
1811	if query.AuthorizerIsOpen(auth) {
1812		return true, nil
1813	}
1814
1815	// When an authorizer is present, the measurement should be
1816	// included only if one of it's series is authorized.
1817	sitr, err := is.tagValueSeriesIDIterator(me, key, value)
1818	if err != nil || sitr == nil {
1819		return false, err
1820	}
1821	defer sitr.Close()
1822	sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
1823
1824	// Locate a series with this matching tag value that's authorized.
1825	for {
1826		se, err := sitr.Next()
1827		if err != nil || se.SeriesID == 0 {
1828			return false, err
1829		}
1830
1831		name, tags := is.SeriesFile.Series(se.SeriesID)
1832		if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
1833			return true, nil
1834		}
1835	}
1836}
1837
1838func (is IndexSet) measurementHasEmptyTagValue(auth query.Authorizer, me, key []byte) (bool, error) {
1839	// Any series that does not have a tag key
1840	// has an empty tag value for that key.
1841	// Iterate through all of the series to find one
1842	// series that does not have the tag key.
1843	sitr, err := is.measurementSeriesIDIterator(me)
1844	if err != nil || sitr == nil {
1845		return false, err
1846	}
1847	defer sitr.Close()
1848	sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
1849
1850	for {
1851		series, err := sitr.Next()
1852		if err != nil || series.SeriesID == 0 {
1853			return false, err
1854		}
1855
1856		name, tags := is.SeriesFile.Series(series.SeriesID)
1857		if len(tags.Get(key)) > 0 {
1858			// The tag key exists in this series. We need
1859			// at least one series that does not have the tag
1860			// keys.
1861			continue
1862		}
1863
1864		// Verify that we can see this series.
1865		if query.AuthorizerIsOpen(auth) {
1866			return true, nil
1867		} else if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
1868			return true, nil
1869		}
1870	}
1871}
1872
1873func (is IndexSet) measurementHasTagValueRegex(auth query.Authorizer, me, key []byte, value *regexp.Regexp) (bool, error) {
1874	// If the regex matches the empty string, do a special check to see
1875	// if we have an empty tag value.
1876	if matchEmpty := value.MatchString(""); matchEmpty {
1877		if ok, err := is.measurementHasEmptyTagValue(auth, me, key); err != nil {
1878			return false, err
1879		} else if ok {
1880			return true, nil
1881		}
1882	}
1883
1884	// Iterate over the tag values and find one that matches the value.
1885	vitr, err := is.tagValueIterator(me, key)
1886	if err != nil || vitr == nil {
1887		return false, err
1888	}
1889	defer vitr.Close()
1890
1891	for {
1892		ve, err := vitr.Next()
1893		if err != nil || ve == nil {
1894			return false, err
1895		}
1896
1897		if !value.Match(ve) {
1898			// The regex does not match this tag value.
1899			continue
1900		}
1901
1902		// If the authorizer is open, then we have found a suitable tag value.
1903		if query.AuthorizerIsOpen(auth) {
1904			return true, nil
1905		}
1906
1907		// When an authorizer is present, the measurement should only be included
1908		// if one of the series is authorized.
1909		if authorized, err := func() (bool, error) {
1910			sitr, err := is.tagValueSeriesIDIterator(me, key, ve)
1911			if err != nil || sitr == nil {
1912				return false, err
1913			}
1914			defer sitr.Close()
1915			sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
1916
1917			// Locate an authorized series.
1918			for {
1919				se, err := sitr.Next()
1920				if err != nil || se.SeriesID == 0 {
1921					return false, err
1922				}
1923
1924				name, tags := is.SeriesFile.Series(se.SeriesID)
1925				if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
1926					return true, nil
1927				}
1928			}
1929		}(); err != nil {
1930			return false, err
1931		} else if authorized {
1932			return true, nil
1933		}
1934	}
1935}
1936
1937// HasTagKey returns true if the tag key exists in any index for the provided
1938// measurement.
1939func (is IndexSet) HasTagKey(name, key []byte) (bool, error) {
1940	return is.hasTagKey(name, key)
1941}
1942
1943// hasTagKey returns true if the tag key exists in any index for the provided
1944// measurement, and guarantees to never take a lock on the series file.
1945func (is IndexSet) hasTagKey(name, key []byte) (bool, error) {
1946	for _, idx := range is.Indexes {
1947		if ok, err := idx.HasTagKey(name, key); err != nil {
1948			return false, err
1949		} else if ok {
1950			return true, nil
1951		}
1952	}
1953	return false, nil
1954}
1955
1956// HasTagValue returns true if the tag value exists in any index for the provided
1957// measurement and tag key.
1958func (is IndexSet) HasTagValue(name, key, value []byte) (bool, error) {
1959	for _, idx := range is.Indexes {
1960		if ok, err := idx.HasTagValue(name, key, value); err != nil {
1961			return false, err
1962		} else if ok {
1963			return true, nil
1964		}
1965	}
1966	return false, nil
1967}
1968
1969// MeasurementIterator returns an iterator over all measurements in the index.
1970func (is IndexSet) MeasurementIterator() (MeasurementIterator, error) {
1971	return is.measurementIterator()
1972}
1973
1974// measurementIterator returns an iterator over all measurements in the index.
1975// It guarantees to never take any locks on the underlying series file.
1976func (is IndexSet) measurementIterator() (MeasurementIterator, error) {
1977	a := make([]MeasurementIterator, 0, len(is.Indexes))
1978	for _, idx := range is.Indexes {
1979		itr, err := idx.MeasurementIterator()
1980		if err != nil {
1981			MeasurementIterators(a).Close()
1982			return nil, err
1983		} else if itr != nil {
1984			a = append(a, itr)
1985		}
1986	}
1987	return MergeMeasurementIterators(a...), nil
1988}
1989
1990// TagKeyIterator returns a key iterator for a measurement.
1991func (is IndexSet) TagKeyIterator(name []byte) (TagKeyIterator, error) {
1992	return is.tagKeyIterator(name)
1993}
1994
1995// tagKeyIterator returns a key iterator for a measurement. It guarantees to never
1996// take any locks on the underlying series file.
1997func (is IndexSet) tagKeyIterator(name []byte) (TagKeyIterator, error) {
1998	a := make([]TagKeyIterator, 0, len(is.Indexes))
1999	for _, idx := range is.Indexes {
2000		itr, err := idx.TagKeyIterator(name)
2001		if err != nil {
2002			TagKeyIterators(a).Close()
2003			return nil, err
2004		} else if itr != nil {
2005			a = append(a, itr)
2006		}
2007	}
2008	return MergeTagKeyIterators(a...), nil
2009}
2010
2011// TagValueIterator returns a value iterator for a tag key.
2012func (is IndexSet) TagValueIterator(name, key []byte) (TagValueIterator, error) {
2013	return is.tagValueIterator(name, key)
2014}
2015
2016// tagValueIterator returns a value iterator for a tag key. It guarantees to never
2017// take any locks on the underlying series file.
2018func (is IndexSet) tagValueIterator(name, key []byte) (TagValueIterator, error) {
2019	a := make([]TagValueIterator, 0, len(is.Indexes))
2020	for _, idx := range is.Indexes {
2021		itr, err := idx.TagValueIterator(name, key)
2022		if err != nil {
2023			TagValueIterators(a).Close()
2024			return nil, err
2025		} else if itr != nil {
2026			a = append(a, itr)
2027		}
2028	}
2029	return MergeTagValueIterators(a...), nil
2030}
2031
2032// TagKeyHasAuthorizedSeries determines if there exists an authorized series for
2033// the provided measurement name and tag key.
2034func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey []byte) (bool, error) {
2035	if !is.HasInmemIndex() && query.AuthorizerIsOpen(auth) {
2036		return true, nil
2037	}
2038
2039	release := is.SeriesFile.Retain()
2040	defer release()
2041
2042	itr, err := is.tagKeySeriesIDIterator(name, tagKey)
2043	if err != nil {
2044		return false, err
2045	} else if itr == nil {
2046		return false, nil
2047	}
2048	defer itr.Close()
2049	itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr)
2050
2051	for {
2052		e, err := itr.Next()
2053		if err != nil {
2054			return false, err
2055		}
2056
2057		if e.SeriesID == 0 {
2058			return false, nil
2059		}
2060
2061		if query.AuthorizerIsOpen(auth) {
2062			return true, nil
2063		}
2064
2065		name, tags := is.SeriesFile.Series(e.SeriesID)
2066		if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
2067			return true, nil
2068		}
2069	}
2070}
2071
2072// MeasurementSeriesIDIterator returns an iterator over all non-tombstoned series
2073// for the provided measurement.
2074func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) {
2075	release := is.SeriesFile.Retain()
2076	defer release()
2077
2078	itr, err := is.measurementSeriesIDIterator(name)
2079	if err != nil {
2080		return nil, err
2081	}
2082	return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
2083}
2084
2085// measurementSeriesIDIterator does not provide any locking on the Series file.
2086//
2087// See  MeasurementSeriesIDIterator for more details.
2088func (is IndexSet) measurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) {
2089	a := make([]SeriesIDIterator, 0, len(is.Indexes))
2090	for _, idx := range is.Indexes {
2091		itr, err := idx.MeasurementSeriesIDIterator(name)
2092		if err != nil {
2093			SeriesIDIterators(a).Close()
2094			return nil, err
2095		} else if itr != nil {
2096			a = append(a, itr)
2097		}
2098	}
2099	return MergeSeriesIDIterators(a...), nil
2100}
2101
2102// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies
2103// the provided function.
2104func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
2105	release := is.SeriesFile.Retain()
2106	defer release()
2107
2108	itr, err := is.tagKeyIterator(name)
2109	if err != nil {
2110		return err
2111	} else if itr == nil {
2112		return nil
2113	}
2114	defer itr.Close()
2115
2116	for {
2117		key, err := itr.Next()
2118		if err != nil {
2119			return err
2120		} else if key == nil {
2121			return nil
2122		}
2123
2124		if err := fn(key); err != nil {
2125			return err
2126		}
2127	}
2128}
2129
2130// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
2131func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
2132	release := is.SeriesFile.Retain()
2133	defer release()
2134
2135	keys := make(map[string]struct{})
2136	for _, idx := range is.Indexes {
2137		m, err := idx.MeasurementTagKeysByExpr(name, expr)
2138		if err != nil {
2139			return nil, err
2140		}
2141		for k := range m {
2142			keys[k] = struct{}{}
2143		}
2144	}
2145	return keys, nil
2146}
2147
2148// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
2149func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) {
2150	release := is.SeriesFile.Retain()
2151	defer release()
2152
2153	itr, err := is.tagKeySeriesIDIterator(name, key)
2154	if err != nil {
2155		return nil, err
2156	}
2157	return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
2158}
2159
2160// tagKeySeriesIDIterator returns a series iterator for all values across a
2161// single key.
2162//
2163// It guarantees to never take any locks on the series file.
2164func (is IndexSet) tagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) {
2165	a := make([]SeriesIDIterator, 0, len(is.Indexes))
2166	for _, idx := range is.Indexes {
2167		itr, err := idx.TagKeySeriesIDIterator(name, key)
2168		if err != nil {
2169			SeriesIDIterators(a).Close()
2170			return nil, err
2171		} else if itr != nil {
2172			a = append(a, itr)
2173		}
2174	}
2175	return MergeSeriesIDIterators(a...), nil
2176}
2177
2178// TagValueSeriesIDIterator returns a series iterator for a single tag value.
2179func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) {
2180	release := is.SeriesFile.Retain()
2181	defer release()
2182
2183	itr, err := is.tagValueSeriesIDIterator(name, key, value)
2184	if err != nil {
2185		return nil, err
2186	}
2187	return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
2188}
2189
2190// tagValueSeriesIDIterator does not provide any locking on the Series File.
2191//
2192// See TagValueSeriesIDIterator for more details.
2193func (is IndexSet) tagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) {
2194	a := make([]SeriesIDIterator, 0, len(is.Indexes))
2195	for _, idx := range is.Indexes {
2196		itr, err := idx.TagValueSeriesIDIterator(name, key, value)
2197		if err != nil {
2198			SeriesIDIterators(a).Close()
2199			return nil, err
2200		} else if itr != nil {
2201			a = append(a, itr)
2202		}
2203	}
2204	return MergeSeriesIDIterators(a...), nil
2205}
2206
2207// MeasurementSeriesByExprIterator returns a series iterator for a measurement
2208// that is filtered by expr. If expr only contains time expressions then this
2209// call is equivalent to MeasurementSeriesIDIterator().
2210func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) {
2211	release := is.SeriesFile.Retain()
2212	defer release()
2213	return is.measurementSeriesByExprIterator(name, expr)
2214}
2215
2216// measurementSeriesByExprIterator returns a series iterator for a measurement
2217// that is filtered by expr. See MeasurementSeriesByExprIterator for more details.
2218//
2219// measurementSeriesByExprIterator guarantees to never take any locks on the
2220// series file.
2221func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) {
2222	// Return all series for the measurement if there are no tag expressions.
2223	if expr == nil {
2224		itr, err := is.measurementSeriesIDIterator(name)
2225		if err != nil {
2226			return nil, err
2227		}
2228		return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
2229	}
2230
2231	itr, err := is.seriesByExprIterator(name, expr)
2232	if err != nil {
2233		return nil, err
2234	}
2235	return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
2236}
2237
2238// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
2239func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
2240	release := is.SeriesFile.Retain()
2241	defer release()
2242
2243	// Create iterator for all matching series.
2244	itr, err := is.measurementSeriesByExprIterator(name, expr)
2245	if err != nil {
2246		return nil, err
2247	} else if itr == nil {
2248		return nil, nil
2249	}
2250	defer itr.Close()
2251
2252	// measurementSeriesByExprIterator filters deleted series; no need to do so here.
2253
2254	// Iterate over all series and generate keys.
2255	var keys [][]byte
2256	for {
2257		e, err := itr.Next()
2258		if err != nil {
2259			return nil, err
2260		} else if e.SeriesID == 0 {
2261			break
2262		}
2263
2264		// Check for unsupported field filters.
2265		// Any remaining filters means there were fields (e.g., `WHERE value = 1.2`).
2266		if e.Expr != nil {
2267			if v, ok := e.Expr.(*influxql.BooleanLiteral); !ok || !v.Val {
2268				return nil, errors.New("fields not supported in WHERE clause during deletion")
2269			}
2270		}
2271
2272		seriesKey := is.SeriesFile.SeriesKey(e.SeriesID)
2273		if len(seriesKey) == 0 {
2274			continue
2275		}
2276
2277		name, tags := ParseSeriesKey(seriesKey)
2278		keys = append(keys, models.MakeKey(name, tags))
2279	}
2280
2281	bytesutil.Sort(keys)
2282
2283	return keys, nil
2284}
2285
2286func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) {
2287	switch expr := expr.(type) {
2288	case *influxql.BinaryExpr:
2289		switch expr.Op {
2290		case influxql.AND, influxql.OR:
2291			// Get the series IDs and filter expressions for the LHS.
2292			litr, err := is.seriesByExprIterator(name, expr.LHS)
2293			if err != nil {
2294				return nil, err
2295			}
2296
2297			// Get the series IDs and filter expressions for the RHS.
2298			ritr, err := is.seriesByExprIterator(name, expr.RHS)
2299			if err != nil {
2300				if litr != nil {
2301					litr.Close()
2302				}
2303				return nil, err
2304			}
2305
2306			// Intersect iterators if expression is "AND".
2307			if expr.Op == influxql.AND {
2308				return IntersectSeriesIDIterators(litr, ritr), nil
2309			}
2310
2311			// Union iterators if expression is "OR".
2312			return UnionSeriesIDIterators(litr, ritr), nil
2313
2314		default:
2315			return is.seriesByBinaryExprIterator(name, expr)
2316		}
2317
2318	case *influxql.ParenExpr:
2319		return is.seriesByExprIterator(name, expr.Expr)
2320
2321	case *influxql.BooleanLiteral:
2322		if expr.Val {
2323			return is.measurementSeriesIDIterator(name)
2324		}
2325		return nil, nil
2326
2327	default:
2328		return nil, nil
2329	}
2330}
2331
2332// seriesByBinaryExprIterator returns a series iterator and a filtering expression.
2333func (is IndexSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr) (SeriesIDIterator, error) {
2334	// If this binary expression has another binary expression, then this
2335	// is some expression math and we should just pass it to the underlying query.
2336	if _, ok := n.LHS.(*influxql.BinaryExpr); ok {
2337		itr, err := is.measurementSeriesIDIterator(name)
2338		if err != nil {
2339			return nil, err
2340		}
2341		return newSeriesIDExprIterator(itr, n), nil
2342	} else if _, ok := n.RHS.(*influxql.BinaryExpr); ok {
2343		itr, err := is.measurementSeriesIDIterator(name)
2344		if err != nil {
2345			return nil, err
2346		}
2347		return newSeriesIDExprIterator(itr, n), nil
2348	}
2349
2350	// Retrieve the variable reference from the correct side of the expression.
2351	key, ok := n.LHS.(*influxql.VarRef)
2352	value := n.RHS
2353	if !ok {
2354		key, ok = n.RHS.(*influxql.VarRef)
2355		if !ok {
2356			// This is an expression we do not know how to evaluate. Let the
2357			// query engine take care of this.
2358			itr, err := is.measurementSeriesIDIterator(name)
2359			if err != nil {
2360				return nil, err
2361			}
2362			return newSeriesIDExprIterator(itr, n), nil
2363		}
2364		value = n.LHS
2365	}
2366
2367	// For fields, return all series from this measurement.
2368	if key.Val != "_name" && ((key.Type == influxql.Unknown && is.HasField(name, key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) {
2369		itr, err := is.measurementSeriesIDIterator(name)
2370		if err != nil {
2371			return nil, err
2372		}
2373		return newSeriesIDExprIterator(itr, n), nil
2374	} else if value, ok := value.(*influxql.VarRef); ok {
2375		// Check if the RHS is a variable and if it is a field.
2376		if value.Val != "_name" && ((value.Type == influxql.Unknown && is.HasField(name, value.Val)) || key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) {
2377			itr, err := is.measurementSeriesIDIterator(name)
2378			if err != nil {
2379				return nil, err
2380			}
2381			return newSeriesIDExprIterator(itr, n), nil
2382		}
2383	}
2384
2385	// Create iterator based on value type.
2386	switch value := value.(type) {
2387	case *influxql.StringLiteral:
2388		return is.seriesByBinaryExprStringIterator(name, []byte(key.Val), []byte(value.Val), n.Op)
2389	case *influxql.RegexLiteral:
2390		return is.seriesByBinaryExprRegexIterator(name, []byte(key.Val), value.Val, n.Op)
2391	case *influxql.VarRef:
2392		return is.seriesByBinaryExprVarRefIterator(name, []byte(key.Val), value, n.Op)
2393	default:
2394		// We do not know how to evaluate this expression so pass it
2395		// on to the query engine.
2396		itr, err := is.measurementSeriesIDIterator(name)
2397		if err != nil {
2398			return nil, err
2399		}
2400		return newSeriesIDExprIterator(itr, n), nil
2401	}
2402}
2403
2404func (is IndexSet) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIDIterator, error) {
2405	// Special handling for "_name" to match measurement name.
2406	if bytes.Equal(key, []byte("_name")) {
2407		if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) {
2408			return is.measurementSeriesIDIterator(name)
2409		}
2410		return nil, nil
2411	}
2412
2413	if op == influxql.EQ {
2414		// Match a specific value.
2415		if len(value) != 0 {
2416			return is.tagValueSeriesIDIterator(name, key, value)
2417		}
2418
2419		mitr, err := is.measurementSeriesIDIterator(name)
2420		if err != nil {
2421			return nil, err
2422		}
2423
2424		kitr, err := is.tagKeySeriesIDIterator(name, key)
2425		if err != nil {
2426			if mitr != nil {
2427				mitr.Close()
2428			}
2429			return nil, err
2430		}
2431
2432		// Return all measurement series that have no values from this tag key.
2433		return DifferenceSeriesIDIterators(mitr, kitr), nil
2434	}
2435
2436	// Return all measurement series without this tag value.
2437	if len(value) != 0 {
2438		mitr, err := is.measurementSeriesIDIterator(name)
2439		if err != nil {
2440			return nil, err
2441		}
2442
2443		vitr, err := is.tagValueSeriesIDIterator(name, key, value)
2444		if err != nil {
2445			if mitr != nil {
2446				mitr.Close()
2447			}
2448			return nil, err
2449		}
2450
2451		return DifferenceSeriesIDIterators(mitr, vitr), nil
2452	}
2453
2454	// Return all series across all values of this tag key.
2455	return is.tagKeySeriesIDIterator(name, key)
2456}
2457
2458func (is IndexSet) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIDIterator, error) {
2459	// Special handling for "_name" to match measurement name.
2460	if bytes.Equal(key, []byte("_name")) {
2461		match := value.Match(name)
2462		if (op == influxql.EQREGEX && match) || (op == influxql.NEQREGEX && !match) {
2463			mitr, err := is.measurementSeriesIDIterator(name)
2464			if err != nil {
2465				return nil, err
2466			}
2467			return newSeriesIDExprIterator(mitr, &influxql.BooleanLiteral{Val: true}), nil
2468		}
2469		return nil, nil
2470	}
2471	return is.matchTagValueSeriesIDIterator(name, key, value, op == influxql.EQREGEX)
2472}
2473
2474func (is IndexSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIDIterator, error) {
2475	itr0, err := is.tagKeySeriesIDIterator(name, key)
2476	if err != nil {
2477		return nil, err
2478	}
2479
2480	itr1, err := is.tagKeySeriesIDIterator(name, []byte(value.Val))
2481	if err != nil {
2482		if itr0 != nil {
2483			itr0.Close()
2484		}
2485		return nil, err
2486	}
2487
2488	if op == influxql.EQ {
2489		return IntersectSeriesIDIterators(itr0, itr1), nil
2490	}
2491	return DifferenceSeriesIDIterators(itr0, itr1), nil
2492}
2493
2494// MatchTagValueSeriesIDIterator returns a series iterator for tags which match value.
2495// If matches is false, returns iterators which do not match value.
2496func (is IndexSet) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) {
2497	release := is.SeriesFile.Retain()
2498	defer release()
2499	itr, err := is.matchTagValueSeriesIDIterator(name, key, value, matches)
2500	if err != nil {
2501		return nil, err
2502	}
2503	return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
2504}
2505
2506// matchTagValueSeriesIDIterator returns a series iterator for tags which match
2507// value. See MatchTagValueSeriesIDIterator for more details.
2508//
2509// It guarantees to never take any locks on the underlying series file.
2510func (is IndexSet) matchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) {
2511	matchEmpty := value.MatchString("")
2512	if matches {
2513		if matchEmpty {
2514			return is.matchTagValueEqualEmptySeriesIDIterator(name, key, value)
2515		}
2516		return is.matchTagValueEqualNotEmptySeriesIDIterator(name, key, value)
2517	}
2518
2519	if matchEmpty {
2520		return is.matchTagValueNotEqualEmptySeriesIDIterator(name, key, value)
2521	}
2522	return is.matchTagValueNotEqualNotEmptySeriesIDIterator(name, key, value)
2523}
2524
2525func (is IndexSet) matchTagValueEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) {
2526	vitr, err := is.tagValueIterator(name, key)
2527	if err != nil {
2528		return nil, err
2529	} else if vitr == nil {
2530		return is.measurementSeriesIDIterator(name)
2531	}
2532	defer vitr.Close()
2533
2534	var itrs []SeriesIDIterator
2535	if err := func() error {
2536		for {
2537			e, err := vitr.Next()
2538			if err != nil {
2539				return err
2540			} else if e == nil {
2541				break
2542			}
2543
2544			if !value.Match(e) {
2545				itr, err := is.tagValueSeriesIDIterator(name, key, e)
2546				if err != nil {
2547					return err
2548				} else if itr != nil {
2549					itrs = append(itrs, itr)
2550				}
2551			}
2552		}
2553		return nil
2554	}(); err != nil {
2555		SeriesIDIterators(itrs).Close()
2556		return nil, err
2557	}
2558
2559	mitr, err := is.measurementSeriesIDIterator(name)
2560	if err != nil {
2561		SeriesIDIterators(itrs).Close()
2562		return nil, err
2563	}
2564
2565	return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil
2566}
2567
2568func (is IndexSet) matchTagValueEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) {
2569	vitr, err := is.tagValueIterator(name, key)
2570	if err != nil {
2571		return nil, err
2572	} else if vitr == nil {
2573		return nil, nil
2574	}
2575	defer vitr.Close()
2576
2577	var itrs []SeriesIDIterator
2578	for {
2579		e, err := vitr.Next()
2580		if err != nil {
2581			SeriesIDIterators(itrs).Close()
2582			return nil, err
2583		} else if e == nil {
2584			break
2585		}
2586
2587		if value.Match(e) {
2588			itr, err := is.tagValueSeriesIDIterator(name, key, e)
2589			if err != nil {
2590				SeriesIDIterators(itrs).Close()
2591				return nil, err
2592			} else if itr != nil {
2593				itrs = append(itrs, itr)
2594			}
2595		}
2596	}
2597	return MergeSeriesIDIterators(itrs...), nil
2598}
2599
2600func (is IndexSet) matchTagValueNotEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) {
2601	vitr, err := is.tagValueIterator(name, key)
2602	if err != nil {
2603		return nil, err
2604	} else if vitr == nil {
2605		return nil, nil
2606	}
2607	defer vitr.Close()
2608
2609	var itrs []SeriesIDIterator
2610	for {
2611		e, err := vitr.Next()
2612		if err != nil {
2613			SeriesIDIterators(itrs).Close()
2614			return nil, err
2615		} else if e == nil {
2616			break
2617		}
2618
2619		if !value.Match(e) {
2620			itr, err := is.tagValueSeriesIDIterator(name, key, e)
2621			if err != nil {
2622				SeriesIDIterators(itrs).Close()
2623				return nil, err
2624			} else if itr != nil {
2625				itrs = append(itrs, itr)
2626			}
2627		}
2628	}
2629	return MergeSeriesIDIterators(itrs...), nil
2630}
2631
2632func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) {
2633	vitr, err := is.tagValueIterator(name, key)
2634	if err != nil {
2635		return nil, err
2636	} else if vitr == nil {
2637		return is.measurementSeriesIDIterator(name)
2638	}
2639	defer vitr.Close()
2640
2641	var itrs []SeriesIDIterator
2642	for {
2643		e, err := vitr.Next()
2644		if err != nil {
2645			SeriesIDIterators(itrs).Close()
2646			return nil, err
2647		} else if e == nil {
2648			break
2649		}
2650		if value.Match(e) {
2651			itr, err := is.tagValueSeriesIDIterator(name, key, e)
2652			if err != nil {
2653				SeriesIDIterators(itrs).Close()
2654				return nil, err
2655			} else if itr != nil {
2656				itrs = append(itrs, itr)
2657			}
2658		}
2659	}
2660
2661	mitr, err := is.measurementSeriesIDIterator(name)
2662	if err != nil {
2663		SeriesIDIterators(itrs).Close()
2664		return nil, err
2665	}
2666	return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil
2667}
2668
2669// TagValuesByKeyAndExpr retrieves tag values for the provided tag keys.
2670//
2671// TagValuesByKeyAndExpr returns sets of values for each key, indexable by the
2672// position of the tag key in the keys argument.
2673//
2674// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending
2675// lexicographic order.
2676func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) {
2677	release := is.SeriesFile.Retain()
2678	defer release()
2679	return is.tagValuesByKeyAndExpr(auth, name, keys, expr)
2680}
2681
2682// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys. See
2683// TagValuesByKeyAndExpr for more details.
2684//
2685// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying
2686// series file.
2687func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) {
2688	database := is.Database()
2689
2690	valueExpr := influxql.CloneExpr(expr)
2691	valueExpr = influxql.Reduce(influxql.RewriteExpr(valueExpr, func(e influxql.Expr) influxql.Expr {
2692		switch e := e.(type) {
2693		case *influxql.BinaryExpr:
2694			switch e.Op {
2695			case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
2696				tag, ok := e.LHS.(*influxql.VarRef)
2697				if !ok || tag.Val != "value" {
2698					return nil
2699				}
2700			}
2701		}
2702		return e
2703	}), nil)
2704
2705	itr, err := is.seriesByExprIterator(name, expr)
2706	if err != nil {
2707		return nil, err
2708	} else if itr == nil {
2709		return nil, nil
2710	}
2711	itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr)
2712	defer itr.Close()
2713
2714	keyIdxs := make(map[string]int, len(keys))
2715	for ki, key := range keys {
2716		keyIdxs[key] = ki
2717
2718		// Check that keys are in order.
2719		if ki > 0 && key < keys[ki-1] {
2720			return nil, fmt.Errorf("keys %v are not in ascending order", keys)
2721		}
2722	}
2723
2724	resultSet := make([]map[string]struct{}, len(keys))
2725	for i := 0; i < len(resultSet); i++ {
2726		resultSet[i] = make(map[string]struct{})
2727	}
2728
2729	// Iterate all series to collect tag values.
2730	for {
2731		e, err := itr.Next()
2732		if err != nil {
2733			return nil, err
2734		} else if e.SeriesID == 0 {
2735			break
2736		}
2737
2738		buf := is.SeriesFile.SeriesKey(e.SeriesID)
2739		if len(buf) == 0 {
2740			continue
2741		}
2742
2743		if auth != nil {
2744			name, tags := ParseSeriesKey(buf)
2745			if !auth.AuthorizeSeriesRead(database, name, tags) {
2746				continue
2747			}
2748		}
2749
2750		_, buf = ReadSeriesKeyLen(buf)
2751		_, buf = ReadSeriesKeyMeasurement(buf)
2752		tagN, buf := ReadSeriesKeyTagN(buf)
2753		for i := 0; i < tagN; i++ {
2754			var key, value []byte
2755			key, value, buf = ReadSeriesKeyTag(buf)
2756			if valueExpr != nil {
2757				if !influxql.EvalBool(valueExpr, map[string]interface{}{"value": string(value)}) {
2758					continue
2759				}
2760			}
2761
2762			if idx, ok := keyIdxs[string(key)]; ok {
2763				resultSet[idx][string(value)] = struct{}{}
2764			} else if string(key) > keys[len(keys)-1] {
2765				// The tag key is > the largest key we're interested in.
2766				break
2767			}
2768		}
2769	}
2770	return resultSet, nil
2771}
2772
2773// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
2774func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
2775	if len(keys) == 0 {
2776		return nil, nil
2777	}
2778
2779	results := make([][]string, len(keys))
2780	// If the keys are not sorted, then sort them.
2781	if !keysSorted {
2782		sort.Strings(keys)
2783	}
2784
2785	release := is.SeriesFile.Retain()
2786	defer release()
2787
2788	// No expression means that the values shouldn't be filtered; so fetch them
2789	// all.
2790	if expr == nil {
2791		for ki, key := range keys {
2792			vitr, err := is.tagValueIterator(name, []byte(key))
2793			if err != nil {
2794				return nil, err
2795			} else if vitr == nil {
2796				break
2797			}
2798			defer vitr.Close()
2799
2800			// If no authorizer present then return all values.
2801			if query.AuthorizerIsOpen(auth) {
2802				for {
2803					val, err := vitr.Next()
2804					if err != nil {
2805						return nil, err
2806					} else if val == nil {
2807						break
2808					}
2809					results[ki] = append(results[ki], string(val))
2810				}
2811				continue
2812			}
2813
2814			// Authorization is present — check all series with matching tag values
2815			// and measurements for the presence of an authorized series.
2816			for {
2817				val, err := vitr.Next()
2818				if err != nil {
2819					return nil, err
2820				} else if val == nil {
2821					break
2822				}
2823
2824				sitr, err := is.tagValueSeriesIDIterator(name, []byte(key), val)
2825				if err != nil {
2826					return nil, err
2827				} else if sitr == nil {
2828					continue
2829				}
2830				defer sitr.Close()
2831				sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
2832
2833				for {
2834					se, err := sitr.Next()
2835					if err != nil {
2836						return nil, err
2837					}
2838
2839					if se.SeriesID == 0 {
2840						break
2841					}
2842
2843					name, tags := is.SeriesFile.Series(se.SeriesID)
2844					if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
2845						results[ki] = append(results[ki], string(val))
2846						break
2847					}
2848				}
2849				if err := sitr.Close(); err != nil {
2850					return nil, err
2851				}
2852			}
2853		}
2854		return results, nil
2855	}
2856
2857	// This is the case where we have filtered series by some WHERE condition.
2858	// We only care about the tag values for the keys given the
2859	// filtered set of series ids.
2860	resultSet, err := is.tagValuesByKeyAndExpr(auth, name, keys, expr)
2861	if err != nil {
2862		return nil, err
2863	}
2864
2865	// Convert result sets into []string
2866	for i, s := range resultSet {
2867		values := make([]string, 0, len(s))
2868		for v := range s {
2869			values = append(values, v)
2870		}
2871		sort.Strings(values)
2872		results[i] = values
2873	}
2874	return results, nil
2875}
2876
2877// TagSets returns an ordered list of tag sets for a measurement by dimension
2878// and filtered by an optional conditional expression.
2879func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
2880	release := is.SeriesFile.Retain()
2881	defer release()
2882
2883	itr, err := is.measurementSeriesByExprIterator(name, opt.Condition)
2884	if err != nil {
2885		return nil, err
2886	} else if itr == nil {
2887		return nil, nil
2888	}
2889	defer itr.Close()
2890	// measurementSeriesByExprIterator filters deleted series IDs; no need to
2891	// do so here.
2892
2893	var dims []string
2894	if len(opt.Dimensions) > 0 {
2895		dims = make([]string, len(opt.Dimensions))
2896		copy(dims, opt.Dimensions)
2897		sort.Strings(dims)
2898	}
2899
2900	// For every series, get the tag values for the requested tag keys i.e.
2901	// dimensions. This is the TagSet for that series. Series with the same
2902	// TagSet are then grouped together, because for the purpose of GROUP BY
2903	// they are part of the same composite series.
2904	tagSets := make(map[string]*query.TagSet, 64)
2905	var (
2906		seriesN, maxSeriesN int
2907		db                  = is.Database()
2908	)
2909
2910	if opt.MaxSeriesN > 0 {
2911		maxSeriesN = opt.MaxSeriesN
2912	} else {
2913		maxSeriesN = int(^uint(0) >> 1)
2914	}
2915
2916	// The tag sets require a string for each series key in the set, The series
2917	// file formatted keys need to be parsed into models format. Since they will
2918	// end up as strings we can re-use an intermediate buffer for this process.
2919	var keyBuf []byte
2920	var tagsBuf models.Tags // Buffer for tags. Tags are not needed outside of each loop iteration.
2921	for {
2922		se, err := itr.Next()
2923		if err != nil {
2924			return nil, err
2925		} else if se.SeriesID == 0 {
2926			break
2927		}
2928
2929		// Skip if the series has been tombstoned.
2930		key := sfile.SeriesKey(se.SeriesID)
2931		if len(key) == 0 {
2932			continue
2933		}
2934
2935		if seriesN&0x3fff == 0x3fff {
2936			// check every 16384 series if the query has been canceled
2937			select {
2938			case <-opt.InterruptCh:
2939				return nil, query.ErrQueryInterrupted
2940			default:
2941			}
2942		}
2943
2944		if seriesN > maxSeriesN {
2945			return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN)
2946		}
2947
2948		// NOTE - must not escape this loop iteration.
2949		_, tagsBuf = ParseSeriesKeyInto(key, tagsBuf)
2950		if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(db, name, tagsBuf) {
2951			continue
2952		}
2953
2954		var tagsAsKey []byte
2955		if len(dims) > 0 {
2956			tagsAsKey = MakeTagsKey(dims, tagsBuf)
2957		}
2958
2959		tagSet, ok := tagSets[string(tagsAsKey)]
2960		if !ok {
2961			// This TagSet is new, create a new entry for it.
2962			tagSet = &query.TagSet{
2963				Tags: nil,
2964				Key:  tagsAsKey,
2965			}
2966		}
2967
2968		// Associate the series and filter with the Tagset.
2969		keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
2970		tagSet.AddFilter(string(keyBuf), se.Expr)
2971		keyBuf = keyBuf[:0]
2972
2973		// Ensure it's back in the map.
2974		tagSets[string(tagsAsKey)] = tagSet
2975		seriesN++
2976	}
2977
2978	// Sort the series in each tag set.
2979	for _, t := range tagSets {
2980		sort.Sort(t)
2981	}
2982
2983	// The TagSets have been created, as a map of TagSets. Just send
2984	// the values back as a slice, sorting for consistency.
2985	sortedTagsSets := make([]*query.TagSet, 0, len(tagSets))
2986	for _, v := range tagSets {
2987		sortedTagsSets = append(sortedTagsSets, v)
2988	}
2989	sort.Sort(byTagKey(sortedTagsSets))
2990
2991	return sortedTagsSets, nil
2992}
2993
2994// IndexFormat represents the format for an index.
2995type IndexFormat int
2996
2997const (
2998	// InMemFormat is the format used by the original in-memory shared index.
2999	InMemFormat IndexFormat = 1
3000
3001	// TSI1Format is the format used by the tsi1 index.
3002	TSI1Format IndexFormat = 2
3003)
3004
3005// NewIndexFunc creates a new index.
3006type NewIndexFunc func(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index
3007
3008// newIndexFuncs is a lookup of index constructors by name.
3009var newIndexFuncs = make(map[string]NewIndexFunc)
3010
3011// RegisterIndex registers a storage index initializer by name.
3012func RegisterIndex(name string, fn NewIndexFunc) {
3013	if _, ok := newIndexFuncs[name]; ok {
3014		panic("index already registered: " + name)
3015	}
3016	newIndexFuncs[name] = fn
3017}
3018
3019// RegisteredIndexes returns the slice of currently registered indexes.
3020func RegisteredIndexes() []string {
3021	a := make([]string, 0, len(newIndexFuncs))
3022	for k := range newIndexFuncs {
3023		a = append(a, k)
3024	}
3025	sort.Strings(a)
3026	return a
3027}
3028
3029// NewIndex returns an instance of an index based on its format.
3030// If the path does not exist then the DefaultFormat is used.
3031func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) (Index, error) {
3032	format := options.IndexVersion
3033
3034	// Use default format unless existing directory exists.
3035	_, err := os.Stat(path)
3036	if os.IsNotExist(err) {
3037		// nop, use default
3038	} else if err != nil {
3039		return nil, err
3040	} else if err == nil {
3041		format = TSI1IndexName
3042	}
3043
3044	// Lookup index by format.
3045	fn := newIndexFuncs[format]
3046	if fn == nil {
3047		return nil, fmt.Errorf("invalid index format: %q", format)
3048	}
3049	return fn(id, database, path, seriesIDSet, sfile, options), nil
3050}
3051
3052func MustOpenIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index {
3053	idx, err := NewIndex(id, database, path, seriesIDSet, sfile, options)
3054	if err != nil {
3055		panic(err)
3056	} else if err := idx.Open(); err != nil {
3057		panic(err)
3058	}
3059	return idx
3060}
3061
3062// assert will panic with a given formatted message if the given condition is false.
3063func assert(condition bool, msg string, v ...interface{}) {
3064	if !condition {
3065		panic(fmt.Sprintf("assert failed: "+msg, v...))
3066	}
3067}
3068
3069type byTagKey []*query.TagSet
3070
3071func (t byTagKey) Len() int           { return len(t) }
3072func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 }
3073func (t byTagKey) Swap(i, j int)      { t[i], t[j] = t[j], t[i] }
3074