1package binary
2
3import (
4	"bufio"
5	"fmt"
6	"io"
7	"time"
8
9	"github.com/influxdata/influxdb/cmd/influx_tools/internal/format"
10	"github.com/influxdata/influxdb/cmd/influx_tools/internal/tlv"
11	"github.com/influxdata/influxdb/models"
12	"github.com/influxdata/influxdb/tsdb"
13	"github.com/influxdata/influxql"
14)
15
16type Writer struct {
17	w           *bufio.Writer
18	buf         []byte
19	db, rp      string
20	duration    time.Duration
21	err         error
22	bw          *bucketWriter
23	state       writeState
24	wroteHeader bool
25
26	msg struct {
27		bucketHeader BucketHeader
28		bucketFooter BucketFooter
29		seriesHeader SeriesHeader
30		seriesFooter SeriesFooter
31	}
32
33	stats struct {
34		series int
35		counts [8]struct {
36			series, values int
37		}
38	}
39}
40
41type writeState int
42
43const (
44	writeHeader writeState = iota
45	writeBucket
46	writeSeries
47	writeSeriesHeader
48	writePoints
49)
50
51func NewWriter(w io.Writer, database, rp string, duration time.Duration) *Writer {
52	var wr *bufio.Writer
53	if wr, _ = w.(*bufio.Writer); wr == nil {
54		wr = bufio.NewWriter(w)
55	}
56	return &Writer{w: wr, db: database, rp: rp, duration: duration}
57}
58
59func (w *Writer) WriteStats(o io.Writer) {
60	fmt.Fprintf(o, "total series: %d\n", w.stats.series)
61
62	for i := 0; i < 5; i++ {
63		ft := FieldType(i)
64		fmt.Fprintf(o, "%s unique series: %d\n", ft, w.stats.counts[i].series)
65		fmt.Fprintf(o, "%s total values : %d\n", ft, w.stats.counts[i].values)
66	}
67}
68
69func (w *Writer) NewBucket(start, end int64) (format.BucketWriter, error) {
70	if w.state == writeHeader {
71		w.writeHeader()
72	}
73
74	if w.err != nil {
75		return nil, w.err
76	}
77
78	if w.state != writeBucket {
79		panic(fmt.Sprintf("writer state: got=%v, exp=%v", w.state, writeBucket))
80	}
81
82	w.bw = &bucketWriter{w: w, start: start, end: end}
83	w.writeBucketHeader(start, end)
84
85	return w.bw, w.err
86}
87
88func (w *Writer) Close() error {
89	if w.err == ErrWriteAfterClose {
90		return nil
91	}
92	if w.err != nil {
93		return w.err
94	}
95
96	w.err = ErrWriteAfterClose
97
98	return nil
99}
100
101func (w *Writer) writeHeader() {
102	w.state = writeBucket
103	w.wroteHeader = true
104
105	w.write(Magic[:])
106
107	h := Header{
108		Version:         Version0,
109		Database:        w.db,
110		RetentionPolicy: w.rp,
111		ShardDuration:   w.duration,
112	}
113	w.writeTypeMessage(HeaderType, &h)
114}
115
116func (w *Writer) writeBucketHeader(start, end int64) {
117	w.state = writeSeries
118	w.msg.bucketHeader.Start = start
119	w.msg.bucketHeader.End = end
120	w.writeTypeMessage(BucketHeaderType, &w.msg.bucketHeader)
121}
122
123func (w *Writer) writeBucketFooter() {
124	w.state = writeBucket
125	w.writeTypeMessage(BucketFooterType, &w.msg.bucketFooter)
126}
127
128func (w *Writer) writeSeriesHeader(key, field []byte, ft FieldType) {
129	w.state = writePoints
130	w.stats.series++
131	w.stats.counts[ft&7].series++
132
133	w.msg.seriesHeader.SeriesKey = key
134	w.msg.seriesHeader.Field = field
135	w.msg.seriesHeader.FieldType = ft
136	w.writeTypeMessage(SeriesHeaderType, &w.msg.seriesHeader)
137}
138
139func (w *Writer) writeSeriesFooter(ft FieldType, count int) {
140	w.stats.counts[ft&7].values += count
141	w.writeTypeMessage(SeriesFooterType, &w.msg.seriesFooter)
142}
143
144func (w *Writer) write(p []byte) {
145	if w.err != nil {
146		return
147	}
148	_, w.err = w.w.Write(p)
149}
150
151func (w *Writer) writeTypeMessage(typ MessageType, msg message) {
152	if w.err != nil {
153		return
154	}
155
156	// ensure size
157	n := msg.Size()
158	if n > cap(w.buf) {
159		w.buf = make([]byte, n)
160	} else {
161		w.buf = w.buf[:n]
162	}
163
164	_, w.err = msg.MarshalTo(w.buf)
165	w.writeTypeBytes(typ, w.buf)
166}
167
168func (w *Writer) writeTypeBytes(typ MessageType, b []byte) {
169	if w.err != nil {
170		return
171	}
172	w.err = tlv.WriteTLV(w.w, byte(typ), w.buf)
173}
174
175type bucketWriter struct {
176	w          *Writer
177	err        error
178	start, end int64
179	key        []byte
180	field      []byte
181	n          int
182	closed     bool
183}
184
185func (bw *bucketWriter) Err() error {
186	if bw.w.err != nil {
187		return bw.w.err
188	}
189	return bw.err
190}
191
192func (bw *bucketWriter) hasErr() bool {
193	return bw.w.err != nil || bw.err != nil
194}
195
196func (bw *bucketWriter) BeginSeries(name, field []byte, typ influxql.DataType, tags models.Tags) {
197	if bw.hasErr() {
198		return
199	}
200
201	if bw.w.state != writeSeries {
202		panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writeSeries))
203	}
204	bw.w.state = writeSeriesHeader
205
206	bw.key = models.AppendMakeKey(bw.key[:0], name, tags)
207	bw.field = field
208}
209
210func (bw *bucketWriter) EndSeries() {
211	if bw.hasErr() {
212		return
213	}
214
215	if bw.w.state != writePoints && bw.w.state != writeSeriesHeader {
216		panic(fmt.Sprintf("writer state: got=%v, exp=%v,%v", bw.w.state, writeSeriesHeader, writePoints))
217	}
218	if bw.w.state == writePoints {
219		bw.w.writeSeriesFooter(IntegerFieldType, bw.n)
220	}
221	bw.w.state = writeSeries
222}
223
224func (bw *bucketWriter) WriteIntegerCursor(cur tsdb.IntegerArrayCursor) {
225	if bw.hasErr() {
226		return
227	}
228
229	if bw.w.state == writeSeriesHeader {
230		bw.w.writeSeriesHeader(bw.key, bw.field, IntegerFieldType)
231	}
232
233	if bw.w.state != writePoints {
234		panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
235	}
236
237	var msg IntegerPoints
238	for {
239		a := cur.Next()
240		if a.Len() == 0 {
241			break
242		}
243
244		bw.n += a.Len()
245		msg.Timestamps = a.Timestamps
246		msg.Values = a.Values
247		bw.w.writeTypeMessage(IntegerPointsType, &msg)
248	}
249}
250
251func (bw *bucketWriter) WriteFloatCursor(cur tsdb.FloatArrayCursor) {
252	if bw.hasErr() {
253		return
254	}
255
256	if bw.w.state == writeSeriesHeader {
257		bw.w.writeSeriesHeader(bw.key, bw.field, FloatFieldType)
258	}
259
260	if bw.w.state != writePoints {
261		panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
262	}
263
264	var msg FloatPoints
265	for {
266		a := cur.Next()
267		if a.Len() == 0 {
268			break
269		}
270
271		bw.n += a.Len()
272		msg.Timestamps = a.Timestamps
273		msg.Values = a.Values
274		bw.w.writeTypeMessage(FloatPointsType, &msg)
275	}
276}
277
278func (bw *bucketWriter) WriteUnsignedCursor(cur tsdb.UnsignedArrayCursor) {
279	if bw.hasErr() {
280		return
281	}
282
283	if bw.w.state == writeSeriesHeader {
284		bw.w.writeSeriesHeader(bw.key, bw.field, UnsignedFieldType)
285	}
286
287	if bw.w.state != writePoints {
288		panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
289	}
290
291	var msg UnsignedPoints
292	for {
293		a := cur.Next()
294		if a.Len() == 0 {
295			break
296		}
297
298		bw.n += a.Len()
299		msg.Timestamps = a.Timestamps
300		msg.Values = a.Values
301		bw.w.writeTypeMessage(UnsignedPointsType, &msg)
302	}
303}
304
305func (bw *bucketWriter) WriteBooleanCursor(cur tsdb.BooleanArrayCursor) {
306	if bw.hasErr() {
307		return
308	}
309
310	if bw.w.state == writeSeriesHeader {
311		bw.w.writeSeriesHeader(bw.key, bw.field, BooleanFieldType)
312	}
313
314	if bw.w.state != writePoints {
315		panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
316	}
317
318	var msg BooleanPoints
319	for {
320		a := cur.Next()
321		if a.Len() == 0 {
322			break
323		}
324
325		bw.n += a.Len()
326		msg.Timestamps = a.Timestamps
327		msg.Values = a.Values
328		bw.w.writeTypeMessage(BooleanPointsType, &msg)
329	}
330}
331
332func (bw *bucketWriter) WriteStringCursor(cur tsdb.StringArrayCursor) {
333	if bw.hasErr() {
334		return
335	}
336
337	if bw.w.state == writeSeriesHeader {
338		bw.w.writeSeriesHeader(bw.key, bw.field, StringFieldType)
339	}
340
341	if bw.w.state != writePoints {
342		panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
343	}
344
345	var msg StringPoints
346	for {
347		a := cur.Next()
348		if a.Len() == 0 {
349			break
350		}
351
352		bw.n += a.Len()
353		msg.Timestamps = a.Timestamps
354		msg.Values = a.Values
355		bw.w.writeTypeMessage(StringPointsType, &msg)
356	}
357}
358
359func (bw *bucketWriter) Close() error {
360	if bw.closed {
361		return nil
362	}
363
364	bw.closed = true
365
366	if bw.hasErr() {
367		return bw.Err()
368	}
369
370	bw.w.bw = nil
371	bw.w.writeBucketFooter()
372	bw.err = ErrWriteBucketAfterClose
373
374	return bw.w.w.Flush()
375}
376