1package protocol 2 3import ( 4 "bytes" 5 "math" 6 "sort" 7 "testing" 8 "time" 9) 10 11type MockMetric struct { 12 name string 13 tags []*Tag 14 fields []*Field 15 t time.Time 16} 17 18func (m *MockMetric) Name() string { 19 return m.name 20} 21func (m *MockMetric) TagList() []*Tag { 22 return m.tags 23} 24 25func (m *MockMetric) FieldList() []*Field { 26 return m.fields 27} 28 29func (m *MockMetric) Time() time.Time { 30 return m.t 31} 32 33func (m *MockMetric) AddTag(k, v string) { 34 for i, tag := range m.tags { 35 if k == tag.Key { 36 m.tags[i].Value = v 37 return 38 } 39 } 40 m.tags = append(m.tags, &Tag{Key: k, Value: v}) 41} 42 43func (m *MockMetric) AddField(k string, v interface{}) { 44 for i, field := range m.fields { 45 if k == field.Key { 46 m.fields[i].Value = v 47 return 48 } 49 } 50 m.fields = append(m.fields, &Field{Key: k, Value: convertField(v)}) 51 sort.Slice(m.fields, func(i, j int) bool { 52 return string(m.fields[i].Key) < string(m.fields[j].Key) 53 }) 54} 55 56func NewMockMetric(name string, 57 tags map[string]string, 58 fields map[string]interface{}, 59 tm time.Time, 60) Metric { 61 m := &MockMetric{ 62 name: name, 63 tags: nil, 64 fields: nil, 65 t: tm, 66 } 67 68 if len(tags) > 0 { 69 m.tags = make([]*Tag, 0, len(tags)) 70 for k, v := range tags { 71 m.tags = append(m.tags, 72 &Tag{Key: k, Value: v}) 73 } 74 sort.Slice(m.tags, func(i, j int) bool { return m.tags[i].Key < m.tags[j].Key }) 75 } 76 77 m.fields = make([]*Field, 0, len(fields)) 78 for k, v := range fields { 79 v := convertField(v) 80 if v == nil { 81 continue 82 } 83 m.AddField(k, v) 84 } 85 86 return m 87} 88 89var tests = []struct { 90 name string 91 maxBytes int 92 typeSupport FieldTypeSupport 93 input Metric 94 output []byte 95 failOnFieldErr bool 96 err error 97 precision time.Duration 98}{ 99 { 100 name: "minimal", 101 input: NewMockMetric( 102 "cpu", 103 map[string]string{}, 104 map[string]interface{}{ 105 "value": 42.0, 106 }, 107 time.Unix(0, 0), 108 ), 109 output: []byte("cpu value=42 0\n"), 110 }, 111 { 112 name: "multiple tags", 113 input: NewMockMetric( 114 "cpu", 115 map[string]string{ 116 "host": "localhost", 117 "cpu": "CPU0", 118 }, 119 map[string]interface{}{ 120 "value": 42.0, 121 }, 122 time.Unix(0, 0), 123 ), 124 output: []byte("cpu,cpu=CPU0,host=localhost value=42 0\n"), 125 }, 126 { 127 name: "multiple fields", 128 input: NewMockMetric( 129 "cpu", 130 map[string]string{}, 131 map[string]interface{}{ 132 "x": 42.0, 133 "y": 42.0, 134 }, 135 time.Unix(0, 0), 136 ), 137 output: []byte("cpu x=42,y=42 0\n"), 138 }, 139 { 140 name: "float NaN", 141 input: NewMockMetric( 142 "cpu", 143 map[string]string{}, 144 map[string]interface{}{ 145 "x": math.NaN(), 146 "y": 42, 147 }, 148 time.Unix(0, 0), 149 ), 150 output: []byte("cpu y=42i 0\n"), 151 }, 152 { 153 name: "float NaN only", 154 input: NewMockMetric( 155 "cpu", 156 map[string]string{}, 157 map[string]interface{}{ 158 "value": math.NaN(), 159 }, 160 time.Unix(0, 0), 161 ), 162 err: ErrNoFields, 163 }, 164 { 165 name: "float Inf", 166 input: NewMockMetric( 167 "cpu", 168 map[string]string{}, 169 map[string]interface{}{ 170 "value": math.Inf(1), 171 "y": 42, 172 }, 173 time.Unix(0, 0), 174 ), 175 output: []byte("cpu y=42i 0\n"), 176 }, 177 { 178 name: "integer field", 179 input: NewMockMetric( 180 "cpu", 181 map[string]string{}, 182 map[string]interface{}{ 183 "value": 42, 184 }, 185 time.Unix(0, 0), 186 ), 187 output: []byte("cpu value=42i 0\n"), 188 }, 189 { 190 name: "integer field 64-bit", 191 input: NewMockMetric( 192 "cpu", 193 map[string]string{}, 194 map[string]interface{}{ 195 "value": int64(123456789012345), 196 }, 197 time.Unix(0, 0), 198 ), 199 output: []byte("cpu value=123456789012345i 0\n"), 200 }, 201 { 202 name: "uint field", 203 input: NewMockMetric( 204 "cpu", 205 map[string]string{}, 206 map[string]interface{}{ 207 "value": uint64(42), 208 }, 209 time.Unix(0, 0), 210 ), 211 output: []byte("cpu value=42u 0\n"), 212 typeSupport: UintSupport, 213 }, 214 { 215 name: "uint field max value", 216 input: NewMockMetric( 217 "cpu", 218 map[string]string{}, 219 map[string]interface{}{ 220 "value": uint64(18446744073709551615), 221 }, 222 time.Unix(0, 0), 223 ), 224 output: []byte("cpu value=18446744073709551615u 0\n"), 225 typeSupport: UintSupport, 226 }, 227 { 228 name: "uint field no uint support", 229 input: NewMockMetric( 230 "cpu", 231 map[string]string{}, 232 map[string]interface{}{ 233 "value": uint64(42), 234 }, 235 time.Unix(0, 0), 236 ), 237 output: []byte("cpu value=42i 0\n"), 238 }, 239 { 240 name: "uint field no uint support overflow", 241 input: NewMockMetric( 242 "cpu", 243 map[string]string{}, 244 map[string]interface{}{ 245 "value": uint64(18446744073709551615), 246 }, 247 time.Unix(0, 0), 248 ), 249 output: []byte("cpu value=9223372036854775807i 0\n"), 250 }, 251 { 252 name: "bool field", 253 input: NewMockMetric( 254 "cpu", 255 map[string]string{}, 256 map[string]interface{}{ 257 "value": true, 258 }, 259 time.Unix(0, 0), 260 ), 261 output: []byte("cpu value=true 0\n"), 262 }, 263 { 264 name: "string field", 265 input: NewMockMetric( 266 "cpu", 267 map[string]string{}, 268 map[string]interface{}{ 269 "value": "howdy", 270 }, 271 time.Unix(0, 0), 272 ), 273 output: []byte("cpu value=\"howdy\" 0\n"), 274 }, 275 { 276 name: "timestamp", 277 input: NewMockMetric( 278 "cpu", 279 map[string]string{}, 280 map[string]interface{}{ 281 "value": 42.0, 282 }, 283 time.Unix(1519194109, 42), 284 ), 285 output: []byte("cpu value=42 1519194109000000042\n"), 286 }, 287 { 288 name: "split fields exact", 289 maxBytes: 33, 290 input: NewMockMetric( 291 "cpu", 292 map[string]string{}, 293 map[string]interface{}{ 294 "abc": 123, 295 "def": 456, 296 }, 297 time.Unix(1519194109, 42), 298 ), 299 output: []byte("cpu abc=123i 1519194109000000042\ncpu def=456i 1519194109000000042\n"), 300 }, 301 { 302 name: "split fields extra", 303 maxBytes: 34, 304 input: NewMockMetric( 305 "cpu", 306 map[string]string{}, 307 map[string]interface{}{ 308 "abc": 123, 309 "def": 456, 310 }, 311 time.Unix(1519194109, 42), 312 ), 313 output: []byte("cpu abc=123i 1519194109000000042\ncpu def=456i 1519194109000000042\n"), 314 }, 315 { 316 name: "split_fields_overflow", 317 maxBytes: 43, 318 input: NewMockMetric( 319 "cpu", 320 map[string]string{}, 321 map[string]interface{}{ 322 "abc": 123, 323 "def": 456, 324 "ghi": 789, 325 "jkl": 123, 326 }, 327 time.Unix(1519194109, 42), 328 ), 329 output: []byte("cpu abc=123i,def=456i 1519194109000000042\ncpu ghi=789i,jkl=123i 1519194109000000042\n"), 330 }, 331 { 332 name: "name newline", 333 input: NewMockMetric( 334 "c\npu", 335 map[string]string{}, 336 map[string]interface{}{ 337 "value": 42, 338 }, 339 time.Unix(0, 0), 340 ), 341 output: []byte("c\\npu value=42i 0\n"), 342 }, 343 { 344 name: "tag newline", 345 input: NewMockMetric( 346 "cpu", 347 map[string]string{ 348 "host": "x\ny", 349 }, 350 map[string]interface{}{ 351 "value": 42, 352 }, 353 time.Unix(0, 0), 354 ), 355 output: []byte("cpu,host=x\\ny value=42i 0\n"), 356 }, 357 { 358 name: "string newline", 359 input: NewMockMetric( 360 "cpu", 361 map[string]string{}, 362 map[string]interface{}{ 363 "value": "x\ny", 364 }, 365 time.Unix(0, 0), 366 ), 367 output: []byte("cpu value=\"x\\ny\" 0\n"), 368 }, 369 { 370 name: "need more space", 371 maxBytes: 32, 372 input: NewMockMetric( 373 "cpu", 374 map[string]string{}, 375 map[string]interface{}{ 376 "abc": 123, 377 "def": 456, 378 }, 379 time.Unix(1519194109, 42), 380 ), 381 output: nil, 382 err: ErrNeedMoreSpace, 383 }, 384 { 385 name: "no fields", 386 input: NewMockMetric( 387 "cpu", 388 map[string]string{}, 389 map[string]interface{}{}, 390 time.Unix(0, 0), 391 ), 392 err: ErrNoFields, 393 }, 394 { 395 name: "procstat", 396 input: NewMockMetric( 397 "procstat", 398 map[string]string{ 399 "exe": "bash", 400 "process_name": "bash", 401 }, 402 map[string]interface{}{ 403 "cpu_time": 0, 404 "cpu_time_guest": float64(0), 405 "cpu_time_guest_nice": float64(0), 406 "cpu_time_idle": float64(0), 407 "cpu_time_iowait": float64(0), 408 "cpu_time_irq": float64(0), 409 "cpu_time_nice": float64(0), 410 "cpu_time_soft_irq": float64(0), 411 "cpu_time_steal": float64(0), 412 "cpu_time_stolen": float64(0), 413 "cpu_time_system": float64(0), 414 "cpu_time_user": float64(0.02), 415 "cpu_usage": float64(0), 416 "involuntary_context_switches": 2, 417 "memory_data": 1576960, 418 "memory_locked": 0, 419 "memory_rss": 5103616, 420 "memory_stack": 139264, 421 "memory_swap": 0, 422 "memory_vms": 21659648, 423 "nice_priority": 20, 424 "num_fds": 4, 425 "num_threads": 1, 426 "pid": 29417, 427 "read_bytes": 0, 428 "read_count": 259, 429 "realtime_priority": 0, 430 "rlimit_cpu_time_hard": 2147483647, 431 "rlimit_cpu_time_soft": 2147483647, 432 "rlimit_file_locks_hard": 2147483647, 433 "rlimit_file_locks_soft": 2147483647, 434 "rlimit_memory_data_hard": 2147483647, 435 "rlimit_memory_data_soft": 2147483647, 436 "rlimit_memory_locked_hard": 65536, 437 "rlimit_memory_locked_soft": 65536, 438 "rlimit_memory_rss_hard": 2147483647, 439 "rlimit_memory_rss_soft": 2147483647, 440 "rlimit_memory_stack_hard": 2147483647, 441 "rlimit_memory_stack_soft": 8388608, 442 "rlimit_memory_vms_hard": 2147483647, 443 "rlimit_memory_vms_soft": 2147483647, 444 "rlimit_nice_priority_hard": 0, 445 "rlimit_nice_priority_soft": 0, 446 "rlimit_num_fds_hard": 4096, 447 "rlimit_num_fds_soft": 1024, 448 "rlimit_realtime_priority_hard": 0, 449 "rlimit_realtime_priority_soft": 0, 450 "rlimit_signals_pending_hard": 78994, 451 "rlimit_signals_pending_soft": 78994, 452 "signals_pending": 0, 453 "voluntary_context_switches": 42, 454 "write_bytes": 106496, 455 "write_count": 35, 456 }, 457 time.Unix(0, 1517620624000000000), 458 ), 459 output: []byte("procstat,exe=bash,process_name=bash cpu_time=0i,cpu_time_guest=0,cpu_time_guest_nice=0,cpu_time_idle=0,cpu_time_iowait=0,cpu_time_irq=0,cpu_time_nice=0,cpu_time_soft_irq=0,cpu_time_steal=0,cpu_time_stolen=0,cpu_time_system=0,cpu_time_user=0.02,cpu_usage=0,involuntary_context_switches=2i,memory_data=1576960i,memory_locked=0i,memory_rss=5103616i,memory_stack=139264i,memory_swap=0i,memory_vms=21659648i,nice_priority=20i,num_fds=4i,num_threads=1i,pid=29417i,read_bytes=0i,read_count=259i,realtime_priority=0i,rlimit_cpu_time_hard=2147483647i,rlimit_cpu_time_soft=2147483647i,rlimit_file_locks_hard=2147483647i,rlimit_file_locks_soft=2147483647i,rlimit_memory_data_hard=2147483647i,rlimit_memory_data_soft=2147483647i,rlimit_memory_locked_hard=65536i,rlimit_memory_locked_soft=65536i,rlimit_memory_rss_hard=2147483647i,rlimit_memory_rss_soft=2147483647i,rlimit_memory_stack_hard=2147483647i,rlimit_memory_stack_soft=8388608i,rlimit_memory_vms_hard=2147483647i,rlimit_memory_vms_soft=2147483647i,rlimit_nice_priority_hard=0i,rlimit_nice_priority_soft=0i,rlimit_num_fds_hard=4096i,rlimit_num_fds_soft=1024i,rlimit_realtime_priority_hard=0i,rlimit_realtime_priority_soft=0i,rlimit_signals_pending_hard=78994i,rlimit_signals_pending_soft=78994i,signals_pending=0i,voluntary_context_switches=42i,write_bytes=106496i,write_count=35i 1517620624000000000\n"), 460 }, 461 { 462 name: "error out on field error", 463 input: NewMockMetric( 464 "cpu", 465 map[string]string{}, 466 map[string]interface{}{ 467 "x": math.NaN(), 468 "y": 42, 469 }, 470 time.Unix(0, 0), 471 ), 472 failOnFieldErr: true, 473 err: ErrIsNaN, 474 }, 475 { 476 name: "explicit nanoseconds precision", 477 input: NewMockMetric( 478 "cpu", 479 map[string]string{}, 480 map[string]interface{}{ 481 "x": 3, 482 "y": 42.3, 483 }, 484 time.Unix(123456789, 123456789), 485 ), 486 precision: time.Nanosecond, 487 output: []byte("cpu x=3i,y=42.3 123456789123456789\n"), 488 }, 489 { 490 name: "microseconds precision", 491 input: NewMockMetric( 492 "cpu", 493 map[string]string{}, 494 map[string]interface{}{ 495 "x": 3, 496 "y": 42.3, 497 }, 498 time.Unix(123456789, 123456789), 499 ), 500 precision: time.Microsecond, 501 output: []byte("cpu x=3i,y=42.3 123456789123456\n"), 502 }, 503 { 504 name: "milliseconds precision", 505 input: NewMockMetric( 506 "cpu", 507 map[string]string{}, 508 map[string]interface{}{ 509 "x": 3, 510 "y": 42.3, 511 }, 512 time.Unix(123456789, 123456789), 513 ), 514 precision: time.Millisecond, 515 output: []byte("cpu x=3i,y=42.3 123456789123\n"), 516 }, 517 { 518 name: "seconds precision", 519 input: NewMockMetric( 520 "cpu", 521 map[string]string{}, 522 map[string]interface{}{ 523 "x": 3, 524 "y": 42.3, 525 }, 526 time.Unix(123456789, 123456789), 527 ), 528 precision: time.Second, 529 output: []byte("cpu x=3i,y=42.3 123456789\n"), 530 }, 531} 532 533func TestEncoder(t *testing.T) { 534 for _, tt := range tests { 535 t.Run(tt.name, func(t *testing.T) { 536 buf := &bytes.Buffer{} 537 serializer := NewEncoder(buf) 538 serializer.SetMaxLineBytes(tt.maxBytes) 539 serializer.SetFieldSortOrder(SortFields) 540 serializer.SetFieldTypeSupport(tt.typeSupport) 541 serializer.FailOnFieldErr(tt.failOnFieldErr) 542 serializer.SetPrecision(tt.precision) 543 i, err := serializer.Encode(tt.input) 544 if tt.err != err { 545 t.Fatalf("expected error %v, but got %v", tt.err, err) 546 } 547 if i != len(buf.Bytes()) { 548 t.Fatalf("expected i: %v, but got: %v", len(buf.Bytes()), i) 549 } 550 if string(tt.output) != buf.String() { 551 t.Fatalf("expected output %v, but got %v", tt.output, buf.String()) 552 } 553 }) 554 } 555} 556 557func TestWriter(t *testing.T) { 558 type args struct { 559 name []byte 560 ts time.Time 561 tagKeys, tagVals, fieldKeys [][]byte 562 fieldVals []interface{} 563 } 564 btests := make([]struct { 565 name string 566 maxBytes int 567 typeSupport FieldTypeSupport 568 failOnFieldErr bool 569 fields args 570 err error 571 output []byte 572 precision time.Duration 573 }, len(tests)) 574 for i, tt := range tests { 575 btests[i].name = tt.name 576 btests[i].maxBytes = tt.maxBytes 577 btests[i].typeSupport = tt.typeSupport 578 btests[i].failOnFieldErr = tt.failOnFieldErr 579 btests[i].err = tt.err 580 btests[i].output = tt.output 581 btests[i].precision = tt.precision 582 btests[i].fields.name = []byte(tt.input.Name()) 583 btests[i].fields.ts = tt.input.Time() 584 btests[i].fields.fieldKeys, btests[i].fields.fieldVals = fieldsToBytes(tt.input.FieldList()) 585 btests[i].fields.tagKeys, btests[i].fields.tagVals = tagsToBytes(tt.input.TagList()) 586 } 587 for _, tt := range btests { 588 t.Run(tt.name, func(t *testing.T) { 589 if t.Name() == "TestWriter/split_fields_overflow" { 590 t.Skip("https://github.com/influxdata/line-protocol/issues/9") 591 } 592 buf := &bytes.Buffer{} 593 serializer := NewEncoder(buf) 594 serializer.SetMaxLineBytes(tt.maxBytes) 595 serializer.SetFieldSortOrder(SortFields) 596 serializer.SetFieldTypeSupport(tt.typeSupport) 597 serializer.FailOnFieldErr(tt.failOnFieldErr) 598 serializer.SetPrecision(tt.precision) 599 _, err := serializer.Write(tt.fields.name, tt.fields.ts, tt.fields.tagKeys, tt.fields.tagVals, tt.fields.fieldKeys, tt.fields.fieldVals) 600 if tt.err != err { 601 t.Fatalf("expected error %v, but got %v", tt.err, err) 602 } 603 if string(tt.output) != buf.String() { 604 t.Fatalf("expected output %s, but got %s", tt.output, buf.String()) 605 } 606 }) 607 } 608} 609 610func fieldsToBytes(tg []*Field) ([][]byte, []interface{}) { 611 b := make([][]byte, len(tg)) 612 v := make([]interface{}, len(tg)) 613 for i := range tg { 614 b[i] = []byte(tg[i].Key) 615 v[i] = tg[i].Value 616 } 617 return b, v 618} 619 620func tagsToBytes(tg []*Tag) ([][]byte, [][]byte) { 621 b := make([][]byte, len(tg)) 622 v := make([][]byte, len(tg)) 623 for i := range tg { 624 b[i] = []byte(tg[i].Key) 625 v[i] = []byte(tg[i].Value) 626 } 627 return b, v 628 629} 630 631func BenchmarkSerializer(b *testing.B) { 632 for _, tt := range tests { 633 b.Run(tt.name, func(b *testing.B) { 634 buf := &bytes.Buffer{} 635 serializer := NewEncoder(buf) 636 serializer.SetMaxLineBytes(tt.maxBytes) 637 serializer.SetFieldTypeSupport(tt.typeSupport) 638 for n := 0; n < b.N; n++ { 639 output, err := serializer.Encode(tt.input) 640 _ = err 641 _ = output 642 } 643 }) 644 } 645} 646 647func BenchmarkWriter(b *testing.B) { 648 type fields struct { 649 name []byte 650 ts time.Time 651 tagKeys, tagVals, fieldKeys [][]byte 652 fieldVals []interface{} 653 } 654 benches := make([]struct { 655 name string 656 maxBytes int 657 typeSupport FieldTypeSupport 658 failOnFieldErr bool 659 fields fields 660 }, len(tests)) 661 for i, tt := range tests { 662 benches[i].name = tt.name 663 benches[i].maxBytes = tt.maxBytes 664 benches[i].typeSupport = tt.typeSupport 665 benches[i].failOnFieldErr = tt.failOnFieldErr 666 benches[i].fields.name = []byte(tt.input.Name()) 667 benches[i].fields.ts = tt.input.Time() 668 benches[i].fields.fieldKeys, benches[i].fields.fieldVals = fieldsToBytes(tt.input.FieldList()) 669 benches[i].fields.tagKeys, benches[i].fields.tagVals = tagsToBytes(tt.input.TagList()) 670 } 671 b.ResetTimer() 672 673 for _, tt := range benches { 674 b.Run(tt.name, func(b *testing.B) { 675 buf := &bytes.Buffer{} 676 serializer := NewEncoder(buf) 677 serializer.SetMaxLineBytes(tt.maxBytes) 678 serializer.SetFieldTypeSupport(tt.typeSupport) 679 var i int 680 var err error 681 for n := 0; n < b.N; n++ { 682 i, err = serializer.Write(tt.fields.name, tt.fields.ts, tt.fields.tagKeys, tt.fields.tagVals, tt.fields.fieldKeys, tt.fields.fieldVals) 683 _ = err 684 _ = i 685 } 686 _ = buf 687 }) 688 } 689} 690