1package tsdb
2
3import (
4	"bytes"
5	"encoding/binary"
6	"errors"
7	"fmt"
8	"os"
9	"path/filepath"
10	"sort"
11	"sync"
12
13	"github.com/cespare/xxhash"
14	"github.com/influxdata/influxdb/models"
15	"github.com/influxdata/influxdb/pkg/binaryutil"
16	"go.uber.org/zap"
17	"golang.org/x/sync/errgroup"
18)
19
20var (
21	ErrSeriesFileClosed         = errors.New("tsdb: series file closed")
22	ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id")
23)
24
25// SeriesIDSize is the size in bytes of a series key ID.
26const SeriesIDSize = 8
27
28const (
29	// SeriesFilePartitionN is the number of partitions a series file is split into.
30	SeriesFilePartitionN = 8
31)
32
33// SeriesFile represents the section of the index that holds series data.
34type SeriesFile struct {
35	path       string
36	partitions []*SeriesPartition
37
38	refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use.
39
40	Logger *zap.Logger
41}
42
43// NewSeriesFile returns a new instance of SeriesFile.
44func NewSeriesFile(path string) *SeriesFile {
45	return &SeriesFile{
46		path:   path,
47		Logger: zap.NewNop(),
48	}
49}
50
51// Open memory maps the data file at the file's path.
52func (f *SeriesFile) Open() error {
53	// Wait for all references to be released and prevent new ones from being acquired.
54	f.refs.Lock()
55	defer f.refs.Unlock()
56
57	// Create path if it doesn't exist.
58	if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil {
59		return err
60	}
61
62	// Open partitions.
63	f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN)
64	for i := 0; i < SeriesFilePartitionN; i++ {
65		p := NewSeriesPartition(i, f.SeriesPartitionPath(i))
66		p.Logger = f.Logger.With(zap.Int("partition", p.ID()))
67		if err := p.Open(); err != nil {
68			f.Logger.Error("Unable to open series file",
69				zap.String("path", f.path),
70				zap.Int("partition", p.ID()),
71				zap.Error(err))
72			f.close()
73			return err
74		}
75		f.partitions = append(f.partitions, p)
76	}
77
78	return nil
79}
80
81func (f *SeriesFile) close() (err error) {
82	for _, p := range f.partitions {
83		if e := p.Close(); e != nil && err == nil {
84			err = e
85		}
86	}
87
88	return err
89}
90
91// Close unmaps the data file.
92func (f *SeriesFile) Close() (err error) {
93	f.refs.Lock()
94	defer f.refs.Unlock()
95	return f.close()
96}
97
98// Path returns the path to the file.
99func (f *SeriesFile) Path() string { return f.path }
100
101// SeriesPartitionPath returns the path to a given partition.
102func (f *SeriesFile) SeriesPartitionPath(i int) string {
103	return filepath.Join(f.path, fmt.Sprintf("%02x", i))
104}
105
106// Partitions returns all partitions.
107func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions }
108
109// Retain adds a reference count to the file.  It returns a release func.
110func (f *SeriesFile) Retain() func() {
111	if f != nil {
112		f.refs.RLock()
113
114		// Return the RUnlock func as the release func to be called when done.
115		return f.refs.RUnlock
116	}
117	return nop
118}
119
120// EnableCompactions allows compactions to run.
121func (f *SeriesFile) EnableCompactions() {
122	for _, p := range f.partitions {
123		p.EnableCompactions()
124	}
125}
126
127// DisableCompactions prevents new compactions from running.
128func (f *SeriesFile) DisableCompactions() {
129	for _, p := range f.partitions {
130		p.DisableCompactions()
131	}
132}
133
134// Wait waits for all Retains to be released.
135func (f *SeriesFile) Wait() {
136	f.refs.Lock()
137	defer f.refs.Unlock()
138}
139
140// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
141// The returned ids slice returns IDs for every name+tags, creating new series IDs as needed.
142func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
143	keys := GenerateSeriesKeys(names, tagsSlice)
144	keyPartitionIDs := f.SeriesKeysPartitionIDs(keys)
145	ids := make([]uint64, len(keys))
146
147	var g errgroup.Group
148	for i := range f.partitions {
149		p := f.partitions[i]
150		g.Go(func() error {
151			return p.CreateSeriesListIfNotExists(keys, keyPartitionIDs, ids)
152		})
153	}
154	if err := g.Wait(); err != nil {
155		return nil, err
156	}
157	return ids, nil
158}
159
160// DeleteSeriesID flags a series as permanently deleted.
161// If the series is reintroduced later then it must create a new id.
162func (f *SeriesFile) DeleteSeriesID(id uint64) error {
163	p := f.SeriesIDPartition(id)
164	if p == nil {
165		return ErrInvalidSeriesPartitionID
166	}
167	return p.DeleteSeriesID(id)
168}
169
170// IsDeleted returns true if the ID has been deleted before.
171func (f *SeriesFile) IsDeleted(id uint64) bool {
172	p := f.SeriesIDPartition(id)
173	if p == nil {
174		return false
175	}
176	return p.IsDeleted(id)
177}
178
179// SeriesKey returns the series key for a given id.
180func (f *SeriesFile) SeriesKey(id uint64) []byte {
181	if id == 0 {
182		return nil
183	}
184	p := f.SeriesIDPartition(id)
185	if p == nil {
186		return nil
187	}
188	return p.SeriesKey(id)
189}
190
191// SeriesKeys returns a list of series keys from a list of ids.
192func (f *SeriesFile) SeriesKeys(ids []uint64) [][]byte {
193	keys := make([][]byte, len(ids))
194	for i := range ids {
195		keys[i] = f.SeriesKey(ids[i])
196	}
197	return keys
198}
199
200// Series returns the parsed series name and tags for an offset.
201func (f *SeriesFile) Series(id uint64) ([]byte, models.Tags) {
202	key := f.SeriesKey(id)
203	if key == nil {
204		return nil, nil
205	}
206	return ParseSeriesKey(key)
207}
208
209// SeriesID return the series id for the series.
210func (f *SeriesFile) SeriesID(name []byte, tags models.Tags, buf []byte) uint64 {
211	key := AppendSeriesKey(buf[:0], name, tags)
212	keyPartition := f.SeriesKeyPartition(key)
213	if keyPartition == nil {
214		return 0
215	}
216	return keyPartition.FindIDBySeriesKey(key)
217}
218
219// HasSeries return true if the series exists.
220func (f *SeriesFile) HasSeries(name []byte, tags models.Tags, buf []byte) bool {
221	return f.SeriesID(name, tags, buf) > 0
222}
223
224// SeriesCount returns the number of series.
225func (f *SeriesFile) SeriesCount() uint64 {
226	var n uint64
227	for _, p := range f.partitions {
228		n += p.SeriesCount()
229	}
230	return n
231}
232
233// SeriesIterator returns an iterator over all the series.
234func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator {
235	var ids []uint64
236	for _, p := range f.partitions {
237		ids = p.AppendSeriesIDs(ids)
238	}
239	sort.Sort(uint64Slice(ids))
240	return NewSeriesIDSliceIterator(ids)
241}
242
243func (f *SeriesFile) SeriesIDPartitionID(id uint64) int {
244	return int((id - 1) % SeriesFilePartitionN)
245}
246
247func (f *SeriesFile) SeriesIDPartition(id uint64) *SeriesPartition {
248	partitionID := f.SeriesIDPartitionID(id)
249	if partitionID >= len(f.partitions) {
250		return nil
251	}
252	return f.partitions[partitionID]
253}
254
255func (f *SeriesFile) SeriesKeysPartitionIDs(keys [][]byte) []int {
256	partitionIDs := make([]int, len(keys))
257	for i := range keys {
258		partitionIDs[i] = f.SeriesKeyPartitionID(keys[i])
259	}
260	return partitionIDs
261}
262
263func (f *SeriesFile) SeriesKeyPartitionID(key []byte) int {
264	return int(xxhash.Sum64(key) % SeriesFilePartitionN)
265}
266
267func (f *SeriesFile) SeriesKeyPartition(key []byte) *SeriesPartition {
268	partitionID := f.SeriesKeyPartitionID(key)
269	if partitionID >= len(f.partitions) {
270		return nil
271	}
272	return f.partitions[partitionID]
273}
274
275// AppendSeriesKey serializes name and tags to a byte slice.
276// The total length is prepended as a uvarint.
277func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte {
278	buf := make([]byte, binary.MaxVarintLen64)
279	origLen := len(dst)
280
281	// The tag count is variable encoded, so we need to know ahead of time what
282	// the size of the tag count value will be.
283	tcBuf := make([]byte, binary.MaxVarintLen64)
284	tcSz := binary.PutUvarint(tcBuf, uint64(len(tags)))
285
286	// Size of name/tags. Does not include total length.
287	size := 0 + //
288		2 + // size of measurement
289		len(name) + // measurement
290		tcSz + // size of number of tags
291		(4 * len(tags)) + // length of each tag key and value
292		tags.Size() // size of tag keys/values
293
294	// Variable encode length.
295	totalSz := binary.PutUvarint(buf, uint64(size))
296
297	// If caller doesn't provide a buffer then pre-allocate an exact one.
298	if dst == nil {
299		dst = make([]byte, 0, size+totalSz)
300	}
301
302	// Append total length.
303	dst = append(dst, buf[:totalSz]...)
304
305	// Append name.
306	binary.BigEndian.PutUint16(buf, uint16(len(name)))
307	dst = append(dst, buf[:2]...)
308	dst = append(dst, name...)
309
310	// Append tag count.
311	dst = append(dst, tcBuf[:tcSz]...)
312
313	// Append tags.
314	for _, tag := range tags {
315		binary.BigEndian.PutUint16(buf, uint16(len(tag.Key)))
316		dst = append(dst, buf[:2]...)
317		dst = append(dst, tag.Key...)
318
319		binary.BigEndian.PutUint16(buf, uint16(len(tag.Value)))
320		dst = append(dst, buf[:2]...)
321		dst = append(dst, tag.Value...)
322	}
323
324	// Verify that the total length equals the encoded byte count.
325	if got, exp := len(dst)-origLen, size+totalSz; got != exp {
326		panic(fmt.Sprintf("series key encoding does not match calculated total length: actual=%d, exp=%d, key=%x", got, exp, dst))
327	}
328
329	return dst
330}
331
332// ReadSeriesKey returns the series key from the beginning of the buffer.
333func ReadSeriesKey(data []byte) (key, remainder []byte) {
334	sz, n := binary.Uvarint(data)
335	return data[:int(sz)+n], data[int(sz)+n:]
336}
337
338func ReadSeriesKeyLen(data []byte) (sz int, remainder []byte) {
339	sz64, i := binary.Uvarint(data)
340	return int(sz64), data[i:]
341}
342
343func ReadSeriesKeyMeasurement(data []byte) (name, remainder []byte) {
344	n, data := binary.BigEndian.Uint16(data), data[2:]
345	return data[:n], data[n:]
346}
347
348func ReadSeriesKeyTagN(data []byte) (n int, remainder []byte) {
349	n64, i := binary.Uvarint(data)
350	return int(n64), data[i:]
351}
352
353func ReadSeriesKeyTag(data []byte) (key, value, remainder []byte) {
354	n, data := binary.BigEndian.Uint16(data), data[2:]
355	key, data = data[:n], data[n:]
356
357	n, data = binary.BigEndian.Uint16(data), data[2:]
358	value, data = data[:n], data[n:]
359	return key, value, data
360}
361
362// ParseSeriesKey extracts the name & tags from a series key.
363func ParseSeriesKey(data []byte) (name []byte, tags models.Tags) {
364	return parseSeriesKey(data, nil)
365}
366
367// ParseSeriesKeyInto extracts the name and tags for data, parsing the tags into
368// dstTags, which is then returened.
369//
370// The returned dstTags may have a different length and capacity.
371func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags) {
372	return parseSeriesKey(data, dstTags)
373}
374
375// parseSeriesKey extracts the name and tags from data, attempting to re-use the
376// provided tags value rather than allocating. The returned tags may have a
377// different length and capacity to those provided.
378func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
379	var name []byte
380	_, data = ReadSeriesKeyLen(data)
381	name, data = ReadSeriesKeyMeasurement(data)
382	tagN, data := ReadSeriesKeyTagN(data)
383
384	dst = dst[:cap(dst)] // Grow dst to use full capacity
385	if got, want := len(dst), tagN; got < want {
386		dst = append(dst, make(models.Tags, want-got)...)
387	} else if got > want {
388		dst = dst[:want]
389	}
390	dst = dst[:tagN]
391
392	for i := 0; i < tagN; i++ {
393		var key, value []byte
394		key, value, data = ReadSeriesKeyTag(data)
395		dst[i].Key, dst[i].Value = key, value
396	}
397
398	return name, dst
399}
400
401func CompareSeriesKeys(a, b []byte) int {
402	// Handle 'nil' keys.
403	if len(a) == 0 && len(b) == 0 {
404		return 0
405	} else if len(a) == 0 {
406		return -1
407	} else if len(b) == 0 {
408		return 1
409	}
410
411	// Read total size.
412	_, a = ReadSeriesKeyLen(a)
413	_, b = ReadSeriesKeyLen(b)
414
415	// Read names.
416	name0, a := ReadSeriesKeyMeasurement(a)
417	name1, b := ReadSeriesKeyMeasurement(b)
418
419	// Compare names, return if not equal.
420	if cmp := bytes.Compare(name0, name1); cmp != 0 {
421		return cmp
422	}
423
424	// Read tag counts.
425	tagN0, a := ReadSeriesKeyTagN(a)
426	tagN1, b := ReadSeriesKeyTagN(b)
427
428	// Compare each tag in order.
429	for i := 0; ; i++ {
430		// Check for EOF.
431		if i == tagN0 && i == tagN1 {
432			return 0
433		} else if i == tagN0 {
434			return -1
435		} else if i == tagN1 {
436			return 1
437		}
438
439		// Read keys.
440		var key0, key1, value0, value1 []byte
441		key0, value0, a = ReadSeriesKeyTag(a)
442		key1, value1, b = ReadSeriesKeyTag(b)
443
444		// Compare keys & values.
445		if cmp := bytes.Compare(key0, key1); cmp != 0 {
446			return cmp
447		} else if cmp := bytes.Compare(value0, value1); cmp != 0 {
448			return cmp
449		}
450	}
451}
452
453// GenerateSeriesKeys generates series keys for a list of names & tags using
454// a single large memory block.
455func GenerateSeriesKeys(names [][]byte, tagsSlice []models.Tags) [][]byte {
456	buf := make([]byte, 0, SeriesKeysSize(names, tagsSlice))
457	keys := make([][]byte, len(names))
458	for i := range names {
459		offset := len(buf)
460		buf = AppendSeriesKey(buf, names[i], tagsSlice[i])
461		keys[i] = buf[offset:]
462	}
463	return keys
464}
465
466// SeriesKeysSize returns the number of bytes required to encode a list of name/tags.
467func SeriesKeysSize(names [][]byte, tagsSlice []models.Tags) int {
468	var n int
469	for i := range names {
470		n += SeriesKeySize(names[i], tagsSlice[i])
471	}
472	return n
473}
474
475// SeriesKeySize returns the number of bytes required to encode a series key.
476func SeriesKeySize(name []byte, tags models.Tags) int {
477	var n int
478	n += 2 + len(name)
479	n += binaryutil.UvarintSize(uint64(len(tags)))
480	for _, tag := range tags {
481		n += 2 + len(tag.Key)
482		n += 2 + len(tag.Value)
483	}
484	n += binaryutil.UvarintSize(uint64(n))
485	return n
486}
487
488type seriesKeys [][]byte
489
490func (a seriesKeys) Len() int      { return len(a) }
491func (a seriesKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
492func (a seriesKeys) Less(i, j int) bool {
493	return CompareSeriesKeys(a[i], a[j]) == -1
494}
495
496type uint64Slice []uint64
497
498func (a uint64Slice) Len() int           { return len(a) }
499func (a uint64Slice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
500func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
501
502func nop() {}
503