1package tsi1_test
2
3import (
4	"bytes"
5	"fmt"
6	"io/ioutil"
7	"math/rand"
8	"os"
9	"path/filepath"
10	"reflect"
11	"regexp"
12	"runtime/pprof"
13	"sort"
14	"testing"
15	"time"
16
17	"github.com/influxdata/influxdb/pkg/slices"
18
19	"github.com/influxdata/influxdb/models"
20	"github.com/influxdata/influxdb/pkg/bloom"
21	"github.com/influxdata/influxdb/tsdb"
22	"github.com/influxdata/influxdb/tsdb/index/tsi1"
23)
24
25// Ensure log file can append series.
26func TestLogFile_AddSeriesList(t *testing.T) {
27	sfile := MustOpenSeriesFile()
28	defer sfile.Close()
29
30	f := MustOpenLogFile(sfile.SeriesFile)
31	defer f.Close()
32	seriesSet := tsdb.NewSeriesIDSet()
33
34	// Add test data.
35	ids, err := f.AddSeriesList(seriesSet,
36		slices.StringsToBytes("cpu", "mem"),
37		[]models.Tags{
38			models.NewTags(map[string]string{"region": "us-east"}),
39			models.NewTags(map[string]string{"host": "serverA"}),
40		},
41	)
42
43	if err != nil {
44		t.Fatal(err)
45	}
46
47	// Returned series ids should match those in the seriesSet.
48	other := tsdb.NewSeriesIDSet(ids...)
49	if !other.Equals(seriesSet) {
50		t.Fatalf("got series ids %s, expected %s", other, seriesSet)
51	}
52
53	// Add the same series again with a new one.
54	ids, err = f.AddSeriesList(seriesSet,
55		slices.StringsToBytes("cpu", "mem"),
56		[]models.Tags{
57			models.NewTags(map[string]string{"region": "us-west"}),
58			models.NewTags(map[string]string{"host": "serverA"}),
59		},
60	)
61
62	if err != nil {
63		t.Fatal(err)
64	}
65
66	if got, exp := len(ids), 2; got != exp {
67		t.Fatalf("got %d series ids, expected %d", got, exp)
68	} else if got := ids[0]; got == 0 {
69		t.Error("series id was 0, expected it not to be")
70	} else if got := ids[1]; got != 0 {
71		t.Errorf("got series id %d, expected 0", got)
72	}
73
74	// Add only the same series IDs.
75	ids, err = f.AddSeriesList(seriesSet,
76		slices.StringsToBytes("cpu", "mem"),
77		[]models.Tags{
78			models.NewTags(map[string]string{"region": "us-west"}),
79			models.NewTags(map[string]string{"host": "serverA"}),
80		},
81	)
82
83	if err != nil {
84		t.Fatal(err)
85	}
86
87	if got, exp := ids, make([]uint64, 2); !reflect.DeepEqual(got, exp) {
88		t.Fatalf("got ids %v, expected %v", got, exp)
89	}
90
91	// Verify data.
92	itr := f.MeasurementIterator()
93	if e := itr.Next(); e == nil || string(e.Name()) != "cpu" {
94		t.Fatalf("unexpected measurement: %#v", e)
95	} else if e := itr.Next(); e == nil || string(e.Name()) != "mem" {
96		t.Fatalf("unexpected measurement: %#v", e)
97	} else if e := itr.Next(); e != nil {
98		t.Fatalf("expected eof, got: %#v", e)
99	}
100
101	// Reopen file and re-verify.
102	if err := f.Reopen(); err != nil {
103		t.Fatal(err)
104	}
105
106	// Verify data.
107	itr = f.MeasurementIterator()
108	if e := itr.Next(); e == nil || string(e.Name()) != "cpu" {
109		t.Fatalf("unexpected measurement: %#v", e)
110	} else if e := itr.Next(); e == nil || string(e.Name()) != "mem" {
111		t.Fatalf("unexpected measurement: %#v", e)
112	} else if e := itr.Next(); e != nil {
113		t.Fatalf("expected eof, got: %#v", e)
114	}
115}
116
117func TestLogFile_SeriesStoredInOrder(t *testing.T) {
118	sfile := MustOpenSeriesFile()
119	defer sfile.Close()
120
121	f := MustOpenLogFile(sfile.SeriesFile)
122	defer f.Close()
123	seriesSet := tsdb.NewSeriesIDSet()
124
125	// Generate and add test data
126	tvm := make(map[string]struct{})
127	rand.Seed(time.Now().Unix())
128	for i := 0; i < 100; i++ {
129		tv := fmt.Sprintf("server-%d", rand.Intn(50)) // Encourage adding duplicate series.
130		tvm[tv] = struct{}{}
131
132		if _, err := f.AddSeriesList(seriesSet, [][]byte{
133			[]byte("mem"),
134			[]byte("cpu"),
135		}, []models.Tags{
136			{models.NewTag([]byte("host"), []byte(tv))},
137			{models.NewTag([]byte("host"), []byte(tv))},
138		}); err != nil {
139			t.Fatal(err)
140		}
141	}
142
143	// Sort the tag values so we know what order to expect.
144	tvs := make([]string, 0, len(tvm))
145	for tv := range tvm {
146		tvs = append(tvs, tv)
147	}
148	sort.Strings(tvs)
149
150	// Double the series values since we're adding them twice (two measurements)
151	tvs = append(tvs, tvs...)
152
153	// When we pull the series out via an iterator they should be in order.
154	itr := f.SeriesIDIterator()
155	if itr == nil {
156		t.Fatal("nil iterator")
157	}
158
159	var prevSeriesID uint64
160	for i := 0; i < len(tvs); i++ {
161		elem, err := itr.Next()
162		if err != nil {
163			t.Fatal(err)
164		} else if elem.SeriesID == 0 {
165			t.Fatal("got nil series")
166		} else if elem.SeriesID < prevSeriesID {
167			t.Fatalf("series out of order: %d !< %d ", elem.SeriesID, prevSeriesID)
168		}
169		prevSeriesID = elem.SeriesID
170	}
171}
172
173// Ensure log file can delete an existing measurement.
174func TestLogFile_DeleteMeasurement(t *testing.T) {
175	sfile := MustOpenSeriesFile()
176	defer sfile.Close()
177
178	f := MustOpenLogFile(sfile.SeriesFile)
179	defer f.Close()
180	seriesSet := tsdb.NewSeriesIDSet()
181
182	// Add test data.
183	if _, err := f.AddSeriesList(seriesSet, [][]byte{
184		[]byte("mem"),
185		[]byte("cpu"),
186		[]byte("cpu"),
187	}, []models.Tags{
188		{{Key: []byte("host"), Value: []byte("serverA")}},
189		{{Key: []byte("region"), Value: []byte("us-east")}},
190		{{Key: []byte("region"), Value: []byte("us-west")}},
191	}); err != nil {
192		t.Fatal(err)
193	}
194
195	// Remove measurement.
196	if err := f.DeleteMeasurement([]byte("cpu")); err != nil {
197		t.Fatal(err)
198	}
199
200	// Verify data.
201	itr := f.MeasurementIterator()
202	if e := itr.Next(); string(e.Name()) != "cpu" || !e.Deleted() {
203		t.Fatalf("unexpected measurement: %s/%v", e.Name(), e.Deleted())
204	} else if e := itr.Next(); string(e.Name()) != "mem" || e.Deleted() {
205		t.Fatalf("unexpected measurement: %s/%v", e.Name(), e.Deleted())
206	} else if e := itr.Next(); e != nil {
207		t.Fatalf("expected eof, got: %#v", e)
208	}
209}
210
211// Ensure log file can recover correctly.
212func TestLogFile_Open(t *testing.T) {
213	t.Run("Truncate", func(t *testing.T) {
214		sfile := MustOpenSeriesFile()
215		defer sfile.Close()
216		seriesSet := tsdb.NewSeriesIDSet()
217
218		f := MustOpenLogFile(sfile.SeriesFile)
219		defer f.Close()
220
221		// Add test data & close.
222		if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil {
223			t.Fatal(err)
224		} else if err := f.LogFile.Close(); err != nil {
225			t.Fatal(err)
226		}
227
228		// Truncate data & reopen.
229		if fi, err := os.Stat(f.LogFile.Path()); err != nil {
230			t.Fatal(err)
231		} else if err := os.Truncate(f.LogFile.Path(), fi.Size()-1); err != nil {
232			t.Fatal(err)
233		} else if err := f.LogFile.Open(); err != nil {
234			t.Fatal(err)
235		}
236
237		// Verify data.
238		itr := f.SeriesIDIterator()
239		if elem, err := itr.Next(); err != nil {
240			t.Fatal(err)
241		} else if name, tags := sfile.Series(elem.SeriesID); string(name) != `cpu` {
242			t.Fatalf("unexpected series: %s,%s", name, tags.String())
243		} else if elem, err := itr.Next(); err != nil {
244			t.Fatal(err)
245		} else if elem.SeriesID != 0 {
246			t.Fatalf("expected eof, got: %#v", elem)
247		}
248
249		// Add more data & reopen.
250		if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("disk")}, []models.Tags{{{}}}); err != nil {
251			t.Fatal(err)
252		} else if err := f.Reopen(); err != nil {
253			t.Fatal(err)
254		}
255
256		// Verify new data.
257		itr = f.SeriesIDIterator()
258		if elem, err := itr.Next(); err != nil {
259			t.Fatal(err)
260		} else if name, tags := sfile.Series(elem.SeriesID); string(name) != `cpu` {
261			t.Fatalf("unexpected series: %s,%s", name, tags.String())
262		} else if elem, err := itr.Next(); err != nil {
263			t.Fatal(err)
264		} else if name, tags := sfile.Series(elem.SeriesID); string(name) != `disk` {
265			t.Fatalf("unexpected series: %s,%s", name, tags.String())
266		} else if elem, err := itr.Next(); err != nil {
267			t.Fatal(err)
268		} else if elem.SeriesID != 0 {
269			t.Fatalf("expected eof, got: %#v", elem)
270		}
271	})
272
273	t.Run("ChecksumMismatch", func(t *testing.T) {
274		sfile := MustOpenSeriesFile()
275		defer sfile.Close()
276		seriesSet := tsdb.NewSeriesIDSet()
277
278		f := MustOpenLogFile(sfile.SeriesFile)
279		defer f.Close()
280
281		// Add test data & close.
282		if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil {
283			t.Fatal(err)
284		} else if err := f.LogFile.Close(); err != nil {
285			t.Fatal(err)
286		}
287
288		// Corrupt last entry.
289		buf, err := ioutil.ReadFile(f.LogFile.Path())
290		if err != nil {
291			t.Fatal(err)
292		}
293		buf[len(buf)-1] = 0
294
295		// Overwrite file with corrupt entry and reopen.
296		if err := ioutil.WriteFile(f.LogFile.Path(), buf, 0666); err != nil {
297			t.Fatal(err)
298		} else if err := f.LogFile.Open(); err != nil {
299			t.Fatal(err)
300		}
301
302		// Verify data.
303		itr := f.SeriesIDIterator()
304		if elem, err := itr.Next(); err != nil {
305			t.Fatal(err)
306		} else if name, tags := sfile.Series(elem.SeriesID); string(name) != `cpu` {
307			t.Fatalf("unexpected series: %s,%s", name, tags.String())
308		} else if elem, err := itr.Next(); err != nil {
309			t.Fatal(err)
310		} else if elem.SeriesID != 0 {
311			t.Fatalf("expected eof, got: %#v", elem)
312		}
313	})
314}
315
316// LogFile is a test wrapper for tsi1.LogFile.
317type LogFile struct {
318	*tsi1.LogFile
319}
320
321// NewLogFile returns a new instance of LogFile with a temporary file path.
322func NewLogFile(sfile *tsdb.SeriesFile) *LogFile {
323	file, err := ioutil.TempFile("", "tsi1-log-file-")
324	if err != nil {
325		panic(err)
326	}
327	file.Close()
328
329	return &LogFile{LogFile: tsi1.NewLogFile(sfile, file.Name())}
330}
331
332// MustOpenLogFile returns a new, open instance of LogFile. Panic on error.
333func MustOpenLogFile(sfile *tsdb.SeriesFile) *LogFile {
334	f := NewLogFile(sfile)
335	if err := f.Open(); err != nil {
336		panic(err)
337	}
338	return f
339}
340
341// Close closes the log file and removes it from disk.
342func (f *LogFile) Close() error {
343	defer os.Remove(f.Path())
344	return f.LogFile.Close()
345}
346
347// Reopen closes and reopens the file.
348func (f *LogFile) Reopen() error {
349	if err := f.LogFile.Close(); err != nil {
350		return err
351	}
352	if err := f.LogFile.Open(); err != nil {
353		return err
354	}
355	return nil
356}
357
358// CreateLogFile creates a new temporary log file and adds a list of series.
359func CreateLogFile(sfile *tsdb.SeriesFile, series []Series) (*LogFile, error) {
360	f := MustOpenLogFile(sfile)
361	seriesSet := tsdb.NewSeriesIDSet()
362	for _, serie := range series {
363		if _, err := f.AddSeriesList(seriesSet, [][]byte{serie.Name}, []models.Tags{serie.Tags}); err != nil {
364			return nil, err
365		}
366	}
367	return f, nil
368}
369
370// GenerateLogFile generates a log file from a set of series based on the count arguments.
371// Total series returned will equal measurementN * tagN * valueN.
372func GenerateLogFile(sfile *tsdb.SeriesFile, measurementN, tagN, valueN int) (*LogFile, error) {
373	tagValueN := pow(valueN, tagN)
374
375	f := MustOpenLogFile(sfile)
376	seriesSet := tsdb.NewSeriesIDSet()
377	for i := 0; i < measurementN; i++ {
378		name := []byte(fmt.Sprintf("measurement%d", i))
379
380		// Generate tag sets.
381		for j := 0; j < tagValueN; j++ {
382			var tags models.Tags
383			for k := 0; k < tagN; k++ {
384				key := []byte(fmt.Sprintf("key%d", k))
385				value := []byte(fmt.Sprintf("value%d", (j / pow(valueN, k) % valueN)))
386				tags = append(tags, models.NewTag(key, value))
387			}
388			if _, err := f.AddSeriesList(seriesSet, [][]byte{name}, []models.Tags{tags}); err != nil {
389				return nil, err
390			}
391		}
392	}
393	return f, nil
394}
395
396func benchmarkLogFile_AddSeries(b *testing.B, measurementN, seriesKeyN, seriesValueN int) {
397	sfile := MustOpenSeriesFile()
398	defer sfile.Close()
399
400	b.StopTimer()
401	f := MustOpenLogFile(sfile.SeriesFile)
402	seriesSet := tsdb.NewSeriesIDSet()
403
404	type Datum struct {
405		Name []byte
406		Tags models.Tags
407	}
408
409	// Pre-generate everything.
410	var (
411		data   []Datum
412		series int
413	)
414
415	tagValueN := pow(seriesValueN, seriesKeyN)
416
417	for i := 0; i < measurementN; i++ {
418		name := []byte(fmt.Sprintf("measurement%d", i))
419		for j := 0; j < tagValueN; j++ {
420			var tags models.Tags
421			for k := 0; k < seriesKeyN; k++ {
422				key := []byte(fmt.Sprintf("key%d", k))
423				value := []byte(fmt.Sprintf("value%d", (j / pow(seriesValueN, k) % seriesValueN)))
424				tags = append(tags, models.NewTag(key, value))
425			}
426			data = append(data, Datum{Name: name, Tags: tags})
427			series += len(tags)
428		}
429	}
430	b.StartTimer()
431	b.ResetTimer()
432
433	for i := 0; i < b.N; i++ {
434		for _, d := range data {
435			if _, err := f.AddSeriesList(seriesSet, [][]byte{d.Name}, []models.Tags{d.Tags}); err != nil {
436				b.Fatal(err)
437			}
438		}
439	}
440}
441
442func BenchmarkLogFile_AddSeries_100_1_1(b *testing.B)    { benchmarkLogFile_AddSeries(b, 100, 1, 1) }    // 100 series
443func BenchmarkLogFile_AddSeries_1000_1_1(b *testing.B)   { benchmarkLogFile_AddSeries(b, 1000, 1, 1) }   // 1000 series
444func BenchmarkLogFile_AddSeries_10000_1_1(b *testing.B)  { benchmarkLogFile_AddSeries(b, 10000, 1, 1) }  // 10000 series
445func BenchmarkLogFile_AddSeries_100_2_10(b *testing.B)   { benchmarkLogFile_AddSeries(b, 100, 2, 10) }   // ~20K series
446func BenchmarkLogFile_AddSeries_100000_1_1(b *testing.B) { benchmarkLogFile_AddSeries(b, 100000, 1, 1) } // ~100K series
447func BenchmarkLogFile_AddSeries_100_3_7(b *testing.B)    { benchmarkLogFile_AddSeries(b, 100, 3, 7) }    // ~100K series
448func BenchmarkLogFile_AddSeries_200_3_7(b *testing.B)    { benchmarkLogFile_AddSeries(b, 200, 3, 7) }    // ~200K series
449func BenchmarkLogFile_AddSeries_200_4_7(b *testing.B)    { benchmarkLogFile_AddSeries(b, 200, 4, 7) }    // ~1.9M series
450
451func BenchmarkLogFile_WriteTo(b *testing.B) {
452	for _, seriesN := range []int{1000, 10000, 100000, 1000000} {
453		name := fmt.Sprintf("series=%d", seriesN)
454		b.Run(name, func(b *testing.B) {
455			sfile := MustOpenSeriesFile()
456			defer sfile.Close()
457
458			f := MustOpenLogFile(sfile.SeriesFile)
459			defer f.Close()
460			seriesSet := tsdb.NewSeriesIDSet()
461
462			// Estimate bloom filter size.
463			m, k := bloom.Estimate(uint64(seriesN), 0.02)
464
465			// Initialize log file with series data.
466			for i := 0; i < seriesN; i++ {
467				if _, err := f.AddSeriesList(
468					seriesSet,
469					[][]byte{[]byte("cpu")},
470					[]models.Tags{{
471						{Key: []byte("host"), Value: []byte(fmt.Sprintf("server-%d", i))},
472						{Key: []byte("location"), Value: []byte("us-west")},
473					}},
474				); err != nil {
475					b.Fatal(err)
476				}
477			}
478			b.ResetTimer()
479
480			// Create cpu profile for each subtest.
481			MustStartCPUProfile(name)
482			defer pprof.StopCPUProfile()
483
484			// Compact log file.
485			for i := 0; i < b.N; i++ {
486				buf := bytes.NewBuffer(make([]byte, 0, 150*seriesN))
487				if _, err := f.CompactTo(buf, m, k, nil); err != nil {
488					b.Fatal(err)
489				}
490				b.Logf("sz=%db", buf.Len())
491			}
492		})
493	}
494}
495
496// MustStartCPUProfile starts a cpu profile in a temporary path based on name.
497func MustStartCPUProfile(name string) {
498	name = regexp.MustCompile(`\W+`).ReplaceAllString(name, "-")
499
500	// Open file and start pprof.
501	f, err := os.Create(filepath.Join("/tmp", fmt.Sprintf("cpu-%s.pprof", name)))
502	if err != nil {
503		panic(err)
504	}
505	if err := pprof.StartCPUProfile(f); err != nil {
506		panic(err)
507	}
508}
509