1package protocol 2 3import ( 4 "fmt" 5 "io" 6 "math" 7 "sort" 8 "strconv" 9 "time" 10) 11 12// ErrIsNaN is a field error for when a float field is NaN. 13var ErrIsNaN = &FieldError{"is NaN"} 14 15// ErrIsInf is a field error for when a float field is Inf. 16var ErrIsInf = &FieldError{"is Inf"} 17 18// Encoder marshals Metrics into influxdb line protocol. 19// It is not safe for concurrent use, make a new one! 20// The default behavior when encountering a field error is to ignore the field and move on. 21// If you wish it to error out on field errors, use Encoder.FailOnFieldErr(true) 22type Encoder struct { 23 w io.Writer 24 fieldSortOrder FieldSortOrder 25 fieldTypeSupport FieldTypeSupport 26 failOnFieldError bool 27 maxLineBytes int 28 fieldList []*Field 29 header []byte 30 footer []byte 31 pair []byte 32 precision time.Duration 33} 34 35// SetMaxLineBytes sets a maximum length for a line, Encode will error if the generated line is longer 36func (e *Encoder) SetMaxLineBytes(i int) { 37 e.maxLineBytes = i 38} 39 40// SetFieldSortOrder sets a sort order for the data. 41// The options are: 42// NoSortFields (doesn't sort the fields) 43// SortFields (sorts the keys in alphabetical order) 44func (e *Encoder) SetFieldSortOrder(s FieldSortOrder) { 45 e.fieldSortOrder = s 46} 47 48// SetFieldTypeSupport sets flags for if the encoder supports certain optional field types such as uint64 49func (e *Encoder) SetFieldTypeSupport(s FieldTypeSupport) { 50 e.fieldTypeSupport = s 51} 52 53// FailOnFieldErr whether or not to fail on a field error or just move on. 54// The default behavior to move on 55func (e *Encoder) FailOnFieldErr(s bool) { 56 e.failOnFieldError = s 57} 58 59// SetPrecision sets time precision for writes 60// Default is nanoseconds precision 61func (e *Encoder) SetPrecision(p time.Duration) { 62 e.precision = p 63} 64 65// NewEncoder gives us an encoder that marshals to a writer in influxdb line protocol 66// as defined by: 67// https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ 68func NewEncoder(w io.Writer) *Encoder { 69 return &Encoder{ 70 w: w, 71 header: make([]byte, 0, 128), 72 footer: make([]byte, 0, 128), 73 pair: make([]byte, 0, 128), 74 fieldList: make([]*Field, 0, 16), 75 precision: time.Nanosecond, 76 } 77} 78 79// This is here to significantly reduce allocations, wish that we had constant/immutable keyword that applied to 80// more complex objects 81var comma = []byte(",") 82 83// Encode marshals a Metric to the io.Writer in the Encoder 84func (e *Encoder) Encode(m Metric) (int, error) { 85 err := e.buildHeader(m) 86 if err != nil { 87 return 0, err 88 } 89 90 e.buildFooter(m.Time()) 91 92 // here we make a copy of the *fields so we can do an in-place sort 93 e.fieldList = append(e.fieldList[:0], m.FieldList()...) 94 95 if e.fieldSortOrder == SortFields { 96 sort.Slice(e.fieldList, func(i, j int) bool { 97 return e.fieldList[i].Key < e.fieldList[j].Key 98 }) 99 } 100 i := 0 101 totalWritten := 0 102 pairsLen := 0 103 firstField := true 104 for _, field := range e.fieldList { 105 err = e.buildFieldPair(field.Key, field.Value) 106 if err != nil { 107 if e.failOnFieldError { 108 return 0, err 109 } 110 continue 111 } 112 113 bytesNeeded := len(e.header) + pairsLen + len(e.pair) + len(e.footer) 114 115 // Additional length needed for field separator `,` 116 if !firstField { 117 bytesNeeded++ 118 } 119 120 if e.maxLineBytes > 0 && bytesNeeded > e.maxLineBytes { 121 // Need at least one field per line 122 if firstField { 123 return 0, ErrNeedMoreSpace 124 } 125 126 i, err = e.w.Write(e.footer) 127 if err != nil { 128 return 0, err 129 } 130 pairsLen = 0 131 totalWritten += i 132 133 bytesNeeded = len(e.header) + len(e.pair) + len(e.footer) 134 135 if e.maxLineBytes > 0 && bytesNeeded > e.maxLineBytes { 136 return 0, ErrNeedMoreSpace 137 } 138 139 i, err = e.w.Write(e.header) 140 if err != nil { 141 return 0, err 142 } 143 totalWritten += i 144 145 i, err = e.w.Write(e.pair) 146 if err != nil { 147 return 0, err 148 } 149 totalWritten += i 150 151 pairsLen += len(e.pair) 152 firstField = false 153 continue 154 } 155 156 if firstField { 157 i, err = e.w.Write(e.header) 158 if err != nil { 159 return 0, err 160 } 161 totalWritten += i 162 163 } else { 164 i, err = e.w.Write(comma) 165 if err != nil { 166 return 0, err 167 } 168 totalWritten += i 169 170 } 171 172 i, err = e.w.Write(e.pair) 173 if err != nil { 174 return 0, err 175 } 176 totalWritten += i 177 178 pairsLen += len(e.pair) 179 firstField = false 180 } 181 182 if firstField { 183 return 0, ErrNoFields 184 } 185 i, err = e.w.Write(e.footer) 186 if err != nil { 187 return 0, err 188 } 189 totalWritten += i 190 return totalWritten, nil 191 192} 193 194func (e *Encoder) buildHeader(m Metric) error { 195 e.header = e.header[:0] 196 name := nameEscape(m.Name()) 197 if name == "" { 198 return ErrInvalidName 199 } 200 e.header = append(e.header, name...) 201 202 for _, tag := range m.TagList() { 203 key := escape(tag.Key) 204 value := escape(tag.Value) 205 206 // Some keys and values are not encodeable as line protocol, such as 207 // those with a trailing '\' or empty strings. 208 if key == "" || value == "" { 209 continue 210 } 211 212 e.header = append(e.header, ',') 213 e.header = append(e.header, key...) 214 e.header = append(e.header, '=') 215 e.header = append(e.header, value...) 216 } 217 218 e.header = append(e.header, ' ') 219 return nil 220} 221 222func (e *Encoder) buildFieldVal(value interface{}) error { 223 switch v := value.(type) { 224 case uint64: 225 if e.fieldTypeSupport&UintSupport != 0 { 226 e.pair = append(strconv.AppendUint(e.pair, v, 10), 'u') 227 } else if v <= uint64(math.MaxInt64) { 228 e.pair = append(strconv.AppendInt(e.pair, int64(v), 10), 'i') 229 } else { 230 e.pair = append(strconv.AppendInt(e.pair, math.MaxInt64, 10), 'i') 231 } 232 case int64: 233 e.pair = append(strconv.AppendInt(e.pair, v, 10), 'i') 234 case int: 235 e.pair = append(strconv.AppendInt(e.pair, int64(v), 10), 'i') 236 case float64: 237 if math.IsNaN(v) { 238 return ErrIsNaN 239 } 240 241 if math.IsInf(v, 0) { 242 return ErrIsInf 243 } 244 245 e.pair = strconv.AppendFloat(e.pair, v, 'f', -1, 64) 246 case float32: 247 v32 := float64(v) 248 if math.IsNaN(v32) { 249 return ErrIsNaN 250 } 251 252 if math.IsInf(v32, 0) { 253 return ErrIsInf 254 } 255 256 e.pair = strconv.AppendFloat(e.pair, v32, 'f', -1, 64) 257 258 case string: 259 e.pair = append(e.pair, '"') 260 e.pair = append(e.pair, stringFieldEscape(v)...) 261 e.pair = append(e.pair, '"') 262 case []byte: 263 e.pair = append(e.pair, '"') 264 stringFieldEscapeBytes(&e.pair, v) 265 e.pair = append(e.pair, '"') 266 case bool: 267 e.pair = strconv.AppendBool(e.pair, v) 268 default: 269 return &FieldError{fmt.Sprintf("invalid value type: %T", v)} 270 } 271 return nil 272} 273 274func (e *Encoder) buildFieldPair(key string, value interface{}) error { 275 e.pair = e.pair[:0] 276 key = escape(key) 277 // Some keys are not encodeable as line protocol, such as those with a 278 // trailing '\' or empty strings. 279 if key == "" || key[:len(key)-1] == "\\" { 280 return &FieldError{"invalid field key"} 281 } 282 e.pair = append(e.pair, key...) 283 e.pair = append(e.pair, '=') 284 return e.buildFieldVal(value) 285} 286 287func (e *Encoder) buildFooter(t time.Time) { 288 e.footer = e.footer[:0] 289 if !t.IsZero() { 290 e.footer = append(e.footer, ' ') 291 switch e.precision { 292 case time.Microsecond: 293 e.footer = strconv.AppendInt(e.footer, t.UnixNano()/1000, 10) 294 case time.Millisecond: 295 e.footer = strconv.AppendInt(e.footer, t.UnixNano()/1000000, 10) 296 case time.Second: 297 e.footer = strconv.AppendInt(e.footer, t.Unix(), 10) 298 default: 299 e.footer = strconv.AppendInt(e.footer, t.UnixNano(), 10) 300 } 301 } 302 e.footer = append(e.footer, '\n') 303} 304