1package statsd
2
3import (
4	"sync/atomic"
5	"time"
6)
7
8// A statsdWriter offers a standard interface regardless of the underlying
9// protocol. For now UDS and UPD writers are available.
10// Attention: the underlying buffer of `data` is reused after a `statsdWriter.Write` call.
11// `statsdWriter.Write` must be synchronous.
12type statsdWriter interface {
13	Write(data []byte) (n int, err error)
14	SetWriteTimeout(time.Duration) error
15	Close() error
16}
17
18// SenderMetrics contains metrics about the health of the sender
19type SenderMetrics struct {
20	TotalSentBytes                uint64
21	TotalSentPayloads             uint64
22	TotalDroppedPayloads          uint64
23	TotalDroppedBytes             uint64
24	TotalDroppedPayloadsQueueFull uint64
25	TotalDroppedBytesQueueFull    uint64
26	TotalDroppedPayloadsWriter    uint64
27	TotalDroppedBytesWriter       uint64
28}
29
30type sender struct {
31	transport   statsdWriter
32	pool        *bufferPool
33	queue       chan *statsdBuffer
34	metrics     *SenderMetrics
35	stop        chan struct{}
36	flushSignal chan struct{}
37}
38
39func newSender(transport statsdWriter, queueSize int, pool *bufferPool) *sender {
40	sender := &sender{
41		transport:   transport,
42		pool:        pool,
43		queue:       make(chan *statsdBuffer, queueSize),
44		metrics:     &SenderMetrics{},
45		stop:        make(chan struct{}),
46		flushSignal: make(chan struct{}),
47	}
48
49	go sender.sendLoop()
50	return sender
51}
52
53func (s *sender) send(buffer *statsdBuffer) {
54	select {
55	case s.queue <- buffer:
56	default:
57		atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1)
58		atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes())))
59		atomic.AddUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 1)
60		atomic.AddUint64(&s.metrics.TotalDroppedBytesQueueFull, uint64(len(buffer.bytes())))
61		s.pool.returnBuffer(buffer)
62	}
63}
64
65func (s *sender) write(buffer *statsdBuffer) {
66	_, err := s.transport.Write(buffer.bytes())
67	if err != nil {
68		atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1)
69		atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes())))
70		atomic.AddUint64(&s.metrics.TotalDroppedPayloadsWriter, 1)
71		atomic.AddUint64(&s.metrics.TotalDroppedBytesWriter, uint64(len(buffer.bytes())))
72	} else {
73		atomic.AddUint64(&s.metrics.TotalSentPayloads, 1)
74		atomic.AddUint64(&s.metrics.TotalSentBytes, uint64(len(buffer.bytes())))
75	}
76	s.pool.returnBuffer(buffer)
77}
78
79func (s *sender) flushTelemetryMetrics() SenderMetrics {
80	return SenderMetrics{
81		TotalSentBytes:                atomic.SwapUint64(&s.metrics.TotalSentBytes, 0),
82		TotalSentPayloads:             atomic.SwapUint64(&s.metrics.TotalSentPayloads, 0),
83		TotalDroppedPayloads:          atomic.SwapUint64(&s.metrics.TotalDroppedPayloads, 0),
84		TotalDroppedBytes:             atomic.SwapUint64(&s.metrics.TotalDroppedBytes, 0),
85		TotalDroppedPayloadsQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 0),
86		TotalDroppedBytesQueueFull:    atomic.SwapUint64(&s.metrics.TotalDroppedBytesQueueFull, 0),
87		TotalDroppedPayloadsWriter:    atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsWriter, 0),
88		TotalDroppedBytesWriter:       atomic.SwapUint64(&s.metrics.TotalDroppedBytesWriter, 0),
89	}
90}
91
92func (s *sender) sendLoop() {
93	defer close(s.stop)
94	for {
95		select {
96		case buffer := <-s.queue:
97			s.write(buffer)
98		case <-s.stop:
99			return
100		case <-s.flushSignal:
101			// At that point we know that the workers are paused (the statsd client
102			// will pause them before calling sender.flush()).
103			// So we can fully flush the input queue
104			s.flushInputQueue()
105			s.flushSignal <- struct{}{}
106		}
107	}
108}
109
110func (s *sender) flushInputQueue() {
111	for {
112		select {
113		case buffer := <-s.queue:
114			s.write(buffer)
115		default:
116			return
117		}
118	}
119}
120func (s *sender) flush() {
121	s.flushSignal <- struct{}{}
122	<-s.flushSignal
123}
124
125func (s *sender) close() error {
126	s.stop <- struct{}{}
127	<-s.stop
128	s.flushInputQueue()
129	return s.transport.Close()
130}
131