1package statsd
2
3import (
4	"fmt"
5	"io"
6	"os"
7	"sort"
8	"testing"
9	"time"
10
11	"github.com/stretchr/testify/assert"
12	"github.com/stretchr/testify/require"
13)
14
15var basicExpectedTags = []string{clientTelemetryTag, clientVersionTelemetryTag, "client_transport:test_transport"}
16
17func appendBasicMetrics(metrics []metric, tags []string) []metric {
18	basicExpectedMetrics := map[string]int64{
19		"datadog.dogstatsd.client.metrics":                   9,
20		"datadog.dogstatsd.client.events":                    1,
21		"datadog.dogstatsd.client.service_checks":            1,
22		"datadog.dogstatsd.client.metric_dropped_on_receive": 0,
23		"datadog.dogstatsd.client.packets_sent":              0,
24		"datadog.dogstatsd.client.bytes_sent":                0,
25		"datadog.dogstatsd.client.packets_dropped":           0,
26		"datadog.dogstatsd.client.bytes_dropped":             0,
27		"datadog.dogstatsd.client.packets_dropped_queue":     0,
28		"datadog.dogstatsd.client.bytes_dropped_queue":       0,
29		"datadog.dogstatsd.client.packets_dropped_writer":    0,
30		"datadog.dogstatsd.client.bytes_dropped_writer":      0,
31	}
32
33	for name, value := range basicExpectedMetrics {
34		metrics = append(metrics, metric{
35			name:       name,
36			ivalue:     value,
37			metricType: count,
38			tags:       append(tags, basicExpectedTags...),
39			rate:       float64(1),
40		})
41	}
42	return metrics
43}
44
45func appendAggregationMetrics(metrics []metric, tags []string, devMode bool, extendedAggregation bool) []metric {
46	if extendedAggregation {
47		metrics = append(metrics, metric{
48			name:       "datadog.dogstatsd.client.aggregated_context",
49			ivalue:     9,
50			metricType: count,
51			tags:       append(tags, basicExpectedTags...),
52			rate:       float64(1),
53		})
54	} else {
55		metrics = append(metrics, metric{
56			name:       "datadog.dogstatsd.client.aggregated_context",
57			ivalue:     5,
58			metricType: count,
59			tags:       append(tags, basicExpectedTags...),
60			rate:       float64(1),
61		})
62	}
63
64	if devMode {
65		contextByTypeName := "datadog.dogstatsd.client.aggregated_context_by_type"
66		devModeAggregationExpectedMetrics := map[string]int64{
67			"metrics_type:gauge":        1,
68			"metrics_type:set":          1,
69			"metrics_type:count":        3,
70			"metrics_type:histogram":    0,
71			"metrics_type:distribution": 0,
72			"metrics_type:timing":       0,
73		}
74		if extendedAggregation {
75			devModeAggregationExpectedMetrics["metrics_type:histogram"] = 1
76			devModeAggregationExpectedMetrics["metrics_type:distribution"] = 1
77			devModeAggregationExpectedMetrics["metrics_type:timing"] = 2
78		}
79
80		for typeTag, value := range devModeAggregationExpectedMetrics {
81			metrics = append(metrics, metric{
82				name:       contextByTypeName,
83				ivalue:     value,
84				metricType: count,
85				tags:       append(tags, append(basicExpectedTags, typeTag)...),
86				rate:       float64(1),
87			})
88		}
89	}
90	return metrics
91}
92
93func appendDevModeMetrics(metrics []metric, tags []string) []metric {
94	metricByTypeName := "datadog.dogstatsd.client.metrics_by_type"
95	devModeExpectedMetrics := map[string]int64{
96		"metrics_type:gauge":        1,
97		"metrics_type:count":        3,
98		"metrics_type:set":          1,
99		"metrics_type:timing":       2,
100		"metrics_type:histogram":    1,
101		"metrics_type:distribution": 1,
102	}
103
104	for typeTag, value := range devModeExpectedMetrics {
105		metrics = append(metrics, metric{
106			name:       metricByTypeName,
107			ivalue:     value,
108			metricType: count,
109			tags:       append(tags, append(basicExpectedTags, typeTag)...),
110			rate:       float64(1),
111		})
112	}
113	return metrics
114}
115
116func TestNewTelemetry(t *testing.T) {
117	client, err := New("localhost:8125", WithoutTelemetry(), WithNamespace("test_namespace"))
118	require.Nil(t, err)
119
120	telemetry := newTelemetryClient(client, "test_transport", false)
121	assert.NotNil(t, telemetry)
122
123	assert.Equal(t, telemetry.c, client)
124	assert.Equal(t, telemetry.tags, basicExpectedTags)
125	assert.Nil(t, telemetry.sender)
126	assert.Nil(t, telemetry.worker)
127}
128
129func submitTestMetrics(c *Client) {
130	c.Gauge("Gauge", 21, nil, 1)
131	c.Count("Count", 21, nil, 1)
132	c.Histogram("Histogram", 21, nil, 1)
133	c.Distribution("Distribution", 21, nil, 1)
134	c.Decr("Decr", nil, 1)
135	c.Incr("Incr", nil, 1)
136	c.Set("Set", "value", nil, 1)
137	c.Timing("Timing", 21, nil, 1)
138	c.TimeInMilliseconds("TimeInMilliseconds", 21, nil, 1)
139	c.SimpleEvent("hello", "world")
140	c.SimpleServiceCheck("hello", Warn)
141}
142
143type metricSorted []metric
144
145func (s metricSorted) Len() int      { return len(s) }
146func (s metricSorted) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
147
148func (s metricSorted) Less(i, j int) bool {
149	if s[i].name == s[j].name {
150		if len(s[i].tags) != len(s[j].tags) {
151			return len(s[i].tags) < len(s[j].tags)
152		}
153
154		sort.Strings(s[i].tags)
155		sort.Strings(s[j].tags)
156
157		for idx := range s[i].tags {
158			if s[i].tags[idx] != s[j].tags[idx] {
159				return s[i].tags[idx] < s[j].tags[idx]
160			}
161		}
162		return false
163	}
164	return s[i].name < s[j].name
165}
166
167func testTelemetry(t *testing.T, telemetry *telemetryClient, expectedMetrics []metric) {
168	assert.NotNil(t, telemetry)
169
170	submitTestMetrics(telemetry.c)
171	if telemetry.c.agg != nil {
172		telemetry.c.agg.sendMetrics()
173	}
174	metrics := telemetry.flush()
175
176	require.Equal(t, len(expectedMetrics), len(metrics), fmt.Sprintf("expected:\n%v\nactual:\n%v", expectedMetrics, metrics))
177
178	sort.Sort(metricSorted(metrics))
179	sort.Sort(metricSorted(expectedMetrics))
180
181	for idx := range metrics {
182		m := metrics[idx]
183		expected := expectedMetrics[idx]
184
185		assert.Equal(t, expected.ivalue, m.ivalue, fmt.Sprintf("wrong ivalue for '%s' with tags '%v'", m.name, m.tags))
186		assert.Equal(t, expected.metricType, m.metricType, fmt.Sprintf("wrong metricType for '%s'", m.name))
187
188		assert.Equal(t, expected.tags, m.tags, fmt.Sprintf("wrong tags for '%s'", m.name))
189		assert.Equal(t, expected.rate, m.rate, fmt.Sprintf("wrong rate for '%s'", m.name))
190	}
191}
192
193func TestTelemetry(t *testing.T) {
194	// disabling autoflush of the telemetry
195	client, err := New("localhost:8125", WithoutTelemetry())
196	require.Nil(t, err)
197
198	expectedMetrics := []metric{}
199	expectedMetrics = appendBasicMetrics(expectedMetrics, nil)
200
201	telemetry := newTelemetryClient(client, "test_transport", false)
202	testTelemetry(t, telemetry, expectedMetrics)
203}
204
205func TestTelemetryDevMode(t *testing.T) {
206	// disabling autoflush of the telemetry
207	client, err := New("localhost:8125", WithoutTelemetry(), WithDevMode())
208	require.Nil(t, err)
209
210	expectedMetrics := []metric{}
211	expectedMetrics = appendBasicMetrics(expectedMetrics, nil)
212	expectedMetrics = appendDevModeMetrics(expectedMetrics, nil)
213
214	telemetry := newTelemetryClient(client, "test_transport", true)
215	testTelemetry(t, telemetry, expectedMetrics)
216}
217
218func TestTelemetryChannelMode(t *testing.T) {
219	// disabling autoflush of the telemetry
220	client, err := New("localhost:8125", WithoutTelemetry(), WithChannelMode())
221	require.Nil(t, err)
222
223	telemetry := newTelemetryClient(client, "test_transport", false)
224
225	expectedMetrics := []metric{}
226	expectedMetrics = appendBasicMetrics(expectedMetrics, nil)
227
228	testTelemetry(t, telemetry, expectedMetrics)
229}
230
231func TestTelemetryWithGlobalTags(t *testing.T) {
232	orig := os.Getenv("DD_ENV")
233	os.Setenv("DD_ENV", "test")
234	defer os.Setenv("DD_ENV", orig)
235
236	// disabling autoflush of the telemetry
237	client, err := New("localhost:8125", WithoutTelemetry(), WithTags([]string{"tag1", "tag2"}))
238	require.Nil(t, err)
239
240	telemetry := newTelemetryClient(client, "test_transport", false)
241
242	expectedMetrics := []metric{}
243	expectedMetrics = appendBasicMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"})
244
245	testTelemetry(t, telemetry, expectedMetrics)
246}
247
248func TestTelemetryWithAggregationBasic(t *testing.T) {
249	// disabling autoflush of the telemetry
250	client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation())
251	require.Nil(t, err)
252
253	telemetry := newTelemetryClient(client, "test_transport", false)
254
255	expectedMetrics := []metric{}
256	expectedMetrics = appendBasicMetrics(expectedMetrics, nil)
257	expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, false, false)
258
259	testTelemetry(t, telemetry, expectedMetrics)
260}
261
262func TestTelemetryWithAggregationAllType(t *testing.T) {
263	// disabling autoflush of the telemetry
264	client, err := New("localhost:8125", WithoutTelemetry(), WithExtendedClientSideAggregation())
265	require.Nil(t, err)
266
267	telemetry := newTelemetryClient(client, "test_transport", false)
268
269	expectedMetrics := []metric{}
270	expectedMetrics = appendBasicMetrics(expectedMetrics, nil)
271	expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, false, true)
272
273	testTelemetry(t, telemetry, expectedMetrics)
274}
275
276func TestTelemetryWithAggregationDevMode(t *testing.T) {
277	// disabling autoflush of the telemetry
278	client, err := New("localhost:8125", WithoutTelemetry(), WithExtendedClientSideAggregation(), WithDevMode())
279	require.Nil(t, err)
280
281	telemetry := newTelemetryClient(client, "test_transport", true)
282
283	expectedMetrics := []metric{}
284	expectedMetrics = appendBasicMetrics(expectedMetrics, nil)
285	expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, true, true)
286	expectedMetrics = appendDevModeMetrics(expectedMetrics, nil)
287
288	testTelemetry(t, telemetry, expectedMetrics)
289}
290
291func TestTelemetryWithAggregationDevModeWithGlobalTags(t *testing.T) {
292	orig := os.Getenv("DD_ENV")
293	os.Setenv("DD_ENV", "test")
294	defer os.Setenv("DD_ENV", orig)
295
296	// disabling autoflush of the telemetry
297	client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation(), WithDevMode(), WithTags([]string{"tag1", "tag2"}))
298	require.Nil(t, err)
299
300	telemetry := newTelemetryClient(client, "test_transport", true)
301
302	expectedMetrics := []metric{}
303	expectedMetrics = appendBasicMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"})
304	expectedMetrics = appendAggregationMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}, true, false)
305	expectedMetrics = appendDevModeMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"})
306
307	testTelemetry(t, telemetry, expectedMetrics)
308}
309
310func TestTelemetryCustomAddr(t *testing.T) {
311	buffer := make([]byte, 4096)
312	addr := "localhost:1201"
313	server := getTestServer(t, addr)
314	defer server.Close()
315
316	client, err := New("localhost:9876", WithTelemetryAddr(addr), WithNamespace("test_namespace"))
317	require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err))
318	readDone := make(chan struct{})
319	n := 0
320	go func() {
321		n, _ = io.ReadAtLeast(server, buffer, 1)
322		close(readDone)
323	}()
324
325	submitTestMetrics(client)
326	client.telemetry.sendTelemetry()
327
328	select {
329	case <-readDone:
330	case <-time.After(2 * time.Second):
331		require.Fail(t, "No data was flush on Close")
332	}
333
334	result := string(buffer[:n])
335
336	expectedPayload := "datadog.dogstatsd.client.metrics:9|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
337		"datadog.dogstatsd.client.events:1|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
338		"datadog.dogstatsd.client.service_checks:1|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
339		"datadog.dogstatsd.client.metric_dropped_on_receive:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
340		"datadog.dogstatsd.client.packets_sent:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
341		"datadog.dogstatsd.client.bytes_sent:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
342		"datadog.dogstatsd.client.packets_dropped:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
343		"datadog.dogstatsd.client.bytes_dropped:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
344		"datadog.dogstatsd.client.packets_dropped_queue:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
345		"datadog.dogstatsd.client.bytes_dropped_queue:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
346		"datadog.dogstatsd.client.packets_dropped_writer:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" +
347		"datadog.dogstatsd.client.bytes_dropped_writer:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp"
348	assert.Equal(t, expectedPayload, result)
349}
350