1package reads_test
2
3import (
4	"context"
5	"strings"
6	"testing"
7
8	"github.com/google/go-cmp/cmp"
9	"github.com/influxdata/influxdb/models"
10	"github.com/influxdata/influxdb/pkg/data/gen"
11	"github.com/influxdata/influxdb/storage/reads"
12	"github.com/influxdata/influxdb/storage/reads/datatypes"
13)
14
15func TestGroupGroupResultSetSorting(t *testing.T) {
16	tests := []struct {
17		name  string
18		cur   reads.SeriesCursor
19		group datatypes.ReadRequest_Group
20		keys  []string
21		exp   string
22	}{
23		{
24			name: "group by tag1 in all series",
25			cur: &sliceSeriesCursor{
26				rows: newSeriesRows(
27					"cpu,tag0=val00,tag1=val10",
28					"cpu,tag0=val00,tag1=val11",
29					"cpu,tag0=val00,tag1=val12",
30					"cpu,tag0=val01,tag1=val10",
31					"cpu,tag0=val01,tag1=val11",
32					"cpu,tag0=val01,tag1=val12",
33				)},
34			group: datatypes.GroupBy,
35			keys:  []string{"tag1"},
36			exp: `group:
37  tag key      : _m,tag0,tag1
38  partition key: val10
39    series: _m=cpu,tag0=val00,tag1=val10
40    series: _m=cpu,tag0=val01,tag1=val10
41group:
42  tag key      : _m,tag0,tag1
43  partition key: val11
44    series: _m=cpu,tag0=val00,tag1=val11
45    series: _m=cpu,tag0=val01,tag1=val11
46group:
47  tag key      : _m,tag0,tag1
48  partition key: val12
49    series: _m=cpu,tag0=val00,tag1=val12
50    series: _m=cpu,tag0=val01,tag1=val12
51`,
52		},
53		{
54			name: "group by tag1 in partial series",
55			cur: &sliceSeriesCursor{
56				rows: newSeriesRows(
57					"aaa,tag0=val00",
58					"aaa,tag0=val01",
59					"cpu,tag0=val00,tag1=val10",
60					"cpu,tag0=val00,tag1=val11",
61					"cpu,tag0=val00,tag1=val12",
62					"cpu,tag0=val01,tag1=val10",
63					"cpu,tag0=val01,tag1=val11",
64					"cpu,tag0=val01,tag1=val12",
65				)},
66			group: datatypes.GroupBy,
67			keys:  []string{"tag1"},
68			exp: `group:
69  tag key      : _m,tag0,tag1
70  partition key: val10
71    series: _m=cpu,tag0=val00,tag1=val10
72    series: _m=cpu,tag0=val01,tag1=val10
73group:
74  tag key      : _m,tag0,tag1
75  partition key: val11
76    series: _m=cpu,tag0=val01,tag1=val11
77    series: _m=cpu,tag0=val00,tag1=val11
78group:
79  tag key      : _m,tag0,tag1
80  partition key: val12
81    series: _m=cpu,tag0=val01,tag1=val12
82    series: _m=cpu,tag0=val00,tag1=val12
83group:
84  tag key      : _m,tag0
85  partition key: <nil>
86    series: _m=aaa,tag0=val00
87    series: _m=aaa,tag0=val01
88`,
89		},
90		{
91			name: "group by tag2,tag1 with partial series",
92			cur: &sliceSeriesCursor{
93				rows: newSeriesRows(
94					"aaa,tag0=val00",
95					"aaa,tag0=val01",
96					"cpu,tag0=val00,tag1=val10",
97					"cpu,tag0=val00,tag1=val11",
98					"cpu,tag0=val00,tag1=val12",
99					"mem,tag1=val10,tag2=val20",
100					"mem,tag1=val11,tag2=val20",
101					"mem,tag1=val11,tag2=val21",
102				)},
103			group: datatypes.GroupBy,
104			keys:  []string{"tag2", "tag1"},
105			exp: `group:
106  tag key      : _m,tag1,tag2
107  partition key: val20,val10
108    series: _m=mem,tag1=val10,tag2=val20
109group:
110  tag key      : _m,tag1,tag2
111  partition key: val20,val11
112    series: _m=mem,tag1=val11,tag2=val20
113group:
114  tag key      : _m,tag1,tag2
115  partition key: val21,val11
116    series: _m=mem,tag1=val11,tag2=val21
117group:
118  tag key      : _m,tag0,tag1
119  partition key: <nil>,val10
120    series: _m=cpu,tag0=val00,tag1=val10
121group:
122  tag key      : _m,tag0,tag1
123  partition key: <nil>,val11
124    series: _m=cpu,tag0=val00,tag1=val11
125group:
126  tag key      : _m,tag0,tag1
127  partition key: <nil>,val12
128    series: _m=cpu,tag0=val00,tag1=val12
129group:
130  tag key      : _m,tag0
131  partition key: <nil>,<nil>
132    series: _m=aaa,tag0=val00
133    series: _m=aaa,tag0=val01
134`,
135		},
136		{
137			name: "group by tag0,tag2 with partial series",
138			cur: &sliceSeriesCursor{
139				rows: newSeriesRows(
140					"aaa,tag0=val00",
141					"aaa,tag0=val01",
142					"cpu,tag0=val00,tag1=val10",
143					"cpu,tag0=val00,tag1=val11",
144					"cpu,tag0=val00,tag1=val12",
145					"mem,tag1=val10,tag2=val20",
146					"mem,tag1=val11,tag2=val20",
147					"mem,tag1=val11,tag2=val21",
148				)},
149			group: datatypes.GroupBy,
150			keys:  []string{"tag0", "tag2"},
151			exp: `group:
152  tag key      : _m,tag0,tag1
153  partition key: val00,<nil>
154    series: _m=aaa,tag0=val00
155    series: _m=cpu,tag0=val00,tag1=val10
156    series: _m=cpu,tag0=val00,tag1=val11
157    series: _m=cpu,tag0=val00,tag1=val12
158group:
159  tag key      : _m,tag0
160  partition key: val01,<nil>
161    series: _m=aaa,tag0=val01
162group:
163  tag key      : _m,tag1,tag2
164  partition key: <nil>,val20
165    series: _m=mem,tag1=val10,tag2=val20
166    series: _m=mem,tag1=val11,tag2=val20
167group:
168  tag key      : _m,tag1,tag2
169  partition key: <nil>,val21
170    series: _m=mem,tag1=val11,tag2=val21
171`,
172		},
173	}
174
175	for _, tt := range tests {
176		t.Run(tt.name, func(t *testing.T) {
177
178			newCursor := func() (reads.SeriesCursor, error) {
179				return tt.cur, nil
180			}
181
182			var hints datatypes.HintFlags
183			hints.SetHintSchemaAllTime()
184			rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: tt.group, GroupKeys: tt.keys, Hints: hints}, newCursor)
185
186			sb := new(strings.Builder)
187			GroupResultSetToString(sb, rs, SkipNilCursor())
188
189			if got := sb.String(); !cmp.Equal(got, tt.exp) {
190				t.Errorf("unexpected value; -got/+exp\n%s", cmp.Diff(strings.Split(got, "\n"), strings.Split(tt.exp, "\n")))
191			}
192		})
193	}
194}
195
196func TestNewGroupResultSet_GroupNone_NoDataReturnsNil(t *testing.T) {
197	newCursor := func() (reads.SeriesCursor, error) {
198		return &sliceSeriesCursor{
199			rows: newSeriesRows(
200				"aaa,tag0=val00",
201				"aaa,tag0=val01",
202			)}, nil
203	}
204
205	rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupNone}, newCursor)
206	if rs != nil {
207		t.Errorf("expected nil cursor")
208	}
209}
210
211func TestNewGroupResultSet_GroupBy_NoDataReturnsNil(t *testing.T) {
212	newCursor := func() (reads.SeriesCursor, error) {
213		return &sliceSeriesCursor{
214			rows: newSeriesRows(
215				"aaa,tag0=val00",
216				"aaa,tag0=val01",
217			)}, nil
218	}
219
220	rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupBy, GroupKeys: []string{"tag0"}}, newCursor)
221	if rs != nil {
222		t.Errorf("expected nil cursor")
223	}
224}
225
226func TestNewGroupResultSet_Sorting(t *testing.T) {
227	tests := []struct {
228		name string
229		keys []string
230		opts []reads.GroupOption
231		exp  string
232	}{
233		{
234			name: "nil hi",
235			keys: []string{"tag0", "tag2"},
236			exp: `group:
237  tag key      : _m,tag0,tag1
238  partition key: val00,<nil>
239    series: _m=aaa,tag0=val00
240    series: _m=cpu,tag0=val00,tag1=val10
241    series: _m=cpu,tag0=val00,tag1=val11
242    series: _m=cpu,tag0=val00,tag1=val12
243group:
244  tag key      : _m,tag0
245  partition key: val01,<nil>
246    series: _m=aaa,tag0=val01
247group:
248  tag key      : _m,tag1,tag2
249  partition key: <nil>,val20
250    series: _m=mem,tag1=val10,tag2=val20
251    series: _m=mem,tag1=val11,tag2=val20
252group:
253  tag key      : _m,tag1,tag2
254  partition key: <nil>,val21
255    series: _m=mem,tag1=val11,tag2=val21
256`,
257		},
258		{
259			name: "nil lo",
260			keys: []string{"tag0", "tag2"},
261			opts: []reads.GroupOption{reads.GroupOptionNilSortLo()},
262			exp: `group:
263  tag key      : _m,tag1,tag2
264  partition key: <nil>,val20
265    series: _m=mem,tag1=val11,tag2=val20
266    series: _m=mem,tag1=val10,tag2=val20
267group:
268  tag key      : _m,tag1,tag2
269  partition key: <nil>,val21
270    series: _m=mem,tag1=val11,tag2=val21
271group:
272  tag key      : _m,tag0,tag1
273  partition key: val00,<nil>
274    series: _m=cpu,tag0=val00,tag1=val10
275    series: _m=cpu,tag0=val00,tag1=val11
276    series: _m=cpu,tag0=val00,tag1=val12
277    series: _m=aaa,tag0=val00
278group:
279  tag key      : _m,tag0
280  partition key: val01,<nil>
281    series: _m=aaa,tag0=val01
282`,
283		},
284	}
285	for _, tt := range tests {
286		t.Run(tt.name, func(t *testing.T) {
287			newCursor := func() (reads.SeriesCursor, error) {
288				return &sliceSeriesCursor{
289					rows: newSeriesRows(
290						"aaa,tag0=val00",
291						"aaa,tag0=val01",
292						"cpu,tag0=val00,tag1=val10",
293						"cpu,tag0=val00,tag1=val11",
294						"cpu,tag0=val00,tag1=val12",
295						"mem,tag1=val10,tag2=val20",
296						"mem,tag1=val11,tag2=val20",
297						"mem,tag1=val11,tag2=val21",
298					)}, nil
299			}
300
301			var hints datatypes.HintFlags
302			hints.SetHintSchemaAllTime()
303			rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupBy, GroupKeys: tt.keys, Hints: hints}, newCursor, tt.opts...)
304
305			sb := new(strings.Builder)
306			GroupResultSetToString(sb, rs, SkipNilCursor())
307
308			if got := sb.String(); !cmp.Equal(got, tt.exp) {
309				t.Errorf("unexpected value; -got/+exp\n%s", cmp.Diff(strings.Split(got, "\n"), strings.Split(tt.exp, "\n")))
310			}
311		})
312	}
313}
314
315type sliceSeriesCursor struct {
316	rows []reads.SeriesRow
317	i    int
318}
319
320func newSeriesRows(keys ...string) []reads.SeriesRow {
321	rows := make([]reads.SeriesRow, len(keys))
322	for i := range keys {
323		rows[i].Name, rows[i].SeriesTags = models.ParseKeyBytes([]byte(keys[i]))
324		rows[i].Tags = rows[i].SeriesTags.Clone()
325		rows[i].Tags.Set([]byte("_m"), rows[i].Name)
326	}
327	return rows
328}
329
330func (s *sliceSeriesCursor) Close()     {}
331func (s *sliceSeriesCursor) Err() error { return nil }
332
333func (s *sliceSeriesCursor) Next() *reads.SeriesRow {
334	if s.i < len(s.rows) {
335		s.i++
336		return &s.rows[s.i-1]
337	}
338	return nil
339}
340
341func BenchmarkNewGroupResultSet_GroupBy(b *testing.B) {
342	card := []int{10, 10, 10}
343	vals := make([]gen.CountableSequence, len(card))
344	for i := range card {
345		vals[i] = gen.NewCounterByteSequenceCount(card[i])
346	}
347
348	tags := gen.NewTagsValuesSequenceValues("tag", vals)
349	rows := make([]reads.SeriesRow, tags.Count())
350	for i := range rows {
351		tags.Next()
352		t := tags.Value().Clone()
353		rows[i].SeriesTags = t
354		rows[i].Tags = t
355		rows[i].Name = []byte("m0")
356	}
357
358	cur := &sliceSeriesCursor{rows: rows}
359	newCursor := func() (reads.SeriesCursor, error) {
360		cur.i = 0
361		return cur, nil
362	}
363
364	b.ResetTimer()
365	b.ReportAllocs()
366	for i := 0; i < b.N; i++ {
367		var hints datatypes.HintFlags
368		hints.SetHintSchemaAllTime()
369		rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupBy, GroupKeys: []string{"tag2"}, Hints: hints}, newCursor)
370		rs.Close()
371	}
372}
373