1package binary_test
2
3import (
4	"bytes"
5	"fmt"
6	"math"
7	"testing"
8	"time"
9
10	"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary"
11	"github.com/influxdata/influxdb/models"
12	"github.com/influxdata/influxdb/tsdb"
13	"github.com/influxdata/influxql"
14)
15
16func TestReader_OneBucketOneIntegerSeries(t *testing.T) {
17	var buf bytes.Buffer
18	ts := []int64{0, 1, 2}
19	ints := []int64{10, 11, 12}
20	vs := make([]interface{}, len(ints))
21	for i, v := range ints {
22		vs[i] = v
23	}
24	s := &oneSeriesData{
25		db:          "database",
26		rp:          "default",
27		sd:          time.Hour * 24,
28		start:       int64(0),
29		end:         int64(time.Hour * 24),
30		seriesName:  []byte("series"),
31		seriesField: []byte("field"),
32		seriesTags:  models.NewTags(map[string]string{"k": "v"}),
33		fieldType:   binary.IntegerFieldType,
34		ts:          ts,
35		vs:          vs,
36	}
37
38	w := binary.NewWriter(&buf, s.db, s.rp, s.sd)
39	bw, _ := w.NewBucket(s.start, s.end)
40	bw.BeginSeries(s.seriesName, s.seriesField, influxql.Integer, s.seriesTags)
41	bw.WriteIntegerCursor(&intCursor{1, s.ts, ints})
42	bw.EndSeries()
43	bw.Close()
44	w.Close()
45
46	verifySingleSeries(t, buf, s)
47}
48
49func TestReader_OneBucketOneFloatSeries(t *testing.T) {
50	var buf bytes.Buffer
51	ts := []int64{0, 1, 2}
52	floats := []float64{0.1, 11.1, 1200.0}
53	vs := make([]interface{}, len(floats))
54	for i, v := range floats {
55		vs[i] = v
56	}
57	s := &oneSeriesData{
58		db:          "database",
59		rp:          "default",
60		sd:          time.Hour * 24,
61		start:       int64(0),
62		end:         int64(time.Hour * 24),
63		seriesName:  []byte("series"),
64		seriesField: []byte("field"),
65		seriesTags:  models.NewTags(map[string]string{"k": "v"}),
66		fieldType:   binary.FloatFieldType,
67		ts:          ts,
68		vs:          vs,
69	}
70
71	w := binary.NewWriter(&buf, s.db, s.rp, s.sd)
72	bw, _ := w.NewBucket(s.start, s.end)
73	bw.BeginSeries(s.seriesName, s.seriesField, influxql.Float, s.seriesTags)
74	bw.WriteFloatCursor(&floatCursor{1, s.ts, floats})
75	bw.EndSeries()
76	bw.Close()
77	w.Close()
78
79	verifySingleSeries(t, buf, s)
80}
81
82func TestReader_OneBucketOneUnsignedSeries(t *testing.T) {
83	var buf bytes.Buffer
84	ts := []int64{0, 1, 2}
85	uints := []uint64{0, 1, math.MaxUint64}
86	vs := make([]interface{}, len(uints))
87	for i, v := range uints {
88		vs[i] = v
89	}
90	s := &oneSeriesData{
91		db:          "database",
92		rp:          "default",
93		sd:          time.Hour * 24,
94		start:       int64(0),
95		end:         int64(time.Hour * 24),
96		seriesName:  []byte("series"),
97		seriesField: []byte("field"),
98		seriesTags:  models.NewTags(map[string]string{"k": "v"}),
99		fieldType:   binary.UnsignedFieldType,
100		ts:          ts,
101		vs:          vs,
102	}
103
104	w := binary.NewWriter(&buf, s.db, s.rp, s.sd)
105	bw, _ := w.NewBucket(s.start, s.end)
106	bw.BeginSeries(s.seriesName, s.seriesField, influxql.Unsigned, s.seriesTags)
107	bw.WriteUnsignedCursor(&unsignedCursor{1, s.ts, uints})
108	bw.EndSeries()
109	bw.Close()
110	w.Close()
111
112	verifySingleSeries(t, buf, s)
113}
114
115func TestReader_OneBucketOneBooleanSeries(t *testing.T) {
116	var buf bytes.Buffer
117	ts := []int64{0, 1, 2}
118	bools := []bool{true, true, false}
119	vs := make([]interface{}, len(bools))
120	for i, v := range bools {
121		vs[i] = v
122	}
123	s := &oneSeriesData{
124		db:          "database",
125		rp:          "default",
126		sd:          time.Hour * 24,
127		start:       int64(0),
128		end:         int64(time.Hour * 24),
129		seriesName:  []byte("series"),
130		seriesField: []byte("field"),
131		seriesTags:  models.NewTags(map[string]string{"k": "v"}),
132		fieldType:   binary.BooleanFieldType,
133		ts:          ts,
134		vs:          vs,
135	}
136
137	w := binary.NewWriter(&buf, s.db, s.rp, s.sd)
138	bw, _ := w.NewBucket(s.start, s.end)
139	bw.BeginSeries(s.seriesName, s.seriesField, influxql.Boolean, s.seriesTags)
140	bw.WriteBooleanCursor(&booleanCursor{1, s.ts, bools})
141	bw.EndSeries()
142	bw.Close()
143	w.Close()
144
145	verifySingleSeries(t, buf, s)
146}
147
148func TestReader_OneBucketOneStringSeries(t *testing.T) {
149	var buf bytes.Buffer
150	ts := []int64{0, 1, 2}
151	strings := []string{"", "a", "a《 》"}
152	vs := make([]interface{}, len(strings))
153	for i, v := range strings {
154		vs[i] = v
155	}
156	s := &oneSeriesData{
157		db:          "database",
158		rp:          "default",
159		sd:          time.Hour * 24,
160		start:       int64(0),
161		end:         int64(time.Hour * 24),
162		seriesName:  []byte("series"),
163		seriesField: []byte("field"),
164		seriesTags:  models.NewTags(map[string]string{"k": "v"}),
165		fieldType:   binary.StringFieldType,
166		ts:          ts,
167		vs:          vs,
168	}
169
170	w := binary.NewWriter(&buf, s.db, s.rp, s.sd)
171	bw, _ := w.NewBucket(s.start, s.end)
172	bw.BeginSeries(s.seriesName, s.seriesField, influxql.String, s.seriesTags)
173	bw.WriteStringCursor(&stringCursor{1, s.ts, strings})
174	bw.EndSeries()
175	bw.Close()
176	w.Close()
177
178	verifySingleSeries(t, buf, s)
179}
180
181type oneSeriesData struct {
182	db          string
183	rp          string
184	sd          time.Duration
185	start       int64
186	end         int64
187	seriesName  []byte
188	seriesField []byte
189	seriesTags  models.Tags
190	fieldType   binary.FieldType
191	ts          []int64
192	vs          []interface{}
193}
194
195func verifySingleSeries(t *testing.T, buf bytes.Buffer, s *oneSeriesData) {
196	t.Helper()
197	r := binary.NewReader(&buf)
198	h, err := r.ReadHeader()
199	assertNoError(t, err)
200	assertEqual(t, h, &binary.Header{Database: s.db, RetentionPolicy: s.rp, ShardDuration: s.sd})
201
202	bh, err := r.NextBucket()
203	assertNoError(t, err)
204	assertEqual(t, bh, &binary.BucketHeader{Start: s.start, End: s.end})
205
206	sh, err := r.NextSeries()
207	assertNoError(t, err)
208
209	seriesKey := make([]byte, 0)
210	seriesKey = models.AppendMakeKey(seriesKey[:0], s.seriesName, s.seriesTags)
211	assertEqual(t, sh, &binary.SeriesHeader{FieldType: s.fieldType, SeriesKey: seriesKey, Field: s.seriesField})
212
213	for i := 0; i < len(s.ts); i++ {
214		next, err := r.Points().Next()
215		assertNoError(t, err)
216		assertEqual(t, next, true)
217		values := r.Points().Values()
218		assertEqual(t, len(values), 1)
219		assertEqual(t, values[0].UnixNano(), s.ts[i])
220		assertEqual(t, values[0].Value(), s.vs[i])
221	}
222
223	next, err := r.Points().Next()
224	assertNoError(t, err)
225	assertEqual(t, next, false)
226
227	sh, err = r.NextSeries()
228	assertNoError(t, err)
229	assertNil(t, sh)
230
231	bh, err = r.NextBucket()
232	assertNoError(t, err)
233	assertNil(t, bh)
234}
235
236func TestReader_OneBucketMixedSeries(t *testing.T) {
237	var buf bytes.Buffer
238	db := "db"
239	rp := "rp"
240	start := int64(0)
241	end := int64(time.Hour * 24)
242	seriesName := []byte("cpu")
243	seriesField := []byte("idle")
244	seriesTags1 := models.NewTags(map[string]string{"host": "host1", "region": "us-west-1"})
245	seriesTags2 := models.NewTags(map[string]string{"host": "host2", "region": "us-west-1"})
246
247	w := binary.NewWriter(&buf, db, rp, time.Hour*24)
248	bw, _ := w.NewBucket(start, end)
249	bw.BeginSeries(seriesName, seriesField, influxql.Integer, seriesTags1)
250	t1s := []int64{0, 1, 2}
251	v1s := []int64{10, 11, 12}
252	bw.WriteIntegerCursor(&intCursor{1, t1s, v1s})
253	bw.EndSeries()
254	bw.BeginSeries(seriesName, seriesField, influxql.Integer, seriesTags2)
255	t2s := []int64{1, 2, 3}
256	v2s := []float64{7, 8, 9}
257	bw.WriteFloatCursor(&floatCursor{1, t2s, v2s})
258	bw.EndSeries()
259	bw.Close()
260	w.Close()
261
262	r := binary.NewReader(&buf)
263	h, err := r.ReadHeader()
264	assertNoError(t, err)
265	assertEqual(t, h, &binary.Header{Database: db, RetentionPolicy: rp, ShardDuration: time.Hour * 24})
266
267	bh, err := r.NextBucket()
268	assertNoError(t, err)
269	assertEqual(t, bh, &binary.BucketHeader{Start: start, End: end})
270
271	sh, err := r.NextSeries()
272	assertNoError(t, err)
273
274	seriesKey := make([]byte, 0)
275	seriesKey = models.AppendMakeKey(seriesKey[:0], seriesName, seriesTags1)
276	assertEqual(t, sh, &binary.SeriesHeader{FieldType: binary.IntegerFieldType, SeriesKey: seriesKey, Field: seriesField})
277
278	for i := 0; i < len(t1s); i++ {
279		next, err := r.Points().Next()
280		assertNoError(t, err)
281		assertEqual(t, next, true)
282		values := r.Points().Values()
283		assertEqual(t, len(values), 1)
284		assertEqual(t, values[0].UnixNano(), t1s[i])
285		assertEqual(t, values[0].Value(), v1s[i])
286	}
287
288	next, err := r.Points().Next()
289	assertNoError(t, err)
290	assertEqual(t, next, false)
291
292	sh, err = r.NextSeries()
293	assertNoError(t, err)
294
295	seriesKey = models.AppendMakeKey(seriesKey[:0], seriesName, seriesTags2)
296	assertEqual(t, sh, &binary.SeriesHeader{FieldType: binary.FloatFieldType, SeriesKey: seriesKey, Field: seriesField})
297
298	for i := 0; i < len(t2s); i++ {
299		next, err := r.Points().Next()
300		assertNoError(t, err)
301		assertEqual(t, next, true)
302		values := r.Points().Values()
303		assertEqual(t, len(values), 1)
304		assertEqual(t, values[0].UnixNano(), t2s[i])
305		assertEqual(t, values[0].Value(), v2s[i])
306	}
307
308	next, err = r.Points().Next()
309	assertNoError(t, err)
310	assertEqual(t, next, false)
311
312	sh, err = r.NextSeries()
313	assertNoError(t, err)
314	assertNil(t, sh)
315
316	bh, err = r.NextBucket()
317	assertNoError(t, err)
318	assertNil(t, bh)
319}
320
321func TestReader_EmptyBucket(t *testing.T) {
322	var buf bytes.Buffer
323	db := "db"
324	rp := "default"
325	start := int64(0)
326	end := int64(time.Hour * 24)
327
328	w := binary.NewWriter(&buf, db, rp, time.Hour*24)
329	bw, _ := w.NewBucket(start, end)
330	bw.Close()
331	w.Close()
332
333	r := binary.NewReader(&buf)
334	h, err := r.ReadHeader()
335	assertNoError(t, err)
336	assertEqual(t, h, &binary.Header{Database: db, RetentionPolicy: rp, ShardDuration: time.Hour * 24})
337
338	bh, err := r.NextBucket()
339	assertNoError(t, err)
340	assertEqual(t, bh, &binary.BucketHeader{Start: start, End: end})
341
342	sh, err := r.NextSeries()
343	assertNoError(t, err)
344	assertNil(t, sh)
345
346	bh, err = r.NextBucket()
347	assertNoError(t, err)
348	assertNil(t, bh)
349}
350
351func TestReader_States(t *testing.T) {
352	var buf bytes.Buffer
353	r := binary.NewReader(&buf)
354
355	next, err := r.Points().Next()
356	assertError(t, err, fmt.Errorf("expected reader in state %v, was in state %v", 4, 1))
357	assertEqual(t, next, false)
358
359	sh, err := r.NextSeries()
360	assertError(t, err, fmt.Errorf("expected reader in state %v, was in state %v", 3, 1))
361	assertNil(t, sh)
362
363	bh, err := r.NextBucket()
364	assertError(t, err, fmt.Errorf("expected reader in state %v, was in state %v", 2, 1))
365	assertNil(t, bh)
366}
367
368type floatCursor struct {
369	c    int // number of values to return per call to Next
370	keys []int64
371	vals []float64
372}
373
374func (c *floatCursor) Close()                  {}
375func (c *floatCursor) Err() error              { return nil }
376func (c *floatCursor) Stats() tsdb.CursorStats { return tsdb.CursorStats{} }
377
378func (c *floatCursor) Next() *tsdb.FloatArray {
379	if c.c > len(c.keys) {
380		c.c = len(c.keys)
381	}
382
383	var a tsdb.FloatArray
384	a.Timestamps, a.Values = c.keys[:c.c], c.vals[:c.c]
385	c.keys, c.vals = c.keys[c.c:], c.vals[c.c:]
386	return &a
387}
388
389type unsignedCursor struct {
390	c    int // number of values to return per call to Next
391	keys []int64
392	vals []uint64
393}
394
395func (c *unsignedCursor) Close()                  {}
396func (c *unsignedCursor) Err() error              { return nil }
397func (c *unsignedCursor) Stats() tsdb.CursorStats { return tsdb.CursorStats{} }
398
399func (c *unsignedCursor) Next() *tsdb.UnsignedArray {
400	if c.c > len(c.keys) {
401		c.c = len(c.keys)
402	}
403
404	var a tsdb.UnsignedArray
405	a.Timestamps, a.Values = c.keys[:c.c], c.vals[:c.c]
406	c.keys, c.vals = c.keys[c.c:], c.vals[c.c:]
407	return &a
408}
409
410type booleanCursor struct {
411	c    int // number of values to return per call to Next
412	keys []int64
413	vals []bool
414}
415
416func (c *booleanCursor) Close()                  {}
417func (c *booleanCursor) Err() error              { return nil }
418func (c *booleanCursor) Stats() tsdb.CursorStats { return tsdb.CursorStats{} }
419
420func (c *booleanCursor) Next() *tsdb.BooleanArray {
421	if c.c > len(c.keys) {
422		c.c = len(c.keys)
423	}
424
425	var a tsdb.BooleanArray
426	a.Timestamps, a.Values = c.keys[:c.c], c.vals[:c.c]
427	c.keys, c.vals = c.keys[c.c:], c.vals[c.c:]
428	return &a
429}
430
431type stringCursor struct {
432	c    int // number of values to return per call to Next
433	keys []int64
434	vals []string
435}
436
437func (c *stringCursor) Close()                  {}
438func (c *stringCursor) Err() error              { return nil }
439func (c *stringCursor) Stats() tsdb.CursorStats { return tsdb.CursorStats{} }
440
441func (c *stringCursor) Next() *tsdb.StringArray {
442	if c.c > len(c.keys) {
443		c.c = len(c.keys)
444	}
445
446	var a tsdb.StringArray
447	a.Timestamps, a.Values = c.keys[:c.c], c.vals[:c.c]
448	c.keys, c.vals = c.keys[c.c:], c.vals[c.c:]
449	return &a
450}
451
452func assertNil(t *testing.T, got interface{}) {
453	t.Helper()
454	if got == nil {
455		t.Fatalf("not nil: got:\n%s", got)
456	}
457}
458
459func assertError(t *testing.T, got error, exp error) {
460	t.Helper()
461	if got == nil {
462		t.Fatalf("did not receive expected error: %s", exp)
463	} else {
464		assertEqual(t, got.Error(), exp.Error())
465	}
466}
467