1package statsd 2 3import ( 4 "fmt" 5 "io" 6 "os" 7 "sort" 8 "testing" 9 "time" 10 11 "github.com/stretchr/testify/assert" 12 "github.com/stretchr/testify/require" 13) 14 15var basicExpectedTags = []string{clientTelemetryTag, clientVersionTelemetryTag, "client_transport:test_transport"} 16 17func appendBasicMetrics(metrics []metric, tags []string) []metric { 18 basicExpectedMetrics := map[string]int64{ 19 "datadog.dogstatsd.client.metrics": 9, 20 "datadog.dogstatsd.client.events": 1, 21 "datadog.dogstatsd.client.service_checks": 1, 22 "datadog.dogstatsd.client.metric_dropped_on_receive": 0, 23 "datadog.dogstatsd.client.packets_sent": 0, 24 "datadog.dogstatsd.client.bytes_sent": 0, 25 "datadog.dogstatsd.client.packets_dropped": 0, 26 "datadog.dogstatsd.client.bytes_dropped": 0, 27 "datadog.dogstatsd.client.packets_dropped_queue": 0, 28 "datadog.dogstatsd.client.bytes_dropped_queue": 0, 29 "datadog.dogstatsd.client.packets_dropped_writer": 0, 30 "datadog.dogstatsd.client.bytes_dropped_writer": 0, 31 } 32 33 for name, value := range basicExpectedMetrics { 34 metrics = append(metrics, metric{ 35 name: name, 36 ivalue: value, 37 metricType: count, 38 tags: append(tags, basicExpectedTags...), 39 rate: float64(1), 40 }) 41 } 42 return metrics 43} 44 45func appendAggregationMetrics(metrics []metric, tags []string, devMode bool, extendedAggregation bool) []metric { 46 if extendedAggregation { 47 metrics = append(metrics, metric{ 48 name: "datadog.dogstatsd.client.aggregated_context", 49 ivalue: 9, 50 metricType: count, 51 tags: append(tags, basicExpectedTags...), 52 rate: float64(1), 53 }) 54 } else { 55 metrics = append(metrics, metric{ 56 name: "datadog.dogstatsd.client.aggregated_context", 57 ivalue: 5, 58 metricType: count, 59 tags: append(tags, basicExpectedTags...), 60 rate: float64(1), 61 }) 62 } 63 64 if devMode { 65 contextByTypeName := "datadog.dogstatsd.client.aggregated_context_by_type" 66 devModeAggregationExpectedMetrics := map[string]int64{ 67 "metrics_type:gauge": 1, 68 "metrics_type:set": 1, 69 "metrics_type:count": 3, 70 "metrics_type:histogram": 0, 71 "metrics_type:distribution": 0, 72 "metrics_type:timing": 0, 73 } 74 if extendedAggregation { 75 devModeAggregationExpectedMetrics["metrics_type:histogram"] = 1 76 devModeAggregationExpectedMetrics["metrics_type:distribution"] = 1 77 devModeAggregationExpectedMetrics["metrics_type:timing"] = 2 78 } 79 80 for typeTag, value := range devModeAggregationExpectedMetrics { 81 metrics = append(metrics, metric{ 82 name: contextByTypeName, 83 ivalue: value, 84 metricType: count, 85 tags: append(tags, append(basicExpectedTags, typeTag)...), 86 rate: float64(1), 87 }) 88 } 89 } 90 return metrics 91} 92 93func appendDevModeMetrics(metrics []metric, tags []string) []metric { 94 metricByTypeName := "datadog.dogstatsd.client.metrics_by_type" 95 devModeExpectedMetrics := map[string]int64{ 96 "metrics_type:gauge": 1, 97 "metrics_type:count": 3, 98 "metrics_type:set": 1, 99 "metrics_type:timing": 2, 100 "metrics_type:histogram": 1, 101 "metrics_type:distribution": 1, 102 } 103 104 for typeTag, value := range devModeExpectedMetrics { 105 metrics = append(metrics, metric{ 106 name: metricByTypeName, 107 ivalue: value, 108 metricType: count, 109 tags: append(tags, append(basicExpectedTags, typeTag)...), 110 rate: float64(1), 111 }) 112 } 113 return metrics 114} 115 116func TestNewTelemetry(t *testing.T) { 117 client, err := New("localhost:8125", WithoutTelemetry(), WithNamespace("test_namespace")) 118 require.Nil(t, err) 119 120 telemetry := newTelemetryClient(client, "test_transport", false) 121 assert.NotNil(t, telemetry) 122 123 assert.Equal(t, telemetry.c, client) 124 assert.Equal(t, telemetry.tags, basicExpectedTags) 125 assert.Nil(t, telemetry.sender) 126 assert.Nil(t, telemetry.worker) 127} 128 129func submitTestMetrics(c *Client) { 130 c.Gauge("Gauge", 21, nil, 1) 131 c.Count("Count", 21, nil, 1) 132 c.Histogram("Histogram", 21, nil, 1) 133 c.Distribution("Distribution", 21, nil, 1) 134 c.Decr("Decr", nil, 1) 135 c.Incr("Incr", nil, 1) 136 c.Set("Set", "value", nil, 1) 137 c.Timing("Timing", 21, nil, 1) 138 c.TimeInMilliseconds("TimeInMilliseconds", 21, nil, 1) 139 c.SimpleEvent("hello", "world") 140 c.SimpleServiceCheck("hello", Warn) 141} 142 143type metricSorted []metric 144 145func (s metricSorted) Len() int { return len(s) } 146func (s metricSorted) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 147 148func (s metricSorted) Less(i, j int) bool { 149 if s[i].name == s[j].name { 150 if len(s[i].tags) != len(s[j].tags) { 151 return len(s[i].tags) < len(s[j].tags) 152 } 153 154 sort.Strings(s[i].tags) 155 sort.Strings(s[j].tags) 156 157 for idx := range s[i].tags { 158 if s[i].tags[idx] != s[j].tags[idx] { 159 return s[i].tags[idx] < s[j].tags[idx] 160 } 161 } 162 return false 163 } 164 return s[i].name < s[j].name 165} 166 167func testTelemetry(t *testing.T, telemetry *telemetryClient, expectedMetrics []metric) { 168 assert.NotNil(t, telemetry) 169 170 submitTestMetrics(telemetry.c) 171 if telemetry.c.agg != nil { 172 telemetry.c.agg.sendMetrics() 173 } 174 metrics := telemetry.flush() 175 176 require.Equal(t, len(expectedMetrics), len(metrics), fmt.Sprintf("expected:\n%v\nactual:\n%v", expectedMetrics, metrics)) 177 178 sort.Sort(metricSorted(metrics)) 179 sort.Sort(metricSorted(expectedMetrics)) 180 181 for idx := range metrics { 182 m := metrics[idx] 183 expected := expectedMetrics[idx] 184 185 assert.Equal(t, expected.ivalue, m.ivalue, fmt.Sprintf("wrong ivalue for '%s' with tags '%v'", m.name, m.tags)) 186 assert.Equal(t, expected.metricType, m.metricType, fmt.Sprintf("wrong metricType for '%s'", m.name)) 187 188 assert.Equal(t, expected.tags, m.tags, fmt.Sprintf("wrong tags for '%s'", m.name)) 189 assert.Equal(t, expected.rate, m.rate, fmt.Sprintf("wrong rate for '%s'", m.name)) 190 } 191} 192 193func TestTelemetry(t *testing.T) { 194 // disabling autoflush of the telemetry 195 client, err := New("localhost:8125", WithoutTelemetry()) 196 require.Nil(t, err) 197 198 expectedMetrics := []metric{} 199 expectedMetrics = appendBasicMetrics(expectedMetrics, nil) 200 201 telemetry := newTelemetryClient(client, "test_transport", false) 202 testTelemetry(t, telemetry, expectedMetrics) 203} 204 205func TestTelemetryDevMode(t *testing.T) { 206 // disabling autoflush of the telemetry 207 client, err := New("localhost:8125", WithoutTelemetry(), WithDevMode()) 208 require.Nil(t, err) 209 210 expectedMetrics := []metric{} 211 expectedMetrics = appendBasicMetrics(expectedMetrics, nil) 212 expectedMetrics = appendDevModeMetrics(expectedMetrics, nil) 213 214 telemetry := newTelemetryClient(client, "test_transport", true) 215 testTelemetry(t, telemetry, expectedMetrics) 216} 217 218func TestTelemetryChannelMode(t *testing.T) { 219 // disabling autoflush of the telemetry 220 client, err := New("localhost:8125", WithoutTelemetry(), WithChannelMode()) 221 require.Nil(t, err) 222 223 telemetry := newTelemetryClient(client, "test_transport", false) 224 225 expectedMetrics := []metric{} 226 expectedMetrics = appendBasicMetrics(expectedMetrics, nil) 227 228 testTelemetry(t, telemetry, expectedMetrics) 229} 230 231func TestTelemetryWithGlobalTags(t *testing.T) { 232 orig := os.Getenv("DD_ENV") 233 os.Setenv("DD_ENV", "test") 234 defer os.Setenv("DD_ENV", orig) 235 236 // disabling autoflush of the telemetry 237 client, err := New("localhost:8125", WithoutTelemetry(), WithTags([]string{"tag1", "tag2"})) 238 require.Nil(t, err) 239 240 telemetry := newTelemetryClient(client, "test_transport", false) 241 242 expectedMetrics := []metric{} 243 expectedMetrics = appendBasicMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}) 244 245 testTelemetry(t, telemetry, expectedMetrics) 246} 247 248func TestTelemetryWithAggregationBasic(t *testing.T) { 249 // disabling autoflush of the telemetry 250 client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation()) 251 require.Nil(t, err) 252 253 telemetry := newTelemetryClient(client, "test_transport", false) 254 255 expectedMetrics := []metric{} 256 expectedMetrics = appendBasicMetrics(expectedMetrics, nil) 257 expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, false, false) 258 259 testTelemetry(t, telemetry, expectedMetrics) 260} 261 262func TestTelemetryWithAggregationAllType(t *testing.T) { 263 // disabling autoflush of the telemetry 264 client, err := New("localhost:8125", WithoutTelemetry(), WithExtendedClientSideAggregation()) 265 require.Nil(t, err) 266 267 telemetry := newTelemetryClient(client, "test_transport", false) 268 269 expectedMetrics := []metric{} 270 expectedMetrics = appendBasicMetrics(expectedMetrics, nil) 271 expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, false, true) 272 273 testTelemetry(t, telemetry, expectedMetrics) 274} 275 276func TestTelemetryWithAggregationDevMode(t *testing.T) { 277 // disabling autoflush of the telemetry 278 client, err := New("localhost:8125", WithoutTelemetry(), WithExtendedClientSideAggregation(), WithDevMode()) 279 require.Nil(t, err) 280 281 telemetry := newTelemetryClient(client, "test_transport", true) 282 283 expectedMetrics := []metric{} 284 expectedMetrics = appendBasicMetrics(expectedMetrics, nil) 285 expectedMetrics = appendAggregationMetrics(expectedMetrics, nil, true, true) 286 expectedMetrics = appendDevModeMetrics(expectedMetrics, nil) 287 288 testTelemetry(t, telemetry, expectedMetrics) 289} 290 291func TestTelemetryWithAggregationDevModeWithGlobalTags(t *testing.T) { 292 orig := os.Getenv("DD_ENV") 293 os.Setenv("DD_ENV", "test") 294 defer os.Setenv("DD_ENV", orig) 295 296 // disabling autoflush of the telemetry 297 client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation(), WithDevMode(), WithTags([]string{"tag1", "tag2"})) 298 require.Nil(t, err) 299 300 telemetry := newTelemetryClient(client, "test_transport", true) 301 302 expectedMetrics := []metric{} 303 expectedMetrics = appendBasicMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}) 304 expectedMetrics = appendAggregationMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}, true, false) 305 expectedMetrics = appendDevModeMetrics(expectedMetrics, []string{"tag1", "tag2", "env:test"}) 306 307 testTelemetry(t, telemetry, expectedMetrics) 308} 309 310func TestTelemetryCustomAddr(t *testing.T) { 311 buffer := make([]byte, 4096) 312 addr := "localhost:1201" 313 server := getTestServer(t, addr) 314 defer server.Close() 315 316 client, err := New("localhost:9876", WithTelemetryAddr(addr), WithNamespace("test_namespace")) 317 require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) 318 readDone := make(chan struct{}) 319 n := 0 320 go func() { 321 n, _ = io.ReadAtLeast(server, buffer, 1) 322 close(readDone) 323 }() 324 325 submitTestMetrics(client) 326 client.telemetry.sendTelemetry() 327 328 select { 329 case <-readDone: 330 case <-time.After(2 * time.Second): 331 require.Fail(t, "No data was flush on Close") 332 } 333 334 result := string(buffer[:n]) 335 336 expectedPayload := "datadog.dogstatsd.client.metrics:9|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 337 "datadog.dogstatsd.client.events:1|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 338 "datadog.dogstatsd.client.service_checks:1|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 339 "datadog.dogstatsd.client.metric_dropped_on_receive:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 340 "datadog.dogstatsd.client.packets_sent:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 341 "datadog.dogstatsd.client.bytes_sent:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 342 "datadog.dogstatsd.client.packets_dropped:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 343 "datadog.dogstatsd.client.bytes_dropped:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 344 "datadog.dogstatsd.client.packets_dropped_queue:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 345 "datadog.dogstatsd.client.bytes_dropped_queue:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 346 "datadog.dogstatsd.client.packets_dropped_writer:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + 347 "datadog.dogstatsd.client.bytes_dropped_writer:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp" 348 assert.Equal(t, expectedPayload, result) 349} 350