1// Copyright 2013 Ooyala, Inc.
2
3/*
4Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd,
5adding tags and histograms and pushing upstream to Datadog.
6
7Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD.
8
9Example Usage:
10
11    // Create the client
12    c, err := statsd.New("127.0.0.1:8125")
13    if err != nil {
14        log.Fatal(err)
15    }
16    // Prefix every metric with the app name
17    c.Namespace = "flubber."
18    // Send the EC2 availability zone as a tag with every metric
19    c.Tags = append(c.Tags, "us-east-1a")
20    err = c.Gauge("request.duration", 1.2, nil, 1)
21
22statsd is based on go-statsd-client.
23*/
24package statsd
25
26import (
27	"fmt"
28	"math/rand"
29	"os"
30	"strings"
31	"sync"
32	"time"
33)
34
35/*
36OptimalUDPPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes
37is optimal for regular networks with an MTU of 1500 so datagrams don't get
38fragmented. It's generally recommended not to fragment UDP datagrams as losing
39a single fragment will cause the entire datagram to be lost.
40*/
41const OptimalUDPPayloadSize = 1432
42
43/*
44MaxUDPPayloadSize defines the maximum payload size for a UDP datagram.
45Its value comes from the calculation: 65535 bytes Max UDP datagram size -
468byte UDP header - 60byte max IP headers
47any number greater than that will see frames being cut out.
48*/
49const MaxUDPPayloadSize = 65467
50
51// DefaultUDPBufferPoolSize is the default size of the buffer pool for UDP clients.
52const DefaultUDPBufferPoolSize = 2048
53
54// DefaultUDSBufferPoolSize is the default size of the buffer pool for UDS clients.
55const DefaultUDSBufferPoolSize = 512
56
57/*
58DefaultMaxAgentPayloadSize is the default maximum payload size the agent
59can receive. This can be adjusted by changing dogstatsd_buffer_size in the
60agent configuration file datadog.yaml.
61*/
62const DefaultMaxAgentPayloadSize = 8192
63
64/*
65TelemetryInterval is the interval at which telemetry will be sent by the client.
66*/
67const TelemetryInterval = 10 * time.Second
68
69/*
70clientTelemetryTag is a tag identifying this specific client.
71*/
72var clientTelemetryTag = "client:go"
73
74/*
75UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket
76traffic instead of UDP.
77*/
78const UnixAddressPrefix = "unix://"
79
80// Client-side entity ID injection for container tagging
81const (
82	entityIDEnvName = "DD_ENTITY_ID"
83	entityIDTagName = "dd.internal.entity_id"
84)
85
86type metricType int
87
88const (
89	gauge metricType = iota
90	count
91	histogram
92	distribution
93	set
94	timing
95	event
96	serviceCheck
97)
98
99type metric struct {
100	metricType metricType
101	namespace  string
102	globalTags []string
103	name       string
104	fvalue     float64
105	ivalue     int64
106	svalue     string
107	evalue     *Event
108	scvalue    *ServiceCheck
109	tags       []string
110	rate       float64
111}
112
113type noClientErr string
114
115// ErrNoClient is returned if statsd reporting methods are invoked on
116// a nil client.
117const ErrNoClient = noClientErr("statsd client is nil")
118
119func (e noClientErr) Error() string {
120	return string(e)
121}
122
123// ClientInterface is an interface that exposes the common client functions for the
124// purpose of being able to provide a no-op client or even mocking. This can aid
125// downstream users' with their testing.
126type ClientInterface interface {
127	// Gauge measures the value of a metric at a particular time.
128	Gauge(name string, value float64, tags []string, rate float64) error
129
130	// Count tracks how many times something happened per second.
131	Count(name string, value int64, tags []string, rate float64) error
132
133	// Histogram tracks the statistical distribution of a set of values on each host.
134	Histogram(name string, value float64, tags []string, rate float64) error
135
136	// Distribution tracks the statistical distribution of a set of values across your infrastructure.
137	Distribution(name string, value float64, tags []string, rate float64) error
138
139	// Decr is just Count of -1
140	Decr(name string, tags []string, rate float64) error
141
142	// Incr is just Count of 1
143	Incr(name string, tags []string, rate float64) error
144
145	// Set counts the number of unique elements in a group.
146	Set(name string, value string, tags []string, rate float64) error
147
148	// Timing sends timing information, it is an alias for TimeInMilliseconds
149	Timing(name string, value time.Duration, tags []string, rate float64) error
150
151	// TimeInMilliseconds sends timing information in milliseconds.
152	// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
153	TimeInMilliseconds(name string, value float64, tags []string, rate float64) error
154
155	// Event sends the provided Event.
156	Event(e *Event) error
157
158	// SimpleEvent sends an event with the provided title and text.
159	SimpleEvent(title, text string) error
160
161	// ServiceCheck sends the provided ServiceCheck.
162	ServiceCheck(sc *ServiceCheck) error
163
164	// SimpleServiceCheck sends an serviceCheck with the provided name and status.
165	SimpleServiceCheck(name string, status ServiceCheckStatus) error
166
167	// Close the client connection.
168	Close() error
169
170	// Flush forces a flush of all the queued dogstatsd payloads.
171	Flush() error
172
173	// SetWriteTimeout allows the user to set a custom write timeout.
174	SetWriteTimeout(d time.Duration) error
175}
176
177// A Client is a handle for sending messages to dogstatsd.  It is safe to
178// use one Client from multiple goroutines simultaneously.
179type Client struct {
180	// Sender handles the underlying networking protocol
181	sender *sender
182	// Namespace to prepend to all statsd calls
183	Namespace string
184	// Tags are global tags to be added to every statsd call
185	Tags []string
186	// skipErrors turns off error passing and allows UDS to emulate UDP behaviour
187	SkipErrors    bool
188	flushTime     time.Duration
189	bufferPool    *bufferPool
190	buffer        *statsdBuffer
191	telemetryTags []string
192	stop          chan struct{}
193	sync.Mutex
194}
195
196// Verify that Client implements the ClientInterface.
197// https://golang.org/doc/faq#guarantee_satisfies_interface
198var _ ClientInterface = &Client{}
199
200// New returns a pointer to a new Client given an addr in the format "hostname:port" or
201// "unix:///path/to/socket".
202func New(addr string, options ...Option) (*Client, error) {
203	var w statsdWriter
204	o, err := resolveOptions(options)
205	if err != nil {
206		return nil, err
207	}
208
209	var writerType string
210	optimalPayloadSize := OptimalUDPPayloadSize
211	defaultBufferPoolSize := DefaultUDPBufferPoolSize
212	if !strings.HasPrefix(addr, UnixAddressPrefix) {
213		w, err = newUDPWriter(addr)
214		writerType = "udp"
215	} else {
216		// FIXME: The agent has a performance pitfall preventing us from using better defaults here.
217		// Once it's fixed, use `DefaultMaxAgentPayloadSize` and `DefaultUDSBufferPoolSize` instead.
218		optimalPayloadSize = OptimalUDPPayloadSize
219		defaultBufferPoolSize = DefaultUDPBufferPoolSize
220		w, err = newUDSWriter(addr[len(UnixAddressPrefix)-1:])
221		writerType = "uds"
222	}
223	if err != nil {
224		return nil, err
225	}
226
227	if o.MaxBytesPerPayload == 0 {
228		o.MaxBytesPerPayload = optimalPayloadSize
229	}
230	if o.BufferPoolSize == 0 {
231		o.BufferPoolSize = defaultBufferPoolSize
232	}
233	if o.SenderQueueSize == 0 {
234		o.SenderQueueSize = defaultBufferPoolSize
235	}
236	return newWithWriter(w, o, writerType)
237}
238
239// NewWithWriter creates a new Client with given writer. Writer is a
240// io.WriteCloser + SetWriteTimeout(time.Duration) error
241func NewWithWriter(w statsdWriter, options ...Option) (*Client, error) {
242	o, err := resolveOptions(options)
243	if err != nil {
244		return nil, err
245	}
246	return newWithWriter(w, o, "custom")
247}
248
249func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, error) {
250
251	w.SetWriteTimeout(o.WriteTimeoutUDS)
252
253	c := Client{
254		Namespace:     o.Namespace,
255		Tags:          o.Tags,
256		telemetryTags: []string{clientTelemetryTag, "transport:" + writerName},
257	}
258
259	// Inject DD_ENTITY_ID as a constant tag if found
260	entityID := os.Getenv(entityIDEnvName)
261	if entityID != "" {
262		entityTag := fmt.Sprintf("%s:%s", entityIDTagName, entityID)
263		c.Tags = append(c.Tags, entityTag)
264	}
265
266	if o.MaxBytesPerPayload == 0 {
267		o.MaxBytesPerPayload = OptimalUDPPayloadSize
268	}
269
270	c.bufferPool = newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload)
271	c.buffer = c.bufferPool.borrowBuffer()
272	c.sender = newSender(w, o.SenderQueueSize, c.bufferPool)
273	c.flushTime = o.BufferFlushInterval
274	c.stop = make(chan struct{}, 1)
275	go c.watch()
276	go c.telemetry()
277
278	return &c, nil
279}
280
281// NewBuffered returns a Client that buffers its output and sends it in chunks.
282// Buflen is the length of the buffer in number of commands.
283//
284// When addr is empty, the client will default to a UDP client and use the DD_AGENT_HOST
285// and (optionally) the DD_DOGSTATSD_PORT environment variables to build the target address.
286func NewBuffered(addr string, buflen int) (*Client, error) {
287	return New(addr, WithMaxMessagesPerPayload(buflen))
288}
289
290// SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP.
291func (c *Client) SetWriteTimeout(d time.Duration) error {
292	if c == nil {
293		return ErrNoClient
294	}
295	return c.sender.transport.SetWriteTimeout(d)
296}
297
298func (c *Client) watch() {
299	ticker := time.NewTicker(c.flushTime)
300
301	for {
302		select {
303		case <-ticker.C:
304			c.Lock()
305			c.flushUnsafe()
306			c.Unlock()
307		case <-c.stop:
308			ticker.Stop()
309			return
310		}
311	}
312}
313
314func (c *Client) telemetry() {
315	ticker := time.NewTicker(TelemetryInterval)
316	for {
317		select {
318		case <-ticker.C:
319			metrics := c.sender.flushMetrics()
320			c.telemetryCount("datadog.dogstatsd.client.packets_sent", int64(metrics.TotalSentPayloads), c.telemetryTags, 1)
321			c.telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(metrics.TotalSentBytes), c.telemetryTags, 1)
322			c.telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(metrics.TotalDroppedPayloads), c.telemetryTags, 1)
323			c.telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(metrics.TotalDroppedBytes), c.telemetryTags, 1)
324			c.telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(metrics.TotalDroppedPayloadsQueueFull), c.telemetryTags, 1)
325			c.telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(metrics.TotalDroppedBytesQueueFull), c.telemetryTags, 1)
326			c.telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(metrics.TotalDroppedPayloadsWriter), c.telemetryTags, 1)
327			c.telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(metrics.TotalDroppedBytesWriter), c.telemetryTags, 1)
328		case <-c.stop:
329			ticker.Stop()
330			return
331		}
332	}
333}
334
335// same as Count but without global namespace / tags
336func (c *Client) telemetryCount(name string, value int64, tags []string, rate float64) {
337	c.addMetric(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate})
338}
339
340// Flush forces a flush of all the queued dogstatsd payloads
341// This method is blocking and will not return until everything is sent
342// through the network
343func (c *Client) Flush() error {
344	if c == nil {
345		return ErrNoClient
346	}
347	c.Lock()
348	defer c.Unlock()
349	c.flushUnsafe()
350	c.sender.flush()
351	return nil
352}
353
354// flush the current buffer. Lock must be held by caller.
355// flushed buffer written to the network asynchronously.
356func (c *Client) flushUnsafe() {
357	if len(c.buffer.bytes()) > 0 {
358		c.sender.send(c.buffer)
359		c.buffer = c.bufferPool.borrowBuffer()
360	}
361}
362
363func (c *Client) shouldSample(rate float64) bool {
364	if rate < 1 && rand.Float64() > rate {
365		return true
366	}
367	return false
368}
369
370func (c *Client) globalTags() []string {
371	if c != nil {
372		return c.Tags
373	}
374	return nil
375}
376
377func (c *Client) namespace() string {
378	if c != nil {
379		return c.Namespace
380	}
381	return ""
382}
383
384func (c *Client) writeMetricUnsafe(m metric) error {
385	switch m.metricType {
386	case gauge:
387		return c.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
388	case count:
389		return c.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate)
390	case histogram:
391		return c.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
392	case distribution:
393		return c.buffer.writeDistribution(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
394	case set:
395		return c.buffer.writeSet(m.namespace, m.globalTags, m.name, m.svalue, m.tags, m.rate)
396	case timing:
397		return c.buffer.writeTiming(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
398	case event:
399		return c.buffer.writeEvent(*m.evalue, m.globalTags)
400	case serviceCheck:
401		return c.buffer.writeServiceCheck(*m.scvalue, m.globalTags)
402	default:
403		return nil
404	}
405}
406
407func (c *Client) addMetric(m metric) error {
408	if c == nil {
409		return ErrNoClient
410	}
411	if c.shouldSample(m.rate) {
412		return nil
413	}
414	c.Lock()
415	var err error
416	if err = c.writeMetricUnsafe(m); err == errBufferFull {
417		c.flushUnsafe()
418		err = c.writeMetricUnsafe(m)
419	}
420	c.Unlock()
421	return err
422}
423
424// Gauge measures the value of a metric at a particular time.
425func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error {
426	return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate})
427}
428
429// Count tracks how many times something happened per second.
430func (c *Client) Count(name string, value int64, tags []string, rate float64) error {
431	return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: count, name: name, ivalue: value, tags: tags, rate: rate})
432}
433
434// Histogram tracks the statistical distribution of a set of values on each host.
435func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error {
436	return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate})
437}
438
439// Distribution tracks the statistical distribution of a set of values across your infrastructure.
440func (c *Client) Distribution(name string, value float64, tags []string, rate float64) error {
441	return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate})
442}
443
444// Decr is just Count of -1
445func (c *Client) Decr(name string, tags []string, rate float64) error {
446	return c.Count(name, -1, tags, rate)
447}
448
449// Incr is just Count of 1
450func (c *Client) Incr(name string, tags []string, rate float64) error {
451	return c.Count(name, 1, tags, rate)
452}
453
454// Set counts the number of unique elements in a group.
455func (c *Client) Set(name string, value string, tags []string, rate float64) error {
456	return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: set, name: name, svalue: value, tags: tags, rate: rate})
457}
458
459// Timing sends timing information, it is an alias for TimeInMilliseconds
460func (c *Client) Timing(name string, value time.Duration, tags []string, rate float64) error {
461	return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate)
462}
463
464// TimeInMilliseconds sends timing information in milliseconds.
465// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
466func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error {
467	return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: timing, name: name, fvalue: value, tags: tags, rate: rate})
468}
469
470// Event sends the provided Event.
471func (c *Client) Event(e *Event) error {
472	return c.addMetric(metric{globalTags: c.globalTags(), metricType: event, evalue: e, rate: 1})
473}
474
475// SimpleEvent sends an event with the provided title and text.
476func (c *Client) SimpleEvent(title, text string) error {
477	e := NewEvent(title, text)
478	return c.Event(e)
479}
480
481// ServiceCheck sends the provided ServiceCheck.
482func (c *Client) ServiceCheck(sc *ServiceCheck) error {
483	return c.addMetric(metric{globalTags: c.globalTags(), metricType: serviceCheck, scvalue: sc, rate: 1})
484}
485
486// SimpleServiceCheck sends an serviceCheck with the provided name and status.
487func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error {
488	sc := NewServiceCheck(name, status)
489	return c.ServiceCheck(sc)
490}
491
492// Close the client connection.
493func (c *Client) Close() error {
494	if c == nil {
495		return ErrNoClient
496	}
497	select {
498	case c.stop <- struct{}{}:
499	default:
500	}
501	c.Flush()
502	return c.sender.close()
503}
504