1package tsdb
2
3import (
4	"bytes"
5	"encoding/binary"
6	"errors"
7	"fmt"
8	"os"
9	"path/filepath"
10	"runtime"
11	"sort"
12	"sync"
13
14	"github.com/cespare/xxhash"
15	"github.com/influxdata/influxdb/models"
16	"github.com/influxdata/influxdb/pkg/binaryutil"
17	"github.com/influxdata/influxdb/pkg/limiter"
18	"go.uber.org/zap"
19	"golang.org/x/sync/errgroup"
20)
21
22var (
23	ErrSeriesFileClosed         = errors.New("tsdb: series file closed")
24	ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id")
25)
26
27// SeriesIDSize is the size in bytes of a series key ID.
28const SeriesIDSize = 8
29
30const (
31	// SeriesFilePartitionN is the number of partitions a series file is split into.
32	SeriesFilePartitionN = 8
33)
34
35// SeriesFile represents the section of the index that holds series data.
36type SeriesFile struct {
37	path       string
38	partitions []*SeriesPartition
39
40	maxSnapshotConcurrency int
41
42	refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use.
43
44	Logger *zap.Logger
45}
46
47// NewSeriesFile returns a new instance of SeriesFile.
48func NewSeriesFile(path string) *SeriesFile {
49	maxSnapshotConcurrency := runtime.GOMAXPROCS(0)
50	if maxSnapshotConcurrency < 1 {
51		maxSnapshotConcurrency = 1
52	}
53
54	return &SeriesFile{
55		path:                   path,
56		maxSnapshotConcurrency: maxSnapshotConcurrency,
57		Logger:                 zap.NewNop(),
58	}
59}
60
61func (f *SeriesFile) WithMaxCompactionConcurrency(maxCompactionConcurrency int) {
62	if maxCompactionConcurrency < 1 {
63		maxCompactionConcurrency = runtime.GOMAXPROCS(0)
64		if maxCompactionConcurrency < 1 {
65			maxCompactionConcurrency = 1
66		}
67	}
68
69	f.maxSnapshotConcurrency = maxCompactionConcurrency
70}
71
72// Open memory maps the data file at the file's path.
73func (f *SeriesFile) Open() error {
74	// Wait for all references to be released and prevent new ones from being acquired.
75	f.refs.Lock()
76	defer f.refs.Unlock()
77
78	// Create path if it doesn't exist.
79	if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil {
80		return err
81	}
82
83	// Limit concurrent series file compactions
84	compactionLimiter := limiter.NewFixed(f.maxSnapshotConcurrency)
85
86	// Open partitions.
87	f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN)
88	for i := 0; i < SeriesFilePartitionN; i++ {
89		p := NewSeriesPartition(i, f.SeriesPartitionPath(i), compactionLimiter)
90		p.Logger = f.Logger.With(zap.Int("partition", p.ID()))
91		if err := p.Open(); err != nil {
92			f.Logger.Error("Unable to open series file",
93				zap.String("path", f.path),
94				zap.Int("partition", p.ID()),
95				zap.Error(err))
96			f.close()
97			return err
98		}
99		f.partitions = append(f.partitions, p)
100	}
101
102	return nil
103}
104
105func (f *SeriesFile) close() (err error) {
106	for _, p := range f.partitions {
107		if e := p.Close(); e != nil && err == nil {
108			err = e
109		}
110	}
111
112	return err
113}
114
115// Close unmaps the data file.
116func (f *SeriesFile) Close() (err error) {
117	f.refs.Lock()
118	defer f.refs.Unlock()
119	return f.close()
120}
121
122// Path returns the path to the file.
123func (f *SeriesFile) Path() string { return f.path }
124
125// SeriesPartitionPath returns the path to a given partition.
126func (f *SeriesFile) SeriesPartitionPath(i int) string {
127	return filepath.Join(f.path, fmt.Sprintf("%02x", i))
128}
129
130// Partitions returns all partitions.
131func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions }
132
133// Retain adds a reference count to the file.  It returns a release func.
134func (f *SeriesFile) Retain() func() {
135	if f != nil {
136		f.refs.RLock()
137
138		// Return the RUnlock func as the release func to be called when done.
139		return f.refs.RUnlock
140	}
141	return nop
142}
143
144// EnableCompactions allows compactions to run.
145func (f *SeriesFile) EnableCompactions() {
146	for _, p := range f.partitions {
147		p.EnableCompactions()
148	}
149}
150
151// DisableCompactions prevents new compactions from running.
152func (f *SeriesFile) DisableCompactions() {
153	for _, p := range f.partitions {
154		p.DisableCompactions()
155	}
156}
157
158// Wait waits for all Retains to be released.
159func (f *SeriesFile) Wait() {
160	f.refs.Lock()
161	defer f.refs.Unlock()
162}
163
164// FileSize returns the size of all partitions, in bytes.
165func (f *SeriesFile) FileSize() (n int64, err error) {
166	for _, p := range f.partitions {
167		v, err := p.FileSize()
168		n += v
169		if err != nil {
170			return n, err
171		}
172	}
173	return n, err
174}
175
176// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
177// The returned ids slice returns IDs for every name+tags, creating new series IDs as needed.
178func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
179	keys := GenerateSeriesKeys(names, tagsSlice)
180	keyPartitionIDs := f.SeriesKeysPartitionIDs(keys)
181	ids := make([]uint64, len(keys))
182
183	var g errgroup.Group
184	for i := range f.partitions {
185		p := f.partitions[i]
186		g.Go(func() error {
187			return p.CreateSeriesListIfNotExists(keys, keyPartitionIDs, ids)
188		})
189	}
190	if err := g.Wait(); err != nil {
191		return nil, err
192	}
193	return ids, nil
194}
195
196// DeleteSeriesID flags a series as permanently deleted.
197// If the series is reintroduced later then it must create a new id.
198func (f *SeriesFile) DeleteSeriesID(id uint64) error {
199	p := f.SeriesIDPartition(id)
200	if p == nil {
201		return ErrInvalidSeriesPartitionID
202	}
203	return p.DeleteSeriesID(id)
204}
205
206// IsDeleted returns true if the ID has been deleted before.
207func (f *SeriesFile) IsDeleted(id uint64) bool {
208	p := f.SeriesIDPartition(id)
209	if p == nil {
210		return false
211	}
212	return p.IsDeleted(id)
213}
214
215// SeriesKey returns the series key for a given id.
216func (f *SeriesFile) SeriesKey(id uint64) []byte {
217	if id == 0 {
218		return nil
219	}
220	p := f.SeriesIDPartition(id)
221	if p == nil {
222		return nil
223	}
224	return p.SeriesKey(id)
225}
226
227// SeriesKeys returns a list of series keys from a list of ids.
228func (f *SeriesFile) SeriesKeys(ids []uint64) [][]byte {
229	keys := make([][]byte, len(ids))
230	for i := range ids {
231		keys[i] = f.SeriesKey(ids[i])
232	}
233	return keys
234}
235
236// Series returns the parsed series name and tags for an offset.
237func (f *SeriesFile) Series(id uint64) ([]byte, models.Tags) {
238	key := f.SeriesKey(id)
239	if key == nil {
240		return nil, nil
241	}
242	return ParseSeriesKey(key)
243}
244
245// SeriesID return the series id for the series.
246func (f *SeriesFile) SeriesID(name []byte, tags models.Tags, buf []byte) uint64 {
247	key := AppendSeriesKey(buf[:0], name, tags)
248	keyPartition := f.SeriesKeyPartition(key)
249	if keyPartition == nil {
250		return 0
251	}
252	return keyPartition.FindIDBySeriesKey(key)
253}
254
255// HasSeries return true if the series exists.
256func (f *SeriesFile) HasSeries(name []byte, tags models.Tags, buf []byte) bool {
257	return f.SeriesID(name, tags, buf) > 0
258}
259
260// SeriesCount returns the number of series.
261func (f *SeriesFile) SeriesCount() uint64 {
262	var n uint64
263	for _, p := range f.partitions {
264		n += p.SeriesCount()
265	}
266	return n
267}
268
269// SeriesIterator returns an iterator over all the series.
270func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator {
271	var ids []uint64
272	for _, p := range f.partitions {
273		ids = p.AppendSeriesIDs(ids)
274	}
275	sort.Sort(uint64Slice(ids))
276	return NewSeriesIDSliceIterator(ids)
277}
278
279func (f *SeriesFile) SeriesIDPartitionID(id uint64) int {
280	return int((id - 1) % SeriesFilePartitionN)
281}
282
283func (f *SeriesFile) SeriesIDPartition(id uint64) *SeriesPartition {
284	partitionID := f.SeriesIDPartitionID(id)
285	if partitionID >= len(f.partitions) {
286		return nil
287	}
288	return f.partitions[partitionID]
289}
290
291func (f *SeriesFile) SeriesKeysPartitionIDs(keys [][]byte) []int {
292	partitionIDs := make([]int, len(keys))
293	for i := range keys {
294		partitionIDs[i] = f.SeriesKeyPartitionID(keys[i])
295	}
296	return partitionIDs
297}
298
299func (f *SeriesFile) SeriesKeyPartitionID(key []byte) int {
300	return int(xxhash.Sum64(key) % SeriesFilePartitionN)
301}
302
303func (f *SeriesFile) SeriesKeyPartition(key []byte) *SeriesPartition {
304	partitionID := f.SeriesKeyPartitionID(key)
305	if partitionID >= len(f.partitions) {
306		return nil
307	}
308	return f.partitions[partitionID]
309}
310
311// AppendSeriesKey serializes name and tags to a byte slice.
312// The total length is prepended as a uvarint.
313func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte {
314	buf := make([]byte, binary.MaxVarintLen64)
315	origLen := len(dst)
316
317	// The tag count is variable encoded, so we need to know ahead of time what
318	// the size of the tag count value will be.
319	tcBuf := make([]byte, binary.MaxVarintLen64)
320	tcSz := binary.PutUvarint(tcBuf, uint64(len(tags)))
321
322	// Size of name/tags. Does not include total length.
323	size := 0 + //
324		2 + // size of measurement
325		len(name) + // measurement
326		tcSz + // size of number of tags
327		(4 * len(tags)) + // length of each tag key and value
328		tags.Size() // size of tag keys/values
329
330	// Variable encode length.
331	totalSz := binary.PutUvarint(buf, uint64(size))
332
333	// If caller doesn't provide a buffer then pre-allocate an exact one.
334	if dst == nil {
335		dst = make([]byte, 0, size+totalSz)
336	}
337
338	// Append total length.
339	dst = append(dst, buf[:totalSz]...)
340
341	// Append name.
342	binary.BigEndian.PutUint16(buf, uint16(len(name)))
343	dst = append(dst, buf[:2]...)
344	dst = append(dst, name...)
345
346	// Append tag count.
347	dst = append(dst, tcBuf[:tcSz]...)
348
349	// Append tags.
350	for _, tag := range tags {
351		binary.BigEndian.PutUint16(buf, uint16(len(tag.Key)))
352		dst = append(dst, buf[:2]...)
353		dst = append(dst, tag.Key...)
354
355		binary.BigEndian.PutUint16(buf, uint16(len(tag.Value)))
356		dst = append(dst, buf[:2]...)
357		dst = append(dst, tag.Value...)
358	}
359
360	// Verify that the total length equals the encoded byte count.
361	if got, exp := len(dst)-origLen, size+totalSz; got != exp {
362		panic(fmt.Sprintf("series key encoding does not match calculated total length: actual=%d, exp=%d, key=%x", got, exp, dst))
363	}
364
365	return dst
366}
367
368// ReadSeriesKey returns the series key from the beginning of the buffer.
369func ReadSeriesKey(data []byte) (key, remainder []byte) {
370	sz, n := binary.Uvarint(data)
371	return data[:int(sz)+n], data[int(sz)+n:]
372}
373
374func ReadSeriesKeyLen(data []byte) (sz int, remainder []byte) {
375	sz64, i := binary.Uvarint(data)
376	return int(sz64), data[i:]
377}
378
379func ReadSeriesKeyMeasurement(data []byte) (name, remainder []byte) {
380	n, data := binary.BigEndian.Uint16(data), data[2:]
381	return data[:n], data[n:]
382}
383
384func ReadSeriesKeyTagN(data []byte) (n int, remainder []byte) {
385	n64, i := binary.Uvarint(data)
386	return int(n64), data[i:]
387}
388
389func ReadSeriesKeyTag(data []byte) (key, value, remainder []byte) {
390	n, data := binary.BigEndian.Uint16(data), data[2:]
391	key, data = data[:n], data[n:]
392
393	n, data = binary.BigEndian.Uint16(data), data[2:]
394	value, data = data[:n], data[n:]
395	return key, value, data
396}
397
398// ParseSeriesKey extracts the name & tags from a series key.
399func ParseSeriesKey(data []byte) (name []byte, tags models.Tags) {
400	return parseSeriesKey(data, nil)
401}
402
403// ParseSeriesKeyInto extracts the name and tags for data, parsing the tags into
404// dstTags, which is then returened.
405//
406// The returned dstTags may have a different length and capacity.
407func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags) {
408	return parseSeriesKey(data, dstTags)
409}
410
411// parseSeriesKey extracts the name and tags from data, attempting to re-use the
412// provided tags value rather than allocating. The returned tags may have a
413// different length and capacity to those provided.
414func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
415	var name []byte
416	_, data = ReadSeriesKeyLen(data)
417	name, data = ReadSeriesKeyMeasurement(data)
418	tagN, data := ReadSeriesKeyTagN(data)
419
420	dst = dst[:cap(dst)] // Grow dst to use full capacity
421	if got, want := len(dst), tagN; got < want {
422		dst = append(dst, make(models.Tags, want-got)...)
423	} else if got > want {
424		dst = dst[:want]
425	}
426	dst = dst[:tagN]
427
428	for i := 0; i < tagN; i++ {
429		var key, value []byte
430		key, value, data = ReadSeriesKeyTag(data)
431		dst[i].Key, dst[i].Value = key, value
432	}
433
434	return name, dst
435}
436
437func CompareSeriesKeys(a, b []byte) int {
438	// Handle 'nil' keys.
439	if len(a) == 0 && len(b) == 0 {
440		return 0
441	} else if len(a) == 0 {
442		return -1
443	} else if len(b) == 0 {
444		return 1
445	}
446
447	// Read total size.
448	_, a = ReadSeriesKeyLen(a)
449	_, b = ReadSeriesKeyLen(b)
450
451	// Read names.
452	name0, a := ReadSeriesKeyMeasurement(a)
453	name1, b := ReadSeriesKeyMeasurement(b)
454
455	// Compare names, return if not equal.
456	if cmp := bytes.Compare(name0, name1); cmp != 0 {
457		return cmp
458	}
459
460	// Read tag counts.
461	tagN0, a := ReadSeriesKeyTagN(a)
462	tagN1, b := ReadSeriesKeyTagN(b)
463
464	// Compare each tag in order.
465	for i := 0; ; i++ {
466		// Check for EOF.
467		if i == tagN0 && i == tagN1 {
468			return 0
469		} else if i == tagN0 {
470			return -1
471		} else if i == tagN1 {
472			return 1
473		}
474
475		// Read keys.
476		var key0, key1, value0, value1 []byte
477		key0, value0, a = ReadSeriesKeyTag(a)
478		key1, value1, b = ReadSeriesKeyTag(b)
479
480		// Compare keys & values.
481		if cmp := bytes.Compare(key0, key1); cmp != 0 {
482			return cmp
483		} else if cmp := bytes.Compare(value0, value1); cmp != 0 {
484			return cmp
485		}
486	}
487}
488
489// GenerateSeriesKeys generates series keys for a list of names & tags using
490// a single large memory block.
491func GenerateSeriesKeys(names [][]byte, tagsSlice []models.Tags) [][]byte {
492	buf := make([]byte, 0, SeriesKeysSize(names, tagsSlice))
493	keys := make([][]byte, len(names))
494	for i := range names {
495		offset := len(buf)
496		buf = AppendSeriesKey(buf, names[i], tagsSlice[i])
497		keys[i] = buf[offset:]
498	}
499	return keys
500}
501
502// SeriesKeysSize returns the number of bytes required to encode a list of name/tags.
503func SeriesKeysSize(names [][]byte, tagsSlice []models.Tags) int {
504	var n int
505	for i := range names {
506		n += SeriesKeySize(names[i], tagsSlice[i])
507	}
508	return n
509}
510
511// SeriesKeySize returns the number of bytes required to encode a series key.
512func SeriesKeySize(name []byte, tags models.Tags) int {
513	var n int
514	n += 2 + len(name)
515	n += binaryutil.UvarintSize(uint64(len(tags)))
516	for _, tag := range tags {
517		n += 2 + len(tag.Key)
518		n += 2 + len(tag.Value)
519	}
520	n += binaryutil.UvarintSize(uint64(n))
521	return n
522}
523
524type seriesKeys [][]byte
525
526func (a seriesKeys) Len() int      { return len(a) }
527func (a seriesKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
528func (a seriesKeys) Less(i, j int) bool {
529	return CompareSeriesKeys(a[i], a[j]) == -1
530}
531
532type uint64Slice []uint64
533
534func (a uint64Slice) Len() int           { return len(a) }
535func (a uint64Slice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
536func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
537
538func nop() {}
539