1package inmem
2
3import (
4	"fmt"
5	"math/rand"
6	"strings"
7	"sync"
8	"testing"
9	"time"
10
11	"github.com/influxdata/influxdb/models"
12	"github.com/influxdata/influxdb/query"
13	"github.com/influxdata/influxdb/tsdb"
14	"github.com/influxdata/influxql"
15)
16
17// Test comparing SeriesIDs for equality.
18func TestSeriesIDs_Equals(t *testing.T) {
19	ids1 := seriesIDs([]uint64{1, 2, 3})
20	ids2 := seriesIDs([]uint64{1, 2, 3})
21	ids3 := seriesIDs([]uint64{4, 5, 6})
22
23	if !ids1.Equals(ids2) {
24		t.Fatal("expected ids1 == ids2")
25	} else if ids1.Equals(ids3) {
26		t.Fatal("expected ids1 != ids3")
27	}
28}
29
30// Test intersecting sets of SeriesIDs.
31func TestSeriesIDs_Intersect(t *testing.T) {
32	// Test swapping l & r, all branches of if-else, and exit loop when 'j < len(r)'
33	ids1 := seriesIDs([]uint64{1, 3, 4, 5, 6})
34	ids2 := seriesIDs([]uint64{1, 2, 3, 7})
35	exp := seriesIDs([]uint64{1, 3})
36	got := ids1.Intersect(ids2)
37
38	if !exp.Equals(got) {
39		t.Fatalf("exp=%v, got=%v", exp, got)
40	}
41
42	// Test exit for loop when 'i < len(l)'
43	ids1 = seriesIDs([]uint64{1})
44	ids2 = seriesIDs([]uint64{1, 2})
45	exp = seriesIDs([]uint64{1})
46	got = ids1.Intersect(ids2)
47
48	if !exp.Equals(got) {
49		t.Fatalf("exp=%v, got=%v", exp, got)
50	}
51}
52
53// Test union sets of SeriesIDs.
54func TestSeriesIDs_Union(t *testing.T) {
55	// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
56	ids1 := seriesIDs([]uint64{1, 2, 3, 7})
57	ids2 := seriesIDs([]uint64{1, 3, 4, 5, 6})
58	exp := seriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7})
59	got := ids1.Union(ids2)
60
61	if !exp.Equals(got) {
62		t.Fatalf("exp=%v, got=%v", exp, got)
63	}
64
65	// Test exit because of 'i < len(l)' and append remainder from right.
66	ids1 = seriesIDs([]uint64{1})
67	ids2 = seriesIDs([]uint64{1, 2})
68	exp = seriesIDs([]uint64{1, 2})
69	got = ids1.Union(ids2)
70
71	if !exp.Equals(got) {
72		t.Fatalf("exp=%v, got=%v", exp, got)
73	}
74}
75
76// Test removing one set of SeriesIDs from another.
77func TestSeriesIDs_Reject(t *testing.T) {
78	// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
79	ids1 := seriesIDs([]uint64{1, 2, 3, 7})
80	ids2 := seriesIDs([]uint64{1, 3, 4, 5, 6})
81	exp := seriesIDs([]uint64{2, 7})
82	got := ids1.Reject(ids2)
83
84	if !exp.Equals(got) {
85		t.Fatalf("exp=%v, got=%v", exp, got)
86	}
87
88	// Test exit because of 'i < len(l)'.
89	ids1 = seriesIDs([]uint64{1})
90	ids2 = seriesIDs([]uint64{1, 2})
91	exp = seriesIDs{}
92	got = ids1.Reject(ids2)
93
94	if !exp.Equals(got) {
95		t.Fatalf("exp=%v, got=%v", exp, got)
96	}
97}
98
99func TestMeasurement_AddSeries_Nil(t *testing.T) {
100	m := newMeasurement("foo", "cpu")
101	if m.AddSeries(nil) {
102		t.Fatalf("AddSeries mismatch: exp false, got true")
103	}
104}
105
106func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) {
107	m := newMeasurement("foo", "cpu")
108	var dst []string
109	dst = m.AppendSeriesKeysByID(dst, []uint64{1})
110	if exp, got := 0, len(dst); exp != got {
111		t.Fatalf("series len mismatch: exp %v, got %v", exp, got)
112	}
113}
114
115func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) {
116	m := newMeasurement("foo", "cpu")
117	s := newSeries(1, m, "cpu,host=foo", models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
118	m.AddSeries(s)
119
120	var dst []string
121	dst = m.AppendSeriesKeysByID(dst, []uint64{1})
122	if exp, got := 1, len(dst); exp != got {
123		t.Fatalf("series len mismatch: exp %v, got %v", exp, got)
124	}
125
126	if exp, got := "cpu,host=foo", dst[0]; exp != got {
127		t.Fatalf("series mismatch: exp %v, got %v", exp, got)
128	}
129}
130
131func TestMeasurement_TagsSet_Deadlock(t *testing.T) {
132	m := newMeasurement("foo", "cpu")
133	s1 := newSeries(1, m, "cpu,host=foo", models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
134	m.AddSeries(s1)
135
136	s2 := newSeries(2, m, "cpu,host=bar", models.Tags{models.NewTag([]byte("host"), []byte("bar"))})
137	m.AddSeries(s2)
138
139	m.DropSeries(s1)
140
141	// This was deadlocking
142	s := tsdb.NewSeriesIDSet()
143	s.Add(1)
144	m.TagSets(s, query.IteratorOptions{})
145	if got, exp := len(m.SeriesIDs()), 1; got != exp {
146		t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
147	}
148}
149
150// Ensures the tagKeyValue API contains no deadlocks or sync issues.
151func TestTagKeyValue_Concurrent(t *testing.T) {
152	var wg sync.WaitGroup
153	done := make(chan struct{})
154	time.AfterFunc(2*time.Second, func() { close(done) })
155
156	v := newTagKeyValue()
157	for i := 0; i < 4; i++ {
158		wg.Add(1)
159		go func(i int) {
160			defer wg.Done()
161
162			rand := rand.New(rand.NewSource(int64(i)))
163			for {
164				// Continue running until time limit.
165				select {
166				case <-done:
167					return
168				default:
169				}
170
171				// Randomly choose next API.
172				switch rand.Intn(7) {
173				case 0:
174					v.bytes()
175				case 1:
176					v.Cardinality()
177				case 2:
178					v.Contains(string([]rune{rune(rand.Intn(52) + 65)}))
179				case 3:
180					v.InsertSeriesIDByte([]byte(string([]rune{rune(rand.Intn(52) + 65)})), rand.Uint64()%1000)
181				case 4:
182					v.Load(string([]rune{rune(rand.Intn(52) + 65)}))
183				case 5:
184					v.Range(func(tagValue string, a seriesIDs) bool {
185						return rand.Intn(10) == 0
186					})
187				case 6:
188					v.RangeAll(func(k string, a seriesIDs) {})
189				}
190			}
191		}(i)
192	}
193	wg.Wait()
194}
195
196func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) {
197	m := newMeasurement("foo", "cpu")
198	for i := 0; i < 100000; i++ {
199		s := newSeries(uint64(i), m, "cpu", models.Tags{models.NewTag(
200			[]byte("host"),
201			[]byte(fmt.Sprintf("host%d", i)))})
202		m.AddSeries(s)
203	}
204
205	if exp, got := 100000, len(m.SeriesKeys()); exp != got {
206		b.Fatalf("series count mismatch: exp %v got %v", exp, got)
207	}
208
209	stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host =~ /host\d+/`)).ParseStatement()
210	if err != nil {
211		b.Fatalf("invalid statement: %s", err)
212	}
213
214	selectStmt := stmt.(*influxql.SelectStatement)
215
216	b.ResetTimer()
217	for i := 0; i < b.N; i++ {
218		ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr))
219		if exp, got := 100000, len(ids); exp != got {
220			b.Fatalf("series count mismatch: exp %v got %v", exp, got)
221		}
222
223	}
224}
225
226func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
227	m := newMeasurement("foo", "cpu")
228	for i := 0; i < 100000; i++ {
229		s := newSeries(uint64(i), m, "cpu", models.Tags{models.Tag{
230			Key:   []byte("host"),
231			Value: []byte(fmt.Sprintf("host%d", i))}})
232		m.AddSeries(s)
233	}
234
235	if exp, got := 100000, len(m.SeriesKeys()); exp != got {
236		b.Fatalf("series count mismatch: exp %v got %v", exp, got)
237	}
238
239	stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host !~ /foo\d+/`)).ParseStatement()
240	if err != nil {
241		b.Fatalf("invalid statement: %s", err)
242	}
243
244	selectStmt := stmt.(*influxql.SelectStatement)
245
246	b.ResetTimer()
247	for i := 0; i < b.N; i++ {
248		ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr))
249		if exp, got := 100000, len(ids); exp != got {
250			b.Fatalf("series count mismatch: exp %v got %v", exp, got)
251		}
252
253	}
254
255}
256
257func benchmarkTagSets(b *testing.B, n int, opt query.IteratorOptions) {
258	m := newMeasurement("foo", "m")
259	ss := tsdb.NewSeriesIDSet()
260
261	for i := 0; i < n; i++ {
262		tags := map[string]string{"tag1": "value1", "tag2": "value2"}
263		s := newSeries(uint64(i), m, "m,tag1=value1,tag2=value2", models.NewTags(tags))
264		ss.Add(uint64(i))
265		m.AddSeries(s)
266	}
267
268	// warm caches
269	m.TagSets(ss, opt)
270
271	b.ReportAllocs()
272	b.ResetTimer()
273	for i := 0; i < b.N; i++ {
274		m.TagSets(ss, opt)
275	}
276}
277
278func BenchmarkMeasurement_TagSetsNoDimensions_1000(b *testing.B) {
279	benchmarkTagSets(b, 1000, query.IteratorOptions{})
280}
281
282func BenchmarkMeasurement_TagSetsDimensions_1000(b *testing.B) {
283	benchmarkTagSets(b, 1000, query.IteratorOptions{Dimensions: []string{"tag1", "tag2"}})
284}
285
286func BenchmarkMeasurement_TagSetsNoDimensions_100000(b *testing.B) {
287	benchmarkTagSets(b, 100000, query.IteratorOptions{})
288}
289
290func BenchmarkMeasurement_TagSetsDimensions_100000(b *testing.B) {
291	benchmarkTagSets(b, 100000, query.IteratorOptions{Dimensions: []string{"tag1", "tag2"}})
292}
293