1package protocol
2
3import (
4	"fmt"
5	"io"
6	"strings"
7	"sync"
8	"time"
9)
10
11const (
12	maxErrorBufferSize = 1024
13)
14
15// TimeFunc is used to override the default time for a metric
16// with no specified timestamp.
17type TimeFunc func() time.Time
18
19// ParseError indicates a error in the parsing of the text.
20type ParseError struct {
21	Offset     int
22	LineOffset int
23	LineNumber int
24	Column     int
25	msg        string
26	buf        string
27}
28
29func (e *ParseError) Error() string {
30	buffer := e.buf[e.LineOffset:]
31	eol := strings.IndexAny(buffer, "\r\n")
32	if eol >= 0 {
33		buffer = buffer[:eol]
34	}
35	if len(buffer) > maxErrorBufferSize {
36		buffer = buffer[:maxErrorBufferSize] + "..."
37	}
38	return fmt.Sprintf("metric parse error: %s at %d:%d: %q", e.msg, e.LineNumber, e.Column, buffer)
39}
40
41// Parser is an InfluxDB Line Protocol parser that implements the
42// parsers.Parser interface.
43type Parser struct {
44	DefaultTags map[string]string
45
46	sync.Mutex
47	*machine
48	handler *MetricHandler
49}
50
51// NewParser returns a Parser than accepts line protocol
52func NewParser(handler *MetricHandler) *Parser {
53	return &Parser{
54		machine: NewMachine(handler),
55		handler: handler,
56	}
57}
58
59// NewSeriesParser returns a Parser than accepts a measurement and tagset
60func NewSeriesParser(handler *MetricHandler) *Parser {
61	return &Parser{
62		machine: NewSeriesMachine(handler),
63		handler: handler,
64	}
65}
66
67// SetTimeFunc allows default times to be set when no time is specified
68// for a metric in line-protocol.
69func (p *Parser) SetTimeFunc(f TimeFunc) {
70	p.handler.SetTimeFunc(f)
71}
72
73// Parse interprets line-protocol bytes as many metrics.
74func (p *Parser) Parse(input []byte) ([]Metric, error) {
75	p.Lock()
76	defer p.Unlock()
77	metrics := make([]Metric, 0)
78	p.machine.SetData(input)
79
80	for {
81		err := p.machine.Next()
82		if err == EOF {
83			break
84		}
85
86		if err != nil {
87			return nil, &ParseError{
88				Offset:     p.machine.Position(),
89				LineOffset: p.machine.LineOffset(),
90				LineNumber: p.machine.LineNumber(),
91				Column:     p.machine.Column(),
92				msg:        err.Error(),
93				buf:        string(input),
94			}
95		}
96
97		metric, err := p.handler.Metric()
98		if err != nil {
99			return nil, err
100		}
101
102		if metric == nil {
103			continue
104		}
105
106		metrics = append(metrics, metric)
107	}
108
109	return metrics, nil
110}
111
112// StreamParser is an InfluxDB Line Protocol parser.  It is not safe for
113// concurrent use in multiple goroutines.
114type StreamParser struct {
115	machine *streamMachine
116	handler *MetricHandler
117}
118
119// NewStreamParser parses from a reader and iterates the machine
120// metric by metric.  Not safe for concurrent use in multiple goroutines.
121func NewStreamParser(r io.Reader) *StreamParser {
122	handler := NewMetricHandler()
123	return &StreamParser{
124		machine: NewStreamMachine(r, handler),
125		handler: handler,
126	}
127}
128
129// SetTimeFunc changes the function used to determine the time of metrics
130// without a timestamp.  The default TimeFunc is time.Now.  Useful mostly for
131// testing, or perhaps if you want all metrics to have the same timestamp.
132func (p *StreamParser) SetTimeFunc(f TimeFunc) {
133	p.handler.SetTimeFunc(f)
134}
135
136// SetTimePrecision specifies units for the time stamp.
137func (p *StreamParser) SetTimePrecision(u time.Duration) {
138	p.handler.SetTimePrecision(u)
139}
140
141// Next parses the next item from the stream.  You can repeat calls to this
142// function until it returns EOF.
143func (p *StreamParser) Next() (Metric, error) {
144	err := p.machine.Next()
145	if err == EOF {
146		return nil, EOF
147	}
148
149	if err != nil {
150		return nil, &ParseError{
151			Offset:     p.machine.Position(),
152			LineOffset: p.machine.LineOffset(),
153			LineNumber: p.machine.LineNumber(),
154			Column:     p.machine.Column(),
155			msg:        err.Error(),
156			buf:        p.machine.LineText(),
157		}
158	}
159
160	metric, err := p.handler.Metric()
161	if err != nil {
162		return nil, err
163	}
164
165	return metric, nil
166}
167
168// Position returns the current byte offset into the data.
169func (p *StreamParser) Position() int {
170	return p.machine.Position()
171}
172
173// LineOffset returns the byte offset of the current line.
174func (p *StreamParser) LineOffset() int {
175	return p.machine.LineOffset()
176}
177
178// LineNumber returns the current line number.  Lines are counted based on the
179// regular expression `\r?\n`.
180func (p *StreamParser) LineNumber() int {
181	return p.machine.LineNumber()
182}
183
184// Column returns the current column.
185func (p *StreamParser) Column() int {
186	return p.machine.Column()
187}
188
189// LineText returns the text of the current line that has been parsed so far.
190func (p *StreamParser) LineText() string {
191	return p.machine.LineText()
192}
193