1package binary_test
2
3import (
4	"bytes"
5	"io"
6	"testing"
7	"time"
8
9	"github.com/google/go-cmp/cmp"
10	"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary"
11	"github.com/influxdata/influxdb/cmd/influx_tools/internal/tlv"
12	"github.com/influxdata/influxdb/models"
13	"github.com/influxdata/influxdb/tsdb"
14	"github.com/influxdata/influxql"
15)
16
17func TestWriter_WriteOneBucketOneSeries(t *testing.T) {
18	var buf bytes.Buffer
19	w := binary.NewWriter(&buf, "db", "rp", time.Second)
20	bw, _ := w.NewBucket(0, int64(time.Second))
21	bw.BeginSeries([]byte("cpu"), []byte("idle"), influxql.Integer, models.NewTags(map[string]string{"host": "host1", "region": "us-west-1"}))
22	ts := []int64{0, 1, 2}
23	vs := []int64{10, 11, 12}
24	bw.WriteIntegerCursor(&intCursor{1, ts, vs})
25	bw.EndSeries()
26	bw.Close()
27	w.Close()
28
29	// magic
30	var in [8]byte
31	buf.Read(in[:])
32	assertEqual(t, in[:], binary.Magic[:])
33
34	// header
35	var hdr binary.Header
36	assertTypeValue(t, &buf, binary.HeaderType, &hdr)
37	assertEqual(t, hdr, binary.Header{Version: binary.Version0, Database: "db", RetentionPolicy: "rp", ShardDuration: time.Second})
38
39	// bucket header
40	var bh binary.BucketHeader
41	assertTypeValue(t, &buf, binary.BucketHeaderType, &bh)
42	assertEqual(t, bh, binary.BucketHeader{Start: 0, End: int64(time.Second)})
43
44	// series
45	var sh binary.SeriesHeader
46	assertTypeValue(t, &buf, binary.SeriesHeaderType, &sh)
47	assertEqual(t, sh, binary.SeriesHeader{
48		FieldType: binary.IntegerFieldType,
49		SeriesKey: []byte("cpu,host=host1,region=us-west-1"),
50		Field:     []byte("idle"),
51	})
52
53	// values
54	for i := 0; i < len(ts); i++ {
55		var ip binary.IntegerPoints
56		assertTypeValue(t, &buf, binary.IntegerPointsType, &ip)
57		assertEqual(t, ip, binary.IntegerPoints{Timestamps: ts[i : i+1], Values: vs[i : i+1]})
58	}
59
60	// series footer
61	var sf binary.SeriesFooter
62	assertTypeValue(t, &buf, binary.SeriesFooterType, &sf)
63
64	// bucket footer
65	var bf binary.BucketFooter
66	assertTypeValue(t, &buf, binary.BucketFooterType, &bf)
67}
68
69type intCursor struct {
70	c    int // number of values to return per call to Next
71	keys []int64
72	vals []int64
73}
74
75func (c *intCursor) Close()                  {}
76func (c *intCursor) Err() error              { return nil }
77func (c *intCursor) Stats() tsdb.CursorStats { return tsdb.CursorStats{} }
78
79func (c *intCursor) Next() *tsdb.IntegerArray {
80	if c.c > len(c.keys) {
81		c.c = len(c.keys)
82	}
83
84	var a tsdb.IntegerArray
85	a.Timestamps, a.Values = c.keys[:c.c], c.vals[:c.c]
86	c.keys, c.vals = c.keys[c.c:], c.vals[c.c:]
87	return &a
88}
89
90func assertEqual(t *testing.T, got, exp interface{}) {
91	t.Helper()
92	if !cmp.Equal(got, exp) {
93		t.Fatalf("not equal: -got/+exp\n%s", cmp.Diff(got, exp))
94	}
95}
96
97func assertNoError(t *testing.T, err error) {
98	t.Helper()
99	if err == nil {
100		return
101	}
102	t.Fatalf("unexpected error: %v", err)
103}
104
105type message interface {
106	Unmarshal([]byte) error
107}
108
109func assertTypeValue(t *testing.T, r io.Reader, expType binary.MessageType, m message) {
110	t.Helper()
111	typ, d, err := tlv.ReadTLV(r)
112	assertNoError(t, err)
113	assertEqual(t, typ, byte(expType))
114
115	err = m.Unmarshal(d)
116	assertNoError(t, err)
117}
118