1// Package graphite provides a service for InfluxDB to ingest data via the graphite protocol.
2package graphite // import "github.com/influxdata/influxdb/services/graphite"
3
4import (
5	"bufio"
6	"fmt"
7	"math"
8	"net"
9	"strings"
10	"sync"
11	"sync/atomic"
12	"time"
13
14	"github.com/influxdata/influxdb/logger"
15	"github.com/influxdata/influxdb/models"
16	"github.com/influxdata/influxdb/monitor/diagnostics"
17	"github.com/influxdata/influxdb/services/meta"
18	"github.com/influxdata/influxdb/tsdb"
19	"go.uber.org/zap"
20)
21
22const udpBufferSize = 65536
23
24// statistics gathered by the graphite package.
25const (
26	statPointsReceived      = "pointsRx"
27	statBytesReceived       = "bytesRx"
28	statPointsParseFail     = "pointsParseFail"
29	statPointsNaNFail       = "pointsNaNFail"
30	statBatchesTransmitted  = "batchesTx"
31	statPointsTransmitted   = "pointsTx"
32	statBatchesTransmitFail = "batchesTxFail"
33	statConnectionsActive   = "connsActive"
34	statConnectionsHandled  = "connsHandled"
35)
36
37type tcpConnection struct {
38	conn        net.Conn
39	connectTime time.Time
40}
41
42func (c *tcpConnection) Close() {
43	c.conn.Close()
44}
45
46// Service represents a Graphite service.
47type Service struct {
48	bindAddress     string
49	database        string
50	retentionPolicy string
51	protocol        string
52	batchSize       int
53	batchPending    int
54	batchTimeout    time.Duration
55	udpReadBuffer   int
56
57	batcher *tsdb.PointBatcher
58	parser  *Parser
59
60	logger      *zap.Logger
61	stats       *Statistics
62	defaultTags models.StatisticTags
63
64	tcpConnectionsMu sync.Mutex
65	tcpConnections   map[string]*tcpConnection
66	diagsKey         string
67
68	ln      net.Listener
69	addr    net.Addr
70	udpConn *net.UDPConn
71
72	wg sync.WaitGroup
73
74	mu    sync.RWMutex
75	ready bool          // Has the required database been created?
76	done  chan struct{} // Is the service closing or closed?
77
78	Monitor interface {
79		RegisterDiagnosticsClient(name string, client diagnostics.Client)
80		DeregisterDiagnosticsClient(name string)
81	}
82	PointsWriter interface {
83		WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
84	}
85	MetaClient interface {
86		CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
87		CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
88		Database(name string) *meta.DatabaseInfo
89		RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error)
90	}
91}
92
93// NewService returns an instance of the Graphite service.
94func NewService(c Config) (*Service, error) {
95	// Use defaults where necessary.
96	d := c.WithDefaults()
97
98	s := Service{
99		bindAddress:     d.BindAddress,
100		database:        d.Database,
101		retentionPolicy: d.RetentionPolicy,
102		protocol:        d.Protocol,
103		batchSize:       d.BatchSize,
104		batchPending:    d.BatchPending,
105		udpReadBuffer:   d.UDPReadBuffer,
106		batchTimeout:    time.Duration(d.BatchTimeout),
107		logger:          zap.NewNop(),
108		stats:           &Statistics{},
109		defaultTags:     models.StatisticTags{"proto": d.Protocol, "bind": d.BindAddress},
110		tcpConnections:  make(map[string]*tcpConnection),
111		diagsKey:        strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
112	}
113
114	parser, err := NewParserWithOptions(Options{
115		Templates:   d.Templates,
116		DefaultTags: d.DefaultTags(),
117		Separator:   d.Separator})
118
119	if err != nil {
120		return nil, err
121	}
122	s.parser = parser
123
124	return &s, nil
125}
126
127// Open starts the Graphite input processing data.
128func (s *Service) Open() error {
129	s.mu.Lock()
130	defer s.mu.Unlock()
131
132	if s.done != nil {
133		return nil // Already open.
134	}
135	s.done = make(chan struct{})
136
137	s.logger.Info("Starting graphite service",
138		zap.Int("batch_size", s.batchSize),
139		logger.DurationLiteral("batch_timeout", s.batchTimeout))
140
141	// Register diagnostics if a Monitor service is available.
142	if s.Monitor != nil {
143		s.Monitor.RegisterDiagnosticsClient(s.diagsKey, s)
144	}
145
146	s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout)
147	s.batcher.Start()
148
149	// Start processing batches.
150	s.wg.Add(1)
151	go s.processBatches(s.batcher)
152
153	var err error
154	if strings.ToLower(s.protocol) == "tcp" {
155		s.addr, err = s.openTCPServer()
156	} else if strings.ToLower(s.protocol) == "udp" {
157		s.addr, err = s.openUDPServer()
158	} else {
159		return fmt.Errorf("unrecognized Graphite input protocol %s", s.protocol)
160	}
161	if err != nil {
162		return err
163	}
164
165	s.logger.Info("Listening",
166		zap.String("protocol", s.protocol),
167		zap.Stringer("addr", s.addr))
168	return nil
169}
170
171func (s *Service) closeAllConnections() {
172	s.tcpConnectionsMu.Lock()
173	defer s.tcpConnectionsMu.Unlock()
174	for _, c := range s.tcpConnections {
175		c.Close()
176	}
177}
178
179// Close stops all data processing on the Graphite input.
180func (s *Service) Close() error {
181	if wait := func() bool {
182		s.mu.Lock()
183		defer s.mu.Unlock()
184
185		if s.closed() {
186			return false
187		}
188		close(s.done)
189
190		s.closeAllConnections()
191
192		if s.ln != nil {
193			s.ln.Close()
194		}
195		if s.udpConn != nil {
196			s.udpConn.Close()
197		}
198
199		if s.batcher != nil {
200			s.batcher.Stop()
201		}
202
203		if s.Monitor != nil {
204			s.Monitor.DeregisterDiagnosticsClient(s.diagsKey)
205		}
206		return true
207	}(); !wait {
208		return nil // Already closed.
209	}
210
211	s.wg.Wait()
212
213	s.mu.Lock()
214	s.done = nil
215	s.mu.Unlock()
216
217	return nil
218}
219
220// Closed returns true if the service is currently closed.
221func (s *Service) Closed() bool {
222	s.mu.Lock()
223	defer s.mu.Unlock()
224	return s.closed()
225}
226
227func (s *Service) closed() bool {
228	select {
229	case <-s.done:
230		// Service is closing.
231		return true
232	default:
233	}
234	return s.done == nil
235}
236
237// createInternalStorage ensures that the required database has been created.
238func (s *Service) createInternalStorage() error {
239	s.mu.RLock()
240	ready := s.ready
241	s.mu.RUnlock()
242	if ready {
243		return nil
244	}
245
246	if db := s.MetaClient.Database(s.database); db != nil {
247		if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil {
248			spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
249			if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec, true); err != nil {
250				return err
251			}
252		}
253	} else {
254		spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
255		if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, &spec); err != nil {
256			return err
257		}
258	}
259
260	// The service is now ready.
261	s.mu.Lock()
262	s.ready = true
263	s.mu.Unlock()
264	return nil
265}
266
267// WithLogger sets the logger on the service.
268func (s *Service) WithLogger(log *zap.Logger) {
269	s.logger = log.With(
270		zap.String("service", "graphite"),
271		zap.String("addr", s.bindAddress),
272	)
273}
274
275// Statistics maintains statistics for the graphite service.
276type Statistics struct {
277	PointsReceived      int64
278	BytesReceived       int64
279	PointsParseFail     int64
280	PointsNaNFail       int64
281	BatchesTransmitted  int64
282	PointsTransmitted   int64
283	BatchesTransmitFail int64
284	ActiveConnections   int64
285	HandledConnections  int64
286}
287
288// Statistics returns statistics for periodic monitoring.
289func (s *Service) Statistics(tags map[string]string) []models.Statistic {
290	return []models.Statistic{{
291		Name: "graphite",
292		Tags: s.defaultTags.Merge(tags),
293		Values: map[string]interface{}{
294			statPointsReceived:      atomic.LoadInt64(&s.stats.PointsReceived),
295			statBytesReceived:       atomic.LoadInt64(&s.stats.BytesReceived),
296			statPointsParseFail:     atomic.LoadInt64(&s.stats.PointsParseFail),
297			statPointsNaNFail:       atomic.LoadInt64(&s.stats.PointsNaNFail),
298			statBatchesTransmitted:  atomic.LoadInt64(&s.stats.BatchesTransmitted),
299			statPointsTransmitted:   atomic.LoadInt64(&s.stats.PointsTransmitted),
300			statBatchesTransmitFail: atomic.LoadInt64(&s.stats.BatchesTransmitFail),
301			statConnectionsActive:   atomic.LoadInt64(&s.stats.ActiveConnections),
302			statConnectionsHandled:  atomic.LoadInt64(&s.stats.HandledConnections),
303		},
304	}}
305}
306
307// Addr returns the address the Service binds to.
308func (s *Service) Addr() net.Addr {
309	return s.addr
310}
311
312// openTCPServer opens the Graphite input in TCP mode and starts processing data.
313func (s *Service) openTCPServer() (net.Addr, error) {
314	ln, err := net.Listen("tcp", s.bindAddress)
315	if err != nil {
316		return nil, err
317	}
318	s.ln = ln
319
320	s.wg.Add(1)
321	go func() {
322		defer s.wg.Done()
323		for {
324			conn, err := s.ln.Accept()
325			if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
326				s.logger.Info("Graphite TCP listener closed")
327				return
328			}
329			if err != nil {
330				s.logger.Info("Error accepting TCP connection", zap.Error(err))
331				continue
332			}
333
334			s.wg.Add(1)
335			go s.handleTCPConnection(conn)
336		}
337	}()
338	return ln.Addr(), nil
339}
340
341// handleTCPConnection services an individual TCP connection for the Graphite input.
342func (s *Service) handleTCPConnection(conn net.Conn) {
343	defer s.wg.Done()
344	defer conn.Close()
345	defer atomic.AddInt64(&s.stats.ActiveConnections, -1)
346	defer s.untrackConnection(conn)
347	atomic.AddInt64(&s.stats.ActiveConnections, 1)
348	atomic.AddInt64(&s.stats.HandledConnections, 1)
349	s.trackConnection(conn)
350
351	reader := bufio.NewReader(conn)
352
353	for {
354		// Read up to the next newline.
355		buf, err := reader.ReadBytes('\n')
356		if err != nil {
357			return
358		}
359
360		// Trim the buffer, even though there should be no padding
361		line := strings.TrimSpace(string(buf))
362
363		atomic.AddInt64(&s.stats.PointsReceived, 1)
364		atomic.AddInt64(&s.stats.BytesReceived, int64(len(buf)))
365		s.handleLine(line)
366	}
367}
368
369func (s *Service) trackConnection(c net.Conn) {
370	s.tcpConnectionsMu.Lock()
371	defer s.tcpConnectionsMu.Unlock()
372	s.tcpConnections[c.RemoteAddr().String()] = &tcpConnection{
373		conn:        c,
374		connectTime: time.Now().UTC(),
375	}
376}
377func (s *Service) untrackConnection(c net.Conn) {
378	s.tcpConnectionsMu.Lock()
379	defer s.tcpConnectionsMu.Unlock()
380	delete(s.tcpConnections, c.RemoteAddr().String())
381}
382
383// openUDPServer opens the Graphite input in UDP mode and starts processing incoming data.
384func (s *Service) openUDPServer() (net.Addr, error) {
385	addr, err := net.ResolveUDPAddr("udp", s.bindAddress)
386	if err != nil {
387		return nil, err
388	}
389
390	s.udpConn, err = net.ListenUDP("udp", addr)
391	if err != nil {
392		return nil, err
393	}
394
395	if s.udpReadBuffer != 0 {
396		err = s.udpConn.SetReadBuffer(s.udpReadBuffer)
397		if err != nil {
398			return nil, fmt.Errorf("unable to set UDP read buffer to %d: %s",
399				s.udpReadBuffer, err)
400		}
401	}
402
403	buf := make([]byte, udpBufferSize)
404	s.wg.Add(1)
405	go func() {
406		defer s.wg.Done()
407		for {
408			n, _, err := s.udpConn.ReadFromUDP(buf)
409			if err != nil {
410				s.udpConn.Close()
411				return
412			}
413
414			lines := strings.Split(string(buf[:n]), "\n")
415			for _, line := range lines {
416				s.handleLine(line)
417			}
418			atomic.AddInt64(&s.stats.PointsReceived, int64(len(lines)))
419			atomic.AddInt64(&s.stats.BytesReceived, int64(n))
420		}
421	}()
422	return s.udpConn.LocalAddr(), nil
423}
424
425func (s *Service) handleLine(line string) {
426	if line == "" {
427		return
428	}
429
430	// Parse it.
431	point, err := s.parser.Parse(line)
432	if err != nil {
433		switch err := err.(type) {
434		case *UnsupportedValueError:
435			// Graphite ignores NaN values with no error.
436			if math.IsNaN(err.Value) {
437				atomic.AddInt64(&s.stats.PointsNaNFail, 1)
438				return
439			}
440		}
441		s.logger.Info("Unable to parse line", zap.String("line", line), zap.Error(err))
442		atomic.AddInt64(&s.stats.PointsParseFail, 1)
443		return
444	}
445
446	s.batcher.In() <- point
447}
448
449// processBatches continually drains the given batcher and writes the batches to the database.
450func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
451	defer s.wg.Done()
452	for {
453		select {
454		case batch := <-batcher.Out():
455			// Will attempt to create database if not yet created.
456			if err := s.createInternalStorage(); err != nil {
457				s.logger.Info("Required database or retention policy do not yet exist", zap.Error(err))
458				continue
459			}
460
461			if err := s.PointsWriter.WritePointsPrivileged(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
462				atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
463				atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
464			} else {
465				s.logger.Info("Failed to write point batch to database",
466					logger.Database(s.database), zap.Error(err))
467				atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
468			}
469
470		case <-s.done:
471			return
472		}
473	}
474}
475
476// Diagnostics returns diagnostics of the graphite service.
477func (s *Service) Diagnostics() (*diagnostics.Diagnostics, error) {
478	s.tcpConnectionsMu.Lock()
479	defer s.tcpConnectionsMu.Unlock()
480
481	d := &diagnostics.Diagnostics{
482		Columns: []string{"local", "remote", "connect time"},
483		Rows:    make([][]interface{}, 0, len(s.tcpConnections)),
484	}
485	for _, v := range s.tcpConnections {
486		d.Rows = append(d.Rows, []interface{}{v.conn.LocalAddr().String(), v.conn.RemoteAddr().String(), v.connectTime})
487	}
488	return d, nil
489}
490