1package binary_test 2 3import ( 4 "bytes" 5 "io" 6 "testing" 7 "time" 8 9 "github.com/google/go-cmp/cmp" 10 "github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary" 11 "github.com/influxdata/influxdb/cmd/influx_tools/internal/tlv" 12 "github.com/influxdata/influxdb/models" 13 "github.com/influxdata/influxdb/tsdb" 14 "github.com/influxdata/influxql" 15) 16 17func TestWriter_WriteOneBucketOneSeries(t *testing.T) { 18 var buf bytes.Buffer 19 w := binary.NewWriter(&buf, "db", "rp", time.Second) 20 bw, _ := w.NewBucket(0, int64(time.Second)) 21 bw.BeginSeries([]byte("cpu"), []byte("idle"), influxql.Integer, models.NewTags(map[string]string{"host": "host1", "region": "us-west-1"})) 22 ts := []int64{0, 1, 2} 23 vs := []int64{10, 11, 12} 24 bw.WriteIntegerCursor(&intCursor{1, ts, vs}) 25 bw.EndSeries() 26 bw.Close() 27 w.Close() 28 29 // magic 30 var in [8]byte 31 buf.Read(in[:]) 32 assertEqual(t, in[:], binary.Magic[:]) 33 34 // header 35 var hdr binary.Header 36 assertTypeValue(t, &buf, binary.HeaderType, &hdr) 37 assertEqual(t, hdr, binary.Header{Version: binary.Version0, Database: "db", RetentionPolicy: "rp", ShardDuration: time.Second}) 38 39 // bucket header 40 var bh binary.BucketHeader 41 assertTypeValue(t, &buf, binary.BucketHeaderType, &bh) 42 assertEqual(t, bh, binary.BucketHeader{Start: 0, End: int64(time.Second)}) 43 44 // series 45 var sh binary.SeriesHeader 46 assertTypeValue(t, &buf, binary.SeriesHeaderType, &sh) 47 assertEqual(t, sh, binary.SeriesHeader{ 48 FieldType: binary.IntegerFieldType, 49 SeriesKey: []byte("cpu,host=host1,region=us-west-1"), 50 Field: []byte("idle"), 51 }) 52 53 // values 54 for i := 0; i < len(ts); i++ { 55 var ip binary.IntegerPoints 56 assertTypeValue(t, &buf, binary.IntegerPointsType, &ip) 57 assertEqual(t, ip, binary.IntegerPoints{Timestamps: ts[i : i+1], Values: vs[i : i+1]}) 58 } 59 60 // series footer 61 var sf binary.SeriesFooter 62 assertTypeValue(t, &buf, binary.SeriesFooterType, &sf) 63 64 // bucket footer 65 var bf binary.BucketFooter 66 assertTypeValue(t, &buf, binary.BucketFooterType, &bf) 67} 68 69type intCursor struct { 70 c int // number of values to return per call to Next 71 keys []int64 72 vals []int64 73} 74 75func (c *intCursor) Close() {} 76func (c *intCursor) Err() error { return nil } 77func (c *intCursor) Stats() tsdb.CursorStats { return tsdb.CursorStats{} } 78 79func (c *intCursor) Next() *tsdb.IntegerArray { 80 if c.c > len(c.keys) { 81 c.c = len(c.keys) 82 } 83 84 var a tsdb.IntegerArray 85 a.Timestamps, a.Values = c.keys[:c.c], c.vals[:c.c] 86 c.keys, c.vals = c.keys[c.c:], c.vals[c.c:] 87 return &a 88} 89 90func assertEqual(t *testing.T, got, exp interface{}) { 91 t.Helper() 92 if !cmp.Equal(got, exp) { 93 t.Fatalf("not equal: -got/+exp\n%s", cmp.Diff(got, exp)) 94 } 95} 96 97func assertNoError(t *testing.T, err error) { 98 t.Helper() 99 if err == nil { 100 return 101 } 102 t.Fatalf("unexpected error: %v", err) 103} 104 105type message interface { 106 Unmarshal([]byte) error 107} 108 109func assertTypeValue(t *testing.T, r io.Reader, expType binary.MessageType, m message) { 110 t.Helper() 111 typ, d, err := tlv.ReadTLV(r) 112 assertNoError(t, err) 113 assertEqual(t, typ, byte(expType)) 114 115 err = m.Unmarshal(d) 116 assertNoError(t, err) 117} 118