1package statsd
2
3import (
4	"math"
5	"strings"
6	"time"
7)
8
9var (
10	// DefaultNamespace is the default value for the Namespace option
11	DefaultNamespace = ""
12	// DefaultTags is the default value for the Tags option
13	DefaultTags = []string{}
14	// DefaultMaxBytesPerPayload is the default value for the MaxBytesPerPayload option
15	DefaultMaxBytesPerPayload = 0
16	// DefaultMaxMessagesPerPayload is the default value for the MaxMessagesPerPayload option
17	DefaultMaxMessagesPerPayload = math.MaxInt32
18	// DefaultBufferPoolSize is the default value for the DefaultBufferPoolSize option
19	DefaultBufferPoolSize = 0
20	// DefaultBufferFlushInterval is the default value for the BufferFlushInterval option
21	DefaultBufferFlushInterval = 100 * time.Millisecond
22	// DefaultBufferShardCount is the default value for the BufferShardCount option
23	DefaultBufferShardCount = 32
24	// DefaultSenderQueueSize is the default value for the DefaultSenderQueueSize option
25	DefaultSenderQueueSize = 0
26	// DefaultWriteTimeoutUDS is the default value for the WriteTimeoutUDS option
27	DefaultWriteTimeoutUDS = 1 * time.Millisecond
28	// DefaultTelemetry is the default value for the Telemetry option
29	DefaultTelemetry = true
30	// DefaultReceivingMode is the default behavior when sending metrics
31	DefaultReceivingMode = MutexMode
32	// DefaultChannelModeBufferSize is the default size of the channel holding incoming metrics
33	DefaultChannelModeBufferSize = 4096
34	// DefaultAggregationFlushInterval is the default interval for the aggregator to flush metrics.
35	DefaultAggregationFlushInterval = 3 * time.Second
36	// DefaultAggregation
37	DefaultAggregation = false
38	// DefaultExtendedAggregation
39	DefaultExtendedAggregation = false
40	// DefaultDevMode
41	DefaultDevMode = false
42)
43
44// Options contains the configuration options for a client.
45type Options struct {
46	// Namespace to prepend to all metrics, events and service checks name.
47	Namespace string
48	// Tags are global tags to be applied to every metrics, events and service checks.
49	Tags []string
50	// MaxBytesPerPayload is the maximum number of bytes a single payload will contain.
51	// The magic value 0 will set the option to the optimal size for the transport
52	// protocol used when creating the client: 1432 for UDP and 8192 for UDS.
53	MaxBytesPerPayload int
54	// MaxMessagesPerPayload is the maximum number of metrics, events and/or service checks a single payload will contain.
55	// This option can be set to `1` to create an unbuffered client.
56	MaxMessagesPerPayload int
57	// BufferPoolSize is the size of the pool of buffers in number of buffers.
58	// The magic value 0 will set the option to the optimal size for the transport
59	// protocol used when creating the client: 2048 for UDP and 512 for UDS.
60	BufferPoolSize int
61	// BufferFlushInterval is the interval after which the current buffer will get flushed.
62	BufferFlushInterval time.Duration
63	// BufferShardCount is the number of buffer "shards" that will be used.
64	// Those shards allows the use of multiple buffers at the same time to reduce
65	// lock contention.
66	BufferShardCount int
67	// SenderQueueSize is the size of the sender queue in number of buffers.
68	// The magic value 0 will set the option to the optimal size for the transport
69	// protocol used when creating the client: 2048 for UDP and 512 for UDS.
70	SenderQueueSize int
71	// WriteTimeoutUDS is the timeout after which a UDS packet is dropped.
72	WriteTimeoutUDS time.Duration
73	// Telemetry is a set of metrics automatically injected by the client in the
74	// dogstatsd stream to be able to monitor the client itself.
75	Telemetry bool
76	// ReceiveMode determins the behavior of the client when receiving to many
77	// metrics. The client will either drop the metrics if its buffers are
78	// full (ChannelMode mode) or block the caller until the metric can be
79	// handled (MutexMode mode). By default the client will MutexMode. This
80	// option should be set to ChannelMode only when use under very high
81	// load.
82	//
83	// MutexMode uses a mutex internally which is much faster than
84	// channel but causes some lock contention when used with a high number
85	// of threads. Mutex are sharded based on the metrics name which
86	// limit mutex contention when goroutines send different metrics.
87	//
88	// ChannelMode: uses channel (of ChannelModeBufferSize size) to send
89	// metrics and drop metrics if the channel is full. Sending metrics in
90	// this mode is slower that MutexMode (because of the channel), but
91	// will not block the application. This mode is made for application
92	// using many goroutines, sending the same metrics at a very high
93	// volume. The goal is to not slow down the application at the cost of
94	// dropping metrics and having a lower max throughput.
95	ReceiveMode ReceivingMode
96	// ChannelModeBufferSize is the size of the channel holding incoming metrics
97	ChannelModeBufferSize int
98	// AggregationFlushInterval is the interval for the aggregator to flush metrics
99	AggregationFlushInterval time.Duration
100	// [beta] Aggregation enables/disables client side aggregation for
101	// Gauges, Counts and Sets (compatible with every Agent's version).
102	Aggregation bool
103	// [beta] Extended aggregation enables/disables client side aggregation
104	// for all types. This feature is only compatible with Agent's versions
105	// >=7.25.0 or Agent's version >=6.25.0 && < 7.0.0.
106	ExtendedAggregation bool
107	// TelemetryAddr specify a different endpoint for telemetry metrics.
108	TelemetryAddr string
109	// DevMode enables the "dev" mode where the client sends much more
110	// telemetry metrics to help troubleshooting the client behavior.
111	DevMode bool
112}
113
114func resolveOptions(options []Option) (*Options, error) {
115	o := &Options{
116		Namespace:                DefaultNamespace,
117		Tags:                     DefaultTags,
118		MaxBytesPerPayload:       DefaultMaxBytesPerPayload,
119		MaxMessagesPerPayload:    DefaultMaxMessagesPerPayload,
120		BufferPoolSize:           DefaultBufferPoolSize,
121		BufferFlushInterval:      DefaultBufferFlushInterval,
122		BufferShardCount:         DefaultBufferShardCount,
123		SenderQueueSize:          DefaultSenderQueueSize,
124		WriteTimeoutUDS:          DefaultWriteTimeoutUDS,
125		Telemetry:                DefaultTelemetry,
126		ReceiveMode:              DefaultReceivingMode,
127		ChannelModeBufferSize:    DefaultChannelModeBufferSize,
128		AggregationFlushInterval: DefaultAggregationFlushInterval,
129		Aggregation:              DefaultAggregation,
130		ExtendedAggregation:      DefaultExtendedAggregation,
131		DevMode:                  DefaultDevMode,
132	}
133
134	for _, option := range options {
135		err := option(o)
136		if err != nil {
137			return nil, err
138		}
139	}
140
141	return o, nil
142}
143
144// Option is a client option. Can return an error if validation fails.
145type Option func(*Options) error
146
147// WithNamespace sets the Namespace option.
148func WithNamespace(namespace string) Option {
149	return func(o *Options) error {
150		if strings.HasSuffix(namespace, ".") {
151			o.Namespace = namespace
152		} else {
153			o.Namespace = namespace + "."
154		}
155		return nil
156	}
157}
158
159// WithTags sets the Tags option.
160func WithTags(tags []string) Option {
161	return func(o *Options) error {
162		o.Tags = tags
163		return nil
164	}
165}
166
167// WithMaxMessagesPerPayload sets the MaxMessagesPerPayload option.
168func WithMaxMessagesPerPayload(maxMessagesPerPayload int) Option {
169	return func(o *Options) error {
170		o.MaxMessagesPerPayload = maxMessagesPerPayload
171		return nil
172	}
173}
174
175// WithMaxBytesPerPayload sets the MaxBytesPerPayload option.
176func WithMaxBytesPerPayload(MaxBytesPerPayload int) Option {
177	return func(o *Options) error {
178		o.MaxBytesPerPayload = MaxBytesPerPayload
179		return nil
180	}
181}
182
183// WithBufferPoolSize sets the BufferPoolSize option.
184func WithBufferPoolSize(bufferPoolSize int) Option {
185	return func(o *Options) error {
186		o.BufferPoolSize = bufferPoolSize
187		return nil
188	}
189}
190
191// WithBufferFlushInterval sets the BufferFlushInterval option.
192func WithBufferFlushInterval(bufferFlushInterval time.Duration) Option {
193	return func(o *Options) error {
194		o.BufferFlushInterval = bufferFlushInterval
195		return nil
196	}
197}
198
199// WithBufferShardCount sets the BufferShardCount option.
200func WithBufferShardCount(bufferShardCount int) Option {
201	return func(o *Options) error {
202		o.BufferShardCount = bufferShardCount
203		return nil
204	}
205}
206
207// WithSenderQueueSize sets the SenderQueueSize option.
208func WithSenderQueueSize(senderQueueSize int) Option {
209	return func(o *Options) error {
210		o.SenderQueueSize = senderQueueSize
211		return nil
212	}
213}
214
215// WithWriteTimeoutUDS sets the WriteTimeoutUDS option.
216func WithWriteTimeoutUDS(writeTimeoutUDS time.Duration) Option {
217	return func(o *Options) error {
218		o.WriteTimeoutUDS = writeTimeoutUDS
219		return nil
220	}
221}
222
223// WithoutTelemetry disables the telemetry
224func WithoutTelemetry() Option {
225	return func(o *Options) error {
226		o.Telemetry = false
227		return nil
228	}
229}
230
231// WithChannelMode will use channel to receive metrics
232func WithChannelMode() Option {
233	return func(o *Options) error {
234		o.ReceiveMode = ChannelMode
235		return nil
236	}
237}
238
239// WithMutexMode will use mutex to receive metrics
240func WithMutexMode() Option {
241	return func(o *Options) error {
242		o.ReceiveMode = MutexMode
243		return nil
244	}
245}
246
247// WithChannelModeBufferSize the channel buffer size when using "drop mode"
248func WithChannelModeBufferSize(bufferSize int) Option {
249	return func(o *Options) error {
250		o.ChannelModeBufferSize = bufferSize
251		return nil
252	}
253}
254
255// WithAggregationInterval set the aggregation interval
256func WithAggregationInterval(interval time.Duration) Option {
257	return func(o *Options) error {
258		o.AggregationFlushInterval = interval
259		return nil
260	}
261}
262
263// WithClientSideAggregation enables client side aggregation for Gauges, Counts
264// and Sets. Client side aggregation is a beta feature.
265func WithClientSideAggregation() Option {
266	return func(o *Options) error {
267		o.Aggregation = true
268		return nil
269	}
270}
271
272// WithoutClientSideAggregation disables client side aggregation.
273func WithoutClientSideAggregation() Option {
274	return func(o *Options) error {
275		o.Aggregation = false
276		o.ExtendedAggregation = false
277		return nil
278	}
279}
280
281// WithExtendedClientSideAggregation enables client side aggregation for all
282// types. This feature is only compatible with Agent's version >=6.25.0 &&
283// <7.0.0 or Agent's versions >=7.25.0. Client side aggregation is a beta
284// feature.
285func WithExtendedClientSideAggregation() Option {
286	return func(o *Options) error {
287		o.Aggregation = true
288		o.ExtendedAggregation = true
289		return nil
290	}
291}
292
293// WithTelemetryAddr specify a different address for telemetry metrics.
294func WithTelemetryAddr(addr string) Option {
295	return func(o *Options) error {
296		o.TelemetryAddr = addr
297		return nil
298	}
299}
300
301// WithDevMode enables client "dev" mode, sending more Telemetry metrics to
302// help troubleshoot client behavior.
303func WithDevMode() Option {
304	return func(o *Options) error {
305		o.DevMode = true
306		return nil
307	}
308}
309
310// WithoutDevMode disables client "dev" mode, sending more Telemetry metrics to
311// help troubleshoot client behavior.
312func WithoutDevMode() Option {
313	return func(o *Options) error {
314		o.DevMode = false
315		return nil
316	}
317}
318