1package tsm1_test
2
3import (
4	"fmt"
5	"io"
6	"os"
7	"path/filepath"
8	"reflect"
9	"sort"
10	"testing"
11
12	"github.com/influxdata/influxdb/tsdb/engine/tsm1"
13)
14
15func TestDigest_None(t *testing.T) {
16	dir := MustTempDir()
17	dataDir := filepath.Join(dir, "data")
18	if err := os.Mkdir(dataDir, 0755); err != nil {
19		t.Fatalf("create data dir: %v", err)
20	}
21
22	df := MustTempFile(dir)
23
24	files := []string{}
25	if err := tsm1.Digest(dir, files, df); err != nil {
26		t.Fatalf("digest error: %v", err)
27	}
28
29	df, err := os.Open(df.Name())
30	if err != nil {
31		t.Fatalf("open error: %v", err)
32	}
33
34	r, err := tsm1.NewDigestReader(df)
35	if err != nil {
36		t.Fatalf("NewDigestReader error: %v", err)
37	}
38	defer r.Close()
39
40	mfest, err := r.ReadManifest()
41	if err != nil {
42		t.Fatal(err)
43	}
44
45	if len(mfest.Entries) != 0 {
46		t.Fatalf("exp: 0, got: %d", len(mfest.Entries))
47	}
48
49	var count int
50	for {
51		_, _, err := r.ReadTimeSpan()
52		if err == io.EOF {
53			break
54		}
55
56		count++
57	}
58
59	if got, exp := count, 0; got != exp {
60		t.Fatalf("count mismatch: got %v, exp %v", got, exp)
61	}
62}
63
64func TestDigest_One(t *testing.T) {
65	dir := MustTempDir()
66	dataDir := filepath.Join(dir, "data")
67	if err := os.Mkdir(dataDir, 0755); err != nil {
68		t.Fatalf("create data dir: %v", err)
69	}
70
71	a1 := tsm1.NewValue(1, 1.1)
72	writes := map[string][]tsm1.Value{
73		"cpu,host=A#!~#value": []tsm1.Value{a1},
74	}
75	MustWriteTSM(dir, 1, writes)
76
77	files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", tsm1.TSMFileExtension)))
78	if err != nil {
79		t.Fatal(err)
80	}
81
82	df := MustTempFile(dir)
83
84	if err := tsm1.Digest(dir, files, df); err != nil {
85		t.Fatalf("digest error: %v", err)
86	}
87
88	df, err = os.Open(df.Name())
89	if err != nil {
90		t.Fatalf("open error: %v", err)
91	}
92
93	r, err := tsm1.NewDigestReader(df)
94	if err != nil {
95		t.Fatalf("NewDigestReader error: %v", err)
96	}
97	defer r.Close()
98
99	mfest, err := r.ReadManifest()
100	if err != nil {
101		t.Fatal(err)
102	}
103
104	if len(mfest.Entries) != 1 {
105		t.Fatalf("exp: 1, got: %d", len(mfest.Entries))
106	}
107
108	var count int
109	for {
110		key, _, err := r.ReadTimeSpan()
111		if err == io.EOF {
112			break
113		}
114
115		if got, exp := key, "cpu,host=A#!~#value"; got != exp {
116			t.Fatalf("key mismatch: got %v, exp %v", got, exp)
117		}
118
119		count++
120	}
121
122	if got, exp := count, 1; got != exp {
123		t.Fatalf("count mismatch: got %v, exp %v", got, exp)
124	}
125}
126
127func TestDigest_TimeFilter(t *testing.T) {
128	dir := MustTempDir()
129	dataDir := filepath.Join(dir, "data")
130	if err := os.Mkdir(dataDir, 0755); err != nil {
131		t.Fatalf("create data dir: %v", err)
132	}
133
134	a1 := tsm1.NewValue(1, 1.1)
135	writes := map[string][]tsm1.Value{
136		"cpu,host=A#!~#value": []tsm1.Value{a1},
137	}
138	MustWriteTSM(dir, 1, writes)
139
140	a2 := tsm1.NewValue(2, 2.1)
141	writes = map[string][]tsm1.Value{
142		"cpu,host=A#!~#value": []tsm1.Value{a2},
143	}
144	MustWriteTSM(dir, 2, writes)
145
146	a3 := tsm1.NewValue(3, 3.1)
147	writes = map[string][]tsm1.Value{
148		"cpu,host=A#!~#value": []tsm1.Value{a3},
149	}
150	MustWriteTSM(dir, 3, writes)
151
152	files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", tsm1.TSMFileExtension)))
153	if err != nil {
154		t.Fatal(err)
155	}
156
157	df := MustTempFile(dir)
158
159	if err := tsm1.DigestWithOptions(dir, files, tsm1.DigestOptions{MinTime: 2, MaxTime: 2}, df); err != nil {
160		t.Fatalf("digest error: %v", err)
161	}
162
163	df, err = os.Open(df.Name())
164	if err != nil {
165		t.Fatalf("open error: %v", err)
166	}
167
168	r, err := tsm1.NewDigestReader(df)
169	if err != nil {
170		t.Fatalf("NewDigestReader error: %v", err)
171	}
172	defer r.Close()
173
174	mfest, err := r.ReadManifest()
175	if err != nil {
176		t.Fatal(err)
177	}
178
179	if len(mfest.Entries) != 3 {
180		t.Fatalf("exp: 3, got: %d", len(mfest.Entries))
181	}
182
183	var count int
184	for {
185		key, ts, err := r.ReadTimeSpan()
186		if err == io.EOF {
187			break
188		}
189
190		if got, exp := key, "cpu,host=A#!~#value"; got != exp {
191			t.Fatalf("key mismatch: got %v, exp %v", got, exp)
192		}
193
194		for _, tr := range ts.Ranges {
195			if got, exp := tr.Max, int64(2); got != exp {
196				t.Fatalf("min time not filtered: got %v, exp %v", got, exp)
197			}
198		}
199
200		count++
201	}
202
203	if got, exp := count, 1; got != exp {
204		t.Fatalf("count mismatch: got %v, exp %v", got, exp)
205	}
206}
207
208func TestDigest_KeyFilter(t *testing.T) {
209	dir := MustTempDir()
210	dataDir := filepath.Join(dir, "data")
211	if err := os.Mkdir(dataDir, 0755); err != nil {
212		t.Fatalf("create data dir: %v", err)
213	}
214
215	a1 := tsm1.NewValue(1, 1.1)
216	writes := map[string][]tsm1.Value{
217		"cpu,host=A#!~#value": []tsm1.Value{a1},
218	}
219	MustWriteTSM(dir, 1, writes)
220
221	a2 := tsm1.NewValue(2, 2.1)
222	writes = map[string][]tsm1.Value{
223		"cpu,host=B#!~#value": []tsm1.Value{a2},
224	}
225	MustWriteTSM(dir, 2, writes)
226
227	a3 := tsm1.NewValue(3, 3.1)
228	writes = map[string][]tsm1.Value{
229		"cpu,host=C#!~#value": []tsm1.Value{a3},
230	}
231	MustWriteTSM(dir, 3, writes)
232
233	files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", tsm1.TSMFileExtension)))
234	if err != nil {
235		t.Fatal(err)
236	}
237
238	df := MustTempFile(dir)
239
240	if err := tsm1.DigestWithOptions(dir, files, tsm1.DigestOptions{
241		MinKey: []byte("cpu,host=B#!~#value"),
242		MaxKey: []byte("cpu,host=B#!~#value")}, df); err != nil {
243		t.Fatalf("digest error: %v", err)
244	}
245
246	df, err = os.Open(df.Name())
247	if err != nil {
248		t.Fatalf("open error: %v", err)
249	}
250
251	r, err := tsm1.NewDigestReader(df)
252	if err != nil {
253		t.Fatalf("NewDigestReader error: %v", err)
254	}
255	defer r.Close()
256
257	mfest, err := r.ReadManifest()
258	if err != nil {
259		t.Fatal(err)
260	}
261
262	if len(mfest.Entries) != 3 {
263		t.Fatalf("exp: 3, got: %d", len(mfest.Entries))
264	}
265
266	var count int
267	for {
268		key, _, err := r.ReadTimeSpan()
269		if err == io.EOF {
270			break
271		}
272
273		if got, exp := key, "cpu,host=B#!~#value"; got != exp {
274			t.Fatalf("key mismatch: got %v, exp %v", got, exp)
275		}
276
277		count++
278	}
279
280	if got, exp := count, 1; got != exp {
281		t.Fatalf("count mismatch: got %v, exp %v", got, exp)
282	}
283}
284
285func TestDigest_Manifest(t *testing.T) {
286	// Create temp directory to hold test files.
287	dir := MustTempDir()
288	defer os.RemoveAll(dir)
289
290	digestFile := filepath.Join(dir, tsm1.DigestFilename)
291
292	// Create a point to write to the tsm files.
293	a1 := tsm1.NewValue(1, 1.1)
294	writes := map[string][]tsm1.Value{
295		"cpu,host=A#!~#value": []tsm1.Value{a1},
296	}
297
298	// Write a few tsm files.
299	var files []string
300	gen := 1
301	for ; gen < 4; gen++ {
302		name := MustWriteTSM(dir, gen, writes)
303		files = append(files, name)
304	}
305
306	// Generate a manifest.
307	mfest, err := tsm1.NewDigestManifest(dir, files)
308	if err != nil {
309		t.Fatal(err)
310	}
311
312	// Make sure manifest contains only the expected files.
313	var got []string
314	for _, e := range mfest.Entries {
315		got = append(got, e.Filename)
316	}
317
318	sort.StringSlice(files).Sort()
319	sort.StringSlice(got).Sort()
320
321	if !reflect.DeepEqual(files, got) {
322		t.Fatalf("exp: %v, got: %v", files, got)
323	}
324
325	// Write a digest of the files.
326	df := MustCreate(digestFile)
327	if err := tsm1.Digest(dir, files, df); err != nil {
328		t.Fatalf("digest error: %v", err)
329	}
330
331	// Helper func to read manifest from a digest.
332	readManifest := func(name string) *tsm1.DigestManifest {
333		t.Helper()
334
335		df, err = os.Open(df.Name())
336		if err != nil {
337			t.Fatal(err)
338		}
339
340		r, err := tsm1.NewDigestReader(df)
341		if err != nil {
342			t.Fatal(err)
343		}
344
345		mfest, err := r.ReadManifest()
346		if err != nil {
347			t.Fatal(err)
348		}
349
350		if err := r.Close(); err != nil {
351			t.Fatal(err)
352		}
353
354		return mfest
355	}
356
357	// Read the manifest from the digest.
358	mfest2 := readManifest(df.Name())
359
360	// Make sure the manifest read from the digest on disk is correct.
361	if !reflect.DeepEqual(mfest, mfest2) {
362		t.Fatalf("invalid manifest:\nexp: %v\ngot: %v", mfest, mfest2)
363	}
364
365	// Write an extra tsm file that shouldn't be included in the manifest.
366	extra := MustWriteTSM(dir, gen, writes)
367
368	// Re-generate manifest.
369	mfest, err = tsm1.NewDigestManifest(dir, files)
370	if err != nil {
371		t.Fatal(err)
372	}
373
374	// Make sure manifest contains only the expected files.
375	got = got[:0]
376	for _, e := range mfest.Entries {
377		if e.Filename == extra {
378			t.Fatal("extra file in shard directory should not be in digest manifest")
379		}
380		got = append(got, e.Filename)
381	}
382
383	sort.StringSlice(got).Sort()
384
385	if !reflect.DeepEqual(files, got) {
386		t.Fatalf("exp: %v, got: %v", files, got)
387	}
388
389	// Re-generate digest and make sure it does not include the extra tsm file.
390	df = MustCreate(digestFile)
391	if err := tsm1.Digest(dir, files, df); err != nil {
392		t.Fatalf("digest error: %v", err)
393	}
394
395	// Read the manifest from the new digest.
396	mfest2 = readManifest(df.Name())
397
398	// Make sure the manifest read from the digest on disk is correct.
399	if !reflect.DeepEqual(mfest, mfest2) {
400		t.Fatalf("invalid manifest:\nexp: %v\ngot: %v", mfest, mfest2)
401	}
402
403	// Make sure the digest is fresh.
404	digest, err := os.Stat(df.Name())
405	if err != nil {
406		t.Fatal(err)
407	}
408
409	fresh, reason := tsm1.DigestFresh(dir, files, digest.ModTime())
410	if !fresh {
411		t.Fatalf("digest is stale: reason=%s", reason)
412	}
413
414	// Test that digest is stale if shard time is newer than digest time.
415	fresh, _ = tsm1.DigestFresh(dir, files, digest.ModTime().Add(1))
416	if fresh {
417		t.Fatalf("digest is fresh")
418	}
419
420	// Test that digest is stale if a new tsm file has been written by the engine.
421	allfiles := append(files, extra)
422	fresh, _ = tsm1.DigestFresh(dir, allfiles, digest.ModTime())
423	if fresh {
424		t.Fatalf("digest is fresh")
425	}
426
427	// Open one of the tsm files and write data to it.
428	f, err := os.OpenFile(files[0], os.O_WRONLY|os.O_APPEND, 0666)
429	if err != nil {
430		t.Fatal(err)
431	}
432
433	if _, err := f.WriteString("some data"); err != nil {
434		t.Fatal(err)
435	}
436
437	if err := f.Close(); err != nil {
438		t.Fatal(err)
439	}
440
441	// Test that digest is stale if a tsm file is changed.
442	fresh, _ = tsm1.DigestFresh(dir, files, digest.ModTime())
443	if fresh {
444		t.Fatalf("digest is fresh")
445	}
446
447	// Delete a tsm file.
448	if err := os.Remove(files[0]); err != nil {
449		t.Fatal(err)
450	}
451
452	// Test that digest is stale if a tsm file is missing on disk.
453	fresh, _ = tsm1.DigestFresh(dir, files, digest.ModTime())
454	if fresh {
455		t.Fatalf("digest is fresh")
456	}
457
458	// Delete the entire shard directory
459	if err := os.RemoveAll(dir); err != nil {
460		t.Fatal(err)
461	}
462
463	// Test that digest is stale if the entire shard directory is missing.
464	fresh, _ = tsm1.DigestFresh(dir, files, digest.ModTime())
465	if fresh {
466		t.Fatalf("digest is fresh")
467	}
468}
469
470func MustCreate(path string) *os.File {
471	f, err := os.Create(path)
472	if err != nil {
473		panic(err)
474	}
475	return f
476}
477