1package statsd 2 3import ( 4 "math" 5 "strings" 6 "time" 7) 8 9var ( 10 // DefaultNamespace is the default value for the Namespace option 11 DefaultNamespace = "" 12 // DefaultTags is the default value for the Tags option 13 DefaultTags = []string{} 14 // DefaultMaxBytesPerPayload is the default value for the MaxBytesPerPayload option 15 DefaultMaxBytesPerPayload = 0 16 // DefaultMaxMessagesPerPayload is the default value for the MaxMessagesPerPayload option 17 DefaultMaxMessagesPerPayload = math.MaxInt32 18 // DefaultBufferPoolSize is the default value for the DefaultBufferPoolSize option 19 DefaultBufferPoolSize = 0 20 // DefaultBufferFlushInterval is the default value for the BufferFlushInterval option 21 DefaultBufferFlushInterval = 100 * time.Millisecond 22 // DefaultBufferShardCount is the default value for the BufferShardCount option 23 DefaultBufferShardCount = 32 24 // DefaultSenderQueueSize is the default value for the DefaultSenderQueueSize option 25 DefaultSenderQueueSize = 0 26 // DefaultWriteTimeoutUDS is the default value for the WriteTimeoutUDS option 27 DefaultWriteTimeoutUDS = 1 * time.Millisecond 28 // DefaultTelemetry is the default value for the Telemetry option 29 DefaultTelemetry = true 30 // DefaultReceivingMode is the default behavior when sending metrics 31 DefaultReceivingMode = MutexMode 32 // DefaultChannelModeBufferSize is the default size of the channel holding incoming metrics 33 DefaultChannelModeBufferSize = 4096 34 // DefaultAggregationFlushInterval is the default interval for the aggregator to flush metrics. 35 DefaultAggregationFlushInterval = 3 * time.Second 36 // DefaultAggregation 37 DefaultAggregation = false 38 // DefaultExtendedAggregation 39 DefaultExtendedAggregation = false 40 // DefaultDevMode 41 DefaultDevMode = false 42) 43 44// Options contains the configuration options for a client. 45type Options struct { 46 // Namespace to prepend to all metrics, events and service checks name. 47 Namespace string 48 // Tags are global tags to be applied to every metrics, events and service checks. 49 Tags []string 50 // MaxBytesPerPayload is the maximum number of bytes a single payload will contain. 51 // The magic value 0 will set the option to the optimal size for the transport 52 // protocol used when creating the client: 1432 for UDP and 8192 for UDS. 53 MaxBytesPerPayload int 54 // MaxMessagesPerPayload is the maximum number of metrics, events and/or service checks a single payload will contain. 55 // This option can be set to `1` to create an unbuffered client. 56 MaxMessagesPerPayload int 57 // BufferPoolSize is the size of the pool of buffers in number of buffers. 58 // The magic value 0 will set the option to the optimal size for the transport 59 // protocol used when creating the client: 2048 for UDP and 512 for UDS. 60 BufferPoolSize int 61 // BufferFlushInterval is the interval after which the current buffer will get flushed. 62 BufferFlushInterval time.Duration 63 // BufferShardCount is the number of buffer "shards" that will be used. 64 // Those shards allows the use of multiple buffers at the same time to reduce 65 // lock contention. 66 BufferShardCount int 67 // SenderQueueSize is the size of the sender queue in number of buffers. 68 // The magic value 0 will set the option to the optimal size for the transport 69 // protocol used when creating the client: 2048 for UDP and 512 for UDS. 70 SenderQueueSize int 71 // WriteTimeoutUDS is the timeout after which a UDS packet is dropped. 72 WriteTimeoutUDS time.Duration 73 // Telemetry is a set of metrics automatically injected by the client in the 74 // dogstatsd stream to be able to monitor the client itself. 75 Telemetry bool 76 // ReceiveMode determins the behavior of the client when receiving to many 77 // metrics. The client will either drop the metrics if its buffers are 78 // full (ChannelMode mode) or block the caller until the metric can be 79 // handled (MutexMode mode). By default the client will MutexMode. This 80 // option should be set to ChannelMode only when use under very high 81 // load. 82 // 83 // MutexMode uses a mutex internally which is much faster than 84 // channel but causes some lock contention when used with a high number 85 // of threads. Mutex are sharded based on the metrics name which 86 // limit mutex contention when goroutines send different metrics. 87 // 88 // ChannelMode: uses channel (of ChannelModeBufferSize size) to send 89 // metrics and drop metrics if the channel is full. Sending metrics in 90 // this mode is slower that MutexMode (because of the channel), but 91 // will not block the application. This mode is made for application 92 // using many goroutines, sending the same metrics at a very high 93 // volume. The goal is to not slow down the application at the cost of 94 // dropping metrics and having a lower max throughput. 95 ReceiveMode ReceivingMode 96 // ChannelModeBufferSize is the size of the channel holding incoming metrics 97 ChannelModeBufferSize int 98 // AggregationFlushInterval is the interval for the aggregator to flush metrics 99 AggregationFlushInterval time.Duration 100 // [beta] Aggregation enables/disables client side aggregation for 101 // Gauges, Counts and Sets (compatible with every Agent's version). 102 Aggregation bool 103 // [beta] Extended aggregation enables/disables client side aggregation 104 // for all types. This feature is only compatible with Agent's versions 105 // >=7.25.0 or Agent's version >=6.25.0 && < 7.0.0. 106 ExtendedAggregation bool 107 // TelemetryAddr specify a different endpoint for telemetry metrics. 108 TelemetryAddr string 109 // DevMode enables the "dev" mode where the client sends much more 110 // telemetry metrics to help troubleshooting the client behavior. 111 DevMode bool 112} 113 114func resolveOptions(options []Option) (*Options, error) { 115 o := &Options{ 116 Namespace: DefaultNamespace, 117 Tags: DefaultTags, 118 MaxBytesPerPayload: DefaultMaxBytesPerPayload, 119 MaxMessagesPerPayload: DefaultMaxMessagesPerPayload, 120 BufferPoolSize: DefaultBufferPoolSize, 121 BufferFlushInterval: DefaultBufferFlushInterval, 122 BufferShardCount: DefaultBufferShardCount, 123 SenderQueueSize: DefaultSenderQueueSize, 124 WriteTimeoutUDS: DefaultWriteTimeoutUDS, 125 Telemetry: DefaultTelemetry, 126 ReceiveMode: DefaultReceivingMode, 127 ChannelModeBufferSize: DefaultChannelModeBufferSize, 128 AggregationFlushInterval: DefaultAggregationFlushInterval, 129 Aggregation: DefaultAggregation, 130 ExtendedAggregation: DefaultExtendedAggregation, 131 DevMode: DefaultDevMode, 132 } 133 134 for _, option := range options { 135 err := option(o) 136 if err != nil { 137 return nil, err 138 } 139 } 140 141 return o, nil 142} 143 144// Option is a client option. Can return an error if validation fails. 145type Option func(*Options) error 146 147// WithNamespace sets the Namespace option. 148func WithNamespace(namespace string) Option { 149 return func(o *Options) error { 150 if strings.HasSuffix(namespace, ".") { 151 o.Namespace = namespace 152 } else { 153 o.Namespace = namespace + "." 154 } 155 return nil 156 } 157} 158 159// WithTags sets the Tags option. 160func WithTags(tags []string) Option { 161 return func(o *Options) error { 162 o.Tags = tags 163 return nil 164 } 165} 166 167// WithMaxMessagesPerPayload sets the MaxMessagesPerPayload option. 168func WithMaxMessagesPerPayload(maxMessagesPerPayload int) Option { 169 return func(o *Options) error { 170 o.MaxMessagesPerPayload = maxMessagesPerPayload 171 return nil 172 } 173} 174 175// WithMaxBytesPerPayload sets the MaxBytesPerPayload option. 176func WithMaxBytesPerPayload(MaxBytesPerPayload int) Option { 177 return func(o *Options) error { 178 o.MaxBytesPerPayload = MaxBytesPerPayload 179 return nil 180 } 181} 182 183// WithBufferPoolSize sets the BufferPoolSize option. 184func WithBufferPoolSize(bufferPoolSize int) Option { 185 return func(o *Options) error { 186 o.BufferPoolSize = bufferPoolSize 187 return nil 188 } 189} 190 191// WithBufferFlushInterval sets the BufferFlushInterval option. 192func WithBufferFlushInterval(bufferFlushInterval time.Duration) Option { 193 return func(o *Options) error { 194 o.BufferFlushInterval = bufferFlushInterval 195 return nil 196 } 197} 198 199// WithBufferShardCount sets the BufferShardCount option. 200func WithBufferShardCount(bufferShardCount int) Option { 201 return func(o *Options) error { 202 o.BufferShardCount = bufferShardCount 203 return nil 204 } 205} 206 207// WithSenderQueueSize sets the SenderQueueSize option. 208func WithSenderQueueSize(senderQueueSize int) Option { 209 return func(o *Options) error { 210 o.SenderQueueSize = senderQueueSize 211 return nil 212 } 213} 214 215// WithWriteTimeoutUDS sets the WriteTimeoutUDS option. 216func WithWriteTimeoutUDS(writeTimeoutUDS time.Duration) Option { 217 return func(o *Options) error { 218 o.WriteTimeoutUDS = writeTimeoutUDS 219 return nil 220 } 221} 222 223// WithoutTelemetry disables the telemetry 224func WithoutTelemetry() Option { 225 return func(o *Options) error { 226 o.Telemetry = false 227 return nil 228 } 229} 230 231// WithChannelMode will use channel to receive metrics 232func WithChannelMode() Option { 233 return func(o *Options) error { 234 o.ReceiveMode = ChannelMode 235 return nil 236 } 237} 238 239// WithMutexMode will use mutex to receive metrics 240func WithMutexMode() Option { 241 return func(o *Options) error { 242 o.ReceiveMode = MutexMode 243 return nil 244 } 245} 246 247// WithChannelModeBufferSize the channel buffer size when using "drop mode" 248func WithChannelModeBufferSize(bufferSize int) Option { 249 return func(o *Options) error { 250 o.ChannelModeBufferSize = bufferSize 251 return nil 252 } 253} 254 255// WithAggregationInterval set the aggregation interval 256func WithAggregationInterval(interval time.Duration) Option { 257 return func(o *Options) error { 258 o.AggregationFlushInterval = interval 259 return nil 260 } 261} 262 263// WithClientSideAggregation enables client side aggregation for Gauges, Counts 264// and Sets. Client side aggregation is a beta feature. 265func WithClientSideAggregation() Option { 266 return func(o *Options) error { 267 o.Aggregation = true 268 return nil 269 } 270} 271 272// WithoutClientSideAggregation disables client side aggregation. 273func WithoutClientSideAggregation() Option { 274 return func(o *Options) error { 275 o.Aggregation = false 276 o.ExtendedAggregation = false 277 return nil 278 } 279} 280 281// WithExtendedClientSideAggregation enables client side aggregation for all 282// types. This feature is only compatible with Agent's version >=6.25.0 && 283// <7.0.0 or Agent's versions >=7.25.0. Client side aggregation is a beta 284// feature. 285func WithExtendedClientSideAggregation() Option { 286 return func(o *Options) error { 287 o.Aggregation = true 288 o.ExtendedAggregation = true 289 return nil 290 } 291} 292 293// WithTelemetryAddr specify a different address for telemetry metrics. 294func WithTelemetryAddr(addr string) Option { 295 return func(o *Options) error { 296 o.TelemetryAddr = addr 297 return nil 298 } 299} 300 301// WithDevMode enables client "dev" mode, sending more Telemetry metrics to 302// help troubleshoot client behavior. 303func WithDevMode() Option { 304 return func(o *Options) error { 305 o.DevMode = true 306 return nil 307 } 308} 309 310// WithoutDevMode disables client "dev" mode, sending more Telemetry metrics to 311// help troubleshoot client behavior. 312func WithoutDevMode() Option { 313 return func(o *Options) error { 314 o.DevMode = false 315 return nil 316 } 317} 318