1package protocol 2 3import ( 4 "fmt" 5 "io" 6 "strings" 7 "sync" 8 "time" 9) 10 11const ( 12 maxErrorBufferSize = 1024 13) 14 15// TimeFunc is used to override the default time for a metric 16// with no specified timestamp. 17type TimeFunc func() time.Time 18 19// ParseError indicates a error in the parsing of the text. 20type ParseError struct { 21 Offset int 22 LineOffset int 23 LineNumber int 24 Column int 25 msg string 26 buf string 27} 28 29func (e *ParseError) Error() string { 30 buffer := e.buf[e.LineOffset:] 31 eol := strings.IndexAny(buffer, "\r\n") 32 if eol >= 0 { 33 buffer = buffer[:eol] 34 } 35 if len(buffer) > maxErrorBufferSize { 36 buffer = buffer[:maxErrorBufferSize] + "..." 37 } 38 return fmt.Sprintf("metric parse error: %s at %d:%d: %q", e.msg, e.LineNumber, e.Column, buffer) 39} 40 41// Parser is an InfluxDB Line Protocol parser that implements the 42// parsers.Parser interface. 43type Parser struct { 44 DefaultTags map[string]string 45 46 sync.Mutex 47 *machine 48 handler *MetricHandler 49} 50 51// NewParser returns a Parser than accepts line protocol 52func NewParser(handler *MetricHandler) *Parser { 53 return &Parser{ 54 machine: NewMachine(handler), 55 handler: handler, 56 } 57} 58 59// NewSeriesParser returns a Parser than accepts a measurement and tagset 60func NewSeriesParser(handler *MetricHandler) *Parser { 61 return &Parser{ 62 machine: NewSeriesMachine(handler), 63 handler: handler, 64 } 65} 66 67// SetTimeFunc allows default times to be set when no time is specified 68// for a metric in line-protocol. 69func (p *Parser) SetTimeFunc(f TimeFunc) { 70 p.handler.SetTimeFunc(f) 71} 72 73// Parse interprets line-protocol bytes as many metrics. 74func (p *Parser) Parse(input []byte) ([]Metric, error) { 75 p.Lock() 76 defer p.Unlock() 77 metrics := make([]Metric, 0) 78 p.machine.SetData(input) 79 80 for { 81 err := p.machine.Next() 82 if err == EOF { 83 break 84 } 85 86 if err != nil { 87 return nil, &ParseError{ 88 Offset: p.machine.Position(), 89 LineOffset: p.machine.LineOffset(), 90 LineNumber: p.machine.LineNumber(), 91 Column: p.machine.Column(), 92 msg: err.Error(), 93 buf: string(input), 94 } 95 } 96 97 metric, err := p.handler.Metric() 98 if err != nil { 99 return nil, err 100 } 101 102 if metric == nil { 103 continue 104 } 105 106 metrics = append(metrics, metric) 107 } 108 109 return metrics, nil 110} 111 112// StreamParser is an InfluxDB Line Protocol parser. It is not safe for 113// concurrent use in multiple goroutines. 114type StreamParser struct { 115 machine *streamMachine 116 handler *MetricHandler 117} 118 119// NewStreamParser parses from a reader and iterates the machine 120// metric by metric. Not safe for concurrent use in multiple goroutines. 121func NewStreamParser(r io.Reader) *StreamParser { 122 handler := NewMetricHandler() 123 return &StreamParser{ 124 machine: NewStreamMachine(r, handler), 125 handler: handler, 126 } 127} 128 129// SetTimeFunc changes the function used to determine the time of metrics 130// without a timestamp. The default TimeFunc is time.Now. Useful mostly for 131// testing, or perhaps if you want all metrics to have the same timestamp. 132func (p *StreamParser) SetTimeFunc(f TimeFunc) { 133 p.handler.SetTimeFunc(f) 134} 135 136// SetTimePrecision specifies units for the time stamp. 137func (p *StreamParser) SetTimePrecision(u time.Duration) { 138 p.handler.SetTimePrecision(u) 139} 140 141// Next parses the next item from the stream. You can repeat calls to this 142// function until it returns EOF. 143func (p *StreamParser) Next() (Metric, error) { 144 err := p.machine.Next() 145 if err == EOF { 146 return nil, EOF 147 } 148 149 if err != nil { 150 return nil, &ParseError{ 151 Offset: p.machine.Position(), 152 LineOffset: p.machine.LineOffset(), 153 LineNumber: p.machine.LineNumber(), 154 Column: p.machine.Column(), 155 msg: err.Error(), 156 buf: p.machine.LineText(), 157 } 158 } 159 160 metric, err := p.handler.Metric() 161 if err != nil { 162 return nil, err 163 } 164 165 return metric, nil 166} 167 168// Position returns the current byte offset into the data. 169func (p *StreamParser) Position() int { 170 return p.machine.Position() 171} 172 173// LineOffset returns the byte offset of the current line. 174func (p *StreamParser) LineOffset() int { 175 return p.machine.LineOffset() 176} 177 178// LineNumber returns the current line number. Lines are counted based on the 179// regular expression `\r?\n`. 180func (p *StreamParser) LineNumber() int { 181 return p.machine.LineNumber() 182} 183 184// Column returns the current column. 185func (p *StreamParser) Column() int { 186 return p.machine.Column() 187} 188 189// LineText returns the text of the current line that has been parsed so far. 190func (p *StreamParser) LineText() string { 191 return p.machine.LineText() 192} 193