1package opentsdb
2
3import (
4	"bufio"
5	"compress/gzip"
6	"encoding/json"
7	"errors"
8	"io"
9	"net"
10	"net/http"
11	"sync"
12	"sync/atomic"
13	"time"
14
15	"github.com/influxdata/influxdb"
16	"github.com/influxdata/influxdb/models"
17	"go.uber.org/zap"
18)
19
20// Handler is an http.Handler for the OpenTSDB service.
21type Handler struct {
22	Database        string
23	RetentionPolicy string
24
25	PointsWriter interface {
26		WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
27	}
28
29	Logger *zap.Logger
30
31	stats *Statistics
32}
33
34// ServeHTTP handles an HTTP request of the OpenTSDB REST API.
35func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
36	switch r.URL.Path {
37	case "/api/metadata/put":
38		w.WriteHeader(http.StatusNoContent)
39	case "/api/put":
40		h.servePut(w, r)
41	default:
42		http.NotFound(w, r)
43	}
44}
45
46// servePut implements OpenTSDB's HTTP /api/put endpoint.
47func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
48	defer r.Body.Close()
49
50	// Require POST method.
51	if r.Method != "POST" {
52		http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
53		return
54	}
55
56	// Wrap reader if it's gzip encoded.
57	var br *bufio.Reader
58	if r.Header.Get("Content-Encoding") == "gzip" {
59		zr, err := gzip.NewReader(r.Body)
60		if err != nil {
61			http.Error(w, "could not read gzip, "+err.Error(), http.StatusBadRequest)
62			return
63		}
64
65		br = bufio.NewReader(zr)
66	} else {
67		br = bufio.NewReader(r.Body)
68	}
69
70	// Lookahead at the first byte.
71	f, err := br.Peek(1)
72	if err != nil || len(f) != 1 {
73		http.Error(w, "peek error: "+err.Error(), http.StatusBadRequest)
74		return
75	}
76
77	// Peek to see if this is a JSON array.
78	var multi bool
79	switch f[0] {
80	case '{':
81	case '[':
82		multi = true
83	default:
84		http.Error(w, "expected JSON array or hash", http.StatusBadRequest)
85		return
86	}
87
88	// Decode JSON data into slice of points.
89	dps := make([]point, 1)
90	if dec := json.NewDecoder(br); multi {
91		if err = dec.Decode(&dps); err != nil {
92			http.Error(w, "json array decode error", http.StatusBadRequest)
93			return
94		}
95	} else {
96		if err = dec.Decode(&dps[0]); err != nil {
97			http.Error(w, "json object decode error", http.StatusBadRequest)
98			return
99		}
100	}
101
102	// Convert points into TSDB points.
103	points := make([]models.Point, 0, len(dps))
104	for i := range dps {
105		p := dps[i]
106
107		// Convert timestamp to Go time.
108		// If time value is over a billion then it's microseconds.
109		var ts time.Time
110		if p.Time < 10000000000 {
111			ts = time.Unix(p.Time, 0)
112		} else {
113			ts = time.Unix(p.Time/1000, (p.Time%1000)*1000)
114		}
115
116		pt, err := models.NewPoint(p.Metric, models.NewTags(p.Tags), map[string]interface{}{"value": p.Value}, ts)
117		if err != nil {
118			h.Logger.Info("Dropping point", zap.String("name", p.Metric), zap.Error(err))
119			if h.stats != nil {
120				atomic.AddInt64(&h.stats.InvalidDroppedPoints, 1)
121			}
122			continue
123		}
124		points = append(points, pt)
125	}
126
127	// Write points.
128	if err := h.PointsWriter.WritePointsPrivileged(h.Database, h.RetentionPolicy, models.ConsistencyLevelAny, points); influxdb.IsClientError(err) {
129		h.Logger.Info("Write series error", zap.Error(err))
130		http.Error(w, "write series error: "+err.Error(), http.StatusBadRequest)
131		return
132	} else if err != nil {
133		h.Logger.Info("Write series error", zap.Error(err))
134		http.Error(w, "write series error: "+err.Error(), http.StatusInternalServerError)
135		return
136	}
137
138	w.WriteHeader(http.StatusNoContent)
139}
140
141// chanListener represents a listener that receives connections through a channel.
142type chanListener struct {
143	addr   net.Addr
144	ch     chan net.Conn
145	done   chan struct{}
146	closer sync.Once // closer ensures that Close is idempotent.
147}
148
149// newChanListener returns a new instance of chanListener.
150func newChanListener(addr net.Addr) *chanListener {
151	return &chanListener{
152		addr: addr,
153		ch:   make(chan net.Conn),
154		done: make(chan struct{}),
155	}
156}
157
158func (ln *chanListener) Accept() (net.Conn, error) {
159	errClosed := errors.New("network connection closed")
160	select {
161	case <-ln.done:
162		return nil, errClosed
163	case conn, ok := <-ln.ch:
164		if !ok {
165			return nil, errClosed
166		}
167		return conn, nil
168	}
169}
170
171// Close closes the connection channel.
172func (ln *chanListener) Close() error {
173	ln.closer.Do(func() {
174		close(ln.done)
175	})
176	return nil
177}
178
179// Addr returns the network address of the listener.
180func (ln *chanListener) Addr() net.Addr { return ln.addr }
181
182// readerConn represents a net.Conn with an assignable reader.
183type readerConn struct {
184	net.Conn
185	r io.Reader
186}
187
188// Read implements the io.Reader interface.
189func (conn *readerConn) Read(b []byte) (n int, err error) { return conn.r.Read(b) }
190
191// point represents an incoming JSON data point.
192type point struct {
193	Metric string            `json:"metric"`
194	Time   int64             `json:"timestamp"`
195	Value  float64           `json:"value"`
196	Tags   map[string]string `json:"tags,omitempty"`
197}
198