1package protocol
2
3import (
4	"errors"
5	"io"
6)
7
8var (
9	ErrNameParse = errors.New("expected measurement name")
10	ErrFieldParse = errors.New("expected field")
11	ErrTagParse = errors.New("expected tag")
12	ErrTimestampParse = errors.New("expected timestamp")
13	ErrParse = errors.New("parse error")
14	EOF = errors.New("EOF")
15)
16
17%%{
18machine LineProtocol;
19
20action begin {
21	m.pb = m.p
22}
23
24action name_error {
25	err = ErrNameParse
26	fhold;
27	fnext discard_line;
28	fbreak;
29}
30
31action field_error {
32	err = ErrFieldParse
33	fhold;
34	fnext discard_line;
35	fbreak;
36}
37
38action tagset_error {
39	err = ErrTagParse
40	fhold;
41	fnext discard_line;
42	fbreak;
43}
44
45action timestamp_error {
46	err = ErrTimestampParse
47	fhold;
48	fnext discard_line;
49	fbreak;
50}
51
52action parse_error {
53	err = ErrParse
54	fhold;
55	fnext discard_line;
56	fbreak;
57}
58
59action align_error {
60	err = ErrParse
61	fnext discard_line;
62	fbreak;
63}
64
65action hold_recover {
66	fhold;
67	fgoto main;
68}
69
70action goto_align {
71	fgoto align;
72}
73
74action begin_metric {
75	m.beginMetric = true
76}
77
78action name {
79	err = m.handler.SetMeasurement(m.text())
80	if err != nil {
81		fhold;
82		fnext discard_line;
83		fbreak;
84	}
85}
86
87action tagkey {
88	m.key = m.text()
89}
90
91action tagvalue {
92	err = m.handler.AddTag(m.key, m.text())
93	if err != nil {
94		fhold;
95		fnext discard_line;
96		fbreak;
97	}
98}
99
100action fieldkey {
101	m.key = m.text()
102}
103
104action integer {
105	err = m.handler.AddInt(m.key, m.text())
106	if err != nil {
107		fhold;
108		fnext discard_line;
109		fbreak;
110	}
111}
112
113action unsigned {
114	err = m.handler.AddUint(m.key, m.text())
115	if err != nil {
116		fhold;
117		fnext discard_line;
118		fbreak;
119	}
120}
121
122action float {
123	err = m.handler.AddFloat(m.key, m.text())
124	if err != nil {
125		fhold;
126		fnext discard_line;
127		fbreak;
128	}
129}
130
131action bool {
132	err = m.handler.AddBool(m.key, m.text())
133	if err != nil {
134		fhold;
135		fnext discard_line;
136		fbreak;
137	}
138}
139
140action string {
141	err = m.handler.AddString(m.key, m.text())
142	if err != nil {
143		fhold;
144		fnext discard_line;
145		fbreak;
146	}
147}
148
149action timestamp {
150	err = m.handler.SetTimestamp(m.text())
151	if err != nil {
152		fhold;
153		fnext discard_line;
154		fbreak;
155	}
156}
157
158action incr_newline {
159	m.lineno++
160	m.sol = m.p
161	m.sol++ // next char will be the first column in the line
162}
163
164action eol {
165	m.finishMetric = true
166	fnext align;
167	fbreak;
168}
169
170action finish_metric {
171	m.finishMetric = true
172}
173
174ws =
175	[\t\v\f ];
176
177newline =
178	'\r'? '\n' >incr_newline;
179
180non_zero_digit =
181	[1-9];
182
183integer =
184	'-'? ( digit | ( non_zero_digit digit* ) );
185
186unsigned =
187	( digit | ( non_zero_digit digit* ) );
188
189number =
190	'-'? (digit+ ('.' digit*)? | '.' digit+);
191
192scientific =
193	number 'e'i ["\-+"]? digit+;
194
195timestamp =
196	('-'? digit{1,19}) >begin %timestamp;
197
198fieldkeychar =
199	[^\t\n\v\f\r ,=\\] | ( '\\' [^\t\n\v\f\r] );
200
201fieldkey =
202	fieldkeychar+ >begin %fieldkey;
203
204fieldfloat =
205	(scientific | number) >begin %float;
206
207fieldinteger =
208	(integer 'i') >begin %integer;
209
210fieldunsigned =
211	(unsigned 'u') >begin %unsigned;
212
213false =
214	"false" | "FALSE" | "False" | "F" | "f";
215
216true =
217	"true" | "TRUE" | "True" | "T" | "t";
218
219fieldbool =
220	(true | false) >begin %bool;
221
222fieldstringchar =
223	[^\f\r\n\\"] | '\\' [\\"] | newline;
224
225fieldstring =
226	fieldstringchar* >begin %string;
227
228fieldstringquoted =
229	'"' fieldstring '"';
230
231fieldvalue = fieldinteger | fieldunsigned | fieldfloat | fieldstringquoted | fieldbool;
232
233field =
234	fieldkey '=' fieldvalue;
235
236fieldset =
237	field ( ',' field )*;
238
239tagchar =
240	[^\t\n\v\f\r ,=\\] | ( '\\' [^\t\n\v\f\r\\] ) | '\\\\' %to{ fhold; };
241
242tagkey =
243	tagchar+ >begin %tagkey;
244
245tagvalue =
246	tagchar+ >begin %eof(tagvalue) %tagvalue;
247
248tagset =
249	((',' tagkey '=' tagvalue) $err(tagset_error))*;
250
251measurement_chars =
252	[^\t\n\v\f\r ,\\] | ( '\\' [^\t\n\v\f\r] );
253
254measurement_start =
255	measurement_chars - '#';
256
257measurement =
258	(measurement_start measurement_chars*) >begin %eof(name) %name;
259
260eol_break =
261	newline %to(eol)
262	;
263
264metric =
265	measurement >err(name_error)
266	tagset
267	ws+ fieldset $err(field_error)
268	(ws+ timestamp)? $err(timestamp_error)
269	;
270
271line_with_term =
272	ws* metric ws* eol_break
273	;
274
275line_without_term =
276	ws* metric ws*
277	;
278
279main :=
280	(line_with_term*
281	(line_with_term | line_without_term?)
282    ) >begin_metric %eof(finish_metric)
283	;
284
285# The discard_line machine discards the current line.  Useful for recovering
286# on the next line when an error occurs.
287discard_line :=
288	(any -- newline)* newline @goto_align;
289
290commentline =
291	ws* '#' (any -- newline)* newline;
292
293emptyline =
294	ws* newline;
295
296# The align machine scans forward to the start of the next line.  This machine
297# is used to skip over whitespace and comments, keeping this logic out of the
298# main machine.
299#
300# Skip valid lines that don't contain line protocol, any other data will move
301# control to the main parser via the err action.
302align :=
303	(emptyline | commentline | ws+)* %err(hold_recover);
304
305# Series is a machine for matching measurement+tagset
306series :=
307	(measurement >err(name_error) tagset eol_break?)
308	>begin_metric
309	;
310}%%
311
312%% write data;
313
314type Handler interface {
315	SetMeasurement(name []byte) error
316	AddTag(key []byte, value []byte) error
317	AddInt(key []byte, value []byte) error
318	AddUint(key []byte, value []byte) error
319	AddFloat(key []byte, value []byte) error
320	AddString(key []byte, value []byte) error
321	AddBool(key []byte, value []byte) error
322	SetTimestamp(tm []byte) error
323}
324
325type machine struct {
326	data         []byte
327	cs           int
328	p, pe, eof   int
329	pb           int
330	lineno       int
331	sol          int
332	handler      Handler
333	initState    int
334	key          []byte
335	beginMetric  bool
336	finishMetric bool
337}
338
339func NewMachine(handler Handler) *machine {
340	m := &machine{
341		handler: handler,
342		initState: LineProtocol_en_align,
343	}
344
345	%% access m.;
346	%% variable p m.p;
347	%% variable cs m.cs;
348	%% variable pe m.pe;
349	%% variable eof m.eof;
350	%% variable data m.data;
351	%% write init;
352
353	return m
354}
355
356func NewSeriesMachine(handler Handler) *machine {
357	m := &machine{
358		handler: handler,
359		initState: LineProtocol_en_series,
360	}
361
362	%% access m.;
363	%% variable p m.p;
364	%% variable pe m.pe;
365	%% variable eof m.eof;
366	%% variable data m.data;
367	%% write init;
368
369	return m
370}
371
372func (m *machine) SetData(data []byte) {
373	m.data = data
374	m.p = 0
375	m.pb = 0
376	m.lineno = 1
377	m.sol = 0
378	m.pe = len(data)
379	m.eof = len(data)
380	m.key = nil
381	m.beginMetric = false
382	m.finishMetric = false
383
384	%% write init;
385	m.cs = m.initState
386}
387
388// Next parses the next metric line and returns nil if it was successfully
389// processed.  If the line contains a syntax error an error is returned,
390// otherwise if the end of file is reached before finding a metric line then
391// EOF is returned.
392func (m *machine) Next() error {
393	if m.p == m.pe && m.pe == m.eof {
394		return EOF
395	}
396
397	m.key = nil
398	m.beginMetric = false
399	m.finishMetric = false
400
401	return m.exec()
402}
403
404func (m *machine) exec() error {
405	var err error
406	%% write exec;
407
408	if err != nil {
409		return err
410	}
411
412	// This would indicate an error in the machine that was reported with a
413	// more specific error.  We return a generic error but this should
414	// possibly be a panic.
415	if m.cs == %%{ write error; }%% {
416		m.cs = LineProtocol_en_discard_line
417		return ErrParse
418	}
419
420	// If we haven't found a metric line yet and we reached the EOF, report it
421	// now.  This happens when the data ends with a comment or whitespace.
422	//
423	// Otherwise we have successfully parsed a metric line, so if we are at
424	// the EOF we will report it the next call.
425	if !m.beginMetric && m.p == m.pe && m.pe == m.eof {
426		return EOF
427	}
428
429	return nil
430}
431
432// Position returns the current byte offset into the data.
433func (m *machine) Position() int {
434	return m.p
435}
436
437// LineOffset returns the byte offset of the current line.
438func (m *machine) LineOffset() int {
439	return m.sol
440}
441
442// LineNumber returns the current line number.  Lines are counted based on the
443// regular expression `\r?\n`.
444func (m *machine) LineNumber() int {
445	return m.lineno
446}
447
448// Column returns the current column.
449func (m *machine) Column() int {
450	lineOffset := m.p - m.sol
451	return lineOffset + 1
452}
453
454func (m *machine) text() []byte {
455	return m.data[m.pb:m.p]
456}
457
458type streamMachine struct {
459	machine *machine
460	reader  io.Reader
461}
462
463func NewStreamMachine(r io.Reader, handler Handler) *streamMachine {
464	m := &streamMachine{
465		machine: NewMachine(handler),
466		reader: r,
467	}
468
469	m.machine.SetData(make([]byte, 1024))
470	m.machine.pe = 0
471	m.machine.eof = -1
472	return m
473}
474
475func (m *streamMachine) Next() error {
476	// Check if we are already at EOF, this should only happen if called again
477	// after already returning EOF.
478	if m.machine.p == m.machine.pe && m.machine.pe == m.machine.eof {
479		return EOF
480	}
481
482	copy(m.machine.data, m.machine.data[m.machine.p:])
483	m.machine.pe = m.machine.pe - m.machine.p
484	m.machine.sol = m.machine.sol - m.machine.p
485	m.machine.pb = 0
486	m.machine.p = 0
487	m.machine.eof = -1
488
489	m.machine.key = nil
490	m.machine.beginMetric = false
491	m.machine.finishMetric = false
492
493	for {
494		// Expand the buffer if it is full
495		if m.machine.pe == len(m.machine.data) {
496			expanded := make([]byte, 2 * len(m.machine.data))
497			copy(expanded, m.machine.data)
498			m.machine.data = expanded
499		}
500
501		n, err := m.reader.Read(m.machine.data[m.machine.pe:])
502		if n == 0 && err == io.EOF {
503			m.machine.eof = m.machine.pe
504		} else if err != nil && err != io.EOF {
505			return err
506		}
507
508		m.machine.pe += n
509
510		err = m.machine.exec()
511		if err != nil {
512			return err
513		}
514
515		// If we have successfully parsed a full metric line break out
516		if m.machine.finishMetric {
517			break
518		}
519
520	}
521
522	return nil
523}
524
525// Position returns the current byte offset into the data.
526func (m *streamMachine) Position() int {
527	return m.machine.Position()
528}
529
530// LineOffset returns the byte offset of the current line.
531func (m *streamMachine) LineOffset() int {
532	return m.machine.LineOffset()
533}
534
535// LineNumber returns the current line number.  Lines are counted based on the
536// regular expression `\r?\n`.
537func (m *streamMachine) LineNumber() int {
538	return m.machine.LineNumber()
539}
540
541// Column returns the current column.
542func (m *streamMachine) Column() int {
543	return m.machine.Column()
544}
545
546// LineText returns the text of the current line that has been parsed so far.
547func (m *streamMachine) LineText() string {
548	return string(m.machine.data[0:m.machine.p])
549}
550