1// Copyright 2013 Ooyala, Inc. 2 3package statsd_test 4 5import ( 6 "io" 7 "net" 8 "os" 9 "reflect" 10 "sort" 11 "strings" 12 "sync" 13 "testing" 14 "time" 15 16 "github.com/DataDog/datadog-go/statsd" 17 "github.com/stretchr/testify/assert" 18) 19 20var dogstatsdTests = []struct { 21 GlobalNamespace string 22 GlobalTags []string 23 Method string 24 Metric string 25 Value interface{} 26 Tags []string 27 Rate float64 28 Expected string 29}{ 30 {"", nil, "Gauge", "test.gauge", 1.0, nil, 1.0, "test.gauge:1|g"}, 31 {"", nil, "Gauge", "test.gauge", 1.0, nil, 0.999999, "test.gauge:1|g|@0.999999"}, 32 {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 1.0, "test.gauge:1|g|#tagA"}, 33 {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA", "tagB"}, 1.0, "test.gauge:1|g|#tagA,tagB"}, 34 {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 0.999999, "test.gauge:1|g|@0.999999|#tagA"}, 35 {"", nil, "Count", "test.count", int64(1), []string{"tagA"}, 1.0, "test.count:1|c|#tagA"}, 36 {"", nil, "Count", "test.count", int64(-1), []string{"tagA"}, 1.0, "test.count:-1|c|#tagA"}, 37 {"", nil, "Histogram", "test.histogram", 2.3, []string{"tagA"}, 1.0, "test.histogram:2.3|h|#tagA"}, 38 {"", nil, "Distribution", "test.distribution", 2.3, []string{"tagA"}, 1.0, "test.distribution:2.3|d|#tagA"}, 39 {"", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagA"}, 40 {"flubber.", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "flubber.test.set:uuid|s|#tagA"}, 41 {"", []string{"tagC"}, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagC,tagA"}, 42 {"", nil, "Count", "test.count", int64(1), []string{"hello\nworld"}, 1.0, "test.count:1|c|#helloworld"}, 43} 44 45func assertNotPanics(t *testing.T, f func()) { 46 defer func() { 47 if r := recover(); r != nil { 48 t.Fatal(r) 49 } 50 }() 51 f() 52} 53 54func TestClientUDP(t *testing.T) { 55 addr := "localhost:1201" 56 udpAddr, err := net.ResolveUDPAddr("udp", addr) 57 if err != nil { 58 t.Fatal(err) 59 } 60 61 server, err := net.ListenUDP("udp", udpAddr) 62 if err != nil { 63 t.Fatal(err) 64 } 65 defer server.Close() 66 67 client, err := statsd.New(addr) 68 if err != nil { 69 t.Fatal(err) 70 } 71 72 clientTest(t, server, client) 73} 74 75type statsdWriterWrapper struct { 76 io.WriteCloser 77} 78 79func (statsdWriterWrapper) SetWriteTimeout(time.Duration) error { 80 return nil 81} 82 83func TestClientWithConn(t *testing.T) { 84 server, conn, err := os.Pipe() 85 if err != nil { 86 t.Fatal(err) 87 } 88 89 client, err := statsd.NewWithWriter(statsdWriterWrapper{conn}) 90 if err != nil { 91 t.Fatal(err) 92 } 93 94 clientTest(t, server, client) 95} 96 97func clientTest(t *testing.T, server io.Reader, client *statsd.Client) { 98 for _, tt := range dogstatsdTests { 99 client.Namespace = tt.GlobalNamespace 100 client.Tags = tt.GlobalTags 101 method := reflect.ValueOf(client).MethodByName(tt.Method) 102 e := method.Call([]reflect.Value{ 103 reflect.ValueOf(tt.Metric), 104 reflect.ValueOf(tt.Value), 105 reflect.ValueOf(tt.Tags), 106 reflect.ValueOf(tt.Rate)})[0] 107 errInter := e.Interface() 108 if errInter != nil { 109 t.Fatal(errInter.(error)) 110 } 111 112 bytes := make([]byte, 1024) 113 n, err := server.Read(bytes) 114 if err != nil { 115 t.Fatal(err) 116 } 117 message := bytes[:n] 118 if string(message) != tt.Expected { 119 t.Errorf("Expected: %s. Actual: %s", tt.Expected, string(message)) 120 } 121 } 122} 123 124func TestBufferedClient(t *testing.T) { 125 addr := "localhost:1201" 126 udpAddr, err := net.ResolveUDPAddr("udp", addr) 127 if err != nil { 128 t.Fatal(err) 129 } 130 131 server, err := net.ListenUDP("udp", udpAddr) 132 if err != nil { 133 t.Fatal(err) 134 } 135 defer server.Close() 136 137 bufferLength := 9 138 client, err := statsd.NewBuffered(addr, bufferLength) 139 if err != nil { 140 t.Fatal(err) 141 } 142 143 client.Namespace = "foo." 144 client.Tags = []string{"dd:2"} 145 146 dur, _ := time.ParseDuration("123us") 147 148 client.Incr("ab", nil, 1) 149 client.Decr("ab", nil, 1) 150 client.Count("ab", 1, nil, 1) 151 client.Gauge("ab", 10, nil, 1) 152 client.Histogram("ab", 1, nil, 1) 153 client.Distribution("ab", 1, nil, 1) 154 client.Timing("ab", dur, nil, 1) 155 client.Set("ab", "ss", nil, 1) 156 157 client.Set("ab", "xx", nil, 1) 158 client.Flush() 159 if err != nil { 160 t.Errorf("Error sending: %s", err) 161 } 162 163 buffer := make([]byte, 4096) 164 n, err := io.ReadAtLeast(server, buffer, 1) 165 result := string(buffer[:n]) 166 167 if err != nil { 168 t.Error(err) 169 } 170 171 expected := []string{ 172 `foo.ab:1|c|#dd:2`, 173 `foo.ab:-1|c|#dd:2`, 174 `foo.ab:1|c|#dd:2`, 175 `foo.ab:10|g|#dd:2`, 176 `foo.ab:1|h|#dd:2`, 177 `foo.ab:1|d|#dd:2`, 178 `foo.ab:0.123000|ms|#dd:2`, 179 `foo.ab:ss|s|#dd:2`, 180 `foo.ab:xx|s|#dd:2`, 181 } 182 183 for i, res := range strings.Split(result, "\n") { 184 if res != expected[i] { 185 t.Errorf("Got `%s`, expected `%s`", res, expected[i]) 186 } 187 } 188 189 client.Event(&statsd.Event{Title: "title1", Text: "text1", Priority: statsd.Normal, AlertType: statsd.Success, Tags: []string{"tagg"}}) 190 client.SimpleEvent("event1", "text1") 191 err = client.Flush() 192 193 if err != nil { 194 t.Errorf("Error sending: %s", err) 195 } 196 197 buffer = make([]byte, 1024) 198 n, err = io.ReadAtLeast(server, buffer, 1) 199 result = string(buffer[:n]) 200 201 if err != nil { 202 t.Error(err) 203 } 204 205 if n == 0 { 206 t.Errorf("Read 0 bytes but expected more.") 207 } 208 209 expected = []string{ 210 `_e{6,5}:title1|text1|p:normal|t:success|#dd:2,tagg`, 211 `_e{6,5}:event1|text1|#dd:2`, 212 } 213 214 for i, res := range strings.Split(result, "\n") { 215 if res != expected[i] { 216 t.Errorf("Got `%s`, expected `%s`", res, expected[i]) 217 } 218 } 219 220} 221 222func stringsToBytes(ss []string) [][]byte { 223 bs := make([][]byte, len(ss)) 224 for i, s := range ss { 225 bs[i] = []byte(s) 226 } 227 return bs 228} 229 230func TestNilError(t *testing.T) { 231 var c *statsd.Client 232 tests := []func() error{ 233 func() error { return c.SetWriteTimeout(0) }, 234 func() error { return c.Flush() }, 235 func() error { return c.Close() }, 236 func() error { return c.Count("", 0, nil, 1) }, 237 func() error { return c.Incr("", nil, 1) }, 238 func() error { return c.Decr("", nil, 1) }, 239 func() error { return c.Histogram("", 0, nil, 1) }, 240 func() error { return c.Distribution("", 0, nil, 1) }, 241 func() error { return c.Gauge("", 0, nil, 1) }, 242 func() error { return c.Set("", "", nil, 1) }, 243 func() error { return c.Timing("", time.Second, nil, 1) }, 244 func() error { return c.TimeInMilliseconds("", 1, nil, 1) }, 245 func() error { return c.Event(statsd.NewEvent("", "")) }, 246 func() error { return c.SimpleEvent("", "") }, 247 func() error { return c.ServiceCheck(statsd.NewServiceCheck("", statsd.Ok)) }, 248 func() error { return c.SimpleServiceCheck("", statsd.Ok) }, 249 func() error { 250 _, err := statsd.CloneWithExtraOptions(nil, statsd.WithChannelMode()) 251 return err 252 }, 253 } 254 for i, f := range tests { 255 var err error 256 assertNotPanics(t, func() { err = f() }) 257 if err != statsd.ErrNoClient { 258 t.Errorf("Test case %d: expected ErrNoClient, got %#v", i, err) 259 } 260 } 261} 262 263func TestEvents(t *testing.T) { 264 matrix := []struct { 265 event *statsd.Event 266 encoded string 267 }{ 268 { 269 statsd.NewEvent("Hello", "Something happened to my event"), 270 `_e{5,30}:Hello|Something happened to my event`, 271 }, { 272 &statsd.Event{Title: "hi", Text: "okay", AggregationKey: "foo"}, 273 `_e{2,4}:hi|okay|k:foo`, 274 }, { 275 &statsd.Event{Title: "hi", Text: "okay", AggregationKey: "foo", AlertType: statsd.Info}, 276 `_e{2,4}:hi|okay|k:foo|t:info`, 277 }, { 278 &statsd.Event{Title: "hi", Text: "w/e", AlertType: statsd.Error, Priority: statsd.Normal}, 279 `_e{2,3}:hi|w/e|p:normal|t:error`, 280 }, { 281 &statsd.Event{Title: "hi", Text: "uh", Tags: []string{"host:foo", "app:bar"}}, 282 `_e{2,2}:hi|uh|#host:foo,app:bar`, 283 }, { 284 &statsd.Event{Title: "hi", Text: "line1\nline2", Tags: []string{"hello\nworld"}}, 285 `_e{2,12}:hi|line1\nline2|#helloworld`, 286 }, 287 } 288 289 for _, m := range matrix { 290 r, err := m.event.Encode() 291 if err != nil { 292 t.Errorf("Error encoding: %s\n", err) 293 continue 294 } 295 if r != m.encoded { 296 t.Errorf("Expected `%s`, got `%s`\n", m.encoded, r) 297 } 298 } 299 300 e := statsd.NewEvent("", "hi") 301 if _, err := e.Encode(); err == nil { 302 t.Errorf("Expected error on empty Title.") 303 } 304 305 e = statsd.NewEvent("hi", "") 306 if _, err := e.Encode(); err == nil { 307 t.Errorf("Expected error on empty Text.") 308 } 309 310 e = statsd.NewEvent("hello", "world") 311 s, err := e.Encode("tag1", "tag2") 312 if err != nil { 313 t.Error(err) 314 } 315 expected := "_e{5,5}:hello|world|#tag1,tag2" 316 if s != expected { 317 t.Errorf("Expected %s, got %s", expected, s) 318 } 319 if len(e.Tags) != 0 { 320 t.Errorf("Modified event in place illegally.") 321 } 322} 323 324func TestServiceChecks(t *testing.T) { 325 matrix := []struct { 326 serviceCheck *statsd.ServiceCheck 327 encoded string 328 }{ 329 { 330 statsd.NewServiceCheck("DataCatService", statsd.Ok), 331 `_sc|DataCatService|0`, 332 }, { 333 statsd.NewServiceCheck("DataCatService", statsd.Warn), 334 `_sc|DataCatService|1`, 335 }, { 336 statsd.NewServiceCheck("DataCatService", statsd.Critical), 337 `_sc|DataCatService|2`, 338 }, { 339 statsd.NewServiceCheck("DataCatService", statsd.Unknown), 340 `_sc|DataCatService|3`, 341 }, { 342 &statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat"}, 343 `_sc|DataCatService|0|h:DataStation.Cat`, 344 }, { 345 &statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat", Message: "Here goes valuable message"}, 346 `_sc|DataCatService|0|h:DataStation.Cat|m:Here goes valuable message`, 347 }, { 348 &statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat", Message: "Here are some cyrillic chars: к л м н о п р с т у ф х ц ч ш"}, 349 `_sc|DataCatService|0|h:DataStation.Cat|m:Here are some cyrillic chars: к л м н о п р с т у ф х ц ч ш`, 350 }, { 351 &statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat", Message: "Here goes valuable message", Tags: []string{"host:foo", "app:bar"}}, 352 `_sc|DataCatService|0|h:DataStation.Cat|#host:foo,app:bar|m:Here goes valuable message`, 353 }, { 354 &statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat", Message: "Here goes \n that should be escaped", Tags: []string{"host:foo", "app:b\nar"}}, 355 `_sc|DataCatService|0|h:DataStation.Cat|#host:foo,app:bar|m:Here goes \n that should be escaped`, 356 }, { 357 &statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat", Message: "Here goes m: that should be escaped", Tags: []string{"host:foo", "app:bar"}}, 358 `_sc|DataCatService|0|h:DataStation.Cat|#host:foo,app:bar|m:Here goes m\: that should be escaped`, 359 }, 360 } 361 362 for _, m := range matrix { 363 r, err := m.serviceCheck.Encode() 364 if err != nil { 365 t.Errorf("Error encoding: %s\n", err) 366 continue 367 } 368 if r != m.encoded { 369 t.Errorf("Expected `%s`, got `%s`\n", m.encoded, r) 370 } 371 } 372 373 sc := statsd.NewServiceCheck("", statsd.Ok) 374 if _, err := sc.Encode(); err == nil { 375 t.Errorf("Expected error on empty Name.") 376 } 377 378 sc = statsd.NewServiceCheck("sc", statsd.ServiceCheckStatus(5)) 379 if _, err := sc.Encode(); err == nil { 380 t.Errorf("Expected error on invalid status value.") 381 } 382 383 sc = statsd.NewServiceCheck("hello", statsd.Warn) 384 s, err := sc.Encode("tag1", "tag2") 385 if err != nil { 386 t.Error(err) 387 } 388 expected := "_sc|hello|1|#tag1,tag2" 389 if s != expected { 390 t.Errorf("Expected %s, got %s", expected, s) 391 } 392 if len(sc.Tags) != 0 { 393 t.Errorf("Modified serviceCheck in place illegally.") 394 } 395} 396 397func TestEntityID(t *testing.T) { 398 entityIDEnvName := "DD_ENTITY_ID" 399 initialValue, initiallySet := os.LookupEnv(entityIDEnvName) 400 if initiallySet { 401 defer os.Setenv(entityIDEnvName, initialValue) 402 } else { 403 defer os.Unsetenv(entityIDEnvName) 404 } 405 406 // Set to a valid value 407 os.Setenv(entityIDEnvName, "testing") 408 client, err := statsd.New("localhost:8125") 409 if err != nil { 410 t.Fatal(err) 411 } 412 if len(client.Tags) != 1 { 413 t.Errorf("Expecting one tag, got %d", len(client.Tags)) 414 } 415 if client.Tags[0] != "dd.internal.entity_id:testing" { 416 t.Errorf("Bad tag value, got %s", client.Tags[0]) 417 } 418 419 // Set to empty string 420 os.Setenv(entityIDEnvName, "") 421 client, err = statsd.New("localhost:8125") 422 if err != nil { 423 t.Fatal(err) 424 } 425 if len(client.Tags) != 0 { 426 t.Errorf("Expecting empty default tags, got %v", client.Tags) 427 } 428 429 // Unset 430 os.Unsetenv(entityIDEnvName) 431 client, err = statsd.New("localhost:8125") 432 if err != nil { 433 t.Fatal(err) 434 } 435 if len(client.Tags) != 0 { 436 t.Errorf("Expecting empty default tags, got %v", client.Tags) 437 } 438} 439 440var ( 441 ddEnvName = "DD_ENV" 442 ddServiceName = "DD_SERVICE" 443 ddVersionName = "DD_VERSION" 444) 445 446func TestDDEnvServiceVersionSet(t *testing.T) { 447 for _, tt := range []struct { 448 DDEnv string 449 DDService string 450 DDVersion string 451 Expected []string 452 }{ 453 {"", "", "", []string{}}, 454 {"prod", "", "", []string{"env:prod"}}, 455 {"prod", "dog", "", []string{"env:prod", "service:dog"}}, 456 {"prod", "dog", "abc123", []string{"env:prod", "service:dog", "version:abc123"}}, 457 } { 458 for _, t := range []string{ddEnvName, ddServiceName, ddVersionName} { 459 initialValue, initiallySet := os.LookupEnv(t) 460 if initiallySet { 461 defer os.Setenv(t, initialValue) 462 } else { 463 defer os.Unsetenv(t) 464 } 465 } 466 os.Setenv(ddEnvName, tt.DDEnv) 467 os.Setenv(ddServiceName, tt.DDService) 468 os.Setenv(ddVersionName, tt.DDVersion) 469 client, err := statsd.New("localhost:8125") 470 if err != nil { 471 t.Fatal(err) 472 } 473 // Keep the ordering of global tags consistent. 474 sort.Strings(client.Tags) 475 assert.Equal(t, tt.Expected, client.Tags) 476 } 477} 478 479func TestDDEnvServiceVersionTagsEmitted(t *testing.T) { 480 for _, t := range []string{ddEnvName, ddServiceName, ddVersionName} { 481 initialValue, initiallySet := os.LookupEnv(t) 482 if initiallySet { 483 defer os.Setenv(t, initialValue) 484 } else { 485 defer os.Unsetenv(t) 486 } 487 } 488 os.Setenv(ddEnvName, "prod") 489 os.Setenv(ddServiceName, "dog") 490 os.Setenv(ddVersionName, "abc123") 491 addr := "localhost:1201" 492 udpAddr, err := net.ResolveUDPAddr("udp", addr) 493 if err != nil { 494 t.Fatal(err) 495 } 496 server, err := net.ListenUDP("udp", udpAddr) 497 if err != nil { 498 t.Fatal(err) 499 } 500 defer server.Close() 501 502 for _, tt := range []struct { 503 Tags []string 504 GlobalTags []string 505 Expected string 506 }{ 507 {nil, nil, "test.count:100|c|#env:prod,service:dog,version:abc123"}, 508 {[]string{"env:staging", "service:cat", "custom_tag"}, nil, "test.count:100|c|#env:prod,service:dog,version:abc123,env:staging,service:cat,custom_tag"}, 509 {nil, []string{"version:def456", "custom_tag_two"}, "test.count:100|c|#custom_tag_two,env:prod,service:dog,version:abc123,version:def456"}, 510 {[]string{"env:staging", "service:cat", "custom_tag"}, []string{"version:def456", "custom_tag_two"}, "test.count:100|c|#custom_tag_two,env:prod,service:dog,version:abc123,version:def456,env:staging,service:cat,custom_tag"}, 511 } { 512 client, err := statsd.New(addr, statsd.WithTags(tt.GlobalTags)) 513 if err != nil { 514 t.Fatal(err) 515 } 516 // Keep the ordering of global tags consistent. 517 sort.Strings(client.Tags) 518 client.Count("test.count", 100, tt.Tags, 1.0) 519 err = client.Flush() 520 if err != nil { 521 t.Errorf("Error sending: %s", err) 522 } 523 buffer := make([]byte, 1024) 524 n, err := io.ReadAtLeast(server, buffer, 1) 525 if err != nil { 526 t.Errorf("ReadAtLeast: %s", err) 527 } 528 result := string(buffer[:n]) 529 if result != tt.Expected { 530 t.Errorf("Flushed metric incorrect; expected %s but got %s", tt.Expected, result) 531 } 532 } 533} 534 535func TestClosePanic(t *testing.T) { 536 c, err := statsd.New("localhost:8125") 537 assert.NoError(t, err) 538 c.Close() 539 c.Close() 540} 541 542func TestCloseRace(t *testing.T) { 543 for i := 0; i < 100; i++ { 544 c, err := statsd.New("localhost:8125") 545 assert.NoError(t, err) 546 start := make(chan struct{}) 547 var wg sync.WaitGroup 548 for j := 0; j < 100; j++ { 549 wg.Add(1) 550 go func() { 551 defer wg.Done() 552 <-start 553 c.Close() 554 }() 555 } 556 close(start) 557 wg.Wait() 558 } 559} 560