1package tsi1_test
2
3import (
4	"fmt"
5	"reflect"
6	"sort"
7	"testing"
8
9	"github.com/influxdata/influxdb/models"
10	"github.com/influxdata/influxdb/tsdb"
11)
12
13// Ensure fileset can return an iterator over all series in the index.
14func TestFileSet_SeriesIDIterator(t *testing.T) {
15	idx := MustOpenIndex(1)
16	defer idx.Close()
17
18	// Create initial set of series.
19	if err := idx.CreateSeriesSliceIfNotExists([]Series{
20		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
21		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
22		{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
23	}); err != nil {
24		t.Fatal(err)
25	}
26
27	// Verify initial set of series.
28	idx.Run(t, func(t *testing.T) {
29		fs, err := idx.PartitionAt(0).RetainFileSet()
30		if err != nil {
31			t.Fatal(err)
32		}
33		defer fs.Release()
34
35		itr := fs.SeriesFile().SeriesIDIterator()
36		if itr == nil {
37			t.Fatal("expected iterator")
38		}
39		if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
40			"cpu,[{region east}]",
41			"cpu,[{region west}]",
42			"mem,[{region east}]",
43		}) {
44			t.Fatalf("unexpected keys: %s", result)
45		}
46	})
47
48	// Add more series.
49	if err := idx.CreateSeriesSliceIfNotExists([]Series{
50		{Name: []byte("disk")},
51		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})},
52		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
53	}); err != nil {
54		t.Fatal(err)
55	}
56
57	// Verify additional series.
58	idx.Run(t, func(t *testing.T) {
59		fs, err := idx.PartitionAt(0).RetainFileSet()
60		if err != nil {
61			t.Fatal(err)
62		}
63		defer fs.Release()
64
65		itr := fs.SeriesFile().SeriesIDIterator()
66		if itr == nil {
67			t.Fatal("expected iterator")
68		}
69
70		if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
71			"cpu,[{region east}]",
72			"cpu,[{region north}]",
73			"cpu,[{region west}]",
74			"disk,[]",
75			"mem,[{region east}]",
76		}) {
77			t.Fatalf("unexpected keys: %s", result)
78		}
79	})
80}
81
82// Ensure fileset can return an iterator over all series for one measurement.
83func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
84	idx := MustOpenIndex(1)
85	defer idx.Close()
86
87	// Create initial set of series.
88	if err := idx.CreateSeriesSliceIfNotExists([]Series{
89		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
90		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
91		{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
92	}); err != nil {
93		t.Fatal(err)
94	}
95
96	// Verify initial set of series.
97	idx.Run(t, func(t *testing.T) {
98		fs, err := idx.PartitionAt(0).RetainFileSet()
99		if err != nil {
100			t.Fatal(err)
101		}
102		defer fs.Release()
103
104		itr := fs.MeasurementSeriesIDIterator([]byte("cpu"))
105		if itr == nil {
106			t.Fatal("expected iterator")
107		}
108
109		if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
110			"cpu,[{region east}]",
111			"cpu,[{region west}]",
112		}) {
113			t.Fatalf("unexpected keys: %s", result)
114		}
115	})
116
117	// Add more series.
118	if err := idx.CreateSeriesSliceIfNotExists([]Series{
119		{Name: []byte("disk")},
120		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})},
121	}); err != nil {
122		t.Fatal(err)
123	}
124
125	// Verify additional series.
126	idx.Run(t, func(t *testing.T) {
127		fs, err := idx.PartitionAt(0).RetainFileSet()
128		if err != nil {
129			t.Fatal(err)
130		}
131		defer fs.Release()
132
133		itr := fs.MeasurementSeriesIDIterator([]byte("cpu"))
134		if itr == nil {
135			t.Fatalf("expected iterator")
136		}
137
138		if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
139			"cpu,[{region east}]",
140			"cpu,[{region north}]",
141			"cpu,[{region west}]",
142		}) {
143			t.Fatalf("unexpected keys: %s", result)
144		}
145	})
146}
147
148// Ensure fileset can return an iterator over all measurements for the index.
149func TestFileSet_MeasurementIterator(t *testing.T) {
150	idx := MustOpenIndex(1)
151	defer idx.Close()
152
153	// Create initial set of series.
154	if err := idx.CreateSeriesSliceIfNotExists([]Series{
155		{Name: []byte("cpu")},
156		{Name: []byte("mem")},
157	}); err != nil {
158		t.Fatal(err)
159	}
160
161	// Verify initial set of series.
162	idx.Run(t, func(t *testing.T) {
163		fs, err := idx.PartitionAt(0).RetainFileSet()
164		if err != nil {
165			t.Fatal(err)
166		}
167		defer fs.Release()
168
169		itr := fs.MeasurementIterator()
170		if itr == nil {
171			t.Fatal("expected iterator")
172		}
173
174		expectedNames := []string{"cpu", "mem", ""} // Empty string implies end
175		for _, name := range expectedNames {
176			e := itr.Next()
177			if name == "" && e != nil {
178				t.Errorf("got measurement %s, expected nil measurement", e.Name())
179			} else if e == nil && name != "" {
180				t.Errorf("got nil measurement, expected %s", name)
181			} else if e != nil && string(e.Name()) != name {
182				t.Errorf("got measurement %s, expected %s", e.Name(), name)
183			}
184		}
185	})
186
187	// Add more series.
188	if err := idx.CreateSeriesSliceIfNotExists([]Series{
189		{Name: []byte("disk"), Tags: models.NewTags(map[string]string{"foo": "bar"})},
190		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north", "x": "y"})},
191	}); err != nil {
192		t.Fatal(err)
193	}
194
195	// Verify additional series.
196	idx.Run(t, func(t *testing.T) {
197		fs, err := idx.PartitionAt(0).RetainFileSet()
198		if err != nil {
199			t.Fatal(err)
200		}
201		defer fs.Release()
202
203		itr := fs.MeasurementIterator()
204		if itr == nil {
205			t.Fatal("expected iterator")
206		}
207
208		expectedNames := []string{"cpu", "disk", "mem", ""} // Empty string implies end
209		for _, name := range expectedNames {
210			e := itr.Next()
211			if name == "" && e != nil {
212				t.Errorf("got measurement %s, expected nil measurement", e.Name())
213			} else if e == nil && name != "" {
214				t.Errorf("got nil measurement, expected %s", name)
215			} else if e != nil && string(e.Name()) != name {
216				t.Errorf("got measurement %s, expected %s", e.Name(), name)
217			}
218		}
219	})
220}
221
222// Ensure fileset can return an iterator over all keys for one measurement.
223func TestFileSet_TagKeyIterator(t *testing.T) {
224	idx := MustOpenIndex(1)
225	defer idx.Close()
226
227	// Create initial set of series.
228	if err := idx.CreateSeriesSliceIfNotExists([]Series{
229		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
230		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west", "type": "gpu"})},
231		{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east", "misc": "other"})},
232	}); err != nil {
233		t.Fatal(err)
234	}
235
236	// Verify initial set of series.
237	idx.Run(t, func(t *testing.T) {
238		fs, err := idx.PartitionAt(0).RetainFileSet()
239		if err != nil {
240			t.Fatal(err)
241		}
242		defer fs.Release()
243
244		itr := fs.TagKeyIterator([]byte("cpu"))
245		if itr == nil {
246			t.Fatalf("expected iterator")
247		}
248
249		if e := itr.Next(); string(e.Key()) != `region` {
250			t.Fatalf("unexpected key: %s", e.Key())
251		} else if e := itr.Next(); string(e.Key()) != `type` {
252			t.Fatalf("unexpected key: %s", e.Key())
253		} else if e := itr.Next(); e != nil {
254			t.Fatalf("expected nil key: %s", e.Key())
255		}
256	})
257
258	// Add more series.
259	if err := idx.CreateSeriesSliceIfNotExists([]Series{
260		{Name: []byte("disk"), Tags: models.NewTags(map[string]string{"foo": "bar"})},
261		{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north", "x": "y"})},
262	}); err != nil {
263		t.Fatal(err)
264	}
265
266	// Verify additional series.
267	idx.Run(t, func(t *testing.T) {
268		fs, err := idx.PartitionAt(0).RetainFileSet()
269		if err != nil {
270			t.Fatal(err)
271		}
272		defer fs.Release()
273
274		itr := fs.TagKeyIterator([]byte("cpu"))
275		if itr == nil {
276			t.Fatal("expected iterator")
277		}
278
279		if e := itr.Next(); string(e.Key()) != `region` {
280			t.Fatalf("unexpected key: %s", e.Key())
281		} else if e := itr.Next(); string(e.Key()) != `type` {
282			t.Fatalf("unexpected key: %s", e.Key())
283		} else if e := itr.Next(); string(e.Key()) != `x` {
284			t.Fatalf("unexpected key: %s", e.Key())
285		} else if e := itr.Next(); e != nil {
286			t.Fatalf("expected nil key: %s", e.Key())
287		}
288	})
289}
290
291func MustReadAllSeriesIDIteratorString(sfile *tsdb.SeriesFile, itr tsdb.SeriesIDIterator) []string {
292	// Read all ids.
293	ids, err := tsdb.ReadAllSeriesIDIterator(itr)
294	if err != nil {
295		panic(err)
296	}
297
298	// Convert to keys and sort.
299	keys := sfile.SeriesKeys(ids)
300	sort.Slice(keys, func(i, j int) bool { return tsdb.CompareSeriesKeys(keys[i], keys[j]) == -1 })
301
302	// Convert to strings.
303	a := make([]string, len(keys))
304	for i := range a {
305		name, tags := tsdb.ParseSeriesKey(keys[i])
306		a[i] = fmt.Sprintf("%s,%s", name, tags.String())
307	}
308	return a
309}
310