1package binary
2
3import (
4	"bytes"
5	"errors"
6	"fmt"
7	"io"
8
9	"github.com/influxdata/influxdb/cmd/influx_tools/internal/tlv"
10	"github.com/influxdata/influxdb/tsdb"
11	"github.com/influxdata/influxdb/tsdb/engine/tsm1"
12)
13
14type Reader struct {
15	r     io.Reader
16	pr    *PointsReader
17	state *readerState
18	stats *readerStats
19}
20
21type readerStats struct {
22	series int
23	counts [8]struct {
24		series, values int
25	}
26}
27type readerState byte
28
29const (
30	readHeader readerState = iota + 1
31	readBucket
32	readSeries
33	readPoints
34	done
35)
36
37func NewReader(reader io.Reader) *Reader {
38	state := readHeader
39	var stats readerStats
40	r := &Reader{r: reader, state: &state, stats: &stats,
41		pr: &PointsReader{r: reader, values: make(tsm1.Values, tsdb.DefaultMaxPointsPerBlock), state: &state, stats: &stats}}
42	return r
43}
44
45func (r *Reader) ReadHeader() (*Header, error) {
46	if *r.state != readHeader {
47		return nil, fmt.Errorf("expected reader in state %v, was in state %v\n", readHeader, *r.state)
48	}
49
50	var magic [len(Magic)]byte
51	n, err := r.r.Read(magic[:])
52	if err != nil {
53		return nil, err
54	}
55
56	if n < len(Magic) || !bytes.Equal(magic[:], Magic[:]) {
57		return nil, errors.New("IFLXDUMP header not present")
58	}
59
60	t, lv, err := tlv.ReadTLV(r.r)
61	if err != nil {
62		return nil, err
63	}
64	if t != byte(HeaderType) {
65		return nil, fmt.Errorf("expected header type, got %v", t)
66	}
67	h := &Header{}
68	err = h.Unmarshal(lv)
69	*r.state = readBucket
70
71	return h, err
72}
73
74func (r *Reader) Close() error {
75	return nil
76}
77
78func (r *Reader) NextBucket() (*BucketHeader, error) {
79	if *r.state != readBucket {
80		return nil, fmt.Errorf("expected reader in state %v, was in state %v", readBucket, *r.state)
81	}
82
83	t, lv, err := tlv.ReadTLV(r.r)
84	if err != nil {
85		if err == io.EOF {
86			*r.state = done
87			return nil, nil
88		}
89		return nil, err
90	}
91	if t != byte(BucketHeaderType) {
92		return nil, fmt.Errorf("expected bucket header type, got %v", t)
93	}
94
95	bh := &BucketHeader{}
96	err = bh.Unmarshal(lv)
97	if err != nil {
98		return nil, err
99	}
100	*r.state = readSeries
101
102	return bh, nil
103}
104
105func (r *Reader) NextSeries() (*SeriesHeader, error) {
106	if *r.state != readSeries {
107		return nil, fmt.Errorf("expected reader in state %v, was in state %v", readSeries, *r.state)
108	}
109
110	t, lv, err := tlv.ReadTLV(r.r)
111	if err != nil {
112		return nil, err
113	}
114	if t == byte(BucketFooterType) {
115		*r.state = readBucket
116		return nil, nil
117	}
118	if t != byte(SeriesHeaderType) {
119		return nil, fmt.Errorf("expected series header type, got %v", t)
120	}
121	sh := &SeriesHeader{}
122	err = sh.Unmarshal(lv)
123	if err != nil {
124		return nil, err
125	}
126	r.stats.series++
127	r.stats.counts[sh.FieldType&7].series++
128
129	var pointsType MessageType
130	switch sh.FieldType {
131	case FloatFieldType:
132		pointsType = FloatPointsType
133	case IntegerFieldType:
134		pointsType = IntegerPointsType
135	case UnsignedFieldType:
136		pointsType = UnsignedPointsType
137	case BooleanFieldType:
138		pointsType = BooleanPointsType
139	case StringFieldType:
140		pointsType = StringPointsType
141	default:
142		return nil, fmt.Errorf("unsupported series field type %v", sh.FieldType)
143	}
144
145	*r.state = readPoints
146	r.pr.Reset(pointsType)
147	return sh, nil
148}
149
150func (r *Reader) Points() *PointsReader {
151	return r.pr
152}
153
154type PointsReader struct {
155	pointsType MessageType
156	r          io.Reader
157	values     tsm1.Values
158	n          int
159	state      *readerState
160	stats      *readerStats
161}
162
163func (pr *PointsReader) Reset(pointsType MessageType) {
164	pr.pointsType = pointsType
165	pr.n = 0
166}
167
168func (pr *PointsReader) Next() (bool, error) {
169	if *pr.state != readPoints {
170		return false, fmt.Errorf("expected reader in state %v, was in state %v", readPoints, *pr.state)
171	}
172
173	t, lv, err := tlv.ReadTLV(pr.r)
174	if err != nil {
175		return false, err
176	}
177	if t == byte(SeriesFooterType) {
178		*pr.state = readSeries
179		return false, nil
180	}
181	if t != byte(pr.pointsType) {
182		return false, fmt.Errorf("expected message type %v, got %v", pr.pointsType, t)
183	}
184	err = pr.marshalValues(lv)
185	if err != nil {
186		return false, err
187	}
188
189	return true, nil
190}
191
192func (pr *PointsReader) Values() tsm1.Values {
193	return pr.values[:pr.n]
194}
195
196func (pr *PointsReader) marshalValues(lv []byte) error {
197	switch pr.pointsType {
198	case FloatPointsType:
199		return pr.marshalFloats(lv)
200	case IntegerPointsType:
201		return pr.marshalIntegers(lv)
202	case UnsignedPointsType:
203		return pr.marshalUnsigned(lv)
204	case BooleanPointsType:
205		return pr.marshalBooleans(lv)
206	case StringPointsType:
207		return pr.marshalStrings(lv)
208	default:
209		return fmt.Errorf("unsupported points type %v", pr.pointsType)
210	}
211}
212
213func (pr *PointsReader) marshalFloats(lv []byte) error {
214	fp := &FloatPoints{}
215	err := fp.Unmarshal(lv)
216	if err != nil {
217		return err
218	}
219	for i, t := range fp.Timestamps {
220		pr.values[i] = tsm1.NewFloatValue(t, fp.Values[i])
221	}
222	pr.stats.counts[0].values += len(fp.Timestamps)
223	pr.n = len(fp.Timestamps)
224	return nil
225}
226
227func (pr *PointsReader) marshalIntegers(lv []byte) error {
228	ip := &IntegerPoints{}
229	err := ip.Unmarshal(lv)
230	if err != nil {
231		return err
232	}
233	for i, t := range ip.Timestamps {
234		pr.values[i] = tsm1.NewIntegerValue(t, ip.Values[i])
235	}
236	pr.stats.counts[1].values += len(ip.Timestamps)
237	pr.n = len(ip.Timestamps)
238	return nil
239}
240
241func (pr *PointsReader) marshalUnsigned(lv []byte) error {
242	up := &UnsignedPoints{}
243	err := up.Unmarshal(lv)
244	if err != nil {
245		return err
246	}
247	for i, t := range up.Timestamps {
248		pr.values[i] = tsm1.NewUnsignedValue(t, up.Values[i])
249	}
250	pr.stats.counts[2].values += len(up.Timestamps)
251	pr.n = len(up.Timestamps)
252	return nil
253}
254
255func (pr *PointsReader) marshalBooleans(lv []byte) error {
256	bp := &BooleanPoints{}
257	err := bp.Unmarshal(lv)
258	if err != nil {
259		return err
260	}
261	for i, t := range bp.Timestamps {
262		pr.values[i] = tsm1.NewBooleanValue(t, bp.Values[i])
263	}
264	pr.stats.counts[3].values += len(bp.Timestamps)
265	pr.n = len(bp.Timestamps)
266	return nil
267}
268
269func (pr *PointsReader) marshalStrings(lv []byte) error {
270	sp := &StringPoints{}
271	err := sp.Unmarshal(lv)
272	if err != nil {
273		return err
274	}
275	for i, t := range sp.Timestamps {
276		pr.values[i] = tsm1.NewStringValue(t, sp.Values[i])
277	}
278	pr.stats.counts[4].values += len(sp.Timestamps)
279	pr.n = len(sp.Timestamps)
280	return nil
281}
282