1package tsdb
2
3import (
4	"bytes"
5	"context"
6	"errors"
7	"fmt"
8	"io"
9	"io/ioutil"
10	"os"
11	"path/filepath"
12	"regexp"
13	"runtime"
14	"sort"
15	"strings"
16	"sync"
17	"sync/atomic"
18	"time"
19	"unsafe"
20
21	"github.com/gogo/protobuf/proto"
22	"github.com/influxdata/influxdb/models"
23	"github.com/influxdata/influxdb/pkg/bytesutil"
24	"github.com/influxdata/influxdb/pkg/estimator"
25	"github.com/influxdata/influxdb/pkg/file"
26	"github.com/influxdata/influxdb/pkg/limiter"
27	"github.com/influxdata/influxdb/pkg/slices"
28	"github.com/influxdata/influxdb/query"
29	internal "github.com/influxdata/influxdb/tsdb/internal"
30	"github.com/influxdata/influxql"
31	"go.uber.org/zap"
32)
33
34const (
35	statWriteReq           = "writeReq"
36	statWriteReqOK         = "writeReqOk"
37	statWriteReqErr        = "writeReqErr"
38	statSeriesCreate       = "seriesCreate"
39	statFieldsCreate       = "fieldsCreate"
40	statWritePointsErr     = "writePointsErr"
41	statWritePointsDropped = "writePointsDropped"
42	statWritePointsOK      = "writePointsOk"
43	statWriteBytes         = "writeBytes"
44	statDiskBytes          = "diskBytes"
45)
46
47var (
48	// ErrFieldOverflow is returned when too many fields are created on a measurement.
49	ErrFieldOverflow = errors.New("field overflow")
50
51	// ErrFieldTypeConflict is returned when a new field already exists with a different type.
52	ErrFieldTypeConflict = errors.New("field type conflict")
53
54	// ErrFieldNotFound is returned when a field cannot be found.
55	ErrFieldNotFound = errors.New("field not found")
56
57	// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
58	// there is no mapping for.
59	ErrFieldUnmappedID = errors.New("field ID not mapped")
60
61	// ErrEngineClosed is returned when a caller attempts indirectly to
62	// access the shard's underlying engine.
63	ErrEngineClosed = errors.New("engine is closed")
64
65	// ErrShardDisabled is returned when a the shard is not available for
66	// queries or writes.
67	ErrShardDisabled = errors.New("shard is disabled")
68
69	// ErrUnknownFieldsFormat is returned when the fields index file is not identifiable by
70	// the file's magic number.
71	ErrUnknownFieldsFormat = errors.New("unknown field index format")
72
73	// ErrUnknownFieldType is returned when the type of a field cannot be determined.
74	ErrUnknownFieldType = errors.New("unknown field type")
75
76	// ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is
77	// attempted on a hot shard.
78	ErrShardNotIdle = errors.New("shard not idle")
79
80	// fieldsIndexMagicNumber is the file magic number for the fields index file.
81	fieldsIndexMagicNumber = []byte{0, 6, 1, 3}
82)
83
84var (
85	// Static objects to prevent small allocs.
86	timeBytes = []byte("time")
87)
88
89// A ShardError implements the error interface, and contains extra
90// context about the shard that generated the error.
91type ShardError struct {
92	id  uint64
93	Err error
94}
95
96// NewShardError returns a new ShardError.
97func NewShardError(id uint64, err error) error {
98	if err == nil {
99		return nil
100	}
101	return ShardError{id: id, Err: err}
102}
103
104// Error returns the string representation of the error, to satisfy the error interface.
105func (e ShardError) Error() string {
106	return fmt.Sprintf("[shard %d] %s", e.id, e.Err)
107}
108
109// PartialWriteError indicates a write request could only write a portion of the
110// requested values.
111type PartialWriteError struct {
112	Reason  string
113	Dropped int
114
115	// A sorted slice of series keys that were dropped.
116	DroppedKeys [][]byte
117}
118
119func (e PartialWriteError) Error() string {
120	return fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped)
121}
122
123// Shard represents a self-contained time series database. An inverted index of
124// the measurement and tag data is kept along with the raw time series data.
125// Data can be split across many shards. The query engine in TSDB is responsible
126// for combining the output of many shards into a single query result.
127type Shard struct {
128	path    string
129	walPath string
130	id      uint64
131
132	database        string
133	retentionPolicy string
134
135	sfile   *SeriesFile
136	options EngineOptions
137
138	mu      sync.RWMutex
139	_engine Engine
140	index   Index
141	enabled bool
142
143	// expvar-based stats.
144	stats       *ShardStatistics
145	defaultTags models.StatisticTags
146
147	baseLogger *zap.Logger
148	logger     *zap.Logger
149
150	EnableOnOpen bool
151}
152
153// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
154func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard {
155	db, rp := decodeStorePath(path)
156	logger := zap.NewNop()
157	if opt.FieldValidator == nil {
158		opt.FieldValidator = defaultFieldValidator{}
159	}
160
161	s := &Shard{
162		id:      id,
163		path:    path,
164		walPath: walPath,
165		sfile:   sfile,
166		options: opt,
167
168		stats: &ShardStatistics{},
169		defaultTags: models.StatisticTags{
170			"path":            path,
171			"walPath":         walPath,
172			"id":              fmt.Sprintf("%d", id),
173			"database":        db,
174			"retentionPolicy": rp,
175			"engine":          opt.EngineVersion,
176		},
177
178		database:        db,
179		retentionPolicy: rp,
180
181		logger:       logger,
182		baseLogger:   logger,
183		EnableOnOpen: true,
184	}
185	return s
186}
187
188// WithLogger sets the logger on the shard. It must be called before Open.
189func (s *Shard) WithLogger(log *zap.Logger) {
190	s.baseLogger = log
191	engine, err := s.engine()
192	if err == nil {
193		engine.WithLogger(s.baseLogger)
194		s.index.WithLogger(s.baseLogger)
195	}
196	s.logger = s.baseLogger.With(zap.String("service", "shard"))
197}
198
199// SetEnabled enables the shard for queries and write.  When disabled, all
200// writes and queries return an error and compactions are stopped for the shard.
201func (s *Shard) SetEnabled(enabled bool) {
202	s.mu.Lock()
203	// Prevent writes and queries
204	s.enabled = enabled
205	if s._engine != nil {
206		// Disable background compactions and snapshotting
207		s._engine.SetEnabled(enabled)
208	}
209	s.mu.Unlock()
210}
211
212// ScheduleFullCompaction forces a full compaction to be schedule on the shard.
213func (s *Shard) ScheduleFullCompaction() error {
214	engine, err := s.engine()
215	if err != nil {
216		return err
217	}
218	return engine.ScheduleFullCompaction()
219}
220
221// ID returns the shards ID.
222func (s *Shard) ID() uint64 {
223	return s.id
224}
225
226// Database returns the database of the shard.
227func (s *Shard) Database() string {
228	return s.database
229}
230
231// RetentionPolicy returns the retention policy of the shard.
232func (s *Shard) RetentionPolicy() string {
233	return s.retentionPolicy
234}
235
236// ShardStatistics maintains statistics for a shard.
237type ShardStatistics struct {
238	WriteReq           int64
239	WriteReqOK         int64
240	WriteReqErr        int64
241	FieldsCreated      int64
242	WritePointsErr     int64
243	WritePointsDropped int64
244	WritePointsOK      int64
245	BytesWritten       int64
246	DiskBytes          int64
247}
248
249// Statistics returns statistics for periodic monitoring.
250func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
251	engine, err := s.engine()
252	if err != nil {
253		return nil
254	}
255
256	// Refresh our disk size stat
257	if _, err := s.DiskSize(); err != nil {
258		return nil
259	}
260	seriesN := engine.SeriesN()
261
262	tags = s.defaultTags.Merge(tags)
263	statistics := []models.Statistic{{
264		Name: "shard",
265		Tags: tags,
266		Values: map[string]interface{}{
267			statWriteReq:           atomic.LoadInt64(&s.stats.WriteReq),
268			statWriteReqOK:         atomic.LoadInt64(&s.stats.WriteReqOK),
269			statWriteReqErr:        atomic.LoadInt64(&s.stats.WriteReqErr),
270			statSeriesCreate:       seriesN,
271			statFieldsCreate:       atomic.LoadInt64(&s.stats.FieldsCreated),
272			statWritePointsErr:     atomic.LoadInt64(&s.stats.WritePointsErr),
273			statWritePointsDropped: atomic.LoadInt64(&s.stats.WritePointsDropped),
274			statWritePointsOK:      atomic.LoadInt64(&s.stats.WritePointsOK),
275			statWriteBytes:         atomic.LoadInt64(&s.stats.BytesWritten),
276			statDiskBytes:          atomic.LoadInt64(&s.stats.DiskBytes),
277		},
278	}}
279
280	// Add the index and engine statistics.
281	statistics = append(statistics, engine.Statistics(tags)...)
282	return statistics
283}
284
285// Path returns the path set on the shard when it was created.
286func (s *Shard) Path() string { return s.path }
287
288// Open initializes and opens the shard's store.
289func (s *Shard) Open() error {
290	if err := func() error {
291		s.mu.Lock()
292		defer s.mu.Unlock()
293
294		// Return if the shard is already open
295		if s._engine != nil {
296			return nil
297		}
298
299		seriesIDSet := NewSeriesIDSet()
300
301		// Initialize underlying index.
302		ipath := filepath.Join(s.path, "index")
303		idx, err := NewIndex(s.id, s.database, ipath, seriesIDSet, s.sfile, s.options)
304		if err != nil {
305			return err
306		}
307
308		// Open index.
309		if err := idx.Open(); err != nil {
310			return err
311		}
312		s.index = idx
313		idx.WithLogger(s.baseLogger)
314
315		// Initialize underlying engine.
316		e, err := NewEngine(s.id, idx, s.path, s.walPath, s.sfile, s.options)
317		if err != nil {
318			return err
319		}
320
321		// Set log output on the engine.
322		e.WithLogger(s.baseLogger)
323
324		// Disable compactions while loading the index
325		e.SetEnabled(false)
326
327		// Open engine.
328		if err := e.Open(); err != nil {
329			return err
330		}
331
332		// Load metadata index for the inmem index only.
333		if err := e.LoadMetadataIndex(s.id, s.index); err != nil {
334			return err
335		}
336		s._engine = e
337
338		return nil
339	}(); err != nil {
340		s.close()
341		return NewShardError(s.id, err)
342	}
343
344	if s.EnableOnOpen {
345		// enable writes, queries and compactions
346		s.SetEnabled(true)
347	}
348
349	return nil
350}
351
352// Close shuts down the shard's store.
353func (s *Shard) Close() error {
354	s.mu.Lock()
355	defer s.mu.Unlock()
356	return s.close()
357}
358
359// close closes the shard an removes reference to the shard from associated
360// indexes, unless clean is false.
361func (s *Shard) close() error {
362	if s._engine == nil {
363		return nil
364	}
365
366	err := s._engine.Close()
367	if err == nil {
368		s._engine = nil
369	}
370
371	if e := s.index.Close(); e == nil {
372		s.index = nil
373	}
374	return err
375}
376
377// IndexType returns the index version being used for this shard.
378//
379// IndexType returns the empty string if it is called before the shard is opened,
380// since it is only that point that the underlying index type is known.
381func (s *Shard) IndexType() string {
382	s.mu.RLock()
383	defer s.mu.RUnlock()
384	if s._engine == nil || s.index == nil { // Shard not open yet.
385		return ""
386	}
387	return s.index.Type()
388}
389
390// ready determines if the Shard is ready for queries or writes.
391// It returns nil if ready, otherwise ErrShardClosed or ErrShardDisabled
392func (s *Shard) ready() error {
393	var err error
394	if s._engine == nil {
395		err = ErrEngineClosed
396	} else if !s.enabled {
397		err = ErrShardDisabled
398	}
399	return err
400}
401
402// LastModified returns the time when this shard was last modified.
403func (s *Shard) LastModified() time.Time {
404	engine, err := s.engine()
405	if err != nil {
406		return time.Time{}
407	}
408	return engine.LastModified()
409}
410
411// Index returns a reference to the underlying index. It returns an error if
412// the index is nil.
413func (s *Shard) Index() (Index, error) {
414	s.mu.RLock()
415	defer s.mu.RUnlock()
416	if err := s.ready(); err != nil {
417		return nil, err
418	}
419	return s.index, nil
420}
421
422func (s *Shard) seriesFile() (*SeriesFile, error) {
423	s.mu.RLock()
424	defer s.mu.RUnlock()
425	if err := s.ready(); err != nil {
426		return nil, err
427	}
428	return s.sfile, nil
429}
430
431// IsIdle return true if the shard is not receiving writes and is fully compacted.
432func (s *Shard) IsIdle() bool {
433	engine, err := s.engine()
434	if err != nil {
435		return true
436	}
437	return engine.IsIdle()
438}
439
440func (s *Shard) Free() error {
441	engine, err := s.engine()
442	if err != nil {
443		return err
444	}
445
446	// Disable compactions to stop background goroutines
447	s.SetCompactionsEnabled(false)
448
449	return engine.Free()
450}
451
452// SetCompactionsEnabled enables or disable shard background compactions.
453func (s *Shard) SetCompactionsEnabled(enabled bool) {
454	engine, err := s.engine()
455	if err != nil {
456		return
457	}
458	engine.SetCompactionsEnabled(enabled)
459}
460
461// DiskSize returns the size on disk of this shard.
462func (s *Shard) DiskSize() (int64, error) {
463	s.mu.RLock()
464	defer s.mu.RUnlock()
465	// We don't use engine() becuase we still want to report the shard's disk
466	// size even if the shard has been disabled.
467	if s._engine == nil {
468		return 0, ErrEngineClosed
469	}
470	size := s._engine.DiskSize()
471	atomic.StoreInt64(&s.stats.DiskBytes, size)
472	return size, nil
473}
474
475// FieldCreate holds information for a field to create on a measurement.
476type FieldCreate struct {
477	Measurement []byte
478	Field       *Field
479}
480
481// WritePoints will write the raw data points and any new metadata to the index in the shard.
482func (s *Shard) WritePoints(points []models.Point) error {
483	s.mu.RLock()
484	defer s.mu.RUnlock()
485
486	engine, err := s.engineNoLock()
487	if err != nil {
488		return err
489	}
490
491	var writeError error
492	atomic.AddInt64(&s.stats.WriteReq, 1)
493
494	points, fieldsToCreate, err := s.validateSeriesAndFields(points)
495	if err != nil {
496		if _, ok := err.(PartialWriteError); !ok {
497			return err
498		}
499		// There was a partial write (points dropped), hold onto the error to return
500		// to the caller, but continue on writing the remaining points.
501		writeError = err
502	}
503	atomic.AddInt64(&s.stats.FieldsCreated, int64(len(fieldsToCreate)))
504
505	// add any new fields and keep track of what needs to be saved
506	if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
507		return err
508	}
509
510	// Write to the engine.
511	if err := engine.WritePoints(points); err != nil {
512		atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points)))
513		atomic.AddInt64(&s.stats.WriteReqErr, 1)
514		return fmt.Errorf("engine: %s", err)
515	}
516	atomic.AddInt64(&s.stats.WritePointsOK, int64(len(points)))
517	atomic.AddInt64(&s.stats.WriteReqOK, 1)
518
519	return writeError
520}
521
522// validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed.
523func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, []*FieldCreate, error) {
524	var (
525		fieldsToCreate []*FieldCreate
526		err            error
527		dropped        int
528		reason         string // only first error reason is set unless returned from CreateSeriesListIfNotExists
529	)
530
531	// Create all series against the index in bulk.
532	keys := make([][]byte, len(points))
533	names := make([][]byte, len(points))
534	tagsSlice := make([]models.Tags, len(points))
535
536	var j int
537	for i, p := range points {
538		tags := p.Tags()
539
540		// Drop any series w/ a "time" tag, these are illegal
541		if v := tags.Get(timeBytes); v != nil {
542			dropped++
543			if reason == "" {
544				reason = fmt.Sprintf(
545					"invalid tag key: input tag \"%s\" on measurement \"%s\" is invalid",
546					"time", string(p.Name()))
547			}
548			continue
549		}
550
551		keys[j] = p.Key()
552		names[j] = p.Name()
553		tagsSlice[j] = tags
554		points[j] = points[i]
555		j++
556	}
557	points, keys, names, tagsSlice = points[:j], keys[:j], names[:j], tagsSlice[:j]
558
559	engine, err := s.engineNoLock()
560	if err != nil {
561		return nil, nil, err
562	}
563
564	// Add new series. Check for partial writes.
565	var droppedKeys [][]byte
566	if err := engine.CreateSeriesListIfNotExists(keys, names, tagsSlice); err != nil {
567		switch err := err.(type) {
568		// TODO(jmw): why is this a *PartialWriteError when everything else is not a pointer?
569		// Maybe we can just change it to be consistent if we change it also in all
570		// the places that construct it.
571		case *PartialWriteError:
572			reason = err.Reason
573			dropped += err.Dropped
574			droppedKeys = err.DroppedKeys
575			atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
576		default:
577			return nil, nil, err
578		}
579	}
580
581	// Create a MeasurementFields cache.
582	mfCache := make(map[string]*MeasurementFields, 16)
583	j = 0
584	for i, p := range points {
585		// Skip any points with only invalid fields.
586		iter := p.FieldIterator()
587		validField := false
588		for iter.Next() {
589			if bytes.Equal(iter.FieldKey(), timeBytes) {
590				continue
591			}
592			validField = true
593			break
594		}
595		if !validField {
596			if reason == "" {
597				reason = fmt.Sprintf(
598					"invalid field name: input field \"%s\" on measurement \"%s\" is invalid",
599					"time", string(p.Name()))
600			}
601			dropped++
602			continue
603		}
604
605		// Skip any points whos keys have been dropped. Dropped has already been incremented for them.
606		if len(droppedKeys) > 0 && bytesutil.Contains(droppedKeys, keys[i]) {
607			continue
608		}
609
610		// Grab the MeasurementFields checking the local cache to avoid lock contention.
611		name := p.Name()
612		mf := mfCache[string(name)]
613		if mf == nil {
614			mf = engine.MeasurementFields(name).Clone()
615			mfCache[string(name)] = mf
616		}
617
618		// Check with the field validator.
619		if err := s.options.FieldValidator.Validate(mf, p); err != nil {
620			switch err := err.(type) {
621			case PartialWriteError:
622				if reason == "" {
623					reason = err.Reason
624				}
625				dropped += err.Dropped
626				atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
627			default:
628				return nil, nil, err
629			}
630			continue
631		}
632
633		points[j] = points[i]
634		j++
635
636		// Create any fields that are missing.
637		iter.Reset()
638		for iter.Next() {
639			fieldKey := iter.FieldKey()
640
641			// Skip fields named "time". They are illegal.
642			if bytes.Equal(fieldKey, timeBytes) {
643				continue
644			}
645
646			if mf.FieldBytes(fieldKey) != nil {
647				continue
648			}
649
650			dataType := dataTypeFromModelsFieldType(iter.Type())
651			if dataType == influxql.Unknown {
652				continue
653			}
654
655			fieldsToCreate = append(fieldsToCreate, &FieldCreate{
656				Measurement: name,
657				Field: &Field{
658					Name: string(fieldKey),
659					Type: dataType,
660				},
661			})
662		}
663	}
664
665	if dropped > 0 {
666		err = PartialWriteError{Reason: reason, Dropped: dropped}
667	}
668
669	return points[:j], fieldsToCreate, err
670}
671
672func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error {
673	if len(fieldsToCreate) == 0 {
674		return nil
675	}
676
677	engine, err := s.engineNoLock()
678	if err != nil {
679		return err
680	}
681
682	// add fields
683	for _, f := range fieldsToCreate {
684		mf := engine.MeasurementFields(f.Measurement)
685		if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
686			return err
687		}
688
689		s.index.SetFieldName(f.Measurement, f.Field.Name)
690	}
691
692	if len(fieldsToCreate) > 0 {
693		return engine.MeasurementFieldSet().Save()
694	}
695
696	return nil
697}
698
699// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
700func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64) error {
701	engine, err := s.engine()
702	if err != nil {
703		return err
704	}
705	return engine.DeleteSeriesRange(itr, min, max)
706}
707
708// DeleteSeriesRangeWithPredicate deletes all values from for seriesKeys between min and max (inclusive)
709// for which predicate() returns true. If predicate() is nil, then all values in range are deleted.
710func (s *Shard) DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error {
711	engine, err := s.engine()
712	if err != nil {
713		return err
714	}
715	return engine.DeleteSeriesRangeWithPredicate(itr, predicate)
716}
717
718// DeleteMeasurement deletes a measurement and all underlying series.
719func (s *Shard) DeleteMeasurement(name []byte) error {
720	engine, err := s.engine()
721	if err != nil {
722		return err
723	}
724	return engine.DeleteMeasurement(name)
725}
726
727// SeriesN returns the unique number of series in the shard.
728func (s *Shard) SeriesN() int64 {
729	engine, err := s.engine()
730	if err != nil {
731		return 0
732	}
733	return engine.SeriesN()
734}
735
736// SeriesSketches returns the measurement sketches for the shard.
737func (s *Shard) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
738	engine, err := s.engine()
739	if err != nil {
740		return nil, nil, err
741	}
742	return engine.SeriesSketches()
743}
744
745// MeasurementsSketches returns the measurement sketches for the shard.
746func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
747	engine, err := s.engine()
748	if err != nil {
749		return nil, nil, err
750	}
751	return engine.MeasurementsSketches()
752}
753
754// MeasurementNamesByRegex returns names of measurements matching the regular expression.
755func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
756	engine, err := s.engine()
757	if err != nil {
758		return nil, err
759	}
760	return engine.MeasurementNamesByRegex(re)
761}
762
763// MeasurementTagKeysByExpr returns all the tag keys for the provided expression.
764func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
765	engine, err := s.engine()
766	if err != nil {
767		return nil, err
768	}
769	return engine.MeasurementTagKeysByExpr(name, expr)
770}
771
772// MeasurementTagKeyValuesByExpr returns all the tag keys values for the
773// provided expression.
774func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
775	index, err := s.Index()
776	if err != nil {
777		return nil, err
778	}
779	indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
780	return indexSet.MeasurementTagKeyValuesByExpr(auth, name, key, expr, keysSorted)
781}
782
783// MeasurementFields returns fields for a measurement.
784// TODO(edd): This method is currently only being called from tests; do we
785// really need it?
786func (s *Shard) MeasurementFields(name []byte) *MeasurementFields {
787	engine, err := s.engine()
788	if err != nil {
789		return nil
790	}
791	return engine.MeasurementFields(name)
792}
793
794// MeasurementExists returns true if the shard contains name.
795// TODO(edd): This method is currently only being called from tests; do we
796// really need it?
797func (s *Shard) MeasurementExists(name []byte) (bool, error) {
798	engine, err := s.engine()
799	if err != nil {
800		return false, err
801	}
802	return engine.MeasurementExists(name)
803}
804
805// WriteTo writes the shard's data to w.
806func (s *Shard) WriteTo(w io.Writer) (int64, error) {
807	engine, err := s.engine()
808	if err != nil {
809		return 0, err
810	}
811	n, err := engine.WriteTo(w)
812	atomic.AddInt64(&s.stats.BytesWritten, int64(n))
813	return n, err
814}
815
816// CreateIterator returns an iterator for the data in the shard.
817func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
818	engine, err := s.engine()
819	if err != nil {
820		return nil, err
821	}
822	switch m.SystemIterator {
823	case "_fieldKeys":
824		return NewFieldKeysIterator(s, opt)
825	case "_series":
826		// TODO(benbjohnson): Move up to the Shards.CreateIterator().
827		index, err := s.Index()
828		if err != nil {
829			return nil, err
830		}
831		indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
832
833		itr, err := NewSeriesPointIterator(indexSet, opt)
834		if err != nil {
835			return nil, err
836		}
837
838		return query.NewInterruptIterator(itr, opt.InterruptCh), nil
839	case "_tagKeys":
840		return NewTagKeysIterator(s, opt)
841	}
842	return engine.CreateIterator(ctx, m.Name, opt)
843}
844
845func (s *Shard) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) {
846	index, err := s.Index()
847	if err != nil {
848		return nil, err
849	}
850	return newSeriesCursor(req, IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}, cond)
851}
852
853func (s *Shard) CreateCursorIterator(ctx context.Context) (CursorIterator, error) {
854	engine, err := s.engine()
855	if err != nil {
856		return nil, err
857	}
858	return engine.CreateCursorIterator(ctx)
859}
860
861// FieldDimensions returns unique sets of fields and dimensions across a list of sources.
862func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
863	engine, err := s.engine()
864	if err != nil {
865		return nil, nil, err
866	}
867
868	fields = make(map[string]influxql.DataType)
869	dimensions = make(map[string]struct{})
870
871	index, err := s.Index()
872	if err != nil {
873		return nil, nil, err
874	}
875	for _, name := range measurements {
876		// Handle system sources.
877		if strings.HasPrefix(name, "_") {
878			var keys []string
879			switch name {
880			case "_fieldKeys":
881				keys = []string{"fieldKey", "fieldType"}
882			case "_series":
883				keys = []string{"key"}
884			case "_tagKeys":
885				keys = []string{"tagKey"}
886			}
887
888			if len(keys) > 0 {
889				for _, k := range keys {
890					if fields[k].LessThan(influxql.String) {
891						fields[k] = influxql.String
892					}
893				}
894				continue
895			}
896			// Unknown system source so default to looking for a measurement.
897		}
898
899		// Retrieve measurement.
900		if exists, err := engine.MeasurementExists([]byte(name)); err != nil {
901			return nil, nil, err
902		} else if !exists {
903			continue
904		}
905
906		// Append fields and dimensions.
907		mf := engine.MeasurementFields([]byte(name))
908		if mf != nil {
909			for k, typ := range mf.FieldSet() {
910				if fields[k].LessThan(typ) {
911					fields[k] = typ
912				}
913			}
914		}
915
916		indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
917		if err := indexSet.ForEachMeasurementTagKey([]byte(name), func(key []byte) error {
918			dimensions[string(key)] = struct{}{}
919			return nil
920		}); err != nil {
921			return nil, nil, err
922		}
923	}
924
925	return fields, dimensions, nil
926}
927
928// mapType returns the data type for the field within the measurement.
929func (s *Shard) mapType(measurement, field string) (influxql.DataType, error) {
930	engine, err := s.engineNoLock()
931	if err != nil {
932		return 0, err
933	}
934
935	switch field {
936	case "_name", "_tagKey", "_tagValue", "_seriesKey":
937		return influxql.String, nil
938	}
939
940	// Process system measurements.
941	switch measurement {
942	case "_fieldKeys":
943		if field == "fieldKey" || field == "fieldType" {
944			return influxql.String, nil
945		}
946		return influxql.Unknown, nil
947	case "_series":
948		if field == "key" {
949			return influxql.String, nil
950		}
951		return influxql.Unknown, nil
952	case "_tagKeys":
953		if field == "tagKey" {
954			return influxql.String, nil
955		}
956		return influxql.Unknown, nil
957	}
958	// Unknown system source so default to looking for a measurement.
959
960	if exists, _ := engine.MeasurementExists([]byte(measurement)); !exists {
961		return influxql.Unknown, nil
962	}
963
964	mf := engine.MeasurementFields([]byte(measurement))
965	if mf != nil {
966		f := mf.Field(field)
967		if f != nil {
968			return f.Type, nil
969		}
970	}
971
972	if exists, _ := engine.HasTagKey([]byte(measurement), []byte(field)); exists {
973		return influxql.Tag, nil
974	}
975
976	return influxql.Unknown, nil
977}
978
979// expandSources expands regex sources and removes duplicates.
980// NOTE: sources must be normalized (db and rp set) before calling this function.
981func (s *Shard) expandSources(sources influxql.Sources) (influxql.Sources, error) {
982	engine, err := s.engineNoLock()
983	if err != nil {
984		return nil, err
985	}
986
987	// Use a map as a set to prevent duplicates.
988	set := map[string]influxql.Source{}
989
990	// Iterate all sources, expanding regexes when they're found.
991	for _, source := range sources {
992		switch src := source.(type) {
993		case *influxql.Measurement:
994			// Add non-regex measurements directly to the set.
995			if src.Regex == nil {
996				set[src.String()] = src
997				continue
998			}
999
1000			// Loop over matching measurements.
1001			names, err := engine.MeasurementNamesByRegex(src.Regex.Val)
1002			if err != nil {
1003				return nil, err
1004			}
1005
1006			for _, name := range names {
1007				other := &influxql.Measurement{
1008					Database:        src.Database,
1009					RetentionPolicy: src.RetentionPolicy,
1010					Name:            string(name),
1011				}
1012				set[other.String()] = other
1013			}
1014
1015		default:
1016			return nil, fmt.Errorf("expandSources: unsupported source type: %T", source)
1017		}
1018	}
1019
1020	// Convert set to sorted slice.
1021	names := make([]string, 0, len(set))
1022	for name := range set {
1023		names = append(names, name)
1024	}
1025	sort.Strings(names)
1026
1027	// Convert set to a list of Sources.
1028	expanded := make(influxql.Sources, 0, len(set))
1029	for _, name := range names {
1030		expanded = append(expanded, set[name])
1031	}
1032
1033	return expanded, nil
1034}
1035
1036// Backup backs up the shard by creating a tar archive of all TSM files that
1037// have been modified since the provided time. See Engine.Backup for more details.
1038func (s *Shard) Backup(w io.Writer, basePath string, since time.Time) error {
1039	engine, err := s.engine()
1040	if err != nil {
1041		return err
1042	}
1043	return engine.Backup(w, basePath, since)
1044}
1045
1046func (s *Shard) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
1047	engine, err := s.engine()
1048	if err != nil {
1049		return err
1050	}
1051	return engine.Export(w, basePath, start, end)
1052}
1053
1054// Restore restores data to the underlying engine for the shard.
1055// The shard is reopened after restore.
1056func (s *Shard) Restore(r io.Reader, basePath string) error {
1057	if err := func() error {
1058		s.mu.Lock()
1059		defer s.mu.Unlock()
1060
1061		// Special case - we can still restore to a disabled shard, so we should
1062		// only check if the engine is closed and not care if the shard is
1063		// disabled.
1064		if s._engine == nil {
1065			return ErrEngineClosed
1066		}
1067
1068		// Restore to engine.
1069		return s._engine.Restore(r, basePath)
1070	}(); err != nil {
1071		return err
1072	}
1073
1074	// Close shard.
1075	if err := s.Close(); err != nil {
1076		return err
1077	}
1078
1079	// Reopen engine.
1080	return s.Open()
1081}
1082
1083// Import imports data to the underlying engine for the shard. r should
1084// be a reader from a backup created by Backup.
1085func (s *Shard) Import(r io.Reader, basePath string) error {
1086	// Special case - we can still import to a disabled shard, so we should
1087	// only check if the engine is closed and not care if the shard is
1088	// disabled.
1089	s.mu.Lock()
1090	defer s.mu.Unlock()
1091	if s._engine == nil {
1092		return ErrEngineClosed
1093	}
1094
1095	// Import to engine.
1096	return s._engine.Import(r, basePath)
1097}
1098
1099// CreateSnapshot will return a path to a temp directory
1100// containing hard links to the underlying shard files.
1101func (s *Shard) CreateSnapshot() (string, error) {
1102	engine, err := s.engine()
1103	if err != nil {
1104		return "", err
1105	}
1106	return engine.CreateSnapshot()
1107}
1108
1109// ForEachMeasurementName iterates over each measurement in the shard.
1110func (s *Shard) ForEachMeasurementName(fn func(name []byte) error) error {
1111	engine, err := s.engine()
1112	if err != nil {
1113		return err
1114	}
1115	return engine.ForEachMeasurementName(fn)
1116}
1117
1118func (s *Shard) TagKeyCardinality(name, key []byte) int {
1119	engine, err := s.engine()
1120	if err != nil {
1121		return 0
1122	}
1123	return engine.TagKeyCardinality(name, key)
1124}
1125
1126// Digest returns a digest of the shard.
1127func (s *Shard) Digest() (io.ReadCloser, int64, error) {
1128	engine, err := s.engine()
1129	if err != nil {
1130		return nil, 0, err
1131	}
1132
1133	// Make sure the shard is idle/cold. (No use creating a digest of a
1134	// hot shard that is rapidly changing.)
1135	if !engine.IsIdle() {
1136		return nil, 0, ErrShardNotIdle
1137	}
1138
1139	return engine.Digest()
1140}
1141
1142// engine safely (under an RLock) returns a reference to the shard's Engine, or
1143// an error if the Engine is closed, or the shard is currently disabled.
1144//
1145// The shard's Engine should always be accessed via a call to engine(), rather
1146// than directly referencing Shard.engine.
1147//
1148// If a caller needs an Engine reference but is already under a lock, then they
1149// should use engineNoLock().
1150func (s *Shard) engine() (Engine, error) {
1151	s.mu.RLock()
1152	defer s.mu.RUnlock()
1153	return s.engineNoLock()
1154}
1155
1156// engineNoLock is similar to calling engine(), but the caller must guarantee
1157// that they already hold an appropriate lock.
1158func (s *Shard) engineNoLock() (Engine, error) {
1159	if err := s.ready(); err != nil {
1160		return nil, err
1161	}
1162	return s._engine, nil
1163}
1164
1165type ShardGroup interface {
1166	MeasurementsByRegex(re *regexp.Regexp) []string
1167	FieldKeysByMeasurement(name []byte) []string
1168	FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
1169	MapType(measurement, field string) influxql.DataType
1170	CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error)
1171	IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
1172	ExpandSources(sources influxql.Sources) (influxql.Sources, error)
1173}
1174
1175// Shards represents a sortable list of shards.
1176type Shards []*Shard
1177
1178// Len implements sort.Interface.
1179func (a Shards) Len() int { return len(a) }
1180
1181// Less implements sort.Interface.
1182func (a Shards) Less(i, j int) bool { return a[i].id < a[j].id }
1183
1184// Swap implements sort.Interface.
1185func (a Shards) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
1186
1187// MeasurementsByRegex returns the unique set of measurements matching the
1188// provided regex, for all the shards.
1189func (a Shards) MeasurementsByRegex(re *regexp.Regexp) []string {
1190	var m map[string]struct{}
1191	for _, sh := range a {
1192		names, err := sh.MeasurementNamesByRegex(re)
1193		if err != nil {
1194			continue // Skip this shard's results—previous behaviour.
1195		}
1196
1197		if m == nil {
1198			m = make(map[string]struct{}, len(names))
1199		}
1200
1201		for _, name := range names {
1202			m[string(name)] = struct{}{}
1203		}
1204	}
1205
1206	if len(m) == 0 {
1207		return nil
1208	}
1209
1210	names := make([]string, 0, len(m))
1211	for key := range m {
1212		names = append(names, key)
1213	}
1214	sort.Strings(names)
1215	return names
1216}
1217
1218// FieldKeysByMeasurement returns a de-duplicated, sorted, set of field keys for
1219// the provided measurement name.
1220func (a Shards) FieldKeysByMeasurement(name []byte) []string {
1221	if len(a) == 1 {
1222		mf := a[0].MeasurementFields(name)
1223		if mf == nil {
1224			return nil
1225		}
1226		return mf.FieldKeys()
1227	}
1228
1229	all := make([][]string, 0, len(a))
1230	for _, shard := range a {
1231		mf := shard.MeasurementFields(name)
1232		if mf == nil {
1233			continue
1234		}
1235		all = append(all, mf.FieldKeys())
1236	}
1237	return slices.MergeSortedStrings(all...)
1238}
1239
1240func (a Shards) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
1241	fields = make(map[string]influxql.DataType)
1242	dimensions = make(map[string]struct{})
1243
1244	for _, sh := range a {
1245		f, d, err := sh.FieldDimensions(measurements)
1246		if err != nil {
1247			return nil, nil, err
1248		}
1249		for k, typ := range f {
1250			if fields[k].LessThan(typ) {
1251				fields[k] = typ
1252			}
1253		}
1254		for k := range d {
1255			dimensions[k] = struct{}{}
1256		}
1257	}
1258	return
1259}
1260
1261func (a Shards) MapType(measurement, field string) influxql.DataType {
1262	var typ influxql.DataType
1263	for _, sh := range a {
1264		sh.mu.RLock()
1265		if t, err := sh.mapType(measurement, field); err == nil && typ.LessThan(t) {
1266			typ = t
1267		}
1268		sh.mu.RUnlock()
1269	}
1270	return typ
1271}
1272
1273func (a Shards) CallType(name string, args []influxql.DataType) (influxql.DataType, error) {
1274	typmap := query.CallTypeMapper{}
1275	return typmap.CallType(name, args)
1276}
1277
1278func (a Shards) CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
1279	switch measurement.SystemIterator {
1280	case "_series":
1281		return a.createSeriesIterator(ctx, opt)
1282	}
1283
1284	itrs := make([]query.Iterator, 0, len(a))
1285	for _, sh := range a {
1286		itr, err := sh.CreateIterator(ctx, measurement, opt)
1287		if err != nil {
1288			query.Iterators(itrs).Close()
1289			return nil, err
1290		} else if itr == nil {
1291			continue
1292		}
1293		itrs = append(itrs, itr)
1294
1295		select {
1296		case <-opt.InterruptCh:
1297			query.Iterators(itrs).Close()
1298			return nil, query.ErrQueryInterrupted
1299		default:
1300		}
1301
1302		// Enforce series limit at creation time.
1303		if opt.MaxSeriesN > 0 {
1304			stats := itr.Stats()
1305			if stats.SeriesN > opt.MaxSeriesN {
1306				query.Iterators(itrs).Close()
1307				return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", stats.SeriesN, opt.MaxSeriesN)
1308			}
1309		}
1310	}
1311	return query.Iterators(itrs).Merge(opt)
1312}
1313
1314func (a Shards) createSeriesIterator(ctx context.Context, opt query.IteratorOptions) (_ query.Iterator, err error) {
1315	var (
1316		idxs  = make([]Index, 0, len(a))
1317		sfile *SeriesFile
1318	)
1319	for _, sh := range a {
1320		var idx Index
1321		if idx, err = sh.Index(); err == nil {
1322			idxs = append(idxs, idx)
1323		}
1324		if sfile == nil {
1325			sfile, _ = sh.seriesFile()
1326		}
1327	}
1328
1329	if sfile == nil {
1330		return nil, nil
1331	}
1332
1333	return NewSeriesPointIterator(IndexSet{Indexes: idxs, SeriesFile: sfile}, opt)
1334}
1335
1336func (a Shards) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
1337	var costs query.IteratorCost
1338	var costerr error
1339	var mu sync.RWMutex
1340
1341	setErr := func(err error) {
1342		mu.Lock()
1343		defer mu.Unlock()
1344		if costerr == nil {
1345			costerr = err
1346		}
1347	}
1348
1349	limit := limiter.NewFixed(runtime.GOMAXPROCS(0))
1350	var wg sync.WaitGroup
1351	for _, sh := range a {
1352		limit.Take()
1353		wg.Add(1)
1354
1355		mu.RLock()
1356		if costerr != nil {
1357			mu.RUnlock()
1358			break
1359		}
1360		mu.RUnlock()
1361
1362		go func(sh *Shard) {
1363			defer limit.Release()
1364			defer wg.Done()
1365
1366			engine, err := sh.engine()
1367			if err != nil {
1368				setErr(err)
1369				return
1370			}
1371
1372			cost, err := engine.IteratorCost(measurement, opt)
1373			if err != nil {
1374				setErr(err)
1375				return
1376			}
1377
1378			mu.Lock()
1379			costs = costs.Combine(cost)
1380			mu.Unlock()
1381		}(sh)
1382	}
1383	wg.Wait()
1384	return costs, costerr
1385}
1386
1387func (a Shards) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (_ SeriesCursor, err error) {
1388	var (
1389		idxs  []Index
1390		sfile *SeriesFile
1391	)
1392	for _, sh := range a {
1393		var idx Index
1394		if idx, err = sh.Index(); err == nil {
1395			idxs = append(idxs, idx)
1396		}
1397		if sfile == nil {
1398			sfile, _ = sh.seriesFile()
1399		}
1400	}
1401
1402	if sfile == nil {
1403		return nil, errors.New("CreateSeriesCursor: no series file")
1404	}
1405
1406	return newSeriesCursor(req, IndexSet{Indexes: idxs, SeriesFile: sfile}, cond)
1407}
1408
1409func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
1410	// Use a map as a set to prevent duplicates.
1411	set := map[string]influxql.Source{}
1412
1413	// Iterate through every shard and expand the sources.
1414	for _, sh := range a {
1415		sh.mu.RLock()
1416		expanded, err := sh.expandSources(sources)
1417		sh.mu.RUnlock()
1418		if err != nil {
1419			return nil, err
1420		}
1421
1422		for _, src := range expanded {
1423			switch src := src.(type) {
1424			case *influxql.Measurement:
1425				set[src.String()] = src
1426			default:
1427				return nil, fmt.Errorf("Store.ExpandSources: unsupported source type: %T", src)
1428			}
1429		}
1430	}
1431
1432	// Convert set to sorted slice.
1433	names := make([]string, 0, len(set))
1434	for name := range set {
1435		names = append(names, name)
1436	}
1437	sort.Strings(names)
1438
1439	// Convert set to a list of Sources.
1440	sorted := make([]influxql.Source, 0, len(set))
1441	for _, name := range names {
1442		sorted = append(sorted, set[name])
1443	}
1444	return sorted, nil
1445}
1446
1447// MeasurementFields holds the fields of a measurement and their codec.
1448type MeasurementFields struct {
1449	mu sync.RWMutex
1450
1451	fields map[string]*Field
1452}
1453
1454// NewMeasurementFields returns an initialised *MeasurementFields value.
1455func NewMeasurementFields() *MeasurementFields {
1456	return &MeasurementFields{fields: make(map[string]*Field)}
1457}
1458
1459func (m *MeasurementFields) FieldKeys() []string {
1460	m.mu.RLock()
1461	defer m.mu.RUnlock()
1462
1463	a := make([]string, 0, len(m.fields))
1464	for key := range m.fields {
1465		a = append(a, key)
1466	}
1467	sort.Strings(a)
1468	return a
1469}
1470
1471// bytes estimates the memory footprint of this MeasurementFields, in bytes.
1472func (m *MeasurementFields) bytes() int {
1473	var b int
1474	m.mu.RLock()
1475	b += 24 // mu RWMutex is 24 bytes
1476	b += int(unsafe.Sizeof(m.fields))
1477	for k, v := range m.fields {
1478		b += int(unsafe.Sizeof(k)) + len(k)
1479		b += int(unsafe.Sizeof(v)+unsafe.Sizeof(*v)) + len(v.Name)
1480	}
1481	m.mu.RUnlock()
1482	return b
1483}
1484
1485// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
1486// Returns an error if 255 fields have already been created on the measurement or
1487// the fields already exists with a different type.
1488func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) error {
1489	m.mu.RLock()
1490
1491	// Ignore if the field already exists.
1492	if f := m.fields[string(name)]; f != nil {
1493		if f.Type != typ {
1494			m.mu.RUnlock()
1495			return ErrFieldTypeConflict
1496		}
1497		m.mu.RUnlock()
1498		return nil
1499	}
1500	m.mu.RUnlock()
1501
1502	m.mu.Lock()
1503	defer m.mu.Unlock()
1504
1505	// Re-check field and type under write lock.
1506	if f := m.fields[string(name)]; f != nil {
1507		if f.Type != typ {
1508			return ErrFieldTypeConflict
1509		}
1510		return nil
1511	}
1512
1513	// Create and append a new field.
1514	f := &Field{
1515		ID:   uint8(len(m.fields) + 1),
1516		Name: string(name),
1517		Type: typ,
1518	}
1519	m.fields[string(name)] = f
1520
1521	return nil
1522}
1523
1524func (m *MeasurementFields) FieldN() int {
1525	m.mu.RLock()
1526	n := len(m.fields)
1527	m.mu.RUnlock()
1528	return n
1529}
1530
1531// Field returns the field for name, or nil if there is no field for name.
1532func (m *MeasurementFields) Field(name string) *Field {
1533	m.mu.RLock()
1534	f := m.fields[name]
1535	m.mu.RUnlock()
1536	return f
1537}
1538
1539func (m *MeasurementFields) HasField(name string) bool {
1540	if m == nil {
1541		return false
1542	}
1543	m.mu.RLock()
1544	f := m.fields[name]
1545	m.mu.RUnlock()
1546	return f != nil
1547}
1548
1549// FieldBytes returns the field for name, or nil if there is no field for name.
1550// FieldBytes should be preferred to Field when the caller has a []byte, because
1551// it avoids a string allocation, which can't be avoided if the caller converts
1552// the []byte to a string and calls Field.
1553func (m *MeasurementFields) FieldBytes(name []byte) *Field {
1554	m.mu.RLock()
1555	f := m.fields[string(name)]
1556	m.mu.RUnlock()
1557	return f
1558}
1559
1560// FieldSet returns the set of fields and their types for the measurement.
1561func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
1562	m.mu.RLock()
1563	defer m.mu.RUnlock()
1564
1565	fields := make(map[string]influxql.DataType)
1566	for name, f := range m.fields {
1567		fields[name] = f.Type
1568	}
1569	return fields
1570}
1571
1572func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) {
1573	m.mu.RLock()
1574	defer m.mu.RUnlock()
1575	for name, f := range m.fields {
1576		if !fn(name, f.Type) {
1577			return
1578		}
1579	}
1580}
1581
1582// Clone returns copy of the MeasurementFields
1583func (m *MeasurementFields) Clone() *MeasurementFields {
1584	m.mu.RLock()
1585	defer m.mu.RUnlock()
1586	fields := make(map[string]*Field, len(m.fields))
1587	for key, field := range m.fields {
1588		fields[key] = field
1589	}
1590	return &MeasurementFields{
1591		fields: fields,
1592	}
1593}
1594
1595// MeasurementFieldSet represents a collection of fields by measurement.
1596// This safe for concurrent use.
1597type MeasurementFieldSet struct {
1598	mu     sync.RWMutex
1599	fields map[string]*MeasurementFields
1600
1601	// path is the location to persist field sets
1602	path string
1603}
1604
1605// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
1606func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) {
1607	fs := &MeasurementFieldSet{
1608		fields: make(map[string]*MeasurementFields),
1609		path:   path,
1610	}
1611
1612	// If there is a load error, return the error and an empty set so
1613	// it can be rebuild manually.
1614	return fs, fs.load()
1615}
1616
1617// Bytes estimates the memory footprint of this MeasurementFieldSet, in bytes.
1618func (fs *MeasurementFieldSet) Bytes() int {
1619	var b int
1620	fs.mu.RLock()
1621	b += 24 // mu RWMutex is 24 bytes
1622	for k, v := range fs.fields {
1623		b += int(unsafe.Sizeof(k)) + len(k)
1624		b += int(unsafe.Sizeof(v)) + v.bytes()
1625	}
1626	b += int(unsafe.Sizeof(fs.fields))
1627	b += int(unsafe.Sizeof(fs.path)) + len(fs.path)
1628	fs.mu.RUnlock()
1629	return b
1630}
1631
1632// Fields returns fields for a measurement by name.
1633func (fs *MeasurementFieldSet) Fields(name []byte) *MeasurementFields {
1634	fs.mu.RLock()
1635	mf := fs.fields[string(name)]
1636	fs.mu.RUnlock()
1637	return mf
1638}
1639
1640// FieldsByString returns fields for a measurment by name.
1641func (fs *MeasurementFieldSet) FieldsByString(name string) *MeasurementFields {
1642	fs.mu.RLock()
1643	mf := fs.fields[name]
1644	fs.mu.RUnlock()
1645	return mf
1646}
1647
1648// CreateFieldsIfNotExists returns fields for a measurement by name.
1649func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name []byte) *MeasurementFields {
1650	fs.mu.RLock()
1651	mf := fs.fields[string(name)]
1652	fs.mu.RUnlock()
1653
1654	if mf != nil {
1655		return mf
1656	}
1657
1658	fs.mu.Lock()
1659	mf = fs.fields[string(name)]
1660	if mf == nil {
1661		mf = NewMeasurementFields()
1662		fs.fields[string(name)] = mf
1663	}
1664	fs.mu.Unlock()
1665	return mf
1666}
1667
1668// Delete removes a field set for a measurement.
1669func (fs *MeasurementFieldSet) Delete(name string) {
1670	fs.mu.Lock()
1671	delete(fs.fields, name)
1672	fs.mu.Unlock()
1673}
1674
1675// DeleteWithLock executes fn and removes a field set from a measurement under lock.
1676func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) error {
1677	fs.mu.Lock()
1678	defer fs.mu.Unlock()
1679
1680	if err := fn(); err != nil {
1681		return err
1682	}
1683
1684	delete(fs.fields, name)
1685	return nil
1686}
1687
1688func (fs *MeasurementFieldSet) IsEmpty() bool {
1689	fs.mu.RLock()
1690	defer fs.mu.RUnlock()
1691	return len(fs.fields) == 0
1692}
1693
1694func (fs *MeasurementFieldSet) Save() error {
1695	fs.mu.Lock()
1696	defer fs.mu.Unlock()
1697
1698	return fs.saveNoLock()
1699}
1700
1701func (fs *MeasurementFieldSet) saveNoLock() error {
1702	// No fields left, remove the fields index file
1703	if len(fs.fields) == 0 {
1704		return os.RemoveAll(fs.path)
1705	}
1706
1707	// Write the new index to a temp file and rename when it's sync'd
1708	path := fs.path + ".tmp"
1709	fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
1710	if err != nil {
1711		return err
1712	}
1713	defer os.RemoveAll(path)
1714
1715	if _, err := fd.Write(fieldsIndexMagicNumber); err != nil {
1716		return err
1717	}
1718
1719	pb := internal.MeasurementFieldSet{
1720		Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
1721	}
1722	for name, mf := range fs.fields {
1723		fs := &internal.MeasurementFields{
1724			Name:   name,
1725			Fields: make([]*internal.Field, 0, mf.FieldN()),
1726		}
1727
1728		mf.ForEachField(func(field string, typ influxql.DataType) bool {
1729			fs.Fields = append(fs.Fields, &internal.Field{Name: field, Type: int32(typ)})
1730			return true
1731		})
1732
1733		pb.Measurements = append(pb.Measurements, fs)
1734	}
1735
1736	b, err := proto.Marshal(&pb)
1737	if err != nil {
1738		return err
1739	}
1740
1741	if _, err := fd.Write(b); err != nil {
1742		return err
1743	}
1744
1745	if err = fd.Sync(); err != nil {
1746		return err
1747	}
1748
1749	//close file handle before renaming to support Windows
1750	if err = fd.Close(); err != nil {
1751		return err
1752	}
1753
1754	if err := file.RenameFile(path, fs.path); err != nil {
1755		return err
1756	}
1757
1758	return file.SyncDir(filepath.Dir(fs.path))
1759}
1760
1761func (fs *MeasurementFieldSet) load() error {
1762	fs.mu.Lock()
1763	defer fs.mu.Unlock()
1764
1765	fd, err := os.Open(fs.path)
1766	if os.IsNotExist(err) {
1767		return nil
1768	} else if err != nil {
1769		return err
1770	}
1771	defer fd.Close()
1772
1773	var magic [4]byte
1774	if _, err := fd.Read(magic[:]); err != nil {
1775		return err
1776	}
1777
1778	if !bytes.Equal(magic[:], fieldsIndexMagicNumber) {
1779		return ErrUnknownFieldsFormat
1780	}
1781
1782	var pb internal.MeasurementFieldSet
1783	b, err := ioutil.ReadAll(fd)
1784	if err != nil {
1785		return err
1786	}
1787
1788	if err := proto.Unmarshal(b, &pb); err != nil {
1789		return err
1790	}
1791
1792	fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements()))
1793	for _, measurement := range pb.GetMeasurements() {
1794		set := &MeasurementFields{
1795			fields: make(map[string]*Field, len(measurement.GetFields())),
1796		}
1797		for _, field := range measurement.GetFields() {
1798			set.fields[field.GetName()] = &Field{Name: field.GetName(), Type: influxql.DataType(field.GetType())}
1799		}
1800		fs.fields[measurement.GetName()] = set
1801	}
1802	return nil
1803}
1804
1805// Field represents a series field.
1806type Field struct {
1807	ID   uint8             `json:"id,omitempty"`
1808	Name string            `json:"name,omitempty"`
1809	Type influxql.DataType `json:"type,omitempty"`
1810}
1811
1812// NewFieldKeysIterator returns an iterator that can be iterated over to
1813// retrieve field keys.
1814func NewFieldKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) {
1815	itr := &fieldKeysIterator{shard: sh}
1816
1817	index, err := sh.Index()
1818	if err != nil {
1819		return nil, err
1820	}
1821
1822	// Retrieve measurements from shard. Filter if condition specified.
1823	//
1824	// FGA is currently not supported when retrieving field keys.
1825	indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile}
1826	names, err := indexSet.MeasurementNamesByExpr(query.OpenAuthorizer, opt.Condition)
1827	if err != nil {
1828		return nil, err
1829	}
1830	itr.names = names
1831
1832	return itr, nil
1833}
1834
1835// fieldKeysIterator iterates over measurements and gets field keys from each measurement.
1836type fieldKeysIterator struct {
1837	shard *Shard
1838	names [][]byte // remaining measurement names
1839	buf   struct {
1840		name   []byte  // current measurement name
1841		fields []Field // current measurement's fields
1842	}
1843}
1844
1845// Stats returns stats about the points processed.
1846func (itr *fieldKeysIterator) Stats() query.IteratorStats { return query.IteratorStats{} }
1847
1848// Close closes the iterator.
1849func (itr *fieldKeysIterator) Close() error { return nil }
1850
1851// Next emits the next tag key name.
1852func (itr *fieldKeysIterator) Next() (*query.FloatPoint, error) {
1853	for {
1854		// If there are no more keys then move to the next measurements.
1855		if len(itr.buf.fields) == 0 {
1856			if len(itr.names) == 0 {
1857				return nil, nil
1858			}
1859
1860			itr.buf.name = itr.names[0]
1861			mf := itr.shard.MeasurementFields(itr.buf.name)
1862			if mf != nil {
1863				fset := mf.FieldSet()
1864				if len(fset) == 0 {
1865					itr.names = itr.names[1:]
1866					continue
1867				}
1868
1869				keys := make([]string, 0, len(fset))
1870				for k := range fset {
1871					keys = append(keys, k)
1872				}
1873				sort.Strings(keys)
1874
1875				itr.buf.fields = make([]Field, len(keys))
1876				for i, name := range keys {
1877					itr.buf.fields[i] = Field{Name: name, Type: fset[name]}
1878				}
1879			}
1880			itr.names = itr.names[1:]
1881			continue
1882		}
1883
1884		// Return next key.
1885		field := itr.buf.fields[0]
1886		p := &query.FloatPoint{
1887			Name: string(itr.buf.name),
1888			Aux:  []interface{}{field.Name, field.Type.String()},
1889		}
1890		itr.buf.fields = itr.buf.fields[1:]
1891
1892		return p, nil
1893	}
1894}
1895
1896// NewTagKeysIterator returns a new instance of TagKeysIterator.
1897func NewTagKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) {
1898	fn := func(name []byte) ([][]byte, error) {
1899		index, err := sh.Index()
1900		if err != nil {
1901			return nil, err
1902		}
1903
1904		indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile}
1905		var keys [][]byte
1906		if err := indexSet.ForEachMeasurementTagKey(name, func(key []byte) error {
1907			keys = append(keys, key)
1908			return nil
1909		}); err != nil {
1910			return nil, err
1911		}
1912		return keys, nil
1913	}
1914	return newMeasurementKeysIterator(sh, fn, opt)
1915}
1916
1917// measurementKeyFunc is the function called by measurementKeysIterator.
1918type measurementKeyFunc func(name []byte) ([][]byte, error)
1919
1920func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) {
1921	index, err := sh.Index()
1922	if err != nil {
1923		return nil, err
1924	}
1925
1926	indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile}
1927	itr := &measurementKeysIterator{fn: fn}
1928	names, err := indexSet.MeasurementNamesByExpr(opt.Authorizer, opt.Condition)
1929	if err != nil {
1930		return nil, err
1931	}
1932	itr.names = names
1933
1934	return itr, nil
1935}
1936
1937// measurementKeysIterator iterates over measurements and gets keys from each measurement.
1938type measurementKeysIterator struct {
1939	names [][]byte // remaining measurement names
1940	buf   struct {
1941		name []byte   // current measurement name
1942		keys [][]byte // current measurement's keys
1943	}
1944	fn measurementKeyFunc
1945}
1946
1947// Stats returns stats about the points processed.
1948func (itr *measurementKeysIterator) Stats() query.IteratorStats { return query.IteratorStats{} }
1949
1950// Close closes the iterator.
1951func (itr *measurementKeysIterator) Close() error { return nil }
1952
1953// Next emits the next tag key name.
1954func (itr *measurementKeysIterator) Next() (*query.FloatPoint, error) {
1955	for {
1956		// If there are no more keys then move to the next measurements.
1957		if len(itr.buf.keys) == 0 {
1958			if len(itr.names) == 0 {
1959				return nil, nil
1960			}
1961
1962			itr.buf.name, itr.names = itr.names[0], itr.names[1:]
1963
1964			keys, err := itr.fn(itr.buf.name)
1965			if err != nil {
1966				return nil, err
1967			}
1968			itr.buf.keys = keys
1969			continue
1970		}
1971
1972		// Return next key.
1973		p := &query.FloatPoint{
1974			Name: string(itr.buf.name),
1975			Aux:  []interface{}{string(itr.buf.keys[0])},
1976		}
1977		itr.buf.keys = itr.buf.keys[1:]
1978
1979		return p, nil
1980	}
1981}
1982
1983// LimitError represents an error caused by a configurable limit.
1984type LimitError struct {
1985	Reason string
1986}
1987
1988func (e *LimitError) Error() string { return e.Reason }
1989