1package protocol 2 3import ( 4 "errors" 5 "io" 6) 7 8var ( 9 ErrNameParse = errors.New("expected measurement name") 10 ErrFieldParse = errors.New("expected field") 11 ErrTagParse = errors.New("expected tag") 12 ErrTimestampParse = errors.New("expected timestamp") 13 ErrParse = errors.New("parse error") 14 EOF = errors.New("EOF") 15) 16 17%%{ 18machine LineProtocol; 19 20action begin { 21 m.pb = m.p 22} 23 24action name_error { 25 err = ErrNameParse 26 fhold; 27 fnext discard_line; 28 fbreak; 29} 30 31action field_error { 32 err = ErrFieldParse 33 fhold; 34 fnext discard_line; 35 fbreak; 36} 37 38action tagset_error { 39 err = ErrTagParse 40 fhold; 41 fnext discard_line; 42 fbreak; 43} 44 45action timestamp_error { 46 err = ErrTimestampParse 47 fhold; 48 fnext discard_line; 49 fbreak; 50} 51 52action parse_error { 53 err = ErrParse 54 fhold; 55 fnext discard_line; 56 fbreak; 57} 58 59action align_error { 60 err = ErrParse 61 fnext discard_line; 62 fbreak; 63} 64 65action hold_recover { 66 fhold; 67 fgoto main; 68} 69 70action goto_align { 71 fgoto align; 72} 73 74action begin_metric { 75 m.beginMetric = true 76} 77 78action name { 79 err = m.handler.SetMeasurement(m.text()) 80 if err != nil { 81 fhold; 82 fnext discard_line; 83 fbreak; 84 } 85} 86 87action tagkey { 88 m.key = m.text() 89} 90 91action tagvalue { 92 err = m.handler.AddTag(m.key, m.text()) 93 if err != nil { 94 fhold; 95 fnext discard_line; 96 fbreak; 97 } 98} 99 100action fieldkey { 101 m.key = m.text() 102} 103 104action integer { 105 err = m.handler.AddInt(m.key, m.text()) 106 if err != nil { 107 fhold; 108 fnext discard_line; 109 fbreak; 110 } 111} 112 113action unsigned { 114 err = m.handler.AddUint(m.key, m.text()) 115 if err != nil { 116 fhold; 117 fnext discard_line; 118 fbreak; 119 } 120} 121 122action float { 123 err = m.handler.AddFloat(m.key, m.text()) 124 if err != nil { 125 fhold; 126 fnext discard_line; 127 fbreak; 128 } 129} 130 131action bool { 132 err = m.handler.AddBool(m.key, m.text()) 133 if err != nil { 134 fhold; 135 fnext discard_line; 136 fbreak; 137 } 138} 139 140action string { 141 err = m.handler.AddString(m.key, m.text()) 142 if err != nil { 143 fhold; 144 fnext discard_line; 145 fbreak; 146 } 147} 148 149action timestamp { 150 err = m.handler.SetTimestamp(m.text()) 151 if err != nil { 152 fhold; 153 fnext discard_line; 154 fbreak; 155 } 156} 157 158action incr_newline { 159 m.lineno++ 160 m.sol = m.p 161 m.sol++ // next char will be the first column in the line 162} 163 164action eol { 165 m.finishMetric = true 166 fnext align; 167 fbreak; 168} 169 170action finish_metric { 171 m.finishMetric = true 172} 173 174ws = 175 [\t\v\f ]; 176 177newline = 178 '\r'? '\n' >incr_newline; 179 180non_zero_digit = 181 [1-9]; 182 183integer = 184 '-'? ( digit | ( non_zero_digit digit* ) ); 185 186unsigned = 187 ( digit | ( non_zero_digit digit* ) ); 188 189number = 190 '-'? (digit+ ('.' digit*)? | '.' digit+); 191 192scientific = 193 number 'e'i ["\-+"]? digit+; 194 195timestamp = 196 ('-'? digit{1,19}) >begin %timestamp; 197 198fieldkeychar = 199 [^\t\n\v\f\r ,=\\] | ( '\\' [^\t\n\v\f\r] ); 200 201fieldkey = 202 fieldkeychar+ >begin %fieldkey; 203 204fieldfloat = 205 (scientific | number) >begin %float; 206 207fieldinteger = 208 (integer 'i') >begin %integer; 209 210fieldunsigned = 211 (unsigned 'u') >begin %unsigned; 212 213false = 214 "false" | "FALSE" | "False" | "F" | "f"; 215 216true = 217 "true" | "TRUE" | "True" | "T" | "t"; 218 219fieldbool = 220 (true | false) >begin %bool; 221 222fieldstringchar = 223 [^\f\r\n\\"] | '\\' [\\"] | newline; 224 225fieldstring = 226 fieldstringchar* >begin %string; 227 228fieldstringquoted = 229 '"' fieldstring '"'; 230 231fieldvalue = fieldinteger | fieldunsigned | fieldfloat | fieldstringquoted | fieldbool; 232 233field = 234 fieldkey '=' fieldvalue; 235 236fieldset = 237 field ( ',' field )*; 238 239tagchar = 240 [^\t\n\v\f\r ,=\\] | ( '\\' [^\t\n\v\f\r\\] ) | '\\\\' %to{ fhold; }; 241 242tagkey = 243 tagchar+ >begin %tagkey; 244 245tagvalue = 246 tagchar+ >begin %eof(tagvalue) %tagvalue; 247 248tagset = 249 ((',' tagkey '=' tagvalue) $err(tagset_error))*; 250 251measurement_chars = 252 [^\t\n\v\f\r ,\\] | ( '\\' [^\t\n\v\f\r] ); 253 254measurement_start = 255 measurement_chars - '#'; 256 257measurement = 258 (measurement_start measurement_chars*) >begin %eof(name) %name; 259 260eol_break = 261 newline %to(eol) 262 ; 263 264metric = 265 measurement >err(name_error) 266 tagset 267 ws+ fieldset $err(field_error) 268 (ws+ timestamp)? $err(timestamp_error) 269 ; 270 271line_with_term = 272 ws* metric ws* eol_break 273 ; 274 275line_without_term = 276 ws* metric ws* 277 ; 278 279main := 280 (line_with_term* 281 (line_with_term | line_without_term?) 282 ) >begin_metric %eof(finish_metric) 283 ; 284 285# The discard_line machine discards the current line. Useful for recovering 286# on the next line when an error occurs. 287discard_line := 288 (any -- newline)* newline @goto_align; 289 290commentline = 291 ws* '#' (any -- newline)* newline; 292 293emptyline = 294 ws* newline; 295 296# The align machine scans forward to the start of the next line. This machine 297# is used to skip over whitespace and comments, keeping this logic out of the 298# main machine. 299# 300# Skip valid lines that don't contain line protocol, any other data will move 301# control to the main parser via the err action. 302align := 303 (emptyline | commentline | ws+)* %err(hold_recover); 304 305# Series is a machine for matching measurement+tagset 306series := 307 (measurement >err(name_error) tagset eol_break?) 308 >begin_metric 309 ; 310}%% 311 312%% write data; 313 314type Handler interface { 315 SetMeasurement(name []byte) error 316 AddTag(key []byte, value []byte) error 317 AddInt(key []byte, value []byte) error 318 AddUint(key []byte, value []byte) error 319 AddFloat(key []byte, value []byte) error 320 AddString(key []byte, value []byte) error 321 AddBool(key []byte, value []byte) error 322 SetTimestamp(tm []byte) error 323} 324 325type machine struct { 326 data []byte 327 cs int 328 p, pe, eof int 329 pb int 330 lineno int 331 sol int 332 handler Handler 333 initState int 334 key []byte 335 beginMetric bool 336 finishMetric bool 337} 338 339func NewMachine(handler Handler) *machine { 340 m := &machine{ 341 handler: handler, 342 initState: LineProtocol_en_align, 343 } 344 345 %% access m.; 346 %% variable p m.p; 347 %% variable cs m.cs; 348 %% variable pe m.pe; 349 %% variable eof m.eof; 350 %% variable data m.data; 351 %% write init; 352 353 return m 354} 355 356func NewSeriesMachine(handler Handler) *machine { 357 m := &machine{ 358 handler: handler, 359 initState: LineProtocol_en_series, 360 } 361 362 %% access m.; 363 %% variable p m.p; 364 %% variable pe m.pe; 365 %% variable eof m.eof; 366 %% variable data m.data; 367 %% write init; 368 369 return m 370} 371 372func (m *machine) SetData(data []byte) { 373 m.data = data 374 m.p = 0 375 m.pb = 0 376 m.lineno = 1 377 m.sol = 0 378 m.pe = len(data) 379 m.eof = len(data) 380 m.key = nil 381 m.beginMetric = false 382 m.finishMetric = false 383 384 %% write init; 385 m.cs = m.initState 386} 387 388// Next parses the next metric line and returns nil if it was successfully 389// processed. If the line contains a syntax error an error is returned, 390// otherwise if the end of file is reached before finding a metric line then 391// EOF is returned. 392func (m *machine) Next() error { 393 if m.p == m.pe && m.pe == m.eof { 394 return EOF 395 } 396 397 m.key = nil 398 m.beginMetric = false 399 m.finishMetric = false 400 401 return m.exec() 402} 403 404func (m *machine) exec() error { 405 var err error 406 %% write exec; 407 408 if err != nil { 409 return err 410 } 411 412 // This would indicate an error in the machine that was reported with a 413 // more specific error. We return a generic error but this should 414 // possibly be a panic. 415 if m.cs == %%{ write error; }%% { 416 m.cs = LineProtocol_en_discard_line 417 return ErrParse 418 } 419 420 // If we haven't found a metric line yet and we reached the EOF, report it 421 // now. This happens when the data ends with a comment or whitespace. 422 // 423 // Otherwise we have successfully parsed a metric line, so if we are at 424 // the EOF we will report it the next call. 425 if !m.beginMetric && m.p == m.pe && m.pe == m.eof { 426 return EOF 427 } 428 429 return nil 430} 431 432// Position returns the current byte offset into the data. 433func (m *machine) Position() int { 434 return m.p 435} 436 437// LineOffset returns the byte offset of the current line. 438func (m *machine) LineOffset() int { 439 return m.sol 440} 441 442// LineNumber returns the current line number. Lines are counted based on the 443// regular expression `\r?\n`. 444func (m *machine) LineNumber() int { 445 return m.lineno 446} 447 448// Column returns the current column. 449func (m *machine) Column() int { 450 lineOffset := m.p - m.sol 451 return lineOffset + 1 452} 453 454func (m *machine) text() []byte { 455 return m.data[m.pb:m.p] 456} 457 458type streamMachine struct { 459 machine *machine 460 reader io.Reader 461} 462 463func NewStreamMachine(r io.Reader, handler Handler) *streamMachine { 464 m := &streamMachine{ 465 machine: NewMachine(handler), 466 reader: r, 467 } 468 469 m.machine.SetData(make([]byte, 1024)) 470 m.machine.pe = 0 471 m.machine.eof = -1 472 return m 473} 474 475func (m *streamMachine) Next() error { 476 // Check if we are already at EOF, this should only happen if called again 477 // after already returning EOF. 478 if m.machine.p == m.machine.pe && m.machine.pe == m.machine.eof { 479 return EOF 480 } 481 482 copy(m.machine.data, m.machine.data[m.machine.p:]) 483 m.machine.pe = m.machine.pe - m.machine.p 484 m.machine.sol = m.machine.sol - m.machine.p 485 m.machine.pb = 0 486 m.machine.p = 0 487 m.machine.eof = -1 488 489 m.machine.key = nil 490 m.machine.beginMetric = false 491 m.machine.finishMetric = false 492 493 for { 494 // Expand the buffer if it is full 495 if m.machine.pe == len(m.machine.data) { 496 expanded := make([]byte, 2 * len(m.machine.data)) 497 copy(expanded, m.machine.data) 498 m.machine.data = expanded 499 } 500 501 n, err := m.reader.Read(m.machine.data[m.machine.pe:]) 502 if n == 0 && err == io.EOF { 503 m.machine.eof = m.machine.pe 504 } else if err != nil && err != io.EOF { 505 return err 506 } 507 508 m.machine.pe += n 509 510 err = m.machine.exec() 511 if err != nil { 512 return err 513 } 514 515 // If we have successfully parsed a full metric line break out 516 if m.machine.finishMetric { 517 break 518 } 519 520 } 521 522 return nil 523} 524 525// Position returns the current byte offset into the data. 526func (m *streamMachine) Position() int { 527 return m.machine.Position() 528} 529 530// LineOffset returns the byte offset of the current line. 531func (m *streamMachine) LineOffset() int { 532 return m.machine.LineOffset() 533} 534 535// LineNumber returns the current line number. Lines are counted based on the 536// regular expression `\r?\n`. 537func (m *streamMachine) LineNumber() int { 538 return m.machine.LineNumber() 539} 540 541// Column returns the current column. 542func (m *streamMachine) Column() int { 543 return m.machine.Column() 544} 545 546// LineText returns the text of the current line that has been parsed so far. 547func (m *streamMachine) LineText() string { 548 return string(m.machine.data[0:m.machine.p]) 549} 550