1// Copyright 2013 Ooyala, Inc. 2 3/* 4Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd, 5adding tags and histograms and pushing upstream to Datadog. 6 7Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD. 8 9Example Usage: 10 11 // Create the client 12 c, err := statsd.New("127.0.0.1:8125") 13 if err != nil { 14 log.Fatal(err) 15 } 16 // Prefix every metric with the app name 17 c.Namespace = "flubber." 18 // Send the EC2 availability zone as a tag with every metric 19 c.Tags = append(c.Tags, "us-east-1a") 20 err = c.Gauge("request.duration", 1.2, nil, 1) 21 22statsd is based on go-statsd-client. 23*/ 24package statsd 25 26import ( 27 "fmt" 28 "math/rand" 29 "os" 30 "strings" 31 "sync" 32 "time" 33) 34 35/* 36OptimalUDPPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes 37is optimal for regular networks with an MTU of 1500 so datagrams don't get 38fragmented. It's generally recommended not to fragment UDP datagrams as losing 39a single fragment will cause the entire datagram to be lost. 40*/ 41const OptimalUDPPayloadSize = 1432 42 43/* 44MaxUDPPayloadSize defines the maximum payload size for a UDP datagram. 45Its value comes from the calculation: 65535 bytes Max UDP datagram size - 468byte UDP header - 60byte max IP headers 47any number greater than that will see frames being cut out. 48*/ 49const MaxUDPPayloadSize = 65467 50 51// DefaultUDPBufferPoolSize is the default size of the buffer pool for UDP clients. 52const DefaultUDPBufferPoolSize = 2048 53 54// DefaultUDSBufferPoolSize is the default size of the buffer pool for UDS clients. 55const DefaultUDSBufferPoolSize = 512 56 57/* 58DefaultMaxAgentPayloadSize is the default maximum payload size the agent 59can receive. This can be adjusted by changing dogstatsd_buffer_size in the 60agent configuration file datadog.yaml. 61*/ 62const DefaultMaxAgentPayloadSize = 8192 63 64/* 65TelemetryInterval is the interval at which telemetry will be sent by the client. 66*/ 67const TelemetryInterval = 10 * time.Second 68 69/* 70clientTelemetryTag is a tag identifying this specific client. 71*/ 72var clientTelemetryTag = "client:go" 73 74/* 75UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket 76traffic instead of UDP. 77*/ 78const UnixAddressPrefix = "unix://" 79 80// Client-side entity ID injection for container tagging 81const ( 82 entityIDEnvName = "DD_ENTITY_ID" 83 entityIDTagName = "dd.internal.entity_id" 84) 85 86type metricType int 87 88const ( 89 gauge metricType = iota 90 count 91 histogram 92 distribution 93 set 94 timing 95 event 96 serviceCheck 97) 98 99type metric struct { 100 metricType metricType 101 namespace string 102 globalTags []string 103 name string 104 fvalue float64 105 ivalue int64 106 svalue string 107 evalue *Event 108 scvalue *ServiceCheck 109 tags []string 110 rate float64 111} 112 113type noClientErr string 114 115// ErrNoClient is returned if statsd reporting methods are invoked on 116// a nil client. 117const ErrNoClient = noClientErr("statsd client is nil") 118 119func (e noClientErr) Error() string { 120 return string(e) 121} 122 123// ClientInterface is an interface that exposes the common client functions for the 124// purpose of being able to provide a no-op client or even mocking. This can aid 125// downstream users' with their testing. 126type ClientInterface interface { 127 // Gauge measures the value of a metric at a particular time. 128 Gauge(name string, value float64, tags []string, rate float64) error 129 130 // Count tracks how many times something happened per second. 131 Count(name string, value int64, tags []string, rate float64) error 132 133 // Histogram tracks the statistical distribution of a set of values on each host. 134 Histogram(name string, value float64, tags []string, rate float64) error 135 136 // Distribution tracks the statistical distribution of a set of values across your infrastructure. 137 Distribution(name string, value float64, tags []string, rate float64) error 138 139 // Decr is just Count of -1 140 Decr(name string, tags []string, rate float64) error 141 142 // Incr is just Count of 1 143 Incr(name string, tags []string, rate float64) error 144 145 // Set counts the number of unique elements in a group. 146 Set(name string, value string, tags []string, rate float64) error 147 148 // Timing sends timing information, it is an alias for TimeInMilliseconds 149 Timing(name string, value time.Duration, tags []string, rate float64) error 150 151 // TimeInMilliseconds sends timing information in milliseconds. 152 // It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing) 153 TimeInMilliseconds(name string, value float64, tags []string, rate float64) error 154 155 // Event sends the provided Event. 156 Event(e *Event) error 157 158 // SimpleEvent sends an event with the provided title and text. 159 SimpleEvent(title, text string) error 160 161 // ServiceCheck sends the provided ServiceCheck. 162 ServiceCheck(sc *ServiceCheck) error 163 164 // SimpleServiceCheck sends an serviceCheck with the provided name and status. 165 SimpleServiceCheck(name string, status ServiceCheckStatus) error 166 167 // Close the client connection. 168 Close() error 169 170 // Flush forces a flush of all the queued dogstatsd payloads. 171 Flush() error 172 173 // SetWriteTimeout allows the user to set a custom write timeout. 174 SetWriteTimeout(d time.Duration) error 175} 176 177// A Client is a handle for sending messages to dogstatsd. It is safe to 178// use one Client from multiple goroutines simultaneously. 179type Client struct { 180 // Sender handles the underlying networking protocol 181 sender *sender 182 // Namespace to prepend to all statsd calls 183 Namespace string 184 // Tags are global tags to be added to every statsd call 185 Tags []string 186 // skipErrors turns off error passing and allows UDS to emulate UDP behaviour 187 SkipErrors bool 188 flushTime time.Duration 189 bufferPool *bufferPool 190 buffer *statsdBuffer 191 telemetryTags []string 192 stop chan struct{} 193 sync.Mutex 194} 195 196// Verify that Client implements the ClientInterface. 197// https://golang.org/doc/faq#guarantee_satisfies_interface 198var _ ClientInterface = &Client{} 199 200// New returns a pointer to a new Client given an addr in the format "hostname:port" or 201// "unix:///path/to/socket". 202func New(addr string, options ...Option) (*Client, error) { 203 var w statsdWriter 204 o, err := resolveOptions(options) 205 if err != nil { 206 return nil, err 207 } 208 209 var writerType string 210 optimalPayloadSize := OptimalUDPPayloadSize 211 defaultBufferPoolSize := DefaultUDPBufferPoolSize 212 if !strings.HasPrefix(addr, UnixAddressPrefix) { 213 w, err = newUDPWriter(addr) 214 writerType = "udp" 215 } else { 216 // FIXME: The agent has a performance pitfall preventing us from using better defaults here. 217 // Once it's fixed, use `DefaultMaxAgentPayloadSize` and `DefaultUDSBufferPoolSize` instead. 218 optimalPayloadSize = OptimalUDPPayloadSize 219 defaultBufferPoolSize = DefaultUDPBufferPoolSize 220 w, err = newUDSWriter(addr[len(UnixAddressPrefix)-1:]) 221 writerType = "uds" 222 } 223 if err != nil { 224 return nil, err 225 } 226 227 if o.MaxBytesPerPayload == 0 { 228 o.MaxBytesPerPayload = optimalPayloadSize 229 } 230 if o.BufferPoolSize == 0 { 231 o.BufferPoolSize = defaultBufferPoolSize 232 } 233 if o.SenderQueueSize == 0 { 234 o.SenderQueueSize = defaultBufferPoolSize 235 } 236 return newWithWriter(w, o, writerType) 237} 238 239// NewWithWriter creates a new Client with given writer. Writer is a 240// io.WriteCloser + SetWriteTimeout(time.Duration) error 241func NewWithWriter(w statsdWriter, options ...Option) (*Client, error) { 242 o, err := resolveOptions(options) 243 if err != nil { 244 return nil, err 245 } 246 return newWithWriter(w, o, "custom") 247} 248 249func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, error) { 250 251 w.SetWriteTimeout(o.WriteTimeoutUDS) 252 253 c := Client{ 254 Namespace: o.Namespace, 255 Tags: o.Tags, 256 telemetryTags: []string{clientTelemetryTag, "transport:" + writerName}, 257 } 258 259 // Inject DD_ENTITY_ID as a constant tag if found 260 entityID := os.Getenv(entityIDEnvName) 261 if entityID != "" { 262 entityTag := fmt.Sprintf("%s:%s", entityIDTagName, entityID) 263 c.Tags = append(c.Tags, entityTag) 264 } 265 266 if o.MaxBytesPerPayload == 0 { 267 o.MaxBytesPerPayload = OptimalUDPPayloadSize 268 } 269 270 c.bufferPool = newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload) 271 c.buffer = c.bufferPool.borrowBuffer() 272 c.sender = newSender(w, o.SenderQueueSize, c.bufferPool) 273 c.flushTime = o.BufferFlushInterval 274 c.stop = make(chan struct{}, 1) 275 go c.watch() 276 go c.telemetry() 277 278 return &c, nil 279} 280 281// NewBuffered returns a Client that buffers its output and sends it in chunks. 282// Buflen is the length of the buffer in number of commands. 283// 284// When addr is empty, the client will default to a UDP client and use the DD_AGENT_HOST 285// and (optionally) the DD_DOGSTATSD_PORT environment variables to build the target address. 286func NewBuffered(addr string, buflen int) (*Client, error) { 287 return New(addr, WithMaxMessagesPerPayload(buflen)) 288} 289 290// SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP. 291func (c *Client) SetWriteTimeout(d time.Duration) error { 292 if c == nil { 293 return ErrNoClient 294 } 295 return c.sender.transport.SetWriteTimeout(d) 296} 297 298func (c *Client) watch() { 299 ticker := time.NewTicker(c.flushTime) 300 301 for { 302 select { 303 case <-ticker.C: 304 c.Lock() 305 c.flushUnsafe() 306 c.Unlock() 307 case <-c.stop: 308 ticker.Stop() 309 return 310 } 311 } 312} 313 314func (c *Client) telemetry() { 315 ticker := time.NewTicker(TelemetryInterval) 316 for { 317 select { 318 case <-ticker.C: 319 metrics := c.sender.flushMetrics() 320 c.telemetryCount("datadog.dogstatsd.client.packets_sent", int64(metrics.TotalSentPayloads), c.telemetryTags, 1) 321 c.telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(metrics.TotalSentBytes), c.telemetryTags, 1) 322 c.telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(metrics.TotalDroppedPayloads), c.telemetryTags, 1) 323 c.telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(metrics.TotalDroppedBytes), c.telemetryTags, 1) 324 c.telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(metrics.TotalDroppedPayloadsQueueFull), c.telemetryTags, 1) 325 c.telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(metrics.TotalDroppedBytesQueueFull), c.telemetryTags, 1) 326 c.telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(metrics.TotalDroppedPayloadsWriter), c.telemetryTags, 1) 327 c.telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(metrics.TotalDroppedBytesWriter), c.telemetryTags, 1) 328 case <-c.stop: 329 ticker.Stop() 330 return 331 } 332 } 333} 334 335// same as Count but without global namespace / tags 336func (c *Client) telemetryCount(name string, value int64, tags []string, rate float64) { 337 c.addMetric(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate}) 338} 339 340// Flush forces a flush of all the queued dogstatsd payloads 341// This method is blocking and will not return until everything is sent 342// through the network 343func (c *Client) Flush() error { 344 if c == nil { 345 return ErrNoClient 346 } 347 c.Lock() 348 defer c.Unlock() 349 c.flushUnsafe() 350 c.sender.flush() 351 return nil 352} 353 354// flush the current buffer. Lock must be held by caller. 355// flushed buffer written to the network asynchronously. 356func (c *Client) flushUnsafe() { 357 if len(c.buffer.bytes()) > 0 { 358 c.sender.send(c.buffer) 359 c.buffer = c.bufferPool.borrowBuffer() 360 } 361} 362 363func (c *Client) shouldSample(rate float64) bool { 364 if rate < 1 && rand.Float64() > rate { 365 return true 366 } 367 return false 368} 369 370func (c *Client) globalTags() []string { 371 if c != nil { 372 return c.Tags 373 } 374 return nil 375} 376 377func (c *Client) namespace() string { 378 if c != nil { 379 return c.Namespace 380 } 381 return "" 382} 383 384func (c *Client) writeMetricUnsafe(m metric) error { 385 switch m.metricType { 386 case gauge: 387 return c.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) 388 case count: 389 return c.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate) 390 case histogram: 391 return c.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) 392 case distribution: 393 return c.buffer.writeDistribution(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) 394 case set: 395 return c.buffer.writeSet(m.namespace, m.globalTags, m.name, m.svalue, m.tags, m.rate) 396 case timing: 397 return c.buffer.writeTiming(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) 398 case event: 399 return c.buffer.writeEvent(*m.evalue, m.globalTags) 400 case serviceCheck: 401 return c.buffer.writeServiceCheck(*m.scvalue, m.globalTags) 402 default: 403 return nil 404 } 405} 406 407func (c *Client) addMetric(m metric) error { 408 if c == nil { 409 return ErrNoClient 410 } 411 if c.shouldSample(m.rate) { 412 return nil 413 } 414 c.Lock() 415 var err error 416 if err = c.writeMetricUnsafe(m); err == errBufferFull { 417 c.flushUnsafe() 418 err = c.writeMetricUnsafe(m) 419 } 420 c.Unlock() 421 return err 422} 423 424// Gauge measures the value of a metric at a particular time. 425func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error { 426 return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate}) 427} 428 429// Count tracks how many times something happened per second. 430func (c *Client) Count(name string, value int64, tags []string, rate float64) error { 431 return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: count, name: name, ivalue: value, tags: tags, rate: rate}) 432} 433 434// Histogram tracks the statistical distribution of a set of values on each host. 435func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error { 436 return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate}) 437} 438 439// Distribution tracks the statistical distribution of a set of values across your infrastructure. 440func (c *Client) Distribution(name string, value float64, tags []string, rate float64) error { 441 return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate}) 442} 443 444// Decr is just Count of -1 445func (c *Client) Decr(name string, tags []string, rate float64) error { 446 return c.Count(name, -1, tags, rate) 447} 448 449// Incr is just Count of 1 450func (c *Client) Incr(name string, tags []string, rate float64) error { 451 return c.Count(name, 1, tags, rate) 452} 453 454// Set counts the number of unique elements in a group. 455func (c *Client) Set(name string, value string, tags []string, rate float64) error { 456 return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: set, name: name, svalue: value, tags: tags, rate: rate}) 457} 458 459// Timing sends timing information, it is an alias for TimeInMilliseconds 460func (c *Client) Timing(name string, value time.Duration, tags []string, rate float64) error { 461 return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate) 462} 463 464// TimeInMilliseconds sends timing information in milliseconds. 465// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing) 466func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error { 467 return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: timing, name: name, fvalue: value, tags: tags, rate: rate}) 468} 469 470// Event sends the provided Event. 471func (c *Client) Event(e *Event) error { 472 return c.addMetric(metric{globalTags: c.globalTags(), metricType: event, evalue: e, rate: 1}) 473} 474 475// SimpleEvent sends an event with the provided title and text. 476func (c *Client) SimpleEvent(title, text string) error { 477 e := NewEvent(title, text) 478 return c.Event(e) 479} 480 481// ServiceCheck sends the provided ServiceCheck. 482func (c *Client) ServiceCheck(sc *ServiceCheck) error { 483 return c.addMetric(metric{globalTags: c.globalTags(), metricType: serviceCheck, scvalue: sc, rate: 1}) 484} 485 486// SimpleServiceCheck sends an serviceCheck with the provided name and status. 487func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error { 488 sc := NewServiceCheck(name, status) 489 return c.ServiceCheck(sc) 490} 491 492// Close the client connection. 493func (c *Client) Close() error { 494 if c == nil { 495 return ErrNoClient 496 } 497 select { 498 case c.stop <- struct{}{}: 499 default: 500 } 501 c.Flush() 502 return c.sender.close() 503} 504