1// Package opentsdb provides a service for InfluxDB to ingest data via the opentsdb protocol.
2package opentsdb // import "github.com/influxdata/influxdb/services/opentsdb"
3
4import (
5	"bufio"
6	"bytes"
7	"crypto/tls"
8	"io"
9	"net"
10	"net/http"
11	"net/textproto"
12	"strconv"
13	"strings"
14	"sync"
15	"sync/atomic"
16	"time"
17
18	"github.com/influxdata/influxdb/logger"
19	"github.com/influxdata/influxdb/models"
20	"github.com/influxdata/influxdb/services/meta"
21	"github.com/influxdata/influxdb/tsdb"
22	"go.uber.org/zap"
23)
24
25// statistics gathered by the openTSDB package.
26const (
27	statHTTPConnectionsHandled   = "httpConnsHandled"
28	statTelnetConnectionsActive  = "tlConnsActive"
29	statTelnetConnectionsHandled = "tlConnsHandled"
30	statTelnetPointsReceived     = "tlPointsRx"
31	statTelnetBytesReceived      = "tlBytesRx"
32	statTelnetReadError          = "tlReadErr"
33	statTelnetBadLine            = "tlBadLine"
34	statTelnetBadTime            = "tlBadTime"
35	statTelnetBadTag             = "tlBadTag"
36	statTelnetBadFloat           = "tlBadFloat"
37	statBatchesTransmitted       = "batchesTx"
38	statPointsTransmitted        = "pointsTx"
39	statBatchesTransmitFail      = "batchesTxFail"
40	statConnectionsActive        = "connsActive"
41	statConnectionsHandled       = "connsHandled"
42	statDroppedPointsInvalid     = "droppedPointsInvalid"
43)
44
45// Service manages the listener and handler for an HTTP endpoint.
46type Service struct {
47	ln     net.Listener  // main listener
48	httpln *chanListener // http channel-based listener
49
50	wg        sync.WaitGroup
51	tls       bool
52	tlsConfig *tls.Config
53	cert      string
54
55	mu    sync.RWMutex
56	ready bool          // Has the required database been created?
57	done  chan struct{} // Is the service closing or closed?
58
59	BindAddress     string
60	Database        string
61	RetentionPolicy string
62
63	PointsWriter interface {
64		WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
65	}
66	MetaClient interface {
67		CreateDatabase(name string) (*meta.DatabaseInfo, error)
68	}
69
70	// Points received over the telnet protocol are batched.
71	batchSize    int
72	batchPending int
73	batchTimeout time.Duration
74	batcher      *tsdb.PointBatcher
75
76	LogPointErrors bool
77	Logger         *zap.Logger
78
79	stats       *Statistics
80	defaultTags models.StatisticTags
81}
82
83// NewService returns a new instance of Service.
84func NewService(c Config) (*Service, error) {
85	// Use defaults where necessary.
86	d := c.WithDefaults()
87
88	s := &Service{
89		tls:             d.TLSEnabled,
90		tlsConfig:       d.TLS,
91		cert:            d.Certificate,
92		BindAddress:     d.BindAddress,
93		Database:        d.Database,
94		RetentionPolicy: d.RetentionPolicy,
95		batchSize:       d.BatchSize,
96		batchPending:    d.BatchPending,
97		batchTimeout:    time.Duration(d.BatchTimeout),
98		Logger:          zap.NewNop(),
99		LogPointErrors:  d.LogPointErrors,
100		stats:           &Statistics{},
101		defaultTags:     models.StatisticTags{"bind": d.BindAddress},
102	}
103	if s.tlsConfig == nil {
104		s.tlsConfig = new(tls.Config)
105	}
106
107	return s, nil
108}
109
110// Open starts the service.
111func (s *Service) Open() error {
112	s.mu.Lock()
113	defer s.mu.Unlock()
114
115	if s.done != nil {
116		return nil // Already open.
117	}
118	s.done = make(chan struct{})
119
120	s.Logger.Info("Starting OpenTSDB service")
121
122	s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout)
123	s.batcher.Start()
124
125	// Start processing batches.
126	s.wg.Add(1)
127	go func() { defer s.wg.Done(); s.processBatches(s.batcher) }()
128
129	// Open listener.
130	if s.tls {
131		cert, err := tls.LoadX509KeyPair(s.cert, s.cert)
132		if err != nil {
133			return err
134		}
135
136		tlsConfig := s.tlsConfig.Clone()
137		tlsConfig.Certificates = []tls.Certificate{cert}
138
139		listener, err := tls.Listen("tcp", s.BindAddress, tlsConfig)
140		if err != nil {
141			return err
142		}
143
144		s.ln = listener
145	} else {
146		listener, err := net.Listen("tcp", s.BindAddress)
147		if err != nil {
148			return err
149		}
150
151		s.ln = listener
152	}
153	s.Logger.Info("Listening on TCP",
154		zap.Stringer("addr", s.ln.Addr()),
155		zap.Bool("tls", s.tls))
156	s.httpln = newChanListener(s.ln.Addr())
157
158	// Begin listening for connections.
159	s.wg.Add(2)
160	go func() { defer s.wg.Done(); s.serve() }()
161	go func() { defer s.wg.Done(); s.serveHTTP() }()
162
163	return nil
164}
165
166// Close closes the openTSDB service.
167func (s *Service) Close() error {
168	if wait, err := func() (bool, error) {
169		s.mu.Lock()
170		defer s.mu.Unlock()
171
172		if s.closed() {
173			return false, nil // Already closed.
174		}
175		close(s.done)
176
177		// Close the listeners.
178		if err := s.ln.Close(); err != nil {
179			return false, err
180		}
181		if err := s.httpln.Close(); err != nil {
182			return false, err
183		}
184
185		if s.batcher != nil {
186			s.batcher.Stop()
187		}
188		return true, nil
189	}(); err != nil {
190		return err
191	} else if !wait {
192		return nil
193	}
194	s.wg.Wait()
195
196	s.mu.Lock()
197	s.done = nil
198	s.mu.Unlock()
199
200	return nil
201}
202
203// Closed returns true if the service is currently closed.
204func (s *Service) Closed() bool {
205	s.mu.Lock()
206	defer s.mu.Unlock()
207	return s.closed()
208}
209
210func (s *Service) closed() bool {
211	select {
212	case <-s.done:
213		// Service is closing.
214		return true
215	default:
216		return s.done == nil
217	}
218}
219
220// createInternalStorage ensures that the required database has been created.
221func (s *Service) createInternalStorage() error {
222	s.mu.RLock()
223	ready := s.ready
224	s.mu.RUnlock()
225	if ready {
226		return nil
227	}
228
229	if _, err := s.MetaClient.CreateDatabase(s.Database); err != nil {
230		return err
231	}
232
233	// The service is now ready.
234	s.mu.Lock()
235	s.ready = true
236	s.mu.Unlock()
237	return nil
238}
239
240// WithLogger sets the logger for the service.
241func (s *Service) WithLogger(log *zap.Logger) {
242	s.Logger = log.With(zap.String("service", "opentsdb"))
243}
244
245// Statistics maintains statistics for the subscriber service.
246type Statistics struct {
247	HTTPConnectionsHandled   int64
248	ActiveTelnetConnections  int64
249	HandledTelnetConnections int64
250	TelnetPointsReceived     int64
251	TelnetBytesReceived      int64
252	TelnetReadError          int64
253	TelnetBadLine            int64
254	TelnetBadTime            int64
255	TelnetBadTag             int64
256	TelnetBadFloat           int64
257	BatchesTransmitted       int64
258	PointsTransmitted        int64
259	BatchesTransmitFail      int64
260	ActiveConnections        int64
261	HandledConnections       int64
262	InvalidDroppedPoints     int64
263}
264
265// Statistics returns statistics for periodic monitoring.
266func (s *Service) Statistics(tags map[string]string) []models.Statistic {
267	return []models.Statistic{{
268		Name: "opentsdb",
269		Tags: s.defaultTags.Merge(tags),
270		Values: map[string]interface{}{
271			statHTTPConnectionsHandled:   atomic.LoadInt64(&s.stats.HTTPConnectionsHandled),
272			statTelnetConnectionsActive:  atomic.LoadInt64(&s.stats.ActiveTelnetConnections),
273			statTelnetConnectionsHandled: atomic.LoadInt64(&s.stats.HandledTelnetConnections),
274			statTelnetPointsReceived:     atomic.LoadInt64(&s.stats.TelnetPointsReceived),
275			statTelnetBytesReceived:      atomic.LoadInt64(&s.stats.TelnetBytesReceived),
276			statTelnetReadError:          atomic.LoadInt64(&s.stats.TelnetReadError),
277			statTelnetBadLine:            atomic.LoadInt64(&s.stats.TelnetBadLine),
278			statTelnetBadTime:            atomic.LoadInt64(&s.stats.TelnetBadTime),
279			statTelnetBadTag:             atomic.LoadInt64(&s.stats.TelnetBadTag),
280			statTelnetBadFloat:           atomic.LoadInt64(&s.stats.TelnetBadFloat),
281			statBatchesTransmitted:       atomic.LoadInt64(&s.stats.BatchesTransmitted),
282			statPointsTransmitted:        atomic.LoadInt64(&s.stats.PointsTransmitted),
283			statBatchesTransmitFail:      atomic.LoadInt64(&s.stats.BatchesTransmitFail),
284			statConnectionsActive:        atomic.LoadInt64(&s.stats.ActiveConnections),
285			statConnectionsHandled:       atomic.LoadInt64(&s.stats.HandledConnections),
286			statDroppedPointsInvalid:     atomic.LoadInt64(&s.stats.InvalidDroppedPoints),
287		},
288	}}
289}
290
291// Addr returns the listener's address. Returns nil if listener is closed.
292func (s *Service) Addr() net.Addr {
293	if s.ln == nil {
294		return nil
295	}
296	return s.ln.Addr()
297}
298
299// serve serves the handler from the listener.
300func (s *Service) serve() {
301	for {
302		// Wait for next connection.
303		conn, err := s.ln.Accept()
304		if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
305			s.Logger.Info("OpenTSDB TCP listener closed")
306			return
307		} else if err != nil {
308			s.Logger.Info("Error accepting OpenTSDB", zap.Error(err))
309			continue
310		}
311
312		// Handle connection in separate goroutine.
313		go s.handleConn(conn)
314	}
315}
316
317// handleConn processes conn. This is run in a separate goroutine.
318func (s *Service) handleConn(conn net.Conn) {
319	defer atomic.AddInt64(&s.stats.ActiveConnections, -1)
320	atomic.AddInt64(&s.stats.ActiveConnections, 1)
321	atomic.AddInt64(&s.stats.HandledConnections, 1)
322
323	// Read header into buffer to check if it's HTTP.
324	var buf bytes.Buffer
325	r := bufio.NewReader(io.TeeReader(conn, &buf))
326
327	// Attempt to parse connection as HTTP.
328	_, err := http.ReadRequest(r)
329
330	// Rebuild connection from buffer and remaining connection data.
331	bufr := bufio.NewReader(io.MultiReader(&buf, conn))
332	conn = &readerConn{Conn: conn, r: bufr}
333
334	// If no HTTP parsing error occurred then process as HTTP.
335	if err == nil {
336		atomic.AddInt64(&s.stats.HTTPConnectionsHandled, 1)
337		s.httpln.ch <- conn
338		return
339	}
340
341	// Otherwise handle in telnet format.
342	s.wg.Add(1)
343	s.handleTelnetConn(conn)
344	s.wg.Done()
345}
346
347// handleTelnetConn accepts OpenTSDB's telnet protocol.
348// Each telnet command consists of a line of the form:
349//   put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0
350func (s *Service) handleTelnetConn(conn net.Conn) {
351	defer conn.Close()
352	defer atomic.AddInt64(&s.stats.ActiveTelnetConnections, -1)
353	atomic.AddInt64(&s.stats.ActiveTelnetConnections, 1)
354	atomic.AddInt64(&s.stats.HandledTelnetConnections, 1)
355
356	// Get connection details.
357	remoteAddr := conn.RemoteAddr().String()
358
359	// Wrap connection in a text protocol reader.
360	r := textproto.NewReader(bufio.NewReader(conn))
361	for {
362		line, err := r.ReadLine()
363		if err != nil {
364			if err != io.EOF {
365				atomic.AddInt64(&s.stats.TelnetReadError, 1)
366				s.Logger.Info("Error reading from OpenTSDB connection", zap.Error(err))
367			}
368			return
369		}
370		atomic.AddInt64(&s.stats.TelnetPointsReceived, 1)
371		atomic.AddInt64(&s.stats.TelnetBytesReceived, int64(len(line)))
372
373		inputStrs := strings.Fields(line)
374
375		if len(inputStrs) == 1 && inputStrs[0] == "version" {
376			conn.Write([]byte("InfluxDB TSDB proxy"))
377			continue
378		}
379
380		if len(inputStrs) < 4 || inputStrs[0] != "put" {
381			atomic.AddInt64(&s.stats.TelnetBadLine, 1)
382			if s.LogPointErrors {
383				s.Logger.Info("Malformed line", zap.String("line", line), zap.String("remote_addr", remoteAddr))
384			}
385			continue
386		}
387
388		measurement := inputStrs[1]
389		tsStr := inputStrs[2]
390		valueStr := inputStrs[3]
391		tagStrs := inputStrs[4:]
392
393		var t time.Time
394		ts, err := strconv.ParseInt(tsStr, 10, 64)
395		if err != nil {
396			atomic.AddInt64(&s.stats.TelnetBadTime, 1)
397			if s.LogPointErrors {
398				s.Logger.Info("Malformed time", zap.String("time", tsStr), zap.String("remote_addr", remoteAddr))
399			}
400		}
401
402		switch len(tsStr) {
403		case 10:
404			t = time.Unix(ts, 0)
405		case 13:
406			t = time.Unix(ts/1000, (ts%1000)*1000)
407		default:
408			atomic.AddInt64(&s.stats.TelnetBadTime, 1)
409			if s.LogPointErrors {
410				s.Logger.Info("Time must be 10 or 13 chars", zap.String("time", tsStr), zap.String("remote_addr", remoteAddr))
411			}
412			continue
413		}
414
415		tags := make(map[string]string)
416		for t := range tagStrs {
417			parts := strings.SplitN(tagStrs[t], "=", 2)
418			if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
419				atomic.AddInt64(&s.stats.TelnetBadTag, 1)
420				if s.LogPointErrors {
421					s.Logger.Info("Malformed tag data", zap.String("tag", tagStrs[t]), zap.String("remote_addr", remoteAddr))
422				}
423				continue
424			}
425			k := parts[0]
426
427			tags[k] = parts[1]
428		}
429
430		fields := make(map[string]interface{})
431		fv, err := strconv.ParseFloat(valueStr, 64)
432		if err != nil {
433			atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
434			if s.LogPointErrors {
435				s.Logger.Info("Bad float", zap.String("value", valueStr), zap.String("remote_addr", remoteAddr))
436			}
437			continue
438		}
439		fields["value"] = fv
440
441		pt, err := models.NewPoint(measurement, models.NewTags(tags), fields, t)
442		if err != nil {
443			atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
444			if s.LogPointErrors {
445				s.Logger.Info("Bad float", zap.String("value", valueStr), zap.String("remote_addr", remoteAddr))
446			}
447			continue
448		}
449		s.batcher.In() <- pt
450	}
451}
452
453// serveHTTP handles connections in HTTP format.
454func (s *Service) serveHTTP() {
455	handler := &Handler{
456		Database:        s.Database,
457		RetentionPolicy: s.RetentionPolicy,
458		PointsWriter:    s.PointsWriter,
459		Logger:          s.Logger,
460		stats:           s.stats,
461	}
462	srv := &http.Server{Handler: handler}
463	srv.Serve(s.httpln)
464}
465
466// processBatches continually drains the given batcher and writes the batches to the database.
467func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
468	for {
469		select {
470		case <-s.done:
471			return
472		case batch := <-batcher.Out():
473			// Will attempt to create database if not yet created.
474			if err := s.createInternalStorage(); err != nil {
475				s.Logger.Info("Required database does not yet exist", logger.Database(s.Database), zap.Error(err))
476				continue
477			}
478
479			if err := s.PointsWriter.WritePointsPrivileged(s.Database, s.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
480				atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
481				atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
482			} else {
483				s.Logger.Info("Failed to write point batch to database",
484					logger.Database(s.Database), zap.Error(err))
485				atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
486			}
487		}
488	}
489}
490