1package tsm1_test
2
3import (
4	"archive/tar"
5	"bytes"
6	"context"
7	"fmt"
8	"io"
9	"io/ioutil"
10	"math"
11	"math/rand"
12	"os"
13	"path"
14	"path/filepath"
15	"reflect"
16	"runtime"
17	"strings"
18	"sync"
19	"testing"
20	"time"
21
22	"github.com/google/go-cmp/cmp"
23	"github.com/influxdata/influxdb/logger"
24	"github.com/influxdata/influxdb/models"
25	"github.com/influxdata/influxdb/pkg/deep"
26	"github.com/influxdata/influxdb/query"
27	"github.com/influxdata/influxdb/tsdb"
28	"github.com/influxdata/influxdb/tsdb/engine/tsm1"
29	"github.com/influxdata/influxdb/tsdb/index/inmem"
30	"github.com/influxdata/influxql"
31)
32
33// Ensure that deletes only sent to the WAL will clear out the data from the cache on restart
34func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
35	for _, index := range tsdb.RegisteredIndexes() {
36		t.Run(index, func(t *testing.T) {
37			e := MustOpenEngine(index)
38			defer e.Close()
39
40			if err := e.WritePointsString(
41				`cpu,host=A value=1.1 1000000000`,
42				`cpu,host=B value=1.2 2000000000`,
43			); err != nil {
44				t.Fatalf("failed to write points: %s", err.Error())
45			}
46
47			// Remove series.
48			itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
49			if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
50				t.Fatalf("failed to delete series: %s", err.Error())
51			}
52
53			// Ensure we can close and load index from the WAL
54			if err := e.Reopen(); err != nil {
55				t.Fatal(err)
56			}
57
58			if exp, got := 0, len(e.Cache.Values(tsm1.SeriesFieldKeyBytes("cpu,host=A", "value"))); exp != got {
59				t.Fatalf("unexpected number of values: got: %d. exp: %d", got, exp)
60			}
61
62			if exp, got := 1, len(e.Cache.Values(tsm1.SeriesFieldKeyBytes("cpu,host=B", "value"))); exp != got {
63				t.Fatalf("unexpected number of values: got: %d. exp: %d", got, exp)
64			}
65		})
66	}
67}
68
69// See https://github.com/influxdata/influxdb/issues/14229
70func TestEngine_DeleteSeriesAfterCacheSnapshot(t *testing.T) {
71	for _, index := range tsdb.RegisteredIndexes() {
72		t.Run(index, func(t *testing.T) {
73			e := MustOpenEngine(index)
74			defer e.Close()
75
76			if err := e.WritePointsString(
77				`cpu,host=A value=1.1 1000000000`,
78				`cpu,host=B value=1.2 2000000000`,
79			); err != nil {
80				t.Fatalf("failed to write points: %s", err.Error())
81			}
82
83			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
84			e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
85			e.CreateSeriesIfNotExists([]byte("cpu,host=B"), []byte("cpu"), models.NewTags(map[string]string{"host": "B"}))
86
87			// Verify series exist.
88			n, err := seriesExist(e, "cpu", []string{"host"})
89			if err != nil {
90				t.Fatal(err)
91			} else if got, exp := n, 2; got != exp {
92				t.Fatalf("got %d points, expected %d", got, exp)
93			}
94
95			// Simulate restart of server
96			if err := e.Reopen(); err != nil {
97				t.Fatal(err)
98			}
99
100			// Snapshot the cache
101			if err := e.WriteSnapshot(); err != nil {
102				t.Fatalf("failed to snapshot: %s", err.Error())
103			}
104
105			// Verify series exist.
106			n, err = seriesExist(e, "cpu", []string{"host"})
107			if err != nil {
108				t.Fatal(err)
109			} else if got, exp := n, 2; got != exp {
110				t.Fatalf("got %d points, expected %d", got, exp)
111			}
112
113			// Delete the series
114			itr := &seriesIterator{keys: [][]byte{
115				[]byte("cpu,host=A"),
116				[]byte("cpu,host=B"),
117			},
118			}
119			if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
120				t.Fatalf("failed to delete series: %s", err.Error())
121			}
122
123			// Verify the series are no longer present.
124			n, err = seriesExist(e, "cpu", []string{"host"})
125			if err != nil {
126				t.Fatal(err)
127			} else if got, exp := n, 0; got != exp {
128				t.Fatalf("got %d points, expected %d", got, exp)
129			}
130
131			// Simulate restart of server
132			if err := e.Reopen(); err != nil {
133				t.Fatal(err)
134			}
135
136			// Verify the series are no longer present.
137			n, err = seriesExist(e, "cpu", []string{"host"})
138			if err != nil {
139				t.Fatal(err)
140			} else if got, exp := n, 0; got != exp {
141				t.Fatalf("got %d points, expected %d", got, exp)
142			}
143		})
144	}
145}
146
147func seriesExist(e *Engine, m string, dims []string) (int, error) {
148	itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
149		Expr:       influxql.MustParseExpr(`value`),
150		Dimensions: []string{"host"},
151		StartTime:  influxql.MinTime,
152		EndTime:    influxql.MaxTime,
153		Ascending:  false,
154	})
155	if err != nil {
156		return 0, err
157	} else if itr == nil {
158		return 0, nil
159	}
160	defer itr.Close()
161	fitr := itr.(query.FloatIterator)
162
163	var n int
164	for {
165		p, err := fitr.Next()
166		if err != nil {
167			return 0, err
168		} else if p == nil {
169			return n, nil
170		}
171		n++
172	}
173}
174
175// Ensure that the engine can write & read shard digest files.
176func TestEngine_Digest(t *testing.T) {
177	e := MustOpenEngine(inmem.IndexName)
178	defer e.Close()
179
180	if err := e.Open(); err != nil {
181		t.Fatalf("failed to open tsm1 engine: %s", err.Error())
182	}
183
184	// Create a few points.
185	points := []models.Point{
186		MustParsePointString("cpu,host=A value=1.1 1000000000"),
187		MustParsePointString("cpu,host=B value=1.2 2000000000"),
188	}
189
190	if err := e.WritePoints(points); err != nil {
191		t.Fatalf("failed to write points: %s", err.Error())
192	}
193
194	// Force a compaction.
195	e.ScheduleFullCompaction()
196
197	digest := func() ([]span, error) {
198		// Get a reader for the shard's digest.
199		r, sz, err := e.Digest()
200		if err != nil {
201			return nil, err
202		}
203
204		if sz <= 0 {
205			t.Fatalf("expected digest size > 0")
206		}
207
208		// Make sure the digest can be read.
209		dr, err := tsm1.NewDigestReader(r)
210		if err != nil {
211			r.Close()
212			return nil, err
213		}
214		defer dr.Close()
215
216		_, err = dr.ReadManifest()
217		if err != nil {
218			t.Fatal(err)
219		}
220
221		got := []span{}
222
223		for {
224			k, s, err := dr.ReadTimeSpan()
225			if err == io.EOF {
226				break
227			} else if err != nil {
228				return nil, err
229			}
230
231			got = append(got, span{
232				key:   k,
233				tspan: s,
234			})
235		}
236
237		return got, nil
238	}
239
240	exp := []span{
241		span{
242			key: "cpu,host=A#!~#value",
243			tspan: &tsm1.DigestTimeSpan{
244				Ranges: []tsm1.DigestTimeRange{
245					tsm1.DigestTimeRange{
246						Min: 1000000000,
247						Max: 1000000000,
248						N:   1,
249						CRC: 1048747083,
250					},
251				},
252			},
253		},
254		span{
255			key: "cpu,host=B#!~#value",
256			tspan: &tsm1.DigestTimeSpan{
257				Ranges: []tsm1.DigestTimeRange{
258					tsm1.DigestTimeRange{
259						Min: 2000000000,
260						Max: 2000000000,
261						N:   1,
262						CRC: 734984746,
263					},
264				},
265			},
266		},
267	}
268
269	for n := 0; n < 2; n++ {
270		got, err := digest()
271		if err != nil {
272			t.Fatalf("n = %d: %s", n, err)
273		}
274
275		// Make sure the data in the digest was valid.
276		if !reflect.DeepEqual(exp, got) {
277			t.Fatalf("n = %d\nexp = %v\ngot = %v\n", n, exp, got)
278		}
279	}
280
281	// Test that writing more points causes the digest to be updated.
282	points = []models.Point{
283		MustParsePointString("cpu,host=C value=1.1 3000000000"),
284	}
285
286	if err := e.WritePoints(points); err != nil {
287		t.Fatalf("failed to write points: %s", err.Error())
288	}
289
290	// Force a compaction.
291	e.ScheduleFullCompaction()
292
293	// Get new digest.
294	got, err := digest()
295	if err != nil {
296		t.Fatal(err)
297	}
298
299	exp = append(exp, span{
300		key: "cpu,host=C#!~#value",
301		tspan: &tsm1.DigestTimeSpan{
302			Ranges: []tsm1.DigestTimeRange{
303				tsm1.DigestTimeRange{
304					Min: 3000000000,
305					Max: 3000000000,
306					N:   1,
307					CRC: 2553233514,
308				},
309			},
310		},
311	})
312
313	if !reflect.DeepEqual(exp, got) {
314		t.Fatalf("\nexp = %v\ngot = %v\n", exp, got)
315	}
316}
317
318type span struct {
319	key   string
320	tspan *tsm1.DigestTimeSpan
321}
322
323// Ensure engine handles concurrent calls to Digest().
324func TestEngine_Digest_Concurrent(t *testing.T) {
325	e := MustOpenEngine(inmem.IndexName)
326	defer e.Close()
327
328	if err := e.Open(); err != nil {
329		t.Fatalf("failed to open tsm1 engine: %s", err.Error())
330	}
331
332	// Create a few points.
333	points := []models.Point{
334		MustParsePointString("cpu,host=A value=1.1 1000000000"),
335		MustParsePointString("cpu,host=B value=1.2 2000000000"),
336	}
337
338	if err := e.WritePoints(points); err != nil {
339		t.Fatalf("failed to write points: %s", err.Error())
340	}
341
342	// Force a compaction.
343	e.ScheduleFullCompaction()
344
345	// Start multiple waiting goroutines, ready to call Digest().
346	start := make(chan struct{})
347	errs := make(chan error)
348	wg := &sync.WaitGroup{}
349	for n := 0; n < 100; n++ {
350		wg.Add(1)
351		go func() {
352			defer wg.Done()
353			<-start
354			if _, _, err := e.Digest(); err != nil {
355				errs <- err
356			}
357		}()
358	}
359
360	// Goroutine to close errs channel after all routines have finished.
361	go func() { wg.Wait(); close(errs) }()
362
363	// Signal all goroutines to call Digest().
364	close(start)
365
366	// Check for digest errors.
367	for err := range errs {
368		if err != nil {
369			t.Fatal(err)
370		}
371	}
372}
373
374// Ensure that the engine will backup any TSM files created since the passed in time
375func TestEngine_Backup(t *testing.T) {
376	sfile := MustOpenSeriesFile()
377	defer sfile.Close()
378
379	// Generate temporary file.
380	f, _ := ioutil.TempFile("", "tsm")
381	f.Close()
382	os.Remove(f.Name())
383	walPath := filepath.Join(f.Name(), "wal")
384	os.MkdirAll(walPath, 0777)
385	defer os.RemoveAll(f.Name())
386
387	// Create a few points.
388	p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
389	p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
390	p3 := MustParsePointString("cpu,host=C value=1.3 3000000000")
391
392	// Write those points to the engine.
393	db := path.Base(f.Name())
394	opt := tsdb.NewEngineOptions()
395	opt.InmemIndex = inmem.NewIndex(db, sfile.SeriesFile)
396	idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
397	defer idx.Close()
398
399	e := tsm1.NewEngine(1, idx, f.Name(), walPath, sfile.SeriesFile, opt).(*tsm1.Engine)
400
401	// mock the planner so compactions don't run during the test
402	e.CompactionPlan = &mockPlanner{}
403
404	if err := e.Open(); err != nil {
405		t.Fatalf("failed to open tsm1 engine: %s", err.Error())
406	}
407
408	if err := e.WritePoints([]models.Point{p1}); err != nil {
409		t.Fatalf("failed to write points: %s", err.Error())
410	}
411	if err := e.WriteSnapshot(); err != nil {
412		t.Fatalf("failed to snapshot: %s", err.Error())
413	}
414
415	if err := e.WritePoints([]models.Point{p2}); err != nil {
416		t.Fatalf("failed to write points: %s", err.Error())
417	}
418
419	b := bytes.NewBuffer(nil)
420	if err := e.Backup(b, "", time.Unix(0, 0)); err != nil {
421		t.Fatalf("failed to backup: %s", err.Error())
422	}
423
424	tr := tar.NewReader(b)
425	if len(e.FileStore.Files()) != 2 {
426		t.Fatalf("file count wrong: exp: %d, got: %d", 2, len(e.FileStore.Files()))
427	}
428
429	fileNames := map[string]bool{}
430	for _, f := range e.FileStore.Files() {
431		fileNames[filepath.Base(f.Path())] = true
432	}
433
434	th, err := tr.Next()
435	for err == nil {
436		if !fileNames[th.Name] {
437			t.Errorf("Extra file in backup: %q", th.Name)
438		}
439		delete(fileNames, th.Name)
440		th, err = tr.Next()
441	}
442
443	if err != nil && err != io.EOF {
444		t.Fatalf("Problem reading tar header: %s", err)
445	}
446
447	for f := range fileNames {
448		t.Errorf("File missing from backup: %s", f)
449	}
450
451	if t.Failed() {
452		t.FailNow()
453	}
454
455	lastBackup := time.Now()
456
457	// we have to sleep for a second because last modified times only have second level precision.
458	// so this test won't work properly unless the file is at least a second past the last one
459	time.Sleep(time.Second)
460
461	if err := e.WritePoints([]models.Point{p3}); err != nil {
462		t.Fatalf("failed to write points: %s", err.Error())
463	}
464
465	b = bytes.NewBuffer(nil)
466	if err := e.Backup(b, "", lastBackup); err != nil {
467		t.Fatalf("failed to backup: %s", err.Error())
468	}
469
470	tr = tar.NewReader(b)
471	th, err = tr.Next()
472	if err != nil {
473		t.Fatalf("error getting next tar header: %s", err.Error())
474	}
475
476	mostRecentFile := e.FileStore.Files()[e.FileStore.Count()-1].Path()
477	if !strings.Contains(mostRecentFile, th.Name) || th.Name == "" {
478		t.Fatalf("file name doesn't match:\n\tgot: %s\n\texp: %s", th.Name, mostRecentFile)
479	}
480}
481
482func TestEngine_Export(t *testing.T) {
483	// Generate temporary file.
484	f, _ := ioutil.TempFile("", "tsm")
485	f.Close()
486	os.Remove(f.Name())
487	walPath := filepath.Join(f.Name(), "wal")
488	os.MkdirAll(walPath, 0777)
489	defer os.RemoveAll(f.Name())
490
491	// Create a few points.
492	p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
493	p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
494	p3 := MustParsePointString("cpu,host=C value=1.3 3000000000")
495
496	sfile := MustOpenSeriesFile()
497	defer sfile.Close()
498
499	// Write those points to the engine.
500	db := path.Base(f.Name())
501	opt := tsdb.NewEngineOptions()
502	opt.InmemIndex = inmem.NewIndex(db, sfile.SeriesFile)
503	idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
504	defer idx.Close()
505
506	e := tsm1.NewEngine(1, idx, f.Name(), walPath, sfile.SeriesFile, opt).(*tsm1.Engine)
507
508	// mock the planner so compactions don't run during the test
509	e.CompactionPlan = &mockPlanner{}
510
511	if err := e.Open(); err != nil {
512		t.Fatalf("failed to open tsm1 engine: %s", err.Error())
513	}
514
515	if err := e.WritePoints([]models.Point{p1}); err != nil {
516		t.Fatalf("failed to write points: %s", err.Error())
517	}
518	if err := e.WriteSnapshot(); err != nil {
519		t.Fatalf("failed to snapshot: %s", err.Error())
520	}
521
522	if err := e.WritePoints([]models.Point{p2}); err != nil {
523		t.Fatalf("failed to write points: %s", err.Error())
524	}
525	if err := e.WriteSnapshot(); err != nil {
526		t.Fatalf("failed to snapshot: %s", err.Error())
527	}
528
529	if err := e.WritePoints([]models.Point{p3}); err != nil {
530		t.Fatalf("failed to write points: %s", err.Error())
531	}
532
533	// export the whole DB
534	var exBuf bytes.Buffer
535	if err := e.Export(&exBuf, "", time.Unix(0, 0), time.Unix(0, 4000000000)); err != nil {
536		t.Fatalf("failed to export: %s", err.Error())
537	}
538
539	var bkBuf bytes.Buffer
540	if err := e.Backup(&bkBuf, "", time.Unix(0, 0)); err != nil {
541		t.Fatalf("failed to backup: %s", err.Error())
542	}
543
544	if len(e.FileStore.Files()) != 3 {
545		t.Fatalf("file count wrong: exp: %d, got: %d", 3, len(e.FileStore.Files()))
546	}
547
548	fileNames := map[string]bool{}
549	for _, f := range e.FileStore.Files() {
550		fileNames[filepath.Base(f.Path())] = true
551	}
552
553	fileData, err := getExportData(&exBuf)
554	if err != nil {
555		t.Errorf("Error extracting data from export: %s", err.Error())
556	}
557
558	// TEST 1: did we get any extra files not found in the store?
559	for k := range fileData {
560		if _, ok := fileNames[k]; !ok {
561			t.Errorf("exported a file not in the store: %s", k)
562		}
563	}
564
565	// TEST 2: did we miss any files that the store had?
566	for k := range fileNames {
567		if _, ok := fileData[k]; !ok {
568			t.Errorf("failed to export a file from the store: %s", k)
569		}
570	}
571
572	// TEST 3: Does 'backup' get the same files + bits?
573	tr := tar.NewReader(&bkBuf)
574
575	th, err := tr.Next()
576	for err == nil {
577		expData, ok := fileData[th.Name]
578		if !ok {
579			t.Errorf("Extra file in backup: %q", th.Name)
580			continue
581		}
582
583		buf := new(bytes.Buffer)
584		if _, err := io.Copy(buf, tr); err != nil {
585			t.Fatal(err)
586		}
587
588		if !equalBuffers(expData, buf) {
589			t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
590		}
591
592		th, err = tr.Next()
593	}
594
595	if t.Failed() {
596		t.FailNow()
597	}
598
599	// TEST 4:  Are subsets (1), (2), (3), (1,2), (2,3) accurately found in the larger export?
600	// export the whole DB
601	var ex1 bytes.Buffer
602	if err := e.Export(&ex1, "", time.Unix(0, 0), time.Unix(0, 1000000000)); err != nil {
603		t.Fatalf("failed to export: %s", err.Error())
604	}
605	ex1Data, err := getExportData(&ex1)
606	if err != nil {
607		t.Errorf("Error extracting data from export: %s", err.Error())
608	}
609
610	for k, v := range ex1Data {
611		fullExp, ok := fileData[k]
612		if !ok {
613			t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
614			continue
615		}
616		if !equalBuffers(fullExp, v) {
617			t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
618		}
619
620	}
621
622	var ex2 bytes.Buffer
623	if err := e.Export(&ex2, "", time.Unix(0, 1000000001), time.Unix(0, 2000000000)); err != nil {
624		t.Fatalf("failed to export: %s", err.Error())
625	}
626
627	ex2Data, err := getExportData(&ex2)
628	if err != nil {
629		t.Errorf("Error extracting data from export: %s", err.Error())
630	}
631
632	for k, v := range ex2Data {
633		fullExp, ok := fileData[k]
634		if !ok {
635			t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
636			continue
637		}
638		if !equalBuffers(fullExp, v) {
639			t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
640		}
641
642	}
643
644	var ex3 bytes.Buffer
645	if err := e.Export(&ex3, "", time.Unix(0, 2000000001), time.Unix(0, 3000000000)); err != nil {
646		t.Fatalf("failed to export: %s", err.Error())
647	}
648
649	ex3Data, err := getExportData(&ex3)
650	if err != nil {
651		t.Errorf("Error extracting data from export: %s", err.Error())
652	}
653
654	for k, v := range ex3Data {
655		fullExp, ok := fileData[k]
656		if !ok {
657			t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
658			continue
659		}
660		if !equalBuffers(fullExp, v) {
661			t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
662		}
663
664	}
665
666	var ex12 bytes.Buffer
667	if err := e.Export(&ex12, "", time.Unix(0, 0), time.Unix(0, 2000000000)); err != nil {
668		t.Fatalf("failed to export: %s", err.Error())
669	}
670
671	ex12Data, err := getExportData(&ex12)
672	if err != nil {
673		t.Errorf("Error extracting data from export: %s", err.Error())
674	}
675
676	for k, v := range ex12Data {
677		fullExp, ok := fileData[k]
678		if !ok {
679			t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
680			continue
681		}
682		if !equalBuffers(fullExp, v) {
683			t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
684		}
685
686	}
687
688	var ex23 bytes.Buffer
689	if err := e.Export(&ex23, "", time.Unix(0, 1000000001), time.Unix(0, 3000000000)); err != nil {
690		t.Fatalf("failed to export: %s", err.Error())
691	}
692
693	ex23Data, err := getExportData(&ex23)
694	if err != nil {
695		t.Errorf("Error extracting data from export: %s", err.Error())
696	}
697
698	for k, v := range ex23Data {
699		fullExp, ok := fileData[k]
700		if !ok {
701			t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
702			continue
703		}
704		if !equalBuffers(fullExp, v) {
705			t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
706		}
707
708	}
709}
710
711func equalBuffers(bufA, bufB *bytes.Buffer) bool {
712	for i, v := range bufA.Bytes() {
713		if v != bufB.Bytes()[i] {
714			return false
715		}
716	}
717	return true
718}
719
720func getExportData(exBuf *bytes.Buffer) (map[string]*bytes.Buffer, error) {
721
722	tr := tar.NewReader(exBuf)
723
724	fileData := make(map[string]*bytes.Buffer)
725
726	// TEST 1: Get the bits for each file.  If we got a file the store doesn't know about, report error
727	for {
728		th, err := tr.Next()
729		if err == io.EOF {
730			break
731		}
732		if err != nil {
733			return nil, err
734		}
735
736		buf := new(bytes.Buffer)
737		if _, err := io.Copy(buf, tr); err != nil {
738			return nil, err
739		}
740		fileData[th.Name] = buf
741
742	}
743
744	return fileData, nil
745}
746
747// Ensure engine can create an ascending iterator for cached values.
748func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
749	t.Parallel()
750
751	for _, index := range tsdb.RegisteredIndexes() {
752		t.Run(index, func(t *testing.T) {
753			e := MustOpenEngine(index)
754			defer e.Close()
755
756			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
757			e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
758
759			if err := e.WritePointsString(
760				`cpu,host=A value=1.1 1000000000`,
761				`cpu,host=A value=1.2 2000000000`,
762				`cpu,host=A value=1.3 3000000000`,
763			); err != nil {
764				t.Fatalf("failed to write points: %s", err.Error())
765			}
766
767			itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
768				Expr:       influxql.MustParseExpr(`value`),
769				Dimensions: []string{"host"},
770				StartTime:  influxql.MinTime,
771				EndTime:    influxql.MaxTime,
772				Ascending:  true,
773			})
774			if err != nil {
775				t.Fatal(err)
776			}
777			fitr := itr.(query.FloatIterator)
778
779			if p, err := fitr.Next(); err != nil {
780				t.Fatalf("unexpected error(0): %v", err)
781			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) {
782				t.Fatalf("unexpected point(0): %v", p)
783			}
784			if p, err := fitr.Next(); err != nil {
785				t.Fatalf("unexpected error(1): %v", err)
786			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) {
787				t.Fatalf("unexpected point(1): %v", p)
788			}
789			if p, err := fitr.Next(); err != nil {
790				t.Fatalf("unexpected error(2): %v", err)
791			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) {
792				t.Fatalf("unexpected point(2): %v", p)
793			}
794			if p, err := fitr.Next(); err != nil {
795				t.Fatalf("expected eof, got error: %v", err)
796			} else if p != nil {
797				t.Fatalf("expected eof: %v", p)
798			}
799		})
800	}
801}
802
803// Ensure engine can create an descending iterator for cached values.
804func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
805	t.Parallel()
806
807	for _, index := range tsdb.RegisteredIndexes() {
808		t.Run(index, func(t *testing.T) {
809
810			e := MustOpenEngine(index)
811			defer e.Close()
812
813			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
814			e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
815
816			if err := e.WritePointsString(
817				`cpu,host=A value=1.1 1000000000`,
818				`cpu,host=A value=1.2 2000000000`,
819				`cpu,host=A value=1.3 3000000000`,
820			); err != nil {
821				t.Fatalf("failed to write points: %s", err.Error())
822			}
823
824			itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
825				Expr:       influxql.MustParseExpr(`value`),
826				Dimensions: []string{"host"},
827				StartTime:  influxql.MinTime,
828				EndTime:    influxql.MaxTime,
829				Ascending:  false,
830			})
831			if err != nil {
832				t.Fatal(err)
833			}
834			fitr := itr.(query.FloatIterator)
835
836			if p, err := fitr.Next(); err != nil {
837				t.Fatalf("unexpected error(0): %v", err)
838			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) {
839				t.Fatalf("unexpected point(0): %v", p)
840			}
841			if p, err := fitr.Next(); err != nil {
842				t.Fatalf("unepxected error(1): %v", err)
843			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) {
844				t.Fatalf("unexpected point(1): %v", p)
845			}
846			if p, err := fitr.Next(); err != nil {
847				t.Fatalf("unexpected error(2): %v", err)
848			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) {
849				t.Fatalf("unexpected point(2): %v", p)
850			}
851			if p, err := fitr.Next(); err != nil {
852				t.Fatalf("expected eof, got error: %v", err)
853			} else if p != nil {
854				t.Fatalf("expected eof: %v", p)
855			}
856		})
857	}
858}
859
860// Ensure engine can create an ascending iterator for tsm values.
861func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
862	t.Parallel()
863
864	for _, index := range tsdb.RegisteredIndexes() {
865		t.Run(index, func(t *testing.T) {
866			e := MustOpenEngine(index)
867			defer e.Close()
868
869			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
870			e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
871
872			if err := e.WritePointsString(
873				`cpu,host=A value=1.1 1000000000`,
874				`cpu,host=A value=1.2 2000000000`,
875				`cpu,host=A value=1.3 3000000000`,
876			); err != nil {
877				t.Fatalf("failed to write points: %s", err.Error())
878			}
879			e.MustWriteSnapshot()
880
881			itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
882				Expr:       influxql.MustParseExpr(`value`),
883				Dimensions: []string{"host"},
884				StartTime:  1000000000,
885				EndTime:    3000000000,
886				Ascending:  true,
887			})
888			if err != nil {
889				t.Fatal(err)
890			}
891			defer itr.Close()
892			fitr := itr.(query.FloatIterator)
893
894			if p, err := fitr.Next(); err != nil {
895				t.Fatalf("unexpected error(0): %v", err)
896			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) {
897				t.Fatalf("unexpected point(0): %v", p)
898			}
899			if p, err := fitr.Next(); err != nil {
900				t.Fatalf("unexpected error(1): %v", err)
901			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) {
902				t.Fatalf("unexpected point(1): %v", p)
903			}
904			if p, err := fitr.Next(); err != nil {
905				t.Fatalf("unexpected error(2): %v", err)
906			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) {
907				t.Fatalf("unexpected point(2): %v", p)
908			}
909			if p, err := fitr.Next(); err != nil {
910				t.Fatalf("expected eof, got error: %v", err)
911			} else if p != nil {
912				t.Fatalf("expected eof: %v", p)
913			}
914		})
915	}
916}
917
918// Ensure engine can create an descending iterator for cached values.
919func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
920	t.Parallel()
921
922	for _, index := range tsdb.RegisteredIndexes() {
923		t.Run(index, func(t *testing.T) {
924			e := MustOpenEngine(index)
925			defer e.Close()
926
927			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
928			e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
929
930			if err := e.WritePointsString(
931				`cpu,host=A value=1.1 1000000000`,
932				`cpu,host=A value=1.2 2000000000`,
933				`cpu,host=A value=1.3 3000000000`,
934			); err != nil {
935				t.Fatalf("failed to write points: %s", err.Error())
936			}
937			e.MustWriteSnapshot()
938
939			itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
940				Expr:       influxql.MustParseExpr(`value`),
941				Dimensions: []string{"host"},
942				StartTime:  influxql.MinTime,
943				EndTime:    influxql.MaxTime,
944				Ascending:  false,
945			})
946			if err != nil {
947				t.Fatal(err)
948			}
949			defer itr.Close()
950			fitr := itr.(query.FloatIterator)
951
952			if p, err := fitr.Next(); err != nil {
953				t.Fatalf("unexpected error(0): %v", err)
954			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) {
955				t.Fatalf("unexpected point(0): %v", p)
956			}
957			if p, err := fitr.Next(); err != nil {
958				t.Fatalf("unexpected error(1): %v", err)
959			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) {
960				t.Fatalf("unexpected point(1): %v", p)
961			}
962			if p, err := fitr.Next(); err != nil {
963				t.Fatalf("unexpected error(2): %v", err)
964			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) {
965				t.Fatalf("unexpected point(2): %v", p)
966			}
967			if p, err := fitr.Next(); err != nil {
968				t.Fatalf("expected eof, got error: %v", err)
969			} else if p != nil {
970				t.Fatalf("expected eof: %v", p)
971			}
972		})
973	}
974}
975
976// Ensure engine can create an iterator with auxiliary fields.
977func TestEngine_CreateIterator_Aux(t *testing.T) {
978	t.Parallel()
979
980	for _, index := range tsdb.RegisteredIndexes() {
981		t.Run(index, func(t *testing.T) {
982			e := MustOpenEngine(index)
983			defer e.Close()
984
985			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
986			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("F"), influxql.Float)
987			e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
988
989			if err := e.WritePointsString(
990				`cpu,host=A value=1.1 1000000000`,
991				`cpu,host=A F=100 1000000000`,
992				`cpu,host=A value=1.2 2000000000`,
993				`cpu,host=A value=1.3 3000000000`,
994				`cpu,host=A F=200 3000000000`,
995			); err != nil {
996				t.Fatalf("failed to write points: %s", err.Error())
997			}
998
999			itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
1000				Expr:       influxql.MustParseExpr(`value`),
1001				Aux:        []influxql.VarRef{{Val: "F"}},
1002				Dimensions: []string{"host"},
1003				StartTime:  influxql.MinTime,
1004				EndTime:    influxql.MaxTime,
1005				Ascending:  true,
1006			})
1007			if err != nil {
1008				t.Fatal(err)
1009			}
1010			fitr := itr.(query.FloatIterator)
1011
1012			if p, err := fitr.Next(); err != nil {
1013				t.Fatalf("unexpected error(0): %v", err)
1014			} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1, Aux: []interface{}{float64(100)}}) {
1015				t.Fatalf("unexpected point(0): %v", p)
1016			}
1017			if p, err := fitr.Next(); err != nil {
1018				t.Fatalf("unexpected error(1): %v", err)
1019			} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2, Aux: []interface{}{(*float64)(nil)}}) {
1020				t.Fatalf("unexpected point(1): %v", p)
1021			}
1022			if p, err := fitr.Next(); err != nil {
1023				t.Fatalf("unexpected error(2): %v", err)
1024			} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3, Aux: []interface{}{float64(200)}}) {
1025				t.Fatalf("unexpected point(2): %v", p)
1026			}
1027			if p, err := fitr.Next(); err != nil {
1028				t.Fatalf("expected eof, got error: %v", err)
1029			} else if p != nil {
1030				t.Fatalf("expected eof: %v", p)
1031			}
1032		})
1033	}
1034}
1035
1036// Ensure engine can create an iterator with a condition.
1037func TestEngine_CreateIterator_Condition(t *testing.T) {
1038	t.Parallel()
1039
1040	for _, index := range tsdb.RegisteredIndexes() {
1041		t.Run(index, func(t *testing.T) {
1042			e := MustOpenEngine(index)
1043			defer e.Close()
1044
1045			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
1046			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("X"), influxql.Float)
1047			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("Y"), influxql.Float)
1048			e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
1049			e.SetFieldName([]byte("cpu"), "X")
1050			e.SetFieldName([]byte("cpu"), "Y")
1051
1052			if err := e.WritePointsString(
1053				`cpu,host=A value=1.1 1000000000`,
1054				`cpu,host=A X=10 1000000000`,
1055				`cpu,host=A Y=100 1000000000`,
1056
1057				`cpu,host=A value=1.2 2000000000`,
1058
1059				`cpu,host=A value=1.3 3000000000`,
1060				`cpu,host=A X=20 3000000000`,
1061				`cpu,host=A Y=200 3000000000`,
1062			); err != nil {
1063				t.Fatalf("failed to write points: %s", err.Error())
1064			}
1065
1066			itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
1067				Expr:       influxql.MustParseExpr(`value`),
1068				Dimensions: []string{"host"},
1069				Condition:  influxql.MustParseExpr(`X = 10 OR Y > 150`),
1070				StartTime:  influxql.MinTime,
1071				EndTime:    influxql.MaxTime,
1072				Ascending:  true,
1073			})
1074			if err != nil {
1075				t.Fatal(err)
1076			}
1077			fitr := itr.(query.FloatIterator)
1078
1079			if p, err := fitr.Next(); err != nil {
1080				t.Fatalf("unexpected error(0): %v", err)
1081			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) {
1082				t.Fatalf("unexpected point(0): %v", p)
1083			}
1084			if p, err := fitr.Next(); err != nil {
1085				t.Fatalf("unexpected point(1): %v", err)
1086			} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) {
1087				t.Fatalf("unexpected point(1): %v", p)
1088			}
1089			if p, err := fitr.Next(); err != nil {
1090				t.Fatalf("expected eof, got error: %v", err)
1091			} else if p != nil {
1092				t.Fatalf("expected eof: %v", p)
1093			}
1094		})
1095	}
1096}
1097
1098// Test that series id set gets updated and returned appropriately.
1099func TestIndex_SeriesIDSet(t *testing.T) {
1100	test := func(index string) error {
1101		engine := MustOpenEngine(index)
1102		defer engine.Close()
1103
1104		// Add some series.
1105		engine.MustAddSeries("cpu", map[string]string{"host": "a", "region": "west"})
1106		engine.MustAddSeries("cpu", map[string]string{"host": "b", "region": "west"})
1107		engine.MustAddSeries("cpu", map[string]string{"host": "b"})
1108		engine.MustAddSeries("gpu", nil)
1109		engine.MustAddSeries("gpu", map[string]string{"host": "b"})
1110		engine.MustAddSeries("mem", map[string]string{"host": "z"})
1111
1112		// Collect series IDs.
1113		seriesIDMap := map[string]uint64{}
1114		var e tsdb.SeriesIDElem
1115		var err error
1116
1117		itr := engine.sfile.SeriesIDIterator()
1118		for e, err = itr.Next(); ; e, err = itr.Next() {
1119			if err != nil {
1120				return err
1121			} else if e.SeriesID == 0 {
1122				break
1123			}
1124
1125			name, tags := tsdb.ParseSeriesKey(engine.sfile.SeriesKey(e.SeriesID))
1126			key := fmt.Sprintf("%s%s", name, tags.HashKey())
1127			seriesIDMap[key] = e.SeriesID
1128		}
1129
1130		for _, id := range seriesIDMap {
1131			if !engine.SeriesIDSet().Contains(id) {
1132				return fmt.Errorf("bitmap does not contain ID: %d", id)
1133			}
1134		}
1135
1136		// Drop all the series for the gpu measurement and they should no longer
1137		// be in the series ID set.
1138		if err := engine.DeleteMeasurement([]byte("gpu")); err != nil {
1139			return err
1140		}
1141
1142		if engine.SeriesIDSet().Contains(seriesIDMap["gpu"]) {
1143			return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["gpu"], "gpu")
1144		} else if engine.SeriesIDSet().Contains(seriesIDMap["gpu,host=b"]) {
1145			return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["gpu,host=b"], "gpu,host=b")
1146		}
1147		delete(seriesIDMap, "gpu")
1148		delete(seriesIDMap, "gpu,host=b")
1149
1150		// Drop the specific mem series
1151		ditr := &seriesIterator{keys: [][]byte{[]byte("mem,host=z")}}
1152		if err := engine.DeleteSeriesRange(ditr, math.MinInt64, math.MaxInt64); err != nil {
1153			return err
1154		}
1155
1156		if engine.SeriesIDSet().Contains(seriesIDMap["mem,host=z"]) {
1157			return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["mem,host=z"], "mem,host=z")
1158		}
1159		delete(seriesIDMap, "mem,host=z")
1160
1161		// The rest of the keys should still be in the set.
1162		for key, id := range seriesIDMap {
1163			if !engine.SeriesIDSet().Contains(id) {
1164				return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", id, key)
1165			}
1166		}
1167
1168		// Reopen the engine, and the series should be re-added to the bitmap.
1169		if err := engine.Reopen(); err != nil {
1170			panic(err)
1171		}
1172
1173		// Check bitset is expected.
1174		expected := tsdb.NewSeriesIDSet()
1175		for _, id := range seriesIDMap {
1176			expected.Add(id)
1177		}
1178
1179		if !engine.SeriesIDSet().Equals(expected) {
1180			return fmt.Errorf("got bitset %s, expected %s", engine.SeriesIDSet().String(), expected.String())
1181		}
1182		return nil
1183	}
1184
1185	for _, index := range tsdb.RegisteredIndexes() {
1186		t.Run(index, func(t *testing.T) {
1187			if err := test(index); err != nil {
1188				t.Error(err)
1189			}
1190		})
1191	}
1192}
1193
1194// Ensures that deleting series from TSM files with multiple fields removes all the
1195/// series
1196func TestEngine_DeleteSeries(t *testing.T) {
1197	for _, index := range tsdb.RegisteredIndexes() {
1198		t.Run(index, func(t *testing.T) {
1199			// Create a few points.
1200			p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
1201			p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
1202			p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
1203
1204			e, err := NewEngine(index)
1205			if err != nil {
1206				t.Fatal(err)
1207			}
1208
1209			// mock the planner so compactions don't run during the test
1210			e.CompactionPlan = &mockPlanner{}
1211			if err := e.Open(); err != nil {
1212				t.Fatal(err)
1213			}
1214			defer e.Close()
1215
1216			if err := e.writePoints(p1, p2, p3); err != nil {
1217				t.Fatalf("failed to write points: %s", err.Error())
1218			}
1219			if err := e.WriteSnapshot(); err != nil {
1220				t.Fatalf("failed to snapshot: %s", err.Error())
1221			}
1222
1223			keys := e.FileStore.Keys()
1224			if exp, got := 3, len(keys); exp != got {
1225				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1226			}
1227
1228			itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
1229			if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
1230				t.Fatalf("failed to delete series: %v", err)
1231			}
1232
1233			keys = e.FileStore.Keys()
1234			if exp, got := 1, len(keys); exp != got {
1235				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1236			}
1237
1238			exp := "cpu,host=B#!~#value"
1239			if _, ok := keys[exp]; !ok {
1240				t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
1241			}
1242		})
1243	}
1244}
1245
1246func TestEngine_DeleteSeriesRange(t *testing.T) {
1247	for _, index := range tsdb.RegisteredIndexes() {
1248		t.Run(index, func(t *testing.T) {
1249			// Create a few points.
1250			p1 := MustParsePointString("cpu,host=0 value=1.1 6000000000") // Should not be deleted
1251			p2 := MustParsePointString("cpu,host=A value=1.2 2000000000")
1252			p3 := MustParsePointString("cpu,host=A value=1.3 3000000000")
1253			p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") // Should not be deleted
1254			p5 := MustParsePointString("cpu,host=B value=1.3 5000000000") // Should not be deleted
1255			p6 := MustParsePointString("cpu,host=C value=1.3 1000000000")
1256			p7 := MustParsePointString("mem,host=C value=1.3 1000000000")  // Should not be deleted
1257			p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
1258
1259			e, err := NewEngine(index)
1260			if err != nil {
1261				t.Fatal(err)
1262			}
1263
1264			// mock the planner so compactions don't run during the test
1265			e.CompactionPlan = &mockPlanner{}
1266			if err := e.Open(); err != nil {
1267				t.Fatal(err)
1268			}
1269			defer e.Close()
1270
1271			for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
1272				if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
1273					t.Fatalf("create series index error: %v", err)
1274				}
1275			}
1276
1277			if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil {
1278				t.Fatalf("failed to write points: %s", err.Error())
1279			}
1280			if err := e.WriteSnapshot(); err != nil {
1281				t.Fatalf("failed to snapshot: %s", err.Error())
1282			}
1283
1284			keys := e.FileStore.Keys()
1285			if exp, got := 6, len(keys); exp != got {
1286				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1287			}
1288
1289			itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C")}}
1290			if err := e.DeleteSeriesRange(itr, 0, 3000000000); err != nil {
1291				t.Fatalf("failed to delete series: %v", err)
1292			}
1293
1294			keys = e.FileStore.Keys()
1295			if exp, got := 4, len(keys); exp != got {
1296				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1297			}
1298
1299			exp := "cpu,host=B#!~#value"
1300			if _, ok := keys[exp]; !ok {
1301				t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
1302			}
1303
1304			// Check that the series still exists in the index
1305			indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
1306			iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
1307			if err != nil {
1308				t.Fatalf("iterator error: %v", err)
1309			}
1310			defer iter.Close()
1311
1312			elem, err := iter.Next()
1313			if err != nil {
1314				t.Fatal(err)
1315			}
1316			if elem.SeriesID == 0 {
1317				t.Fatalf("series index mismatch: EOF, exp 2 series")
1318			}
1319
1320			// Lookup series.
1321			name, tags := e.sfile.Series(elem.SeriesID)
1322			if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
1323				t.Fatalf("series mismatch: got %s, exp %s", got, exp)
1324			}
1325
1326			if !tags.Equal(models.NewTags(map[string]string{"host": "0"})) && !tags.Equal(models.NewTags(map[string]string{"host": "B"})) {
1327				t.Fatalf(`series mismatch: got %s, exp either "host=0" or "host=B"`, tags)
1328			}
1329			iter.Close()
1330
1331			// Deleting remaining series should remove them from the series.
1332			itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=B")}}
1333			if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil {
1334				t.Fatalf("failed to delete series: %v", err)
1335			}
1336
1337			indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
1338			if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
1339				t.Fatalf("iterator error: %v", err)
1340			}
1341			if iter == nil {
1342				return
1343			}
1344
1345			defer iter.Close()
1346			if elem, err = iter.Next(); err != nil {
1347				t.Fatal(err)
1348			}
1349			if elem.SeriesID != 0 {
1350				t.Fatalf("got an undeleted series id, but series should be dropped from index")
1351			}
1352		})
1353	}
1354}
1355
1356func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) {
1357	for _, index := range tsdb.RegisteredIndexes() {
1358		t.Run(index, func(t *testing.T) {
1359			// Create a few points.
1360			p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted
1361			p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted
1362			p3 := MustParsePointString("cpu,host=B value=1.3 3000000000")
1363			p4 := MustParsePointString("cpu,host=B value=1.3 4000000000")
1364			p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted
1365			p6 := MustParsePointString("mem,host=B value=1.3 1000000000")
1366			p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
1367			p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
1368
1369			e, err := NewEngine(index)
1370			if err != nil {
1371				t.Fatal(err)
1372			}
1373
1374			// mock the planner so compactions don't run during the test
1375			e.CompactionPlan = &mockPlanner{}
1376			if err := e.Open(); err != nil {
1377				t.Fatal(err)
1378			}
1379			defer e.Close()
1380
1381			for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
1382				if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
1383					t.Fatalf("create series index error: %v", err)
1384				}
1385			}
1386
1387			if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil {
1388				t.Fatalf("failed to write points: %s", err.Error())
1389			}
1390			if err := e.WriteSnapshot(); err != nil {
1391				t.Fatalf("failed to snapshot: %s", err.Error())
1392			}
1393
1394			keys := e.FileStore.Keys()
1395			if exp, got := 6, len(keys); exp != got {
1396				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1397			}
1398
1399			itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}}
1400			predicate := func(name []byte, tags models.Tags) (int64, int64, bool) {
1401				if bytes.Equal(name, []byte("mem")) {
1402					return math.MinInt64, math.MaxInt64, true
1403				}
1404				if bytes.Equal(name, []byte("cpu")) {
1405					for _, tag := range tags {
1406						if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) {
1407							return math.MinInt64, math.MaxInt64, true
1408						}
1409					}
1410				}
1411				return math.MinInt64, math.MaxInt64, false
1412			}
1413			if err := e.DeleteSeriesRangeWithPredicate(itr, predicate); err != nil {
1414				t.Fatalf("failed to delete series: %v", err)
1415			}
1416
1417			keys = e.FileStore.Keys()
1418			if exp, got := 3, len(keys); exp != got {
1419				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1420			}
1421
1422			exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"}
1423			for _, exp := range exps {
1424				if _, ok := keys[exp]; !ok {
1425					t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys)
1426				}
1427			}
1428
1429			// Check that the series still exists in the index
1430			indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
1431			iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
1432			if err != nil {
1433				t.Fatalf("iterator error: %v", err)
1434			}
1435			defer iter.Close()
1436
1437			elem, err := iter.Next()
1438			if err != nil {
1439				t.Fatal(err)
1440			}
1441			if elem.SeriesID == 0 {
1442				t.Fatalf("series index mismatch: EOF, exp 2 series")
1443			}
1444
1445			// Lookup series.
1446			name, tags := e.sfile.Series(elem.SeriesID)
1447			if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
1448				t.Fatalf("series mismatch: got %s, exp %s", got, exp)
1449			}
1450
1451			if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) {
1452				t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags)
1453			}
1454			iter.Close()
1455
1456			// Deleting remaining series should remove them from the series.
1457			itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}}
1458			if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil {
1459				t.Fatalf("failed to delete series: %v", err)
1460			}
1461
1462			indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
1463			if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
1464				t.Fatalf("iterator error: %v", err)
1465			}
1466			if iter == nil {
1467				return
1468			}
1469
1470			defer iter.Close()
1471			if elem, err = iter.Next(); err != nil {
1472				t.Fatal(err)
1473			}
1474			if elem.SeriesID != 0 {
1475				t.Fatalf("got an undeleted series id, but series should be dropped from index")
1476			}
1477		})
1478	}
1479}
1480
1481// Tests that a nil predicate deletes all values returned from the series iterator.
1482func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) {
1483	for _, index := range tsdb.RegisteredIndexes() {
1484		t.Run(index, func(t *testing.T) {
1485			// Create a few points.
1486			p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted
1487			p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted
1488			p3 := MustParsePointString("cpu,host=B value=1.3 3000000000")
1489			p4 := MustParsePointString("cpu,host=B value=1.3 4000000000")
1490			p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted
1491			p6 := MustParsePointString("mem,host=B value=1.3 1000000000")
1492			p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
1493			p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
1494
1495			e, err := NewEngine(index)
1496			if err != nil {
1497				t.Fatal(err)
1498			}
1499
1500			// mock the planner so compactions don't run during the test
1501			e.CompactionPlan = &mockPlanner{}
1502			if err := e.Open(); err != nil {
1503				t.Fatal(err)
1504			}
1505			defer e.Close()
1506
1507			for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
1508				if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
1509					t.Fatalf("create series index error: %v", err)
1510				}
1511			}
1512
1513			if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil {
1514				t.Fatalf("failed to write points: %s", err.Error())
1515			}
1516			if err := e.WriteSnapshot(); err != nil {
1517				t.Fatalf("failed to snapshot: %s", err.Error())
1518			}
1519
1520			keys := e.FileStore.Keys()
1521			if exp, got := 6, len(keys); exp != got {
1522				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1523			}
1524
1525			itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}}
1526			if err := e.DeleteSeriesRangeWithPredicate(itr, nil); err != nil {
1527				t.Fatalf("failed to delete series: %v", err)
1528			}
1529
1530			keys = e.FileStore.Keys()
1531			if exp, got := 1, len(keys); exp != got {
1532				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1533			}
1534
1535			// Check that the series still exists in the index
1536			indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
1537			iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
1538			if err != nil {
1539				t.Fatalf("iterator error: %v", err)
1540			} else if iter == nil {
1541				return
1542			}
1543			defer iter.Close()
1544
1545			if elem, err := iter.Next(); err != nil {
1546				t.Fatal(err)
1547			} else if elem.SeriesID != 0 {
1548				t.Fatalf("got an undeleted series id, but series should be dropped from index")
1549			}
1550
1551			// Check that disk series still exists
1552			iter, err = indexSet.MeasurementSeriesIDIterator([]byte("disk"))
1553			if err != nil {
1554				t.Fatalf("iterator error: %v", err)
1555			} else if iter == nil {
1556				return
1557			}
1558			defer iter.Close()
1559
1560			if elem, err := iter.Next(); err != nil {
1561				t.Fatal(err)
1562			} else if elem.SeriesID == 0 {
1563				t.Fatalf("got an undeleted series id, but series should be dropped from index")
1564			}
1565		})
1566	}
1567}
1568func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) {
1569	for _, index := range tsdb.RegisteredIndexes() {
1570		t.Run(index, func(t *testing.T) {
1571			// Create a few points.
1572			p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted
1573			p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted
1574			p3 := MustParsePointString("cpu,host=B value=1.3 3000000000")
1575			p4 := MustParsePointString("cpu,host=B value=1.3 4000000000")
1576			p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted
1577			p6 := MustParsePointString("mem,host=B value=1.3 1000000000")
1578			p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
1579			p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
1580
1581			e, err := NewEngine(index)
1582			if err != nil {
1583				t.Fatal(err)
1584			}
1585
1586			// mock the planner so compactions don't run during the test
1587			e.CompactionPlan = &mockPlanner{}
1588			if err := e.Open(); err != nil {
1589				t.Fatal(err)
1590			}
1591			defer e.Close()
1592
1593			for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
1594				if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
1595					t.Fatalf("create series index error: %v", err)
1596				}
1597			}
1598
1599			if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil {
1600				t.Fatalf("failed to write points: %s", err.Error())
1601			}
1602			if err := e.WriteSnapshot(); err != nil {
1603				t.Fatalf("failed to snapshot: %s", err.Error())
1604			}
1605
1606			keys := e.FileStore.Keys()
1607			if exp, got := 6, len(keys); exp != got {
1608				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1609			}
1610
1611			itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}}
1612			predicate := func(name []byte, tags models.Tags) (int64, int64, bool) {
1613				if bytes.Equal(name, []byte("mem")) {
1614					return 1000000000, 1000000000, true
1615				}
1616
1617				if bytes.Equal(name, []byte("cpu")) {
1618					for _, tag := range tags {
1619						if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) {
1620							return 3000000000, 4000000000, true
1621						}
1622					}
1623				}
1624				return math.MinInt64, math.MaxInt64, false
1625			}
1626			if err := e.DeleteSeriesRangeWithPredicate(itr, predicate); err != nil {
1627				t.Fatalf("failed to delete series: %v", err)
1628			}
1629
1630			keys = e.FileStore.Keys()
1631			if exp, got := 3, len(keys); exp != got {
1632				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1633			}
1634
1635			exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"}
1636			for _, exp := range exps {
1637				if _, ok := keys[exp]; !ok {
1638					t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys)
1639				}
1640			}
1641
1642			// Check that the series still exists in the index
1643			indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
1644			iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
1645			if err != nil {
1646				t.Fatalf("iterator error: %v", err)
1647			}
1648			defer iter.Close()
1649
1650			elem, err := iter.Next()
1651			if err != nil {
1652				t.Fatal(err)
1653			}
1654			if elem.SeriesID == 0 {
1655				t.Fatalf("series index mismatch: EOF, exp 2 series")
1656			}
1657
1658			// Lookup series.
1659			name, tags := e.sfile.Series(elem.SeriesID)
1660			if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
1661				t.Fatalf("series mismatch: got %s, exp %s", got, exp)
1662			}
1663
1664			if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) {
1665				t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags)
1666			}
1667			iter.Close()
1668
1669			// Deleting remaining series should remove them from the series.
1670			itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}}
1671			if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil {
1672				t.Fatalf("failed to delete series: %v", err)
1673			}
1674
1675			indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
1676			if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
1677				t.Fatalf("iterator error: %v", err)
1678			}
1679			if iter == nil {
1680				return
1681			}
1682
1683			defer iter.Close()
1684			if elem, err = iter.Next(); err != nil {
1685				t.Fatal(err)
1686			}
1687			if elem.SeriesID != 0 {
1688				t.Fatalf("got an undeleted series id, but series should be dropped from index")
1689			}
1690		})
1691	}
1692}
1693
1694func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
1695	for _, index := range tsdb.RegisteredIndexes() {
1696		t.Run(index, func(t *testing.T) {
1697			// Create a few points.
1698			p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") // Should not be deleted
1699
1700			e, err := NewEngine(index)
1701			if err != nil {
1702				t.Fatal(err)
1703			}
1704
1705			// mock the planner so compactions don't run during the test
1706			e.CompactionPlan = &mockPlanner{}
1707			if err := e.Open(); err != nil {
1708				t.Fatal(err)
1709			}
1710			defer e.Close()
1711
1712			for _, p := range []models.Point{p1} {
1713				if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
1714					t.Fatalf("create series index error: %v", err)
1715				}
1716			}
1717
1718			if err := e.WritePoints([]models.Point{p1}); err != nil {
1719				t.Fatalf("failed to write points: %s", err.Error())
1720			}
1721			if err := e.WriteSnapshot(); err != nil {
1722				t.Fatalf("failed to snapshot: %s", err.Error())
1723			}
1724
1725			keys := e.FileStore.Keys()
1726			if exp, got := 1, len(keys); exp != got {
1727				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1728			}
1729
1730			itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
1731			if err := e.DeleteSeriesRange(itr, 0, 0); err != nil {
1732				t.Fatalf("failed to delete series: %v", err)
1733			}
1734
1735			keys = e.FileStore.Keys()
1736			if exp, got := 1, len(keys); exp != got {
1737				t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
1738			}
1739
1740			exp := "cpu,host=A#!~#value"
1741			if _, ok := keys[exp]; !ok {
1742				t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
1743			}
1744
1745			// Check that the series still exists in the index
1746			iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
1747			if err != nil {
1748				t.Fatalf("iterator error: %v", err)
1749			}
1750			defer iter.Close()
1751
1752			elem, err := iter.Next()
1753			if err != nil {
1754				t.Fatal(err)
1755			}
1756			if elem.SeriesID == 0 {
1757				t.Fatalf("series index mismatch: EOF, exp 1 series")
1758			}
1759
1760			// Lookup series.
1761			name, tags := e.sfile.Series(elem.SeriesID)
1762			if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
1763				t.Fatalf("series mismatch: got %s, exp %s", got, exp)
1764			}
1765
1766			if got, exp := tags, models.NewTags(map[string]string{"host": "A"}); !got.Equal(exp) {
1767				t.Fatalf("series mismatch: got %s, exp %s", got, exp)
1768			}
1769		})
1770	}
1771}
1772
1773func TestEngine_LastModified(t *testing.T) {
1774	for _, index := range tsdb.RegisteredIndexes() {
1775		t.Run(index, func(t *testing.T) {
1776			// Create a few points.
1777			p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
1778			p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
1779			p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
1780
1781			e, err := NewEngine(index)
1782			if err != nil {
1783				t.Fatal(err)
1784			}
1785
1786			// mock the planner so compactions don't run during the test
1787			e.CompactionPlan = &mockPlanner{}
1788			e.SetEnabled(false)
1789			if err := e.Open(); err != nil {
1790				t.Fatal(err)
1791			}
1792			defer e.Close()
1793
1794			if err := e.writePoints(p1, p2, p3); err != nil {
1795				t.Fatalf("failed to write points: %s", err.Error())
1796			}
1797
1798			lm := e.LastModified()
1799			if lm.IsZero() {
1800				t.Fatalf("expected non-zero time, got %v", lm.UTC())
1801			}
1802			e.SetEnabled(true)
1803
1804			// Artificial sleep added due to filesystems caching the mod time
1805			// of files.  This prevents the WAL last modified time from being
1806			// returned and newer than the filestore's mod time.
1807			time.Sleep(2 * time.Second) // Covers most filesystems.
1808
1809			if err := e.WriteSnapshot(); err != nil {
1810				t.Fatalf("failed to snapshot: %s", err.Error())
1811			}
1812
1813			lm2 := e.LastModified()
1814
1815			if got, exp := lm.Equal(lm2), false; exp != got {
1816				t.Fatalf("expected time change, got %v, exp %v: %s == %s", got, exp, lm.String(), lm2.String())
1817			}
1818
1819			itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
1820			if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
1821				t.Fatalf("failed to delete series: %v", err)
1822			}
1823
1824			lm3 := e.LastModified()
1825			if got, exp := lm2.Equal(lm3), false; exp != got {
1826				t.Fatalf("expected time change, got %v, exp %v", got, exp)
1827			}
1828		})
1829	}
1830}
1831
1832func TestEngine_SnapshotsDisabled(t *testing.T) {
1833	sfile := MustOpenSeriesFile()
1834	defer sfile.Close()
1835
1836	// Generate temporary file.
1837	dir, _ := ioutil.TempDir("", "tsm")
1838	walPath := filepath.Join(dir, "wal")
1839	os.MkdirAll(walPath, 0777)
1840	defer os.RemoveAll(dir)
1841
1842	// Create a tsm1 engine.
1843	db := path.Base(dir)
1844	opt := tsdb.NewEngineOptions()
1845	opt.InmemIndex = inmem.NewIndex(db, sfile.SeriesFile)
1846	idx := tsdb.MustOpenIndex(1, db, filepath.Join(dir, "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
1847	defer idx.Close()
1848
1849	e := tsm1.NewEngine(1, idx, dir, walPath, sfile.SeriesFile, opt).(*tsm1.Engine)
1850
1851	// mock the planner so compactions don't run during the test
1852	e.CompactionPlan = &mockPlanner{}
1853
1854	e.SetEnabled(false)
1855	if err := e.Open(); err != nil {
1856		t.Fatalf("failed to open tsm1 engine: %s", err.Error())
1857	}
1858
1859	// Make sure Snapshots are disabled.
1860	e.SetCompactionsEnabled(false)
1861	e.Compactor.DisableSnapshots()
1862
1863	// Writing a snapshot should not fail when the snapshot is empty
1864	// even if snapshots are disabled.
1865	if err := e.WriteSnapshot(); err != nil {
1866		t.Fatalf("failed to snapshot: %s", err.Error())
1867	}
1868}
1869
1870func TestEngine_ShouldCompactCache(t *testing.T) {
1871	nowTime := time.Now()
1872
1873	e, err := NewEngine(inmem.IndexName)
1874	if err != nil {
1875		t.Fatal(err)
1876	}
1877
1878	// mock the planner so compactions don't run during the test
1879	e.CompactionPlan = &mockPlanner{}
1880	e.SetEnabled(false)
1881	if err := e.Open(); err != nil {
1882		t.Fatalf("failed to open tsm1 engine: %s", err.Error())
1883	}
1884	defer e.Close()
1885
1886	e.CacheFlushMemorySizeThreshold = 1024
1887	e.CacheFlushWriteColdDuration = time.Minute
1888
1889	if e.ShouldCompactCache(nowTime) {
1890		t.Fatal("nothing written to cache, so should not compact")
1891	}
1892
1893	if err := e.WritePointsString("m,k=v f=3i"); err != nil {
1894		t.Fatal(err)
1895	}
1896
1897	if e.ShouldCompactCache(nowTime) {
1898		t.Fatal("cache size < flush threshold and nothing written to FileStore, so should not compact")
1899	}
1900
1901	if !e.ShouldCompactCache(nowTime.Add(time.Hour)) {
1902		t.Fatal("last compaction was longer than flush write cold threshold, so should compact")
1903	}
1904
1905	e.CacheFlushMemorySizeThreshold = 1
1906	if !e.ShouldCompactCache(nowTime) {
1907		t.Fatal("cache size > flush threshold, so should compact")
1908	}
1909}
1910
1911// Ensure engine can create an ascending cursor for cache and tsm values.
1912func TestEngine_CreateCursor_Ascending(t *testing.T) {
1913	t.Parallel()
1914
1915	for _, index := range tsdb.RegisteredIndexes() {
1916		t.Run(index, func(t *testing.T) {
1917
1918			e := MustOpenEngine(index)
1919			defer e.Close()
1920
1921			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
1922			e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
1923
1924			if err := e.WritePointsString(
1925				`cpu,host=A value=1.1 1`,
1926				`cpu,host=A value=1.2 2`,
1927				`cpu,host=A value=1.3 3`,
1928			); err != nil {
1929				t.Fatalf("failed to write points: %s", err.Error())
1930			}
1931			e.MustWriteSnapshot()
1932
1933			if err := e.WritePointsString(
1934				`cpu,host=A value=10.1 10`,
1935				`cpu,host=A value=11.2 11`,
1936				`cpu,host=A value=12.3 12`,
1937			); err != nil {
1938				t.Fatalf("failed to write points: %s", err.Error())
1939			}
1940
1941			q, err := e.CreateCursorIterator(context.Background())
1942			if err != nil {
1943				t.Fatal(err)
1944			}
1945
1946			cur, err := q.Next(context.Background(), &tsdb.CursorRequest{
1947				Name:      []byte("cpu"),
1948				Tags:      models.ParseTags([]byte("cpu,host=A")),
1949				Field:     "value",
1950				Ascending: true,
1951				StartTime: 2,
1952				EndTime:   11,
1953			})
1954			if err != nil {
1955				t.Fatal(err)
1956			}
1957			defer cur.Close()
1958
1959			fcur := cur.(tsdb.FloatArrayCursor)
1960			a := fcur.Next()
1961			if !cmp.Equal([]int64{2, 3, 10, 11}, a.Timestamps) {
1962				t.Fatal("unexpect timestamps")
1963			}
1964			if !cmp.Equal([]float64{1.2, 1.3, 10.1, 11.2}, a.Values) {
1965				t.Fatal("unexpect timestamps")
1966			}
1967		})
1968	}
1969}
1970
1971// Ensure engine can create an ascending cursor for tsm values.
1972func TestEngine_CreateCursor_Descending(t *testing.T) {
1973	t.Parallel()
1974
1975	for _, index := range tsdb.RegisteredIndexes() {
1976		t.Run(index, func(t *testing.T) {
1977
1978			e := MustOpenEngine(index)
1979			defer e.Close()
1980
1981			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
1982			e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
1983
1984			if err := e.WritePointsString(
1985				`cpu,host=A value=1.1 1`,
1986				`cpu,host=A value=1.2 2`,
1987				`cpu,host=A value=1.3 3`,
1988			); err != nil {
1989				t.Fatalf("failed to write points: %s", err.Error())
1990			}
1991			e.MustWriteSnapshot()
1992
1993			if err := e.WritePointsString(
1994				`cpu,host=A value=10.1 10`,
1995				`cpu,host=A value=11.2 11`,
1996				`cpu,host=A value=12.3 12`,
1997			); err != nil {
1998				t.Fatalf("failed to write points: %s", err.Error())
1999			}
2000
2001			q, err := e.CreateCursorIterator(context.Background())
2002			if err != nil {
2003				t.Fatal(err)
2004			}
2005
2006			cur, err := q.Next(context.Background(), &tsdb.CursorRequest{
2007				Name:      []byte("cpu"),
2008				Tags:      models.ParseTags([]byte("cpu,host=A")),
2009				Field:     "value",
2010				Ascending: false,
2011				StartTime: 2,
2012				EndTime:   11,
2013			})
2014			if err != nil {
2015				t.Fatal(err)
2016			}
2017			defer cur.Close()
2018
2019			fcur := cur.(tsdb.FloatArrayCursor)
2020			a := fcur.Next()
2021			if !cmp.Equal([]int64{11, 10, 3, 2}, a.Timestamps) {
2022				t.Fatal("unexpect timestamps")
2023			}
2024			if !cmp.Equal([]float64{11.2, 10.1, 1.3, 1.2}, a.Values) {
2025				t.Fatal("unexpect timestamps")
2026			}
2027		})
2028	}
2029}
2030
2031func makeBlockTypeSlice(n int) []byte {
2032	r := make([]byte, n)
2033	b := tsm1.BlockFloat64
2034	m := tsm1.BlockUnsigned + 1
2035	for i := 0; i < len(r); i++ {
2036		r[i] = b % m
2037	}
2038	return r
2039}
2040
2041var blockType = influxql.Unknown
2042
2043func BenchmarkBlockTypeToInfluxQLDataType(b *testing.B) {
2044	t := makeBlockTypeSlice(1000)
2045	for i := 0; i < b.N; i++ {
2046		for j := 0; j < len(t); j++ {
2047			blockType = tsm1.BlockTypeToInfluxQLDataType(t[j])
2048		}
2049	}
2050}
2051
2052// This test ensures that "sync: WaitGroup is reused before previous Wait has returned" is
2053// is not raised.
2054func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {
2055	t.Parallel()
2056
2057	for _, index := range tsdb.RegisteredIndexes() {
2058		t.Run(index, func(t *testing.T) {
2059
2060			e := MustOpenEngine(index)
2061			defer e.Close()
2062
2063			var wg sync.WaitGroup
2064			wg.Add(2)
2065
2066			go func() {
2067				defer wg.Done()
2068				for i := 0; i < 1000; i++ {
2069					e.SetCompactionsEnabled(true)
2070					e.SetCompactionsEnabled(false)
2071				}
2072			}()
2073
2074			go func() {
2075				defer wg.Done()
2076				for i := 0; i < 1000; i++ {
2077					e.SetCompactionsEnabled(false)
2078					e.SetCompactionsEnabled(true)
2079				}
2080			}()
2081
2082			done := make(chan struct{})
2083			go func() {
2084				wg.Wait()
2085				close(done)
2086			}()
2087
2088			// Wait for waitgroup or fail if it takes too long.
2089			select {
2090			case <-time.NewTimer(30 * time.Second).C:
2091				t.Fatalf("timed out after 30 seconds waiting for waitgroup")
2092			case <-done:
2093			}
2094		})
2095	}
2096
2097}
2098
2099func TestEngine_WritePointsWithContext(t *testing.T) {
2100	// Create a few points.
2101	points := []models.Point{
2102		MustParsePointString("cpu,host=A value=1.1 1000000000"),
2103		MustParsePointString("cpu,host=B value=1.2,value2=8 2000000000"),
2104	}
2105
2106	expectedPoints, expectedValues := int64(2), int64(3)
2107
2108	for _, index := range tsdb.RegisteredIndexes() {
2109		t.Run(index, func(t *testing.T) {
2110			e := MustOpenEngine(index)
2111
2112			var numPoints, numValues int64
2113
2114			ctx := context.WithValue(context.Background(), tsdb.StatPointsWritten, &numPoints)
2115			ctx = context.WithValue(ctx, tsdb.StatValuesWritten, &numValues)
2116
2117			if err := e.WritePointsWithContext(ctx, points); err != nil {
2118				t.Fatalf("failed to write points: %v", err)
2119			}
2120
2121			if got, expected := numPoints, expectedPoints; got != expected {
2122				t.Fatalf("Expected stats to return %d points; got %d", expected, got)
2123			}
2124
2125			if got, expected := numValues, expectedValues; got != expected {
2126				t.Fatalf("Expected stats to return %d points; got %d", expected, got)
2127			}
2128		})
2129	}
2130}
2131
2132func TestEngine_WritePoints_TypeConflict(t *testing.T) {
2133	os.Setenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED", "1")
2134	defer os.Unsetenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED")
2135
2136	for _, index := range tsdb.RegisteredIndexes() {
2137		t.Run(index, func(t *testing.T) {
2138
2139			e := MustOpenEngine(index)
2140			defer e.Close()
2141
2142			if err := e.WritePointsString(
2143				`cpu,host=A value=1.1 1`,
2144				`cpu,host=A value=1i 2`,
2145			); err == nil {
2146				t.Fatalf("expected field type conflict")
2147			} else if err != tsdb.ErrFieldTypeConflict {
2148				t.Fatalf("error mismatch: got %v, exp %v", err, tsdb.ErrFieldTypeConflict)
2149			}
2150
2151			// Series type should be a float
2152			got, err := e.Type([]byte(tsm1.SeriesFieldKey("cpu,host=A", "value")))
2153			if err != nil {
2154				t.Fatalf("unexpected error getting field type: %v", err)
2155			}
2156
2157			if exp := models.Float; got != exp {
2158				t.Fatalf("field type mismatch: got %v, exp %v", got, exp)
2159			}
2160
2161			values := e.Cache.Values([]byte(tsm1.SeriesFieldKey("cpu,host=A", "value")))
2162			if got, exp := len(values), 1; got != exp {
2163				t.Fatalf("values len mismatch: got %v, exp %v", got, exp)
2164			}
2165		})
2166	}
2167}
2168
2169func TestEngine_WritePoints_Reload(t *testing.T) {
2170	t.Skip("Disabled until INFLUXDB_SERIES_TYPE_CHECK_ENABLED is enabled by default")
2171
2172	for _, index := range tsdb.RegisteredIndexes() {
2173		t.Run(index, func(t *testing.T) {
2174
2175			e := MustOpenEngine(index)
2176			defer e.Close()
2177
2178			if err := e.WritePointsString(
2179				`cpu,host=A value=1.1 1`,
2180			); err != nil {
2181				t.Fatalf("expected field type conflict")
2182			}
2183
2184			// Series type should be a float
2185			got, err := e.Type([]byte(tsm1.SeriesFieldKey("cpu,host=A", "value")))
2186			if err != nil {
2187				t.Fatalf("unexpected error getting field type: %v", err)
2188			}
2189
2190			if exp := models.Float; got != exp {
2191				t.Fatalf("field type mismatch: got %v, exp %v", got, exp)
2192			}
2193
2194			if err := e.WriteSnapshot(); err != nil {
2195				t.Fatalf("unexpected error writing snapshot: %v", err)
2196			}
2197
2198			if err := e.Reopen(); err != nil {
2199				t.Fatalf("unexpected error reopning engine: %v", err)
2200			}
2201
2202			if err := e.WritePointsString(
2203				`cpu,host=A value=1i 1`,
2204			); err != tsdb.ErrFieldTypeConflict {
2205				t.Fatalf("expected field type conflict: got %v", err)
2206			}
2207		})
2208	}
2209}
2210
2211func TestEngine_Invalid_UTF8(t *testing.T) {
2212	for _, index := range tsdb.RegisteredIndexes() {
2213		t.Run(index, func(t *testing.T) {
2214			name := []byte{255, 112, 114, 111, 99} // A known invalid UTF-8 string
2215			field := []byte{255, 110, 101, 116}    // A known invalid UTF-8 string
2216			p := MustParsePointString(fmt.Sprintf("%s,host=A %s=1.1 6000000000", name, field))
2217
2218			e, err := NewEngine(index)
2219			if err != nil {
2220				t.Fatal(err)
2221			}
2222
2223			// mock the planner so compactions don't run during the test
2224			e.CompactionPlan = &mockPlanner{}
2225			if err := e.Open(); err != nil {
2226				t.Fatal(err)
2227			}
2228			defer e.Close()
2229
2230			if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
2231				t.Fatalf("create series index error: %v", err)
2232			}
2233
2234			if err := e.WritePoints([]models.Point{p}); err != nil {
2235				t.Fatalf("failed to write points: %s", err.Error())
2236			}
2237
2238			// Re-open the engine
2239			if err := e.Reopen(); err != nil {
2240				t.Fatal(err)
2241			}
2242		})
2243	}
2244}
2245func BenchmarkEngine_WritePoints(b *testing.B) {
2246	batchSizes := []int{10, 100, 1000, 5000, 10000}
2247	for _, sz := range batchSizes {
2248		for _, index := range tsdb.RegisteredIndexes() {
2249			e := MustOpenEngine(index)
2250			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
2251			pp := make([]models.Point, 0, sz)
2252			for i := 0; i < sz; i++ {
2253				p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i))
2254				pp = append(pp, p)
2255			}
2256
2257			b.Run(fmt.Sprintf("%s_%d", index, sz), func(b *testing.B) {
2258				b.ReportAllocs()
2259				for i := 0; i < b.N; i++ {
2260					err := e.WritePoints(pp)
2261					if err != nil {
2262						b.Fatal(err)
2263					}
2264				}
2265			})
2266			e.Close()
2267		}
2268	}
2269}
2270
2271func BenchmarkEngine_WritePoints_Parallel(b *testing.B) {
2272	batchSizes := []int{1000, 5000, 10000, 25000, 50000, 75000, 100000, 200000}
2273	for _, sz := range batchSizes {
2274		for _, index := range tsdb.RegisteredIndexes() {
2275			e := MustOpenEngine(index)
2276			e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
2277
2278			cpus := runtime.GOMAXPROCS(0)
2279			pp := make([]models.Point, 0, sz*cpus)
2280			for i := 0; i < sz*cpus; i++ {
2281				p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2,other=%di", i, i))
2282				pp = append(pp, p)
2283			}
2284
2285			b.Run(fmt.Sprintf("%s_%d", index, sz), func(b *testing.B) {
2286				b.ReportAllocs()
2287				for i := 0; i < b.N; i++ {
2288					var wg sync.WaitGroup
2289					errC := make(chan error)
2290					for i := 0; i < cpus; i++ {
2291						wg.Add(1)
2292						go func(i int) {
2293							defer wg.Done()
2294							from, to := i*sz, (i+1)*sz
2295							err := e.WritePoints(pp[from:to])
2296							if err != nil {
2297								errC <- err
2298								return
2299							}
2300						}(i)
2301					}
2302
2303					go func() {
2304						wg.Wait()
2305						close(errC)
2306					}()
2307
2308					for err := range errC {
2309						if err != nil {
2310							b.Error(err)
2311						}
2312					}
2313				}
2314			})
2315			e.Close()
2316		}
2317	}
2318}
2319
2320var benchmarks = []struct {
2321	name string
2322	opt  query.IteratorOptions
2323}{
2324	{
2325		name: "Count",
2326		opt: query.IteratorOptions{
2327			Expr:      influxql.MustParseExpr("count(value)"),
2328			Ascending: true,
2329			StartTime: influxql.MinTime,
2330			EndTime:   influxql.MaxTime,
2331		},
2332	},
2333	{
2334		name: "First",
2335		opt: query.IteratorOptions{
2336			Expr:      influxql.MustParseExpr("first(value)"),
2337			Ascending: true,
2338			StartTime: influxql.MinTime,
2339			EndTime:   influxql.MaxTime,
2340		},
2341	},
2342	{
2343		name: "Last",
2344		opt: query.IteratorOptions{
2345			Expr:      influxql.MustParseExpr("last(value)"),
2346			Ascending: true,
2347			StartTime: influxql.MinTime,
2348			EndTime:   influxql.MaxTime,
2349		},
2350	},
2351	{
2352		name: "Limit",
2353		opt: query.IteratorOptions{
2354			Expr:      influxql.MustParseExpr("value"),
2355			Ascending: true,
2356			StartTime: influxql.MinTime,
2357			EndTime:   influxql.MaxTime,
2358			Limit:     10,
2359		},
2360	},
2361}
2362
2363var benchmarkVariants = []struct {
2364	name   string
2365	modify func(opt query.IteratorOptions) query.IteratorOptions
2366}{
2367	{
2368		name: "All",
2369		modify: func(opt query.IteratorOptions) query.IteratorOptions {
2370			return opt
2371		},
2372	},
2373	{
2374		name: "GroupByTime_1m-1h",
2375		modify: func(opt query.IteratorOptions) query.IteratorOptions {
2376			opt.StartTime = 0
2377			opt.EndTime = int64(time.Hour) - 1
2378			opt.Interval = query.Interval{
2379				Duration: time.Minute,
2380			}
2381			return opt
2382		},
2383	},
2384	{
2385		name: "GroupByTime_1h-1d",
2386		modify: func(opt query.IteratorOptions) query.IteratorOptions {
2387			opt.StartTime = 0
2388			opt.EndTime = int64(24*time.Hour) - 1
2389			opt.Interval = query.Interval{
2390				Duration: time.Hour,
2391			}
2392			return opt
2393		},
2394	},
2395	{
2396		name: "GroupByTime_1m-1d",
2397		modify: func(opt query.IteratorOptions) query.IteratorOptions {
2398			opt.StartTime = 0
2399			opt.EndTime = int64(24*time.Hour) - 1
2400			opt.Interval = query.Interval{
2401				Duration: time.Minute,
2402			}
2403			return opt
2404		},
2405	},
2406	{
2407		name: "GroupByHost",
2408		modify: func(opt query.IteratorOptions) query.IteratorOptions {
2409			opt.Dimensions = []string{"host"}
2410			return opt
2411		},
2412	},
2413	{
2414		name: "GroupByHostAndTime_1m-1h",
2415		modify: func(opt query.IteratorOptions) query.IteratorOptions {
2416			opt.Dimensions = []string{"host"}
2417			opt.StartTime = 0
2418			opt.EndTime = int64(time.Hour) - 1
2419			opt.Interval = query.Interval{
2420				Duration: time.Minute,
2421			}
2422			return opt
2423		},
2424	},
2425	{
2426		name: "GroupByHostAndTime_1h-1d",
2427		modify: func(opt query.IteratorOptions) query.IteratorOptions {
2428			opt.Dimensions = []string{"host"}
2429			opt.StartTime = 0
2430			opt.EndTime = int64(24*time.Hour) - 1
2431			opt.Interval = query.Interval{
2432				Duration: time.Hour,
2433			}
2434			return opt
2435		},
2436	},
2437	{
2438		name: "GroupByHostAndTime_1m-1d",
2439		modify: func(opt query.IteratorOptions) query.IteratorOptions {
2440			opt.Dimensions = []string{"host"}
2441			opt.StartTime = 0
2442			opt.EndTime = int64(24*time.Hour) - 1
2443			opt.Interval = query.Interval{
2444				Duration: time.Hour,
2445			}
2446			return opt
2447		},
2448	},
2449}
2450
2451func BenchmarkEngine_CreateIterator(b *testing.B) {
2452	engines := make([]*benchmarkEngine, len(sizes))
2453	for i, size := range sizes {
2454		engines[i] = MustInitDefaultBenchmarkEngine(size.name, size.sz)
2455	}
2456
2457	for _, tt := range benchmarks {
2458		for _, variant := range benchmarkVariants {
2459			name := tt.name + "_" + variant.name
2460			opt := variant.modify(tt.opt)
2461			b.Run(name, func(b *testing.B) {
2462				for _, e := range engines {
2463					b.Run(e.Name, func(b *testing.B) {
2464						b.ReportAllocs()
2465						for i := 0; i < b.N; i++ {
2466							itr, err := e.CreateIterator(context.Background(), "cpu", opt)
2467							if err != nil {
2468								b.Fatal(err)
2469							}
2470							query.DrainIterator(itr)
2471						}
2472					})
2473				}
2474			})
2475		}
2476	}
2477}
2478
2479type benchmarkEngine struct {
2480	*Engine
2481	Name   string
2482	PointN int
2483}
2484
2485var (
2486	hostNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"}
2487	sizes     = []struct {
2488		name string
2489		sz   int
2490	}{
2491		{name: "1K", sz: 1000},
2492		{name: "100K", sz: 100000},
2493		{name: "1M", sz: 1000000},
2494	}
2495)
2496
2497// MustInitDefaultBenchmarkEngine creates a new engine using the default index
2498// and fills it with points.  Reuses previous engine if the same parameters
2499// were used.
2500func MustInitDefaultBenchmarkEngine(name string, pointN int) *benchmarkEngine {
2501	const batchSize = 1000
2502	if pointN%batchSize != 0 {
2503		panic(fmt.Sprintf("point count (%d) must be a multiple of batch size (%d)", pointN, batchSize))
2504	}
2505
2506	e := MustOpenEngine(tsdb.DefaultIndex)
2507
2508	// Initialize metadata.
2509	e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
2510	e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
2511
2512	// Generate time ascending points with jitterred time & value.
2513	rand := rand.New(rand.NewSource(0))
2514	for i := 0; i < pointN; i += batchSize {
2515		var buf bytes.Buffer
2516		for j := 0; j < batchSize; j++ {
2517			fmt.Fprintf(&buf, "cpu,host=%s value=%d %d",
2518				hostNames[j%len(hostNames)],
2519				100+rand.Intn(50)-25,
2520				(time.Duration(i+j)*time.Second)+(time.Duration(rand.Intn(500)-250)*time.Millisecond),
2521			)
2522			if j != pointN-1 {
2523				fmt.Fprint(&buf, "\n")
2524			}
2525		}
2526
2527		if err := e.WritePointsString(buf.String()); err != nil {
2528			panic(err)
2529		}
2530	}
2531
2532	if err := e.WriteSnapshot(); err != nil {
2533		panic(err)
2534	}
2535
2536	// Force garbage collection.
2537	runtime.GC()
2538
2539	// Save engine reference for reuse.
2540	return &benchmarkEngine{
2541		Engine: e,
2542		Name:   name,
2543		PointN: pointN,
2544	}
2545}
2546
2547// Engine is a test wrapper for tsm1.Engine.
2548type Engine struct {
2549	*tsm1.Engine
2550	root      string
2551	indexPath string
2552	indexType string
2553	index     tsdb.Index
2554	sfile     *tsdb.SeriesFile
2555}
2556
2557// NewEngine returns a new instance of Engine at a temporary location.
2558func NewEngine(index string) (*Engine, error) {
2559	root, err := ioutil.TempDir("", "tsm1-")
2560	if err != nil {
2561		panic(err)
2562	}
2563
2564	db := "db0"
2565	dbPath := filepath.Join(root, "data", db)
2566
2567	if err := os.MkdirAll(dbPath, os.ModePerm); err != nil {
2568		return nil, err
2569	}
2570
2571	// Setup series file.
2572	sfile := tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.SeriesFileDirectory))
2573	sfile.Logger = logger.New(os.Stdout)
2574	if err = sfile.Open(); err != nil {
2575		return nil, err
2576	}
2577
2578	opt := tsdb.NewEngineOptions()
2579	opt.IndexVersion = index
2580	if index == tsdb.InmemIndexName {
2581		opt.InmemIndex = inmem.NewIndex(db, sfile)
2582	}
2583	// Initialise series id sets. Need to do this as it's normally done at the
2584	// store level.
2585	seriesIDs := tsdb.NewSeriesIDSet()
2586	opt.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{seriesIDs})
2587
2588	idxPath := filepath.Join(dbPath, "index")
2589	idx := tsdb.MustOpenIndex(1, db, idxPath, seriesIDs, sfile, opt)
2590
2591	tsm1Engine := tsm1.NewEngine(1, idx, filepath.Join(root, "data"), filepath.Join(root, "wal"), sfile, opt).(*tsm1.Engine)
2592
2593	return &Engine{
2594		Engine:    tsm1Engine,
2595		root:      root,
2596		indexPath: idxPath,
2597		indexType: index,
2598		index:     idx,
2599		sfile:     sfile,
2600	}, nil
2601}
2602
2603// MustOpenEngine returns a new, open instance of Engine.
2604func MustOpenEngine(index string) *Engine {
2605	e, err := NewEngine(index)
2606	if err != nil {
2607		panic(err)
2608	}
2609
2610	if err := e.Open(); err != nil {
2611		panic(err)
2612	}
2613	return e
2614}
2615
2616// Close closes the engine and removes all underlying data.
2617func (e *Engine) Close() error {
2618	return e.close(true)
2619}
2620
2621func (e *Engine) close(cleanup bool) error {
2622	if e.index != nil {
2623		e.index.Close()
2624	}
2625
2626	if e.sfile != nil {
2627		e.sfile.Close()
2628	}
2629
2630	defer func() {
2631		if cleanup {
2632			os.RemoveAll(e.root)
2633		}
2634	}()
2635	return e.Engine.Close()
2636}
2637
2638// Reopen closes and reopens the engine.
2639func (e *Engine) Reopen() error {
2640	// Close engine without removing underlying engine data.
2641	if err := e.close(false); err != nil {
2642		return err
2643	}
2644
2645	// Re-open series file. Must create a new series file using the same data.
2646	e.sfile = tsdb.NewSeriesFile(e.sfile.Path())
2647	if err := e.sfile.Open(); err != nil {
2648		return err
2649	}
2650
2651	db := path.Base(e.root)
2652	opt := tsdb.NewEngineOptions()
2653	opt.InmemIndex = inmem.NewIndex(db, e.sfile)
2654
2655	// Re-initialise the series id set
2656	seriesIDSet := tsdb.NewSeriesIDSet()
2657	opt.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{seriesIDSet})
2658
2659	// Re-open index.
2660	e.index = tsdb.MustOpenIndex(1, db, e.indexPath, seriesIDSet, e.sfile, opt)
2661
2662	// Re-initialize engine.
2663	e.Engine = tsm1.NewEngine(1, e.index, filepath.Join(e.root, "data"), filepath.Join(e.root, "wal"), e.sfile, opt).(*tsm1.Engine)
2664
2665	// Reopen engine
2666	if err := e.Engine.Open(); err != nil {
2667		return err
2668	}
2669
2670	// Reload series data into index (no-op on TSI).
2671	return e.LoadMetadataIndex(1, e.index)
2672}
2673
2674// SeriesIDSet provides access to the underlying series id bitset in the engine's
2675// index. It will panic if the underlying index does not have a SeriesIDSet
2676// method.
2677func (e *Engine) SeriesIDSet() *tsdb.SeriesIDSet {
2678	return e.index.SeriesIDSet()
2679}
2680
2681// AddSeries adds the provided series data to the index and writes a point to
2682// the engine with default values for a field and a time of now.
2683func (e *Engine) AddSeries(name string, tags map[string]string) error {
2684	point, err := models.NewPoint(name, models.NewTags(tags), models.Fields{"v": 1.0}, time.Now())
2685	if err != nil {
2686		return err
2687	}
2688	return e.writePoints(point)
2689}
2690
2691// WritePointsString calls WritePointsString on the underlying engine, but also
2692// adds the associated series to the index.
2693func (e *Engine) WritePointsString(ptstr ...string) error {
2694	points, err := models.ParsePointsString(strings.Join(ptstr, "\n"))
2695	if err != nil {
2696		return err
2697	}
2698	return e.writePoints(points...)
2699}
2700
2701// writePoints adds the series for the provided points to the index, and writes
2702// the point data to the engine.
2703func (e *Engine) writePoints(points ...models.Point) error {
2704	for _, point := range points {
2705		// Write into the index.
2706		if err := e.Engine.CreateSeriesIfNotExists(point.Key(), point.Name(), point.Tags()); err != nil {
2707			return err
2708		}
2709	}
2710	// Write the points into the cache/wal.
2711	return e.WritePoints(points)
2712}
2713
2714// MustAddSeries calls AddSeries, panicking if there is an error.
2715func (e *Engine) MustAddSeries(name string, tags map[string]string) {
2716	if err := e.AddSeries(name, tags); err != nil {
2717		panic(err)
2718	}
2719}
2720
2721// MustWriteSnapshot forces a snapshot of the engine. Panic on error.
2722func (e *Engine) MustWriteSnapshot() {
2723	if err := e.WriteSnapshot(); err != nil {
2724		panic(err)
2725	}
2726}
2727
2728// SeriesFile is a test wrapper for tsdb.SeriesFile.
2729type SeriesFile struct {
2730	*tsdb.SeriesFile
2731}
2732
2733// NewSeriesFile returns a new instance of SeriesFile with a temporary file path.
2734func NewSeriesFile() *SeriesFile {
2735	dir, err := ioutil.TempDir("", "tsdb-series-file-")
2736	if err != nil {
2737		panic(err)
2738	}
2739	return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(dir)}
2740}
2741
2742// MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error.
2743func MustOpenSeriesFile() *SeriesFile {
2744	f := NewSeriesFile()
2745	if err := f.Open(); err != nil {
2746		panic(err)
2747	}
2748	return f
2749}
2750
2751// Close closes the log file and removes it from disk.
2752func (f *SeriesFile) Close() {
2753	defer os.RemoveAll(f.Path())
2754	if err := f.SeriesFile.Close(); err != nil {
2755		panic(err)
2756	}
2757}
2758
2759// MustParsePointsString parses points from a string. Panic on error.
2760func MustParsePointsString(buf string) []models.Point {
2761	a, err := models.ParsePointsString(buf)
2762	if err != nil {
2763		panic(err)
2764	}
2765	return a
2766}
2767
2768// MustParsePointString parses the first point from a string. Panic on error.
2769func MustParsePointString(buf string) models.Point { return MustParsePointsString(buf)[0] }
2770
2771type mockPlanner struct{}
2772
2773func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return nil }
2774func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup      { return nil }
2775func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup            { return nil }
2776func (m *mockPlanner) Release(groups []tsm1.CompactionGroup)           {}
2777func (m *mockPlanner) FullyCompacted() bool                            { return false }
2778func (m *mockPlanner) ForceFull()                                      {}
2779func (m *mockPlanner) SetFileStore(fs *tsm1.FileStore)                 {}
2780
2781// ParseTags returns an instance of Tags for a comma-delimited list of key/values.
2782func ParseTags(s string) query.Tags {
2783	m := make(map[string]string)
2784	for _, kv := range strings.Split(s, ",") {
2785		a := strings.Split(kv, "=")
2786		m[a[0]] = a[1]
2787	}
2788	return query.NewTags(m)
2789}
2790
2791type seriesIterator struct {
2792	keys [][]byte
2793}
2794
2795type series struct {
2796	name    []byte
2797	tags    models.Tags
2798	deleted bool
2799}
2800
2801func (s series) Name() []byte        { return s.name }
2802func (s series) Tags() models.Tags   { return s.tags }
2803func (s series) Deleted() bool       { return s.deleted }
2804func (s series) Expr() influxql.Expr { return nil }
2805
2806func (itr *seriesIterator) Close() error { return nil }
2807
2808func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) {
2809	if len(itr.keys) == 0 {
2810		return nil, nil
2811	}
2812	name, tags := models.ParseKeyBytes(itr.keys[0])
2813	s := series{name: name, tags: tags}
2814	itr.keys = itr.keys[1:]
2815	return s, nil
2816}
2817
2818type seriesIDSets []*tsdb.SeriesIDSet
2819
2820func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error {
2821	for _, v := range a {
2822		f(v)
2823	}
2824	return nil
2825}
2826