1package protocol
2
3import (
4	"fmt"
5	"io"
6	"math"
7	"sort"
8	"strconv"
9	"time"
10)
11
12// ErrIsNaN is a field error for when a float field is NaN.
13var ErrIsNaN = &FieldError{"is NaN"}
14
15// ErrIsInf is a field error for when a float field is Inf.
16var ErrIsInf = &FieldError{"is Inf"}
17
18// Encoder marshals Metrics into influxdb line protocol.
19// It is not safe for concurrent use, make a new one!
20// The default behavior when encountering a field error is to ignore the field and move on.
21// If you wish it to error out on field errors, use Encoder.FailOnFieldErr(true)
22type Encoder struct {
23	w                io.Writer
24	fieldSortOrder   FieldSortOrder
25	fieldTypeSupport FieldTypeSupport
26	failOnFieldError bool
27	maxLineBytes     int
28	fieldList        []*Field
29	header           []byte
30	footer           []byte
31	pair             []byte
32	precision        time.Duration
33}
34
35// SetMaxLineBytes sets a maximum length for a line, Encode will error if the generated line is longer
36func (e *Encoder) SetMaxLineBytes(i int) {
37	e.maxLineBytes = i
38}
39
40// SetFieldSortOrder sets a sort order for the data.
41// The options are:
42// NoSortFields (doesn't sort the fields)
43// SortFields (sorts the keys in alphabetical order)
44func (e *Encoder) SetFieldSortOrder(s FieldSortOrder) {
45	e.fieldSortOrder = s
46}
47
48// SetFieldTypeSupport sets flags for if the encoder supports certain optional field types such as uint64
49func (e *Encoder) SetFieldTypeSupport(s FieldTypeSupport) {
50	e.fieldTypeSupport = s
51}
52
53// FailOnFieldErr whether or not to fail on a field error or just move on.
54// The default behavior to move on
55func (e *Encoder) FailOnFieldErr(s bool) {
56	e.failOnFieldError = s
57}
58
59// SetPrecision sets time precision for writes
60// Default is nanoseconds precision
61func (e *Encoder) SetPrecision(p time.Duration) {
62	e.precision = p
63}
64
65// NewEncoder gives us an encoder that marshals to a writer in influxdb line protocol
66// as defined by:
67// https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/
68func NewEncoder(w io.Writer) *Encoder {
69	return &Encoder{
70		w:         w,
71		header:    make([]byte, 0, 128),
72		footer:    make([]byte, 0, 128),
73		pair:      make([]byte, 0, 128),
74		fieldList: make([]*Field, 0, 16),
75		precision: time.Nanosecond,
76	}
77}
78
79// This is here to significantly reduce allocations, wish that we had constant/immutable keyword that applied to
80// more complex objects
81var comma = []byte(",")
82
83// Encode marshals a Metric to the io.Writer in the Encoder
84func (e *Encoder) Encode(m Metric) (int, error) {
85	err := e.buildHeader(m)
86	if err != nil {
87		return 0, err
88	}
89
90	e.buildFooter(m.Time())
91
92	// here we make a copy of the *fields so we can do an in-place sort
93	e.fieldList = append(e.fieldList[:0], m.FieldList()...)
94
95	if e.fieldSortOrder == SortFields {
96		sort.Slice(e.fieldList, func(i, j int) bool {
97			return e.fieldList[i].Key < e.fieldList[j].Key
98		})
99	}
100	i := 0
101	totalWritten := 0
102	pairsLen := 0
103	firstField := true
104	for _, field := range e.fieldList {
105		err = e.buildFieldPair(field.Key, field.Value)
106		if err != nil {
107			if e.failOnFieldError {
108				return 0, err
109			}
110			continue
111		}
112
113		bytesNeeded := len(e.header) + pairsLen + len(e.pair) + len(e.footer)
114
115		// Additional length needed for field separator `,`
116		if !firstField {
117			bytesNeeded++
118		}
119
120		if e.maxLineBytes > 0 && bytesNeeded > e.maxLineBytes {
121			// Need at least one field per line
122			if firstField {
123				return 0, ErrNeedMoreSpace
124			}
125
126			i, err = e.w.Write(e.footer)
127			if err != nil {
128				return 0, err
129			}
130			pairsLen = 0
131			totalWritten += i
132
133			bytesNeeded = len(e.header) + len(e.pair) + len(e.footer)
134
135			if e.maxLineBytes > 0 && bytesNeeded > e.maxLineBytes {
136				return 0, ErrNeedMoreSpace
137			}
138
139			i, err = e.w.Write(e.header)
140			if err != nil {
141				return 0, err
142			}
143			totalWritten += i
144
145			i, err = e.w.Write(e.pair)
146			if err != nil {
147				return 0, err
148			}
149			totalWritten += i
150
151			pairsLen += len(e.pair)
152			firstField = false
153			continue
154		}
155
156		if firstField {
157			i, err = e.w.Write(e.header)
158			if err != nil {
159				return 0, err
160			}
161			totalWritten += i
162
163		} else {
164			i, err = e.w.Write(comma)
165			if err != nil {
166				return 0, err
167			}
168			totalWritten += i
169
170		}
171
172		i, err = e.w.Write(e.pair)
173		if err != nil {
174			return 0, err
175		}
176		totalWritten += i
177
178		pairsLen += len(e.pair)
179		firstField = false
180	}
181
182	if firstField {
183		return 0, ErrNoFields
184	}
185	i, err = e.w.Write(e.footer)
186	if err != nil {
187		return 0, err
188	}
189	totalWritten += i
190	return totalWritten, nil
191
192}
193
194func (e *Encoder) buildHeader(m Metric) error {
195	e.header = e.header[:0]
196	name := nameEscape(m.Name())
197	if name == "" {
198		return ErrInvalidName
199	}
200	e.header = append(e.header, name...)
201
202	for _, tag := range m.TagList() {
203		key := escape(tag.Key)
204		value := escape(tag.Value)
205
206		// Some keys and values are not encodeable as line protocol, such as
207		// those with a trailing '\' or empty strings.
208		if key == "" || value == "" {
209			continue
210		}
211
212		e.header = append(e.header, ',')
213		e.header = append(e.header, key...)
214		e.header = append(e.header, '=')
215		e.header = append(e.header, value...)
216	}
217
218	e.header = append(e.header, ' ')
219	return nil
220}
221
222func (e *Encoder) buildFieldVal(value interface{}) error {
223	switch v := value.(type) {
224	case uint64:
225		if e.fieldTypeSupport&UintSupport != 0 {
226			e.pair = append(strconv.AppendUint(e.pair, v, 10), 'u')
227		} else if v <= uint64(math.MaxInt64) {
228			e.pair = append(strconv.AppendInt(e.pair, int64(v), 10), 'i')
229		} else {
230			e.pair = append(strconv.AppendInt(e.pair, math.MaxInt64, 10), 'i')
231		}
232	case int64:
233		e.pair = append(strconv.AppendInt(e.pair, v, 10), 'i')
234	case int:
235		e.pair = append(strconv.AppendInt(e.pair, int64(v), 10), 'i')
236	case float64:
237		if math.IsNaN(v) {
238			return ErrIsNaN
239		}
240
241		if math.IsInf(v, 0) {
242			return ErrIsInf
243		}
244
245		e.pair = strconv.AppendFloat(e.pair, v, 'f', -1, 64)
246	case float32:
247		v32 := float64(v)
248		if math.IsNaN(v32) {
249			return ErrIsNaN
250		}
251
252		if math.IsInf(v32, 0) {
253			return ErrIsInf
254		}
255
256		e.pair = strconv.AppendFloat(e.pair, v32, 'f', -1, 64)
257
258	case string:
259		e.pair = append(e.pair, '"')
260		e.pair = append(e.pair, stringFieldEscape(v)...)
261		e.pair = append(e.pair, '"')
262	case []byte:
263		e.pair = append(e.pair, '"')
264		stringFieldEscapeBytes(&e.pair, v)
265		e.pair = append(e.pair, '"')
266	case bool:
267		e.pair = strconv.AppendBool(e.pair, v)
268	default:
269		return &FieldError{fmt.Sprintf("invalid value type: %T", v)}
270	}
271	return nil
272}
273
274func (e *Encoder) buildFieldPair(key string, value interface{}) error {
275	e.pair = e.pair[:0]
276	key = escape(key)
277	// Some keys are not encodeable as line protocol, such as those with a
278	// trailing '\' or empty strings.
279	if key == "" || key[:len(key)-1] == "\\" {
280		return &FieldError{"invalid field key"}
281	}
282	e.pair = append(e.pair, key...)
283	e.pair = append(e.pair, '=')
284	return e.buildFieldVal(value)
285}
286
287func (e *Encoder) buildFooter(t time.Time) {
288	e.footer = e.footer[:0]
289	if !t.IsZero() {
290		e.footer = append(e.footer, ' ')
291		switch e.precision {
292		case time.Microsecond:
293			e.footer = strconv.AppendInt(e.footer, t.UnixNano()/1000, 10)
294		case time.Millisecond:
295			e.footer = strconv.AppendInt(e.footer, t.UnixNano()/1000000, 10)
296		case time.Second:
297			e.footer = strconv.AppendInt(e.footer, t.Unix(), 10)
298		default:
299			e.footer = strconv.AppendInt(e.footer, t.UnixNano(), 10)
300		}
301	}
302	e.footer = append(e.footer, '\n')
303}
304