1// Copyright 2013 Ooyala, Inc.
2
3package statsd_test
4
5import (
6	"io"
7	"net"
8	"os"
9	"reflect"
10	"sort"
11	"strings"
12	"sync"
13	"testing"
14	"time"
15
16	"github.com/DataDog/datadog-go/statsd"
17	"github.com/stretchr/testify/assert"
18)
19
20var dogstatsdTests = []struct {
21	GlobalNamespace string
22	GlobalTags      []string
23	Method          string
24	Metric          string
25	Value           interface{}
26	Tags            []string
27	Rate            float64
28	Expected        string
29}{
30	{"", nil, "Gauge", "test.gauge", 1.0, nil, 1.0, "test.gauge:1|g"},
31	{"", nil, "Gauge", "test.gauge", 1.0, nil, 0.999999, "test.gauge:1|g|@0.999999"},
32	{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 1.0, "test.gauge:1|g|#tagA"},
33	{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA", "tagB"}, 1.0, "test.gauge:1|g|#tagA,tagB"},
34	{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 0.999999, "test.gauge:1|g|@0.999999|#tagA"},
35	{"", nil, "Count", "test.count", int64(1), []string{"tagA"}, 1.0, "test.count:1|c|#tagA"},
36	{"", nil, "Count", "test.count", int64(-1), []string{"tagA"}, 1.0, "test.count:-1|c|#tagA"},
37	{"", nil, "Histogram", "test.histogram", 2.3, []string{"tagA"}, 1.0, "test.histogram:2.3|h|#tagA"},
38	{"", nil, "Distribution", "test.distribution", 2.3, []string{"tagA"}, 1.0, "test.distribution:2.3|d|#tagA"},
39	{"", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagA"},
40	{"flubber.", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "flubber.test.set:uuid|s|#tagA"},
41	{"", []string{"tagC"}, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagC,tagA"},
42	{"", nil, "Count", "test.count", int64(1), []string{"hello\nworld"}, 1.0, "test.count:1|c|#helloworld"},
43}
44
45func assertNotPanics(t *testing.T, f func()) {
46	defer func() {
47		if r := recover(); r != nil {
48			t.Fatal(r)
49		}
50	}()
51	f()
52}
53
54func TestClientUDP(t *testing.T) {
55	addr := "localhost:1201"
56	udpAddr, err := net.ResolveUDPAddr("udp", addr)
57	if err != nil {
58		t.Fatal(err)
59	}
60
61	server, err := net.ListenUDP("udp", udpAddr)
62	if err != nil {
63		t.Fatal(err)
64	}
65	defer server.Close()
66
67	client, err := statsd.New(addr)
68	if err != nil {
69		t.Fatal(err)
70	}
71
72	clientTest(t, server, client)
73}
74
75type statsdWriterWrapper struct {
76	io.WriteCloser
77}
78
79func (statsdWriterWrapper) SetWriteTimeout(time.Duration) error {
80	return nil
81}
82
83func TestClientWithConn(t *testing.T) {
84	server, conn, err := os.Pipe()
85	if err != nil {
86		t.Fatal(err)
87	}
88
89	client, err := statsd.NewWithWriter(statsdWriterWrapper{conn})
90	if err != nil {
91		t.Fatal(err)
92	}
93
94	clientTest(t, server, client)
95}
96
97func clientTest(t *testing.T, server io.Reader, client *statsd.Client) {
98	for _, tt := range dogstatsdTests {
99		client.Namespace = tt.GlobalNamespace
100		client.Tags = tt.GlobalTags
101		method := reflect.ValueOf(client).MethodByName(tt.Method)
102		e := method.Call([]reflect.Value{
103			reflect.ValueOf(tt.Metric),
104			reflect.ValueOf(tt.Value),
105			reflect.ValueOf(tt.Tags),
106			reflect.ValueOf(tt.Rate)})[0]
107		errInter := e.Interface()
108		if errInter != nil {
109			t.Fatal(errInter.(error))
110		}
111
112		bytes := make([]byte, 1024)
113		n, err := server.Read(bytes)
114		if err != nil {
115			t.Fatal(err)
116		}
117		message := bytes[:n]
118		if string(message) != tt.Expected {
119			t.Errorf("Expected: %s. Actual: %s", tt.Expected, string(message))
120		}
121	}
122}
123
124func TestBufferedClient(t *testing.T) {
125	addr := "localhost:1201"
126	udpAddr, err := net.ResolveUDPAddr("udp", addr)
127	if err != nil {
128		t.Fatal(err)
129	}
130
131	server, err := net.ListenUDP("udp", udpAddr)
132	if err != nil {
133		t.Fatal(err)
134	}
135	defer server.Close()
136
137	bufferLength := 9
138	client, err := statsd.NewBuffered(addr, bufferLength)
139	if err != nil {
140		t.Fatal(err)
141	}
142
143	client.Namespace = "foo."
144	client.Tags = []string{"dd:2"}
145
146	dur, _ := time.ParseDuration("123us")
147
148	client.Incr("ab", nil, 1)
149	client.Decr("ab", nil, 1)
150	client.Count("ab", 1, nil, 1)
151	client.Gauge("ab", 10, nil, 1)
152	client.Histogram("ab", 1, nil, 1)
153	client.Distribution("ab", 1, nil, 1)
154	client.Timing("ab", dur, nil, 1)
155	client.Set("ab", "ss", nil, 1)
156
157	client.Set("ab", "xx", nil, 1)
158	client.Flush()
159	if err != nil {
160		t.Errorf("Error sending: %s", err)
161	}
162
163	buffer := make([]byte, 4096)
164	n, err := io.ReadAtLeast(server, buffer, 1)
165	result := string(buffer[:n])
166
167	if err != nil {
168		t.Error(err)
169	}
170
171	expected := []string{
172		`foo.ab:1|c|#dd:2`,
173		`foo.ab:-1|c|#dd:2`,
174		`foo.ab:1|c|#dd:2`,
175		`foo.ab:10|g|#dd:2`,
176		`foo.ab:1|h|#dd:2`,
177		`foo.ab:1|d|#dd:2`,
178		`foo.ab:0.123000|ms|#dd:2`,
179		`foo.ab:ss|s|#dd:2`,
180		`foo.ab:xx|s|#dd:2`,
181	}
182
183	for i, res := range strings.Split(result, "\n") {
184		if res != expected[i] {
185			t.Errorf("Got `%s`, expected `%s`", res, expected[i])
186		}
187	}
188
189	client.Event(&statsd.Event{Title: "title1", Text: "text1", Priority: statsd.Normal, AlertType: statsd.Success, Tags: []string{"tagg"}})
190	client.SimpleEvent("event1", "text1")
191	err = client.Flush()
192
193	if err != nil {
194		t.Errorf("Error sending: %s", err)
195	}
196
197	buffer = make([]byte, 1024)
198	n, err = io.ReadAtLeast(server, buffer, 1)
199	result = string(buffer[:n])
200
201	if err != nil {
202		t.Error(err)
203	}
204
205	if n == 0 {
206		t.Errorf("Read 0 bytes but expected more.")
207	}
208
209	expected = []string{
210		`_e{6,5}:title1|text1|p:normal|t:success|#dd:2,tagg`,
211		`_e{6,5}:event1|text1|#dd:2`,
212	}
213
214	for i, res := range strings.Split(result, "\n") {
215		if res != expected[i] {
216			t.Errorf("Got `%s`, expected `%s`", res, expected[i])
217		}
218	}
219
220}
221
222func stringsToBytes(ss []string) [][]byte {
223	bs := make([][]byte, len(ss))
224	for i, s := range ss {
225		bs[i] = []byte(s)
226	}
227	return bs
228}
229
230func TestNilError(t *testing.T) {
231	var c *statsd.Client
232	tests := []func() error{
233		func() error { return c.SetWriteTimeout(0) },
234		func() error { return c.Flush() },
235		func() error { return c.Close() },
236		func() error { return c.Count("", 0, nil, 1) },
237		func() error { return c.Incr("", nil, 1) },
238		func() error { return c.Decr("", nil, 1) },
239		func() error { return c.Histogram("", 0, nil, 1) },
240		func() error { return c.Distribution("", 0, nil, 1) },
241		func() error { return c.Gauge("", 0, nil, 1) },
242		func() error { return c.Set("", "", nil, 1) },
243		func() error { return c.Timing("", time.Second, nil, 1) },
244		func() error { return c.TimeInMilliseconds("", 1, nil, 1) },
245		func() error { return c.Event(statsd.NewEvent("", "")) },
246		func() error { return c.SimpleEvent("", "") },
247		func() error { return c.ServiceCheck(statsd.NewServiceCheck("", statsd.Ok)) },
248		func() error { return c.SimpleServiceCheck("", statsd.Ok) },
249		func() error {
250			_, err := statsd.CloneWithExtraOptions(nil, statsd.WithChannelMode())
251			return err
252		},
253	}
254	for i, f := range tests {
255		var err error
256		assertNotPanics(t, func() { err = f() })
257		if err != statsd.ErrNoClient {
258			t.Errorf("Test case %d: expected ErrNoClient, got %#v", i, err)
259		}
260	}
261}
262
263func TestEvents(t *testing.T) {
264	matrix := []struct {
265		event   *statsd.Event
266		encoded string
267	}{
268		{
269			statsd.NewEvent("Hello", "Something happened to my event"),
270			`_e{5,30}:Hello|Something happened to my event`,
271		}, {
272			&statsd.Event{Title: "hi", Text: "okay", AggregationKey: "foo"},
273			`_e{2,4}:hi|okay|k:foo`,
274		}, {
275			&statsd.Event{Title: "hi", Text: "okay", AggregationKey: "foo", AlertType: statsd.Info},
276			`_e{2,4}:hi|okay|k:foo|t:info`,
277		}, {
278			&statsd.Event{Title: "hi", Text: "w/e", AlertType: statsd.Error, Priority: statsd.Normal},
279			`_e{2,3}:hi|w/e|p:normal|t:error`,
280		}, {
281			&statsd.Event{Title: "hi", Text: "uh", Tags: []string{"host:foo", "app:bar"}},
282			`_e{2,2}:hi|uh|#host:foo,app:bar`,
283		}, {
284			&statsd.Event{Title: "hi", Text: "line1\nline2", Tags: []string{"hello\nworld"}},
285			`_e{2,12}:hi|line1\nline2|#helloworld`,
286		},
287	}
288
289	for _, m := range matrix {
290		r, err := m.event.Encode()
291		if err != nil {
292			t.Errorf("Error encoding: %s\n", err)
293			continue
294		}
295		if r != m.encoded {
296			t.Errorf("Expected `%s`, got `%s`\n", m.encoded, r)
297		}
298	}
299
300	e := statsd.NewEvent("", "hi")
301	if _, err := e.Encode(); err == nil {
302		t.Errorf("Expected error on empty Title.")
303	}
304
305	e = statsd.NewEvent("hi", "")
306	if _, err := e.Encode(); err == nil {
307		t.Errorf("Expected error on empty Text.")
308	}
309
310	e = statsd.NewEvent("hello", "world")
311	s, err := e.Encode("tag1", "tag2")
312	if err != nil {
313		t.Error(err)
314	}
315	expected := "_e{5,5}:hello|world|#tag1,tag2"
316	if s != expected {
317		t.Errorf("Expected %s, got %s", expected, s)
318	}
319	if len(e.Tags) != 0 {
320		t.Errorf("Modified event in place illegally.")
321	}
322}
323
324func TestServiceChecks(t *testing.T) {
325	matrix := []struct {
326		serviceCheck *statsd.ServiceCheck
327		encoded      string
328	}{
329		{
330			statsd.NewServiceCheck("DataCatService", statsd.Ok),
331			`_sc|DataCatService|0`,
332		}, {
333			statsd.NewServiceCheck("DataCatService", statsd.Warn),
334			`_sc|DataCatService|1`,
335		}, {
336			statsd.NewServiceCheck("DataCatService", statsd.Critical),
337			`_sc|DataCatService|2`,
338		}, {
339			statsd.NewServiceCheck("DataCatService", statsd.Unknown),
340			`_sc|DataCatService|3`,
341		}, {
342			&statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat"},
343			`_sc|DataCatService|0|h:DataStation.Cat`,
344		}, {
345			&statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat", Message: "Here goes valuable message"},
346			`_sc|DataCatService|0|h:DataStation.Cat|m:Here goes valuable message`,
347		}, {
348			&statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat", Message: "Here are some cyrillic chars: к л м н о п р с т у ф х ц ч ш"},
349			`_sc|DataCatService|0|h:DataStation.Cat|m:Here are some cyrillic chars: к л м н о п р с т у ф х ц ч ш`,
350		}, {
351			&statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat", Message: "Here goes valuable message", Tags: []string{"host:foo", "app:bar"}},
352			`_sc|DataCatService|0|h:DataStation.Cat|#host:foo,app:bar|m:Here goes valuable message`,
353		}, {
354			&statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat", Message: "Here goes \n that should be escaped", Tags: []string{"host:foo", "app:b\nar"}},
355			`_sc|DataCatService|0|h:DataStation.Cat|#host:foo,app:bar|m:Here goes \n that should be escaped`,
356		}, {
357			&statsd.ServiceCheck{Name: "DataCatService", Status: statsd.Ok, Hostname: "DataStation.Cat", Message: "Here goes m: that should be escaped", Tags: []string{"host:foo", "app:bar"}},
358			`_sc|DataCatService|0|h:DataStation.Cat|#host:foo,app:bar|m:Here goes m\: that should be escaped`,
359		},
360	}
361
362	for _, m := range matrix {
363		r, err := m.serviceCheck.Encode()
364		if err != nil {
365			t.Errorf("Error encoding: %s\n", err)
366			continue
367		}
368		if r != m.encoded {
369			t.Errorf("Expected `%s`, got `%s`\n", m.encoded, r)
370		}
371	}
372
373	sc := statsd.NewServiceCheck("", statsd.Ok)
374	if _, err := sc.Encode(); err == nil {
375		t.Errorf("Expected error on empty Name.")
376	}
377
378	sc = statsd.NewServiceCheck("sc", statsd.ServiceCheckStatus(5))
379	if _, err := sc.Encode(); err == nil {
380		t.Errorf("Expected error on invalid status value.")
381	}
382
383	sc = statsd.NewServiceCheck("hello", statsd.Warn)
384	s, err := sc.Encode("tag1", "tag2")
385	if err != nil {
386		t.Error(err)
387	}
388	expected := "_sc|hello|1|#tag1,tag2"
389	if s != expected {
390		t.Errorf("Expected %s, got %s", expected, s)
391	}
392	if len(sc.Tags) != 0 {
393		t.Errorf("Modified serviceCheck in place illegally.")
394	}
395}
396
397func TestEntityID(t *testing.T) {
398	entityIDEnvName := "DD_ENTITY_ID"
399	initialValue, initiallySet := os.LookupEnv(entityIDEnvName)
400	if initiallySet {
401		defer os.Setenv(entityIDEnvName, initialValue)
402	} else {
403		defer os.Unsetenv(entityIDEnvName)
404	}
405
406	// Set to a valid value
407	os.Setenv(entityIDEnvName, "testing")
408	client, err := statsd.New("localhost:8125")
409	if err != nil {
410		t.Fatal(err)
411	}
412	if len(client.Tags) != 1 {
413		t.Errorf("Expecting one tag, got %d", len(client.Tags))
414	}
415	if client.Tags[0] != "dd.internal.entity_id:testing" {
416		t.Errorf("Bad tag value, got %s", client.Tags[0])
417	}
418
419	// Set to empty string
420	os.Setenv(entityIDEnvName, "")
421	client, err = statsd.New("localhost:8125")
422	if err != nil {
423		t.Fatal(err)
424	}
425	if len(client.Tags) != 0 {
426		t.Errorf("Expecting empty default tags, got %v", client.Tags)
427	}
428
429	// Unset
430	os.Unsetenv(entityIDEnvName)
431	client, err = statsd.New("localhost:8125")
432	if err != nil {
433		t.Fatal(err)
434	}
435	if len(client.Tags) != 0 {
436		t.Errorf("Expecting empty default tags, got %v", client.Tags)
437	}
438}
439
440var (
441	ddEnvName     = "DD_ENV"
442	ddServiceName = "DD_SERVICE"
443	ddVersionName = "DD_VERSION"
444)
445
446func TestDDEnvServiceVersionSet(t *testing.T) {
447	for _, tt := range []struct {
448		DDEnv     string
449		DDService string
450		DDVersion string
451		Expected  []string
452	}{
453		{"", "", "", []string{}},
454		{"prod", "", "", []string{"env:prod"}},
455		{"prod", "dog", "", []string{"env:prod", "service:dog"}},
456		{"prod", "dog", "abc123", []string{"env:prod", "service:dog", "version:abc123"}},
457	} {
458		for _, t := range []string{ddEnvName, ddServiceName, ddVersionName} {
459			initialValue, initiallySet := os.LookupEnv(t)
460			if initiallySet {
461				defer os.Setenv(t, initialValue)
462			} else {
463				defer os.Unsetenv(t)
464			}
465		}
466		os.Setenv(ddEnvName, tt.DDEnv)
467		os.Setenv(ddServiceName, tt.DDService)
468		os.Setenv(ddVersionName, tt.DDVersion)
469		client, err := statsd.New("localhost:8125")
470		if err != nil {
471			t.Fatal(err)
472		}
473		// Keep the ordering of global tags consistent.
474		sort.Strings(client.Tags)
475		assert.Equal(t, tt.Expected, client.Tags)
476	}
477}
478
479func TestDDEnvServiceVersionTagsEmitted(t *testing.T) {
480	for _, t := range []string{ddEnvName, ddServiceName, ddVersionName} {
481		initialValue, initiallySet := os.LookupEnv(t)
482		if initiallySet {
483			defer os.Setenv(t, initialValue)
484		} else {
485			defer os.Unsetenv(t)
486		}
487	}
488	os.Setenv(ddEnvName, "prod")
489	os.Setenv(ddServiceName, "dog")
490	os.Setenv(ddVersionName, "abc123")
491	addr := "localhost:1201"
492	udpAddr, err := net.ResolveUDPAddr("udp", addr)
493	if err != nil {
494		t.Fatal(err)
495	}
496	server, err := net.ListenUDP("udp", udpAddr)
497	if err != nil {
498		t.Fatal(err)
499	}
500	defer server.Close()
501
502	for _, tt := range []struct {
503		Tags       []string
504		GlobalTags []string
505		Expected   string
506	}{
507		{nil, nil, "test.count:100|c|#env:prod,service:dog,version:abc123"},
508		{[]string{"env:staging", "service:cat", "custom_tag"}, nil, "test.count:100|c|#env:prod,service:dog,version:abc123,env:staging,service:cat,custom_tag"},
509		{nil, []string{"version:def456", "custom_tag_two"}, "test.count:100|c|#custom_tag_two,env:prod,service:dog,version:abc123,version:def456"},
510		{[]string{"env:staging", "service:cat", "custom_tag"}, []string{"version:def456", "custom_tag_two"}, "test.count:100|c|#custom_tag_two,env:prod,service:dog,version:abc123,version:def456,env:staging,service:cat,custom_tag"},
511	} {
512		client, err := statsd.New(addr, statsd.WithTags(tt.GlobalTags))
513		if err != nil {
514			t.Fatal(err)
515		}
516		// Keep the ordering of global tags consistent.
517		sort.Strings(client.Tags)
518		client.Count("test.count", 100, tt.Tags, 1.0)
519		err = client.Flush()
520		if err != nil {
521			t.Errorf("Error sending: %s", err)
522		}
523		buffer := make([]byte, 1024)
524		n, err := io.ReadAtLeast(server, buffer, 1)
525		if err != nil {
526			t.Errorf("ReadAtLeast: %s", err)
527		}
528		result := string(buffer[:n])
529		if result != tt.Expected {
530			t.Errorf("Flushed metric incorrect; expected %s but got %s", tt.Expected, result)
531		}
532	}
533}
534
535func TestClosePanic(t *testing.T) {
536	c, err := statsd.New("localhost:8125")
537	assert.NoError(t, err)
538	c.Close()
539	c.Close()
540}
541
542func TestCloseRace(t *testing.T) {
543	for i := 0; i < 100; i++ {
544		c, err := statsd.New("localhost:8125")
545		assert.NoError(t, err)
546		start := make(chan struct{})
547		var wg sync.WaitGroup
548		for j := 0; j < 100; j++ {
549			wg.Add(1)
550			go func() {
551				defer wg.Done()
552				<-start
553				c.Close()
554			}()
555		}
556		close(start)
557		wg.Wait()
558	}
559}
560