1package statsd 2 3import ( 4 "sync/atomic" 5 "time" 6) 7 8// A statsdWriter offers a standard interface regardless of the underlying 9// protocol. For now UDS and UPD writers are available. 10// Attention: the underlying buffer of `data` is reused after a `statsdWriter.Write` call. 11// `statsdWriter.Write` must be synchronous. 12type statsdWriter interface { 13 Write(data []byte) (n int, err error) 14 SetWriteTimeout(time.Duration) error 15 Close() error 16} 17 18// SenderMetrics contains metrics about the health of the sender 19type SenderMetrics struct { 20 TotalSentBytes uint64 21 TotalSentPayloads uint64 22 TotalDroppedPayloads uint64 23 TotalDroppedBytes uint64 24 TotalDroppedPayloadsQueueFull uint64 25 TotalDroppedBytesQueueFull uint64 26 TotalDroppedPayloadsWriter uint64 27 TotalDroppedBytesWriter uint64 28} 29 30type sender struct { 31 transport statsdWriter 32 pool *bufferPool 33 queue chan *statsdBuffer 34 metrics *SenderMetrics 35 stop chan struct{} 36 flushSignal chan struct{} 37} 38 39func newSender(transport statsdWriter, queueSize int, pool *bufferPool) *sender { 40 sender := &sender{ 41 transport: transport, 42 pool: pool, 43 queue: make(chan *statsdBuffer, queueSize), 44 metrics: &SenderMetrics{}, 45 stop: make(chan struct{}), 46 flushSignal: make(chan struct{}), 47 } 48 49 go sender.sendLoop() 50 return sender 51} 52 53func (s *sender) send(buffer *statsdBuffer) { 54 select { 55 case s.queue <- buffer: 56 default: 57 atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1) 58 atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes()))) 59 atomic.AddUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 1) 60 atomic.AddUint64(&s.metrics.TotalDroppedBytesQueueFull, uint64(len(buffer.bytes()))) 61 s.pool.returnBuffer(buffer) 62 } 63} 64 65func (s *sender) write(buffer *statsdBuffer) { 66 _, err := s.transport.Write(buffer.bytes()) 67 if err != nil { 68 atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1) 69 atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes()))) 70 atomic.AddUint64(&s.metrics.TotalDroppedPayloadsWriter, 1) 71 atomic.AddUint64(&s.metrics.TotalDroppedBytesWriter, uint64(len(buffer.bytes()))) 72 } else { 73 atomic.AddUint64(&s.metrics.TotalSentPayloads, 1) 74 atomic.AddUint64(&s.metrics.TotalSentBytes, uint64(len(buffer.bytes()))) 75 } 76 s.pool.returnBuffer(buffer) 77} 78 79func (s *sender) flushTelemetryMetrics() SenderMetrics { 80 return SenderMetrics{ 81 TotalSentBytes: atomic.SwapUint64(&s.metrics.TotalSentBytes, 0), 82 TotalSentPayloads: atomic.SwapUint64(&s.metrics.TotalSentPayloads, 0), 83 TotalDroppedPayloads: atomic.SwapUint64(&s.metrics.TotalDroppedPayloads, 0), 84 TotalDroppedBytes: atomic.SwapUint64(&s.metrics.TotalDroppedBytes, 0), 85 TotalDroppedPayloadsQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 0), 86 TotalDroppedBytesQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedBytesQueueFull, 0), 87 TotalDroppedPayloadsWriter: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsWriter, 0), 88 TotalDroppedBytesWriter: atomic.SwapUint64(&s.metrics.TotalDroppedBytesWriter, 0), 89 } 90} 91 92func (s *sender) sendLoop() { 93 defer close(s.stop) 94 for { 95 select { 96 case buffer := <-s.queue: 97 s.write(buffer) 98 case <-s.stop: 99 return 100 case <-s.flushSignal: 101 // At that point we know that the workers are paused (the statsd client 102 // will pause them before calling sender.flush()). 103 // So we can fully flush the input queue 104 s.flushInputQueue() 105 s.flushSignal <- struct{}{} 106 } 107 } 108} 109 110func (s *sender) flushInputQueue() { 111 for { 112 select { 113 case buffer := <-s.queue: 114 s.write(buffer) 115 default: 116 return 117 } 118 } 119} 120func (s *sender) flush() { 121 s.flushSignal <- struct{}{} 122 <-s.flushSignal 123} 124 125func (s *sender) close() error { 126 s.stop <- struct{}{} 127 <-s.stop 128 s.flushInputQueue() 129 return s.transport.Close() 130} 131