1package httpd
2
3import (
4	"bytes"
5	"compress/gzip"
6	"context"
7	"encoding/json"
8	"errors"
9	"expvar"
10	"fmt"
11	"io"
12	"io/ioutil"
13	"log"
14	"math"
15	"net/http"
16	"os"
17	"runtime/debug"
18	"strconv"
19	"strings"
20	"sync/atomic"
21	"time"
22
23	"github.com/bmizerany/pat"
24	"github.com/dgrijalva/jwt-go"
25	"github.com/gogo/protobuf/proto"
26	"github.com/golang/snappy"
27	"github.com/influxdata/flux"
28	"github.com/influxdata/flux/lang"
29	"github.com/influxdata/influxdb"
30	"github.com/influxdata/influxdb/logger"
31	"github.com/influxdata/influxdb/models"
32	"github.com/influxdata/influxdb/monitor"
33	"github.com/influxdata/influxdb/monitor/diagnostics"
34	"github.com/influxdata/influxdb/prometheus"
35	"github.com/influxdata/influxdb/prometheus/remote"
36	"github.com/influxdata/influxdb/query"
37	"github.com/influxdata/influxdb/services/meta"
38	"github.com/influxdata/influxdb/services/storage"
39	"github.com/influxdata/influxdb/storage/reads"
40	"github.com/influxdata/influxdb/storage/reads/datatypes"
41	"github.com/influxdata/influxdb/tsdb"
42	"github.com/influxdata/influxdb/uuid"
43	"github.com/influxdata/influxql"
44	prom "github.com/prometheus/client_golang/prometheus"
45	"github.com/prometheus/client_golang/prometheus/promhttp"
46	"go.uber.org/zap"
47)
48
49const (
50	// DefaultChunkSize specifies the maximum number of points that will
51	// be read before sending results back to the engine.
52	//
53	// This has no relation to the number of bytes that are returned.
54	DefaultChunkSize = 10000
55
56	DefaultDebugRequestsInterval = 10 * time.Second
57
58	MaxDebugRequestsInterval = 6 * time.Hour
59)
60
61// AuthenticationMethod defines the type of authentication used.
62type AuthenticationMethod int
63
64// Supported authentication methods.
65const (
66	// Authenticate using basic authentication.
67	UserAuthentication AuthenticationMethod = iota
68
69	// Authenticate with jwt.
70	BearerAuthentication
71)
72
73// TODO: Check HTTP response codes: 400, 401, 403, 409.
74
75// Route specifies how to handle a HTTP verb for a given endpoint.
76type Route struct {
77	Name           string
78	Method         string
79	Pattern        string
80	Gzipped        bool
81	LoggingEnabled bool
82	HandlerFunc    interface{}
83}
84
85// Handler represents an HTTP handler for the InfluxDB server.
86type Handler struct {
87	mux       *pat.PatternServeMux
88	Version   string
89	BuildType string
90
91	MetaClient interface {
92		Database(name string) *meta.DatabaseInfo
93		Databases() []meta.DatabaseInfo
94		Authenticate(username, password string) (ui meta.User, err error)
95		User(username string) (meta.User, error)
96		AdminUserExists() bool
97	}
98
99	QueryAuthorizer interface {
100		AuthorizeQuery(u meta.User, query *influxql.Query, database string) error
101	}
102
103	WriteAuthorizer interface {
104		AuthorizeWrite(username, database string) error
105	}
106
107	QueryExecutor *query.Executor
108
109	Monitor interface {
110		Statistics(tags map[string]string) ([]*monitor.Statistic, error)
111		Diagnostics() (map[string]*diagnostics.Diagnostics, error)
112	}
113
114	PointsWriter interface {
115		WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
116	}
117
118	Store Store
119
120	// Flux services
121	Controller       Controller
122	CompilerMappings flux.CompilerMappings
123	registered       bool
124
125	Config           *Config
126	Logger           *zap.Logger
127	CLFLogger        *log.Logger
128	accessLog        *os.File
129	accessLogFilters StatusFilters
130	stats            *Statistics
131
132	requestTracker *RequestTracker
133	writeThrottler *Throttler
134}
135
136// NewHandler returns a new instance of handler with routes.
137func NewHandler(c Config) *Handler {
138	h := &Handler{
139		mux:            pat.New(),
140		Config:         &c,
141		Logger:         zap.NewNop(),
142		CLFLogger:      log.New(os.Stderr, "[httpd] ", 0),
143		stats:          &Statistics{},
144		requestTracker: NewRequestTracker(),
145	}
146
147	// Limit the number of concurrent & enqueued write requests.
148	h.writeThrottler = NewThrottler(c.MaxConcurrentWriteLimit, c.MaxEnqueuedWriteLimit)
149	h.writeThrottler.EnqueueTimeout = c.EnqueuedWriteTimeout
150
151	// Disable the write log if they have been suppressed.
152	writeLogEnabled := c.LogEnabled
153	if c.SuppressWriteLog {
154		writeLogEnabled = false
155	}
156
157	h.AddRoutes([]Route{
158		Route{
159			"query-options", // Satisfy CORS checks.
160			"OPTIONS", "/query", false, true, h.serveOptions,
161		},
162		Route{
163			"query", // Query serving route.
164			"GET", "/query", true, true, h.serveQuery,
165		},
166		Route{
167			"query", // Query serving route.
168			"POST", "/query", true, true, h.serveQuery,
169		},
170		Route{
171			"write-options", // Satisfy CORS checks.
172			"OPTIONS", "/write", false, true, h.serveOptions,
173		},
174		Route{
175			"write", // Data-ingest route.
176			"POST", "/write", true, writeLogEnabled, h.serveWrite,
177		},
178		Route{
179			"prometheus-write", // Prometheus remote write
180			"POST", "/api/v1/prom/write", false, true, h.servePromWrite,
181		},
182		Route{
183			"prometheus-read", // Prometheus remote read
184			"POST", "/api/v1/prom/read", true, true, h.servePromRead,
185		},
186		Route{ // Ping
187			"ping",
188			"GET", "/ping", false, true, h.servePing,
189		},
190		Route{ // Ping
191			"ping-head",
192			"HEAD", "/ping", false, true, h.servePing,
193		},
194		Route{ // Ping w/ status
195			"status",
196			"GET", "/status", false, true, h.serveStatus,
197		},
198		Route{ // Ping w/ status
199			"status-head",
200			"HEAD", "/status", false, true, h.serveStatus,
201		},
202		Route{
203			"prometheus-metrics",
204			"GET", "/metrics", false, true, promhttp.Handler().ServeHTTP,
205		},
206	}...)
207
208	fluxRoute := Route{
209		"flux-read",
210		"POST", "/api/v2/query", true, true, nil,
211	}
212
213	if !c.FluxEnabled {
214		fluxRoute.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
215			http.Error(w, "Flux query service disabled. Verify flux-enabled=true in the [http] section of the InfluxDB config.", http.StatusForbidden)
216		}
217	} else {
218		fluxRoute.HandlerFunc = h.serveFluxQuery
219	}
220	h.AddRoutes(fluxRoute)
221
222	return h
223}
224
225func (h *Handler) Open() {
226	if h.Config.LogEnabled {
227		path := "stderr"
228
229		if h.Config.AccessLogPath != "" {
230			f, err := os.OpenFile(h.Config.AccessLogPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
231			if err != nil {
232				h.Logger.Error("unable to open access log, falling back to stderr", zap.Error(err), zap.String("path", h.Config.AccessLogPath))
233				return
234			}
235			h.CLFLogger = log.New(f, "", 0) // [httpd] prefix stripped when logging to a file
236			h.accessLog = f
237			path = h.Config.AccessLogPath
238		}
239		h.Logger.Info("opened HTTP access log", zap.String("path", path))
240	}
241	h.accessLogFilters = StatusFilters(h.Config.AccessLogStatusFilters)
242
243	if h.Config.AuthEnabled && h.Config.SharedSecret == "" {
244		h.Logger.Info("Auth is enabled but shared-secret is blank. BearerAuthentication is disabled.")
245	}
246
247	if h.Config.FluxEnabled {
248		h.registered = true
249		prom.MustRegister(h.Controller.PrometheusCollectors()...)
250	}
251}
252
253func (h *Handler) Close() {
254	if h.accessLog != nil {
255		h.accessLog.Close()
256		h.accessLog = nil
257		h.accessLogFilters = nil
258	}
259
260	if h.registered {
261		for _, col := range h.Controller.PrometheusCollectors() {
262			prom.Unregister(col)
263		}
264		h.registered = false
265	}
266}
267
268// Statistics maintains statistics for the httpd service.
269type Statistics struct {
270	Requests                     int64
271	CQRequests                   int64
272	QueryRequests                int64
273	WriteRequests                int64
274	PingRequests                 int64
275	StatusRequests               int64
276	WriteRequestBytesReceived    int64
277	QueryRequestBytesTransmitted int64
278	PointsWrittenOK              int64
279	PointsWrittenDropped         int64
280	PointsWrittenFail            int64
281	AuthenticationFailures       int64
282	RequestDuration              int64
283	QueryRequestDuration         int64
284	WriteRequestDuration         int64
285	ActiveRequests               int64
286	ActiveWriteRequests          int64
287	ClientErrors                 int64
288	ServerErrors                 int64
289	RecoveredPanics              int64
290	PromWriteRequests            int64
291	PromReadRequests             int64
292	FluxQueryRequests            int64
293	FluxQueryRequestDuration     int64
294}
295
296// Statistics returns statistics for periodic monitoring.
297func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
298	return []models.Statistic{{
299		Name: "httpd",
300		Tags: tags,
301		Values: map[string]interface{}{
302			statRequest:                      atomic.LoadInt64(&h.stats.Requests),
303			statQueryRequest:                 atomic.LoadInt64(&h.stats.QueryRequests),
304			statWriteRequest:                 atomic.LoadInt64(&h.stats.WriteRequests),
305			statPingRequest:                  atomic.LoadInt64(&h.stats.PingRequests),
306			statStatusRequest:                atomic.LoadInt64(&h.stats.StatusRequests),
307			statWriteRequestBytesReceived:    atomic.LoadInt64(&h.stats.WriteRequestBytesReceived),
308			statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted),
309			statPointsWrittenOK:              atomic.LoadInt64(&h.stats.PointsWrittenOK),
310			statPointsWrittenDropped:         atomic.LoadInt64(&h.stats.PointsWrittenDropped),
311			statPointsWrittenFail:            atomic.LoadInt64(&h.stats.PointsWrittenFail),
312			statAuthFail:                     atomic.LoadInt64(&h.stats.AuthenticationFailures),
313			statRequestDuration:              atomic.LoadInt64(&h.stats.RequestDuration),
314			statQueryRequestDuration:         atomic.LoadInt64(&h.stats.QueryRequestDuration),
315			statWriteRequestDuration:         atomic.LoadInt64(&h.stats.WriteRequestDuration),
316			statRequestsActive:               atomic.LoadInt64(&h.stats.ActiveRequests),
317			statWriteRequestsActive:          atomic.LoadInt64(&h.stats.ActiveWriteRequests),
318			statClientError:                  atomic.LoadInt64(&h.stats.ClientErrors),
319			statServerError:                  atomic.LoadInt64(&h.stats.ServerErrors),
320			statRecoveredPanics:              atomic.LoadInt64(&h.stats.RecoveredPanics),
321			statPromWriteRequest:             atomic.LoadInt64(&h.stats.PromWriteRequests),
322			statPromReadRequest:              atomic.LoadInt64(&h.stats.PromReadRequests),
323			statFluxQueryRequests:            atomic.LoadInt64(&h.stats.FluxQueryRequests),
324			statFluxQueryRequestDuration:     atomic.LoadInt64(&h.stats.FluxQueryRequestDuration),
325		},
326	}}
327}
328
329// AddRoutes sets the provided routes on the handler.
330func (h *Handler) AddRoutes(routes ...Route) {
331	for _, r := range routes {
332		var handler http.Handler
333
334		// If it's a handler func that requires authorization, wrap it in authentication
335		if hf, ok := r.HandlerFunc.(func(http.ResponseWriter, *http.Request, meta.User)); ok {
336			handler = authenticate(hf, h, h.Config.AuthEnabled)
337		}
338
339		// This is a normal handler signature and does not require authentication
340		if hf, ok := r.HandlerFunc.(func(http.ResponseWriter, *http.Request)); ok {
341			handler = http.HandlerFunc(hf)
342		}
343
344		// Throttle route if this is a write endpoint.
345		if r.Method == http.MethodPost {
346			switch r.Pattern {
347			case "/write", "/api/v1/prom/write":
348				handler = h.writeThrottler.Handler(handler)
349			default:
350			}
351		}
352
353		handler = h.responseWriter(handler)
354		if r.Gzipped {
355			handler = gzipFilter(handler)
356		}
357		handler = cors(handler)
358		handler = requestID(handler)
359		if h.Config.LogEnabled && r.LoggingEnabled {
360			handler = h.logging(handler, r.Name)
361		}
362		handler = h.recovery(handler, r.Name) // make sure recovery is always last
363
364		h.mux.Add(r.Method, r.Pattern, handler)
365	}
366}
367
368// ServeHTTP responds to HTTP request to the handler.
369func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
370	atomic.AddInt64(&h.stats.Requests, 1)
371	atomic.AddInt64(&h.stats.ActiveRequests, 1)
372	defer atomic.AddInt64(&h.stats.ActiveRequests, -1)
373	start := time.Now()
374
375	// Add version and build header to all InfluxDB requests.
376	w.Header().Add("X-Influxdb-Version", h.Version)
377	w.Header().Add("X-Influxdb-Build", h.BuildType)
378
379	if strings.HasPrefix(r.URL.Path, "/debug/pprof") && h.Config.PprofEnabled {
380		h.handleProfiles(w, r)
381	} else if strings.HasPrefix(r.URL.Path, "/debug/vars") {
382		h.serveExpvar(w, r)
383	} else if strings.HasPrefix(r.URL.Path, "/debug/requests") {
384		h.serveDebugRequests(w, r)
385	} else {
386		h.mux.ServeHTTP(w, r)
387	}
388
389	atomic.AddInt64(&h.stats.RequestDuration, time.Since(start).Nanoseconds())
390}
391
392// writeHeader writes the provided status code in the response, and
393// updates relevant http error statistics.
394func (h *Handler) writeHeader(w http.ResponseWriter, code int) {
395	switch code / 100 {
396	case 4:
397		atomic.AddInt64(&h.stats.ClientErrors, 1)
398	case 5:
399		atomic.AddInt64(&h.stats.ServerErrors, 1)
400	}
401	w.WriteHeader(code)
402}
403
404// serveQuery parses an incoming query and, if valid, executes the query.
405func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) {
406	atomic.AddInt64(&h.stats.QueryRequests, 1)
407	defer func(start time.Time) {
408		atomic.AddInt64(&h.stats.QueryRequestDuration, time.Since(start).Nanoseconds())
409	}(time.Now())
410	h.requestTracker.Add(r, user)
411
412	// Retrieve the underlying ResponseWriter or initialize our own.
413	rw, ok := w.(ResponseWriter)
414	if !ok {
415		rw = NewResponseWriter(w, r)
416	}
417
418	// Retrieve the node id the query should be executed on.
419	nodeID, _ := strconv.ParseUint(r.FormValue("node_id"), 10, 64)
420
421	var qr io.Reader
422	// Attempt to read the form value from the "q" form value.
423	if qp := strings.TrimSpace(r.FormValue("q")); qp != "" {
424		qr = strings.NewReader(qp)
425	} else if r.MultipartForm != nil && r.MultipartForm.File != nil {
426		// If we have a multipart/form-data, try to retrieve a file from 'q'.
427		if fhs := r.MultipartForm.File["q"]; len(fhs) > 0 {
428			f, err := fhs[0].Open()
429			if err != nil {
430				h.httpError(rw, err.Error(), http.StatusBadRequest)
431				return
432			}
433			defer f.Close()
434			qr = f
435		}
436	}
437
438	if qr == nil {
439		h.httpError(rw, `missing required parameter "q"`, http.StatusBadRequest)
440		return
441	}
442
443	epoch := strings.TrimSpace(r.FormValue("epoch"))
444
445	p := influxql.NewParser(qr)
446	db := r.FormValue("db")
447
448	// Sanitize the request query params so it doesn't show up in the response logger.
449	// Do this before anything else so a parsing error doesn't leak passwords.
450	sanitize(r)
451
452	// Parse the parameters
453	rawParams := r.FormValue("params")
454	if rawParams != "" {
455		var params map[string]interface{}
456		decoder := json.NewDecoder(strings.NewReader(rawParams))
457		decoder.UseNumber()
458		if err := decoder.Decode(&params); err != nil {
459			h.httpError(rw, "error parsing query parameters: "+err.Error(), http.StatusBadRequest)
460			return
461		}
462
463		// Convert json.Number into int64 and float64 values
464		for k, v := range params {
465			if v, ok := v.(json.Number); ok {
466				var err error
467				if strings.Contains(string(v), ".") {
468					params[k], err = v.Float64()
469				} else {
470					params[k], err = v.Int64()
471				}
472
473				if err != nil {
474					h.httpError(rw, "error parsing json value: "+err.Error(), http.StatusBadRequest)
475					return
476				}
477			}
478		}
479		p.SetParams(params)
480	}
481
482	// Parse query from query string.
483	q, err := p.ParseQuery()
484	if err != nil {
485		h.httpError(rw, "error parsing query: "+err.Error(), http.StatusBadRequest)
486		return
487	}
488
489	// Check authorization.
490	if h.Config.AuthEnabled {
491		if err := h.QueryAuthorizer.AuthorizeQuery(user, q, db); err != nil {
492			if err, ok := err.(meta.ErrAuthorize); ok {
493				h.Logger.Info("Unauthorized request",
494					zap.String("user", err.User),
495					zap.Stringer("query", err.Query),
496					logger.Database(err.Database))
497			}
498			h.httpError(rw, "error authorizing query: "+err.Error(), http.StatusForbidden)
499			return
500		}
501	}
502
503	// Parse chunk size. Use default if not provided or unparsable.
504	chunked := r.FormValue("chunked") == "true"
505	chunkSize := DefaultChunkSize
506	if chunked {
507		if n, err := strconv.ParseInt(r.FormValue("chunk_size"), 10, 64); err == nil && int(n) > 0 {
508			chunkSize = int(n)
509		}
510	}
511
512	// Parse whether this is an async command.
513	async := r.FormValue("async") == "true"
514
515	opts := query.ExecutionOptions{
516		Database:        db,
517		RetentionPolicy: r.FormValue("rp"),
518		ChunkSize:       chunkSize,
519		ReadOnly:        r.Method == "GET",
520		NodeID:          nodeID,
521	}
522
523	if h.Config.AuthEnabled {
524		if user != nil && user.AuthorizeUnrestricted() {
525			opts.Authorizer = query.OpenAuthorizer
526		} else {
527			// The current user determines the authorized actions.
528			opts.Authorizer = user
529		}
530	} else {
531		// Auth is disabled, so allow everything.
532		opts.Authorizer = query.OpenAuthorizer
533	}
534
535	// Make sure if the client disconnects we signal the query to abort
536	var closing chan struct{}
537	if !async {
538		closing = make(chan struct{})
539		if notifier, ok := w.(http.CloseNotifier); ok {
540			// CloseNotify() is not guaranteed to send a notification when the query
541			// is closed. Use this channel to signal that the query is finished to
542			// prevent lingering goroutines that may be stuck.
543			done := make(chan struct{})
544			defer close(done)
545
546			notify := notifier.CloseNotify()
547			go func() {
548				// Wait for either the request to finish
549				// or for the client to disconnect
550				select {
551				case <-done:
552				case <-notify:
553					close(closing)
554				}
555			}()
556			opts.AbortCh = done
557		} else {
558			defer close(closing)
559		}
560	}
561
562	// Execute query.
563	results := h.QueryExecutor.ExecuteQuery(q, opts, closing)
564
565	// If we are running in async mode, open a goroutine to drain the results
566	// and return with a StatusNoContent.
567	if async {
568		go h.async(q, results)
569		h.writeHeader(w, http.StatusNoContent)
570		return
571	}
572
573	// if we're not chunking, this will be the in memory buffer for all results before sending to client
574	resp := Response{Results: make([]*query.Result, 0)}
575
576	// Status header is OK once this point is reached.
577	// Attempt to flush the header immediately so the client gets the header information
578	// and knows the query was accepted.
579	h.writeHeader(rw, http.StatusOK)
580	if w, ok := w.(http.Flusher); ok {
581		w.Flush()
582	}
583
584	// pull all results from the channel
585	rows := 0
586	for r := range results {
587		// Ignore nil results.
588		if r == nil {
589			continue
590		}
591
592		// if requested, convert result timestamps to epoch
593		if epoch != "" {
594			convertToEpoch(r, epoch)
595		}
596
597		// Write out result immediately if chunked.
598		if chunked {
599			n, _ := rw.WriteResponse(Response{
600				Results: []*query.Result{r},
601			})
602			atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
603			w.(http.Flusher).Flush()
604			continue
605		}
606
607		// Limit the number of rows that can be returned in a non-chunked
608		// response.  This is to prevent the server from going OOM when
609		// returning a large response.  If you want to return more than the
610		// default chunk size, then use chunking to process multiple blobs.
611		// Iterate through the series in this result to count the rows and
612		// truncate any rows we shouldn't return.
613		if h.Config.MaxRowLimit > 0 {
614			for i, series := range r.Series {
615				n := h.Config.MaxRowLimit - rows
616				if n < len(series.Values) {
617					// We have reached the maximum number of values. Truncate
618					// the values within this row.
619					series.Values = series.Values[:n]
620					// Since this was truncated, it will always be a partial return.
621					// Add this so the client knows we truncated the response.
622					series.Partial = true
623				}
624				rows += len(series.Values)
625
626				if rows >= h.Config.MaxRowLimit {
627					// Drop any remaining series since we have already reached the row limit.
628					if i < len(r.Series) {
629						r.Series = r.Series[:i+1]
630					}
631					break
632				}
633			}
634		}
635
636		// It's not chunked so buffer results in memory.
637		// Results for statements need to be combined together.
638		// We need to check if this new result is for the same statement as
639		// the last result, or for the next statement
640		l := len(resp.Results)
641		if l == 0 {
642			resp.Results = append(resp.Results, r)
643		} else if resp.Results[l-1].StatementID == r.StatementID {
644			if r.Err != nil {
645				resp.Results[l-1] = r
646				continue
647			}
648
649			cr := resp.Results[l-1]
650			rowsMerged := 0
651			if len(cr.Series) > 0 {
652				lastSeries := cr.Series[len(cr.Series)-1]
653
654				for _, row := range r.Series {
655					if !lastSeries.SameSeries(row) {
656						// Next row is for a different series than last.
657						break
658					}
659					// Values are for the same series, so append them.
660					lastSeries.Values = append(lastSeries.Values, row.Values...)
661					rowsMerged++
662				}
663			}
664
665			// Append remaining rows as new rows.
666			r.Series = r.Series[rowsMerged:]
667			cr.Series = append(cr.Series, r.Series...)
668			cr.Messages = append(cr.Messages, r.Messages...)
669			cr.Partial = r.Partial
670		} else {
671			resp.Results = append(resp.Results, r)
672		}
673
674		// Drop out of this loop and do not process further results when we hit the row limit.
675		if h.Config.MaxRowLimit > 0 && rows >= h.Config.MaxRowLimit {
676			// If the result is marked as partial, remove that partial marking
677			// here. While the series is partial and we would normally have
678			// tried to return the rest in the next chunk, we are not using
679			// chunking and are truncating the series so we don't want to
680			// signal to the client that we plan on sending another JSON blob
681			// with another result.  The series, on the other hand, still
682			// returns partial true if it was truncated or had more data to
683			// send in a future chunk.
684			r.Partial = false
685			break
686		}
687	}
688
689	// If it's not chunked we buffered everything in memory, so write it out
690	if !chunked {
691		n, _ := rw.WriteResponse(resp)
692		atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
693	}
694}
695
696// async drains the results from an async query and logs a message if it fails.
697func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) {
698	for r := range results {
699		// Drain the results and do nothing with them.
700		// If it fails, log the failure so there is at least a record of it.
701		if r.Err != nil {
702			// Do not log when a statement was not executed since there would
703			// have been an earlier error that was already logged.
704			if r.Err == query.ErrNotExecuted {
705				continue
706			}
707			h.Logger.Info("Error while running async query",
708				zap.Stringer("query", q),
709				zap.Error(r.Err))
710		}
711	}
712}
713
714// serveWrite receives incoming series data in line protocol format and writes it to the database.
715func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
716	atomic.AddInt64(&h.stats.WriteRequests, 1)
717	atomic.AddInt64(&h.stats.ActiveWriteRequests, 1)
718	defer func(start time.Time) {
719		atomic.AddInt64(&h.stats.ActiveWriteRequests, -1)
720		atomic.AddInt64(&h.stats.WriteRequestDuration, time.Since(start).Nanoseconds())
721	}(time.Now())
722	h.requestTracker.Add(r, user)
723
724	database := r.URL.Query().Get("db")
725	if database == "" {
726		h.httpError(w, "database is required", http.StatusBadRequest)
727		return
728	}
729
730	if di := h.MetaClient.Database(database); di == nil {
731		h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound)
732		return
733	}
734
735	if h.Config.AuthEnabled {
736		if user == nil {
737			h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden)
738			return
739		}
740
741		if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil {
742			h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden)
743			return
744		}
745	}
746
747	body := r.Body
748	if h.Config.MaxBodySize > 0 {
749		body = truncateReader(body, int64(h.Config.MaxBodySize))
750	}
751
752	// Handle gzip decoding of the body
753	if r.Header.Get("Content-Encoding") == "gzip" {
754		b, err := gzip.NewReader(r.Body)
755		if err != nil {
756			h.httpError(w, err.Error(), http.StatusBadRequest)
757			return
758		}
759		defer b.Close()
760		body = b
761	}
762
763	var bs []byte
764	if r.ContentLength > 0 {
765		if h.Config.MaxBodySize > 0 && r.ContentLength > int64(h.Config.MaxBodySize) {
766			h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
767			return
768		}
769
770		// This will just be an initial hint for the gzip reader, as the
771		// bytes.Buffer will grow as needed when ReadFrom is called
772		bs = make([]byte, 0, r.ContentLength)
773	}
774	buf := bytes.NewBuffer(bs)
775
776	_, err := buf.ReadFrom(body)
777	if err != nil {
778		if err == errTruncated {
779			h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
780			return
781		}
782
783		if h.Config.WriteTracing {
784			h.Logger.Info("Write handler unable to read bytes from request body")
785		}
786		h.httpError(w, err.Error(), http.StatusBadRequest)
787		return
788	}
789	atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len()))
790
791	if h.Config.WriteTracing {
792		h.Logger.Info("Write body received by handler", zap.ByteString("body", buf.Bytes()))
793	}
794
795	points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
796	// Not points parsed correctly so return the error now
797	if parseError != nil && len(points) == 0 {
798		if parseError.Error() == "EOF" {
799			h.writeHeader(w, http.StatusOK)
800			return
801		}
802		h.httpError(w, parseError.Error(), http.StatusBadRequest)
803		return
804	}
805
806	// Determine required consistency level.
807	level := r.URL.Query().Get("consistency")
808	consistency := models.ConsistencyLevelOne
809	if level != "" {
810		var err error
811		consistency, err = models.ParseConsistencyLevel(level)
812		if err != nil {
813			h.httpError(w, err.Error(), http.StatusBadRequest)
814			return
815		}
816	}
817
818	// Write points.
819	if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) {
820		atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
821		h.httpError(w, err.Error(), http.StatusBadRequest)
822		return
823	} else if influxdb.IsAuthorizationError(err) {
824		atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
825		h.httpError(w, err.Error(), http.StatusForbidden)
826		return
827	} else if werr, ok := err.(tsdb.PartialWriteError); ok {
828		atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)-werr.Dropped))
829		atomic.AddInt64(&h.stats.PointsWrittenDropped, int64(werr.Dropped))
830		h.httpError(w, werr.Error(), http.StatusBadRequest)
831		return
832	} else if err != nil {
833		atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
834		h.httpError(w, err.Error(), http.StatusInternalServerError)
835		return
836	} else if parseError != nil {
837		// We wrote some of the points
838		atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
839		// The other points failed to parse which means the client sent invalid line protocol.  We return a 400
840		// response code as well as the lines that failed to parse.
841		h.httpError(w, tsdb.PartialWriteError{Reason: parseError.Error()}.Error(), http.StatusBadRequest)
842		return
843	}
844
845	atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
846	h.writeHeader(w, http.StatusNoContent)
847}
848
849// serveOptions returns an empty response to comply with OPTIONS pre-flight requests
850func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) {
851	h.writeHeader(w, http.StatusNoContent)
852}
853
854// servePing returns a simple response to let the client know the server is running.
855func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
856	verbose := r.URL.Query().Get("verbose")
857	atomic.AddInt64(&h.stats.PingRequests, 1)
858
859	if verbose != "" && verbose != "0" && verbose != "false" {
860		h.writeHeader(w, http.StatusOK)
861		b, _ := json.Marshal(map[string]string{"version": h.Version})
862		w.Write(b)
863	} else {
864		h.writeHeader(w, http.StatusNoContent)
865	}
866}
867
868// serveStatus has been deprecated.
869func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
870	h.Logger.Info("WARNING: /status has been deprecated.  Use /ping instead.")
871	atomic.AddInt64(&h.stats.StatusRequests, 1)
872	h.writeHeader(w, http.StatusNoContent)
873}
874
875// convertToEpoch converts result timestamps from time.Time to the specified epoch.
876func convertToEpoch(r *query.Result, epoch string) {
877	divisor := int64(1)
878
879	switch epoch {
880	case "u":
881		divisor = int64(time.Microsecond)
882	case "ms":
883		divisor = int64(time.Millisecond)
884	case "s":
885		divisor = int64(time.Second)
886	case "m":
887		divisor = int64(time.Minute)
888	case "h":
889		divisor = int64(time.Hour)
890	}
891
892	for _, s := range r.Series {
893		for _, v := range s.Values {
894			if ts, ok := v[0].(time.Time); ok {
895				v[0] = ts.UnixNano() / divisor
896			}
897		}
898	}
899}
900
901// servePromWrite receives data in the Prometheus remote write protocol and writes it
902// to the database
903func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
904	atomic.AddInt64(&h.stats.WriteRequests, 1)
905	atomic.AddInt64(&h.stats.ActiveWriteRequests, 1)
906	atomic.AddInt64(&h.stats.PromWriteRequests, 1)
907	defer func(start time.Time) {
908		atomic.AddInt64(&h.stats.ActiveWriteRequests, -1)
909		atomic.AddInt64(&h.stats.WriteRequestDuration, time.Since(start).Nanoseconds())
910	}(time.Now())
911	h.requestTracker.Add(r, user)
912
913	database := r.URL.Query().Get("db")
914	if database == "" {
915		h.httpError(w, "database is required", http.StatusBadRequest)
916		return
917	}
918
919	if di := h.MetaClient.Database(database); di == nil {
920		h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound)
921		return
922	}
923
924	if h.Config.AuthEnabled {
925		if user == nil {
926			h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden)
927			return
928		}
929
930		if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil {
931			h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden)
932			return
933		}
934	}
935
936	body := r.Body
937	if h.Config.MaxBodySize > 0 {
938		body = truncateReader(body, int64(h.Config.MaxBodySize))
939	}
940
941	var bs []byte
942	if r.ContentLength > 0 {
943		if h.Config.MaxBodySize > 0 && r.ContentLength > int64(h.Config.MaxBodySize) {
944			h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
945			return
946		}
947
948		// This will just be an initial hint for the reader, as the
949		// bytes.Buffer will grow as needed when ReadFrom is called
950		bs = make([]byte, 0, r.ContentLength)
951	}
952	buf := bytes.NewBuffer(bs)
953
954	_, err := buf.ReadFrom(body)
955	if err != nil {
956		if err == errTruncated {
957			h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
958			return
959		}
960
961		if h.Config.WriteTracing {
962			h.Logger.Info("Prom write handler unable to read bytes from request body")
963		}
964		h.httpError(w, err.Error(), http.StatusBadRequest)
965		return
966	}
967	atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len()))
968
969	if h.Config.WriteTracing {
970		h.Logger.Info("Prom write body received by handler", zap.ByteString("body", buf.Bytes()))
971	}
972
973	reqBuf, err := snappy.Decode(nil, buf.Bytes())
974	if err != nil {
975		h.httpError(w, err.Error(), http.StatusBadRequest)
976		return
977	}
978
979	// Convert the Prometheus remote write request to Influx Points
980	var req remote.WriteRequest
981	if err := proto.Unmarshal(reqBuf, &req); err != nil {
982		h.httpError(w, err.Error(), http.StatusBadRequest)
983		return
984	}
985
986	points, err := prometheus.WriteRequestToPoints(&req)
987	if err != nil {
988		if h.Config.WriteTracing {
989			h.Logger.Info("Prom write handler", zap.Error(err))
990		}
991
992		// Check if the error was from something other than dropping invalid values.
993		if _, ok := err.(prometheus.DroppedValuesError); !ok {
994			h.httpError(w, err.Error(), http.StatusBadRequest)
995			return
996		}
997	}
998
999	// Determine required consistency level.
1000	level := r.URL.Query().Get("consistency")
1001	consistency := models.ConsistencyLevelOne
1002	if level != "" {
1003		consistency, err = models.ParseConsistencyLevel(level)
1004		if err != nil {
1005			h.httpError(w, err.Error(), http.StatusBadRequest)
1006			return
1007		}
1008	}
1009
1010	// Write points.
1011	if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) {
1012		atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
1013		h.httpError(w, err.Error(), http.StatusBadRequest)
1014		return
1015	} else if influxdb.IsAuthorizationError(err) {
1016		atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
1017		h.httpError(w, err.Error(), http.StatusForbidden)
1018		return
1019	} else if werr, ok := err.(tsdb.PartialWriteError); ok {
1020		atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)-werr.Dropped))
1021		atomic.AddInt64(&h.stats.PointsWrittenDropped, int64(werr.Dropped))
1022		h.httpError(w, werr.Error(), http.StatusBadRequest)
1023		return
1024	} else if err != nil {
1025		atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
1026		h.httpError(w, err.Error(), http.StatusInternalServerError)
1027		return
1028	}
1029
1030	atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
1031	h.writeHeader(w, http.StatusNoContent)
1032}
1033
1034// servePromRead will convert a Prometheus remote read request into a storage
1035// query and returns data in Prometheus remote read protobuf format.
1036func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user meta.User) {
1037	compressed, err := ioutil.ReadAll(r.Body)
1038	if err != nil {
1039		h.httpError(w, err.Error(), http.StatusInternalServerError)
1040		return
1041	}
1042
1043	reqBuf, err := snappy.Decode(nil, compressed)
1044	if err != nil {
1045		h.httpError(w, err.Error(), http.StatusBadRequest)
1046		return
1047	}
1048
1049	var req remote.ReadRequest
1050	if err := proto.Unmarshal(reqBuf, &req); err != nil {
1051		h.httpError(w, err.Error(), http.StatusBadRequest)
1052		return
1053	}
1054
1055	// Query the DB and create a ReadResponse for Prometheus
1056	db := r.FormValue("db")
1057	rp := r.FormValue("rp")
1058
1059	readRequest, err := prometheus.ReadRequestToInfluxStorageRequest(&req, db, rp)
1060	if err != nil {
1061		h.httpError(w, err.Error(), http.StatusBadRequest)
1062		return
1063	}
1064
1065	respond := func(resp *remote.ReadResponse) {
1066		data, err := proto.Marshal(resp)
1067		if err != nil {
1068			h.httpError(w, err.Error(), http.StatusInternalServerError)
1069			return
1070		}
1071
1072		w.Header().Set("Content-Type", "application/x-protobuf")
1073		w.Header().Set("Content-Encoding", "snappy")
1074
1075		compressed = snappy.Encode(nil, data)
1076		if _, err := w.Write(compressed); err != nil {
1077			h.httpError(w, err.Error(), http.StatusInternalServerError)
1078			return
1079		}
1080
1081		atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(len(compressed)))
1082	}
1083
1084	ctx := context.Background()
1085	rs, err := h.Store.Read(ctx, readRequest)
1086	if err != nil {
1087		h.httpError(w, err.Error(), http.StatusBadRequest)
1088		return
1089	}
1090
1091	resp := &remote.ReadResponse{
1092		Results: []*remote.QueryResult{{}},
1093	}
1094
1095	if rs == nil {
1096		respond(resp)
1097		return
1098	}
1099	defer rs.Close()
1100
1101	for rs.Next() {
1102		cur := rs.Cursor()
1103		if cur == nil {
1104			// no data for series key + field combination
1105			continue
1106		}
1107
1108		tags := prometheus.RemoveInfluxSystemTags(rs.Tags())
1109		var unsupportedCursor string
1110		switch cur := cur.(type) {
1111		case tsdb.FloatArrayCursor:
1112			var series *remote.TimeSeries
1113			for {
1114				a := cur.Next()
1115				if a.Len() == 0 {
1116					break
1117				}
1118
1119				// We have some data for this series.
1120				if series == nil {
1121					series = &remote.TimeSeries{
1122						Labels: prometheus.ModelTagsToLabelPairs(tags),
1123					}
1124				}
1125
1126				for i, ts := range a.Timestamps {
1127					series.Samples = append(series.Samples, &remote.Sample{
1128						TimestampMs: ts / int64(time.Millisecond),
1129						Value:       a.Values[i],
1130					})
1131				}
1132			}
1133
1134			// There was data for the series.
1135			if series != nil {
1136				resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, series)
1137			}
1138		case tsdb.IntegerArrayCursor:
1139			unsupportedCursor = "int64"
1140		case tsdb.UnsignedArrayCursor:
1141			unsupportedCursor = "uint"
1142		case tsdb.BooleanArrayCursor:
1143			unsupportedCursor = "bool"
1144		case tsdb.StringArrayCursor:
1145			unsupportedCursor = "string"
1146		default:
1147			panic(fmt.Sprintf("unreachable: %T", cur))
1148		}
1149		cur.Close()
1150
1151		if len(unsupportedCursor) > 0 {
1152			h.Logger.Info("Prometheus can't read cursor",
1153				zap.String("cursor_type", unsupportedCursor),
1154				zap.Stringer("series", tags),
1155			)
1156		}
1157	}
1158
1159	respond(resp)
1160}
1161
1162func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user meta.User) {
1163	atomic.AddInt64(&h.stats.FluxQueryRequests, 1)
1164	defer func(start time.Time) {
1165		atomic.AddInt64(&h.stats.FluxQueryRequestDuration, time.Since(start).Nanoseconds())
1166	}(time.Now())
1167
1168	req, err := decodeQueryRequest(r)
1169	if err != nil {
1170		h.httpError(w, err.Error(), http.StatusBadRequest)
1171		return
1172	}
1173
1174	ctx := r.Context()
1175	if val := r.FormValue("node_id"); val != "" {
1176		if nodeID, err := strconv.ParseUint(val, 10, 64); err == nil {
1177			ctx = storage.NewContextWithReadOptions(ctx, &storage.ReadOptions{NodeID: nodeID})
1178		}
1179	}
1180
1181	if h.Config.AuthEnabled {
1182		ctx = meta.NewContextWithUser(ctx, user)
1183	}
1184
1185	pr := req.ProxyRequest()
1186
1187	// Logging
1188	var (
1189		stats flux.Statistics
1190		n     int64
1191	)
1192	if h.Config.FluxLogEnabled {
1193		defer func() {
1194			h.logFluxQuery(n, stats, pr.Compiler, err)
1195		}()
1196	}
1197
1198	q, err := h.Controller.Query(ctx, pr.Compiler)
1199	if err != nil {
1200		h.httpError(w, err.Error(), http.StatusInternalServerError)
1201		return
1202	}
1203	defer func() {
1204		q.Cancel()
1205		q.Done()
1206	}()
1207
1208	// NOTE: We do not write out the headers here.
1209	// It is possible that if the encoding step fails
1210	// that we can write an error header so long as
1211	// the encoder did not write anything.
1212	// As such we rely on the http.ResponseWriter behavior
1213	// to write an StatusOK header with the first write.
1214
1215	switch r.Header.Get("Accept") {
1216	case "text/csv":
1217		fallthrough
1218	default:
1219		if hd, ok := pr.Dialect.(httpDialect); !ok {
1220			h.httpError(w, fmt.Sprintf("unsupported dialect over HTTP %T", req.Dialect), http.StatusBadRequest)
1221			return
1222		} else {
1223			hd.SetHeaders(w)
1224		}
1225		encoder := pr.Dialect.Encoder()
1226		results := flux.NewResultIteratorFromQuery(q)
1227		if h.Config.FluxLogEnabled {
1228			defer func() {
1229				stats = results.Statistics()
1230			}()
1231		}
1232		defer results.Release()
1233
1234		n, err = encoder.Encode(w, results)
1235		if err != nil {
1236			if n == 0 {
1237				// If the encoder did not write anything, we can write an error header.
1238				h.httpError(w, err.Error(), http.StatusInternalServerError)
1239			}
1240		}
1241	}
1242}
1243
1244func (h *Handler) logFluxQuery(n int64, stats flux.Statistics, compiler flux.Compiler, err error) {
1245	var q string
1246	switch c := compiler.(type) {
1247	case lang.SpecCompiler:
1248		q = fmt.Sprint(flux.Formatted(c.Spec))
1249	case lang.FluxCompiler:
1250		q = c.Query
1251	}
1252
1253	h.Logger.Info("Executed Flux query",
1254		zap.String("compiler_type", string(compiler.CompilerType())),
1255		zap.Int64("response_size", n),
1256		zap.String("query", q),
1257		zap.Error(err),
1258		zap.Duration("stat_total_duration", stats.TotalDuration),
1259		zap.Duration("stat_compile_duration", stats.CompileDuration),
1260		zap.Duration("stat_queue_duration", stats.QueueDuration),
1261		zap.Duration("stat_plan_duration", stats.PlanDuration),
1262		zap.Duration("stat_requeue_duration", stats.RequeueDuration),
1263		zap.Duration("stat_execute_duration", stats.ExecuteDuration),
1264		zap.Int64("stat_max_allocated", stats.MaxAllocated),
1265		zap.Int("stat_concurrency", stats.Concurrency),
1266	)
1267}
1268
1269// serveExpvar serves internal metrics in /debug/vars format over HTTP.
1270func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
1271	// Retrieve statistics from the monitor.
1272	stats, err := h.Monitor.Statistics(nil)
1273	if err != nil {
1274		h.httpError(w, err.Error(), http.StatusInternalServerError)
1275		return
1276	}
1277
1278	// Retrieve diagnostics from the monitor.
1279	diags, err := h.Monitor.Diagnostics()
1280	if err != nil {
1281		h.httpError(w, err.Error(), http.StatusInternalServerError)
1282		return
1283	}
1284
1285	w.Header().Set("Content-Type", "application/json; charset=utf-8")
1286
1287	first := true
1288	if val := diags["system"]; val != nil {
1289		jv, err := parseSystemDiagnostics(val)
1290		if err != nil {
1291			h.httpError(w, err.Error(), http.StatusInternalServerError)
1292			return
1293		}
1294
1295		data, err := json.Marshal(jv)
1296		if err != nil {
1297			h.httpError(w, err.Error(), http.StatusInternalServerError)
1298			return
1299		}
1300
1301		first = false
1302		fmt.Fprintln(w, "{")
1303		fmt.Fprintf(w, "\"system\": %s", data)
1304	} else {
1305		fmt.Fprintln(w, "{")
1306	}
1307
1308	if val := expvar.Get("cmdline"); val != nil {
1309		if !first {
1310			fmt.Fprintln(w, ",")
1311		}
1312		first = false
1313		fmt.Fprintf(w, "\"cmdline\": %s", val)
1314	}
1315	if val := expvar.Get("memstats"); val != nil {
1316		if !first {
1317			fmt.Fprintln(w, ",")
1318		}
1319		first = false
1320		fmt.Fprintf(w, "\"memstats\": %s", val)
1321	}
1322
1323	for _, s := range stats {
1324		val, err := json.Marshal(s)
1325		if err != nil {
1326			continue
1327		}
1328
1329		// Very hackily create a unique key.
1330		buf := bytes.NewBufferString(s.Name)
1331		if path, ok := s.Tags["path"]; ok {
1332			fmt.Fprintf(buf, ":%s", path)
1333			if id, ok := s.Tags["id"]; ok {
1334				fmt.Fprintf(buf, ":%s", id)
1335			}
1336		} else if bind, ok := s.Tags["bind"]; ok {
1337			if proto, ok := s.Tags["proto"]; ok {
1338				fmt.Fprintf(buf, ":%s", proto)
1339			}
1340			fmt.Fprintf(buf, ":%s", bind)
1341		} else if database, ok := s.Tags["database"]; ok {
1342			fmt.Fprintf(buf, ":%s", database)
1343			if rp, ok := s.Tags["retention_policy"]; ok {
1344				fmt.Fprintf(buf, ":%s", rp)
1345				if name, ok := s.Tags["name"]; ok {
1346					fmt.Fprintf(buf, ":%s", name)
1347				}
1348				if dest, ok := s.Tags["destination"]; ok {
1349					fmt.Fprintf(buf, ":%s", dest)
1350				}
1351			}
1352		}
1353		key := buf.String()
1354
1355		if !first {
1356			fmt.Fprintln(w, ",")
1357		}
1358		first = false
1359		fmt.Fprintf(w, "%q: ", key)
1360		w.Write(bytes.TrimSpace(val))
1361	}
1362	fmt.Fprintln(w, "\n}")
1363}
1364
1365// serveDebugRequests will track requests for a period of time.
1366func (h *Handler) serveDebugRequests(w http.ResponseWriter, r *http.Request) {
1367	var d time.Duration
1368	if s := r.URL.Query().Get("seconds"); s == "" {
1369		d = DefaultDebugRequestsInterval
1370	} else if seconds, err := strconv.ParseInt(s, 10, 64); err != nil {
1371		h.httpError(w, err.Error(), http.StatusBadRequest)
1372		return
1373	} else {
1374		d = time.Duration(seconds) * time.Second
1375		if d > MaxDebugRequestsInterval {
1376			h.httpError(w, fmt.Sprintf("exceeded maximum interval time: %s > %s",
1377				influxql.FormatDuration(d),
1378				influxql.FormatDuration(MaxDebugRequestsInterval)),
1379				http.StatusBadRequest)
1380			return
1381		}
1382	}
1383
1384	var closing <-chan bool
1385	if notifier, ok := w.(http.CloseNotifier); ok {
1386		closing = notifier.CloseNotify()
1387	}
1388
1389	profile := h.requestTracker.TrackRequests()
1390
1391	timer := time.NewTimer(d)
1392	select {
1393	case <-timer.C:
1394		profile.Stop()
1395	case <-closing:
1396		// Connection was closed early.
1397		profile.Stop()
1398		timer.Stop()
1399		return
1400	}
1401
1402	w.Header().Set("Content-Type", "application/json; charset=utf-8")
1403	w.Header().Add("Connection", "close")
1404
1405	fmt.Fprintln(w, "{")
1406	first := true
1407	for req, st := range profile.Requests {
1408		val, err := json.Marshal(st)
1409		if err != nil {
1410			continue
1411		}
1412
1413		if !first {
1414			fmt.Fprintln(w, ",")
1415		}
1416		first = false
1417		fmt.Fprintf(w, "%q: ", req.String())
1418		w.Write(bytes.TrimSpace(val))
1419	}
1420	fmt.Fprintln(w, "\n}")
1421}
1422
1423// parseSystemDiagnostics converts the system diagnostics into an appropriate
1424// format for marshaling to JSON in the /debug/vars format.
1425func parseSystemDiagnostics(d *diagnostics.Diagnostics) (map[string]interface{}, error) {
1426	// We don't need PID in this case.
1427	m := map[string]interface{}{"currentTime": nil, "started": nil, "uptime": nil}
1428	for key := range m {
1429		// Find the associated column.
1430		ci := -1
1431		for i, col := range d.Columns {
1432			if col == key {
1433				ci = i
1434				break
1435			}
1436		}
1437
1438		if ci == -1 {
1439			return nil, fmt.Errorf("unable to find column %q", key)
1440		}
1441
1442		if len(d.Rows) < 1 || len(d.Rows[0]) <= ci {
1443			return nil, fmt.Errorf("no data for column %q", key)
1444		}
1445
1446		var res interface{}
1447		switch v := d.Rows[0][ci].(type) {
1448		case time.Time:
1449			res = v
1450		case string:
1451			// Should be a string representation of a time.Duration
1452			d, err := time.ParseDuration(v)
1453			if err != nil {
1454				return nil, err
1455			}
1456			res = int64(d.Seconds())
1457		default:
1458			return nil, fmt.Errorf("value for column %q is not parsable (got %T)", key, v)
1459		}
1460		m[key] = res
1461	}
1462	return m, nil
1463}
1464
1465// httpError writes an error to the client in a standard format.
1466func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) {
1467	if code == http.StatusUnauthorized {
1468		// If an unauthorized header will be sent back, add a WWW-Authenticate header
1469		// as an authorization challenge.
1470		w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=\"%s\"", h.Config.Realm))
1471	} else if code/100 != 2 {
1472		sz := math.Min(float64(len(errmsg)), 1024.0)
1473		w.Header().Set("X-InfluxDB-Error", errmsg[:int(sz)])
1474	}
1475
1476	response := Response{Err: errors.New(errmsg)}
1477	if rw, ok := w.(ResponseWriter); ok {
1478		h.writeHeader(w, code)
1479		rw.WriteResponse(response)
1480		return
1481	}
1482
1483	// Default implementation if the response writer hasn't been replaced
1484	// with our special response writer type.
1485	w.Header().Add("Content-Type", "application/json")
1486	h.writeHeader(w, code)
1487	b, _ := json.Marshal(response)
1488	w.Write(b)
1489}
1490
1491// Filters and filter helpers
1492
1493type credentials struct {
1494	Method   AuthenticationMethod
1495	Username string
1496	Password string
1497	Token    string
1498}
1499
1500func parseToken(token string) (user, pass string, ok bool) {
1501	s := strings.IndexByte(token, ':')
1502	if s < 0 {
1503		return
1504	}
1505	return token[:s], token[s+1:], true
1506}
1507
1508// parseCredentials parses a request and returns the authentication credentials.
1509// The credentials may be present as URL query params, or as a Basic
1510// Authentication header.
1511// As params: http://127.0.0.1/query?u=username&p=password
1512// As basic auth: http://username:password@127.0.0.1
1513// As Bearer token in Authorization header: Bearer <JWT_TOKEN_BLOB>
1514// As Token in Authorization header: Token <username:password>
1515func parseCredentials(r *http.Request) (*credentials, error) {
1516	q := r.URL.Query()
1517
1518	// Check for username and password in URL params.
1519	if u, p := q.Get("u"), q.Get("p"); u != "" && p != "" {
1520		return &credentials{
1521			Method:   UserAuthentication,
1522			Username: u,
1523			Password: p,
1524		}, nil
1525	}
1526
1527	// Check for the HTTP Authorization header.
1528	if s := r.Header.Get("Authorization"); s != "" {
1529		// Check for Bearer token.
1530		strs := strings.Split(s, " ")
1531		if len(strs) == 2 {
1532			switch strs[0] {
1533			case "Bearer":
1534				return &credentials{
1535					Method: BearerAuthentication,
1536					Token:  strs[1],
1537				}, nil
1538			case "Token":
1539				if u, p, ok := parseToken(strs[1]); ok {
1540					return &credentials{
1541						Method:   UserAuthentication,
1542						Username: u,
1543						Password: p,
1544					}, nil
1545				}
1546			}
1547		}
1548
1549		// Check for basic auth.
1550		if u, p, ok := r.BasicAuth(); ok {
1551			return &credentials{
1552				Method:   UserAuthentication,
1553				Username: u,
1554				Password: p,
1555			}, nil
1556		}
1557	}
1558
1559	return nil, fmt.Errorf("unable to parse authentication credentials")
1560}
1561
1562// authenticate wraps a handler and ensures that if user credentials are passed in
1563// an attempt is made to authenticate that user. If authentication fails, an error is returned.
1564//
1565// There is one exception: if there are no users in the system, authentication is not required. This
1566// is to facilitate bootstrapping of a system with authentication enabled.
1567func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h *Handler, requireAuthentication bool) http.Handler {
1568	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1569		// Return early if we are not authenticating
1570		if !requireAuthentication {
1571			inner(w, r, nil)
1572			return
1573		}
1574		var user meta.User
1575
1576		// TODO corylanou: never allow this in the future without users
1577		if requireAuthentication && h.MetaClient.AdminUserExists() {
1578			creds, err := parseCredentials(r)
1579			if err != nil {
1580				atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
1581				h.httpError(w, err.Error(), http.StatusUnauthorized)
1582				return
1583			}
1584
1585			switch creds.Method {
1586			case UserAuthentication:
1587				if creds.Username == "" {
1588					atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
1589					h.httpError(w, "username required", http.StatusUnauthorized)
1590					return
1591				}
1592
1593				user, err = h.MetaClient.Authenticate(creds.Username, creds.Password)
1594				if err != nil {
1595					atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
1596					h.httpError(w, "authorization failed", http.StatusUnauthorized)
1597					return
1598				}
1599			case BearerAuthentication:
1600				if h.Config.SharedSecret == "" {
1601					atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
1602					h.httpError(w, "bearer auth disabled", http.StatusUnauthorized)
1603					return
1604				}
1605				keyLookupFn := func(token *jwt.Token) (interface{}, error) {
1606					// Check for expected signing method.
1607					if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
1608						return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
1609					}
1610					return []byte(h.Config.SharedSecret), nil
1611				}
1612
1613				// Parse and validate the token.
1614				token, err := jwt.Parse(creds.Token, keyLookupFn)
1615				if err != nil {
1616					h.httpError(w, err.Error(), http.StatusUnauthorized)
1617					return
1618				} else if !token.Valid {
1619					h.httpError(w, "invalid token", http.StatusUnauthorized)
1620					return
1621				}
1622
1623				claims, ok := token.Claims.(jwt.MapClaims)
1624				if !ok {
1625					h.httpError(w, "problem authenticating token", http.StatusInternalServerError)
1626					h.Logger.Info("Could not assert JWT token claims as jwt.MapClaims")
1627					return
1628				}
1629
1630				// Make sure an expiration was set on the token.
1631				if exp, ok := claims["exp"].(float64); !ok || exp <= 0.0 {
1632					h.httpError(w, "token expiration required", http.StatusUnauthorized)
1633					return
1634				}
1635
1636				// Get the username from the token.
1637				username, ok := claims["username"].(string)
1638				if !ok {
1639					h.httpError(w, "username in token must be a string", http.StatusUnauthorized)
1640					return
1641				} else if username == "" {
1642					h.httpError(w, "token must contain a username", http.StatusUnauthorized)
1643					return
1644				}
1645
1646				// Lookup user in the metastore.
1647				if user, err = h.MetaClient.User(username); err != nil {
1648					h.httpError(w, err.Error(), http.StatusUnauthorized)
1649					return
1650				} else if user == nil {
1651					h.httpError(w, meta.ErrUserNotFound.Error(), http.StatusUnauthorized)
1652					return
1653				}
1654			default:
1655				h.httpError(w, "unsupported authentication", http.StatusUnauthorized)
1656			}
1657
1658		}
1659		inner(w, r, user)
1660	})
1661}
1662
1663// cors responds to incoming requests and adds the appropriate cors headers
1664// TODO: corylanou: add the ability to configure this in our config
1665func cors(inner http.Handler) http.Handler {
1666	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1667		if origin := r.Header.Get("Origin"); origin != "" {
1668			w.Header().Set(`Access-Control-Allow-Origin`, origin)
1669			w.Header().Set(`Access-Control-Allow-Methods`, strings.Join([]string{
1670				`DELETE`,
1671				`GET`,
1672				`OPTIONS`,
1673				`POST`,
1674				`PUT`,
1675			}, ", "))
1676
1677			w.Header().Set(`Access-Control-Allow-Headers`, strings.Join([]string{
1678				`Accept`,
1679				`Accept-Encoding`,
1680				`Authorization`,
1681				`Content-Length`,
1682				`Content-Type`,
1683				`X-CSRF-Token`,
1684				`X-HTTP-Method-Override`,
1685			}, ", "))
1686
1687			w.Header().Set(`Access-Control-Expose-Headers`, strings.Join([]string{
1688				`Date`,
1689				`X-InfluxDB-Version`,
1690				`X-InfluxDB-Build`,
1691			}, ", "))
1692		}
1693
1694		if r.Method == "OPTIONS" {
1695			return
1696		}
1697
1698		inner.ServeHTTP(w, r)
1699	})
1700}
1701
1702func requestID(inner http.Handler) http.Handler {
1703	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1704		// X-Request-Id takes priority.
1705		rid := r.Header.Get("X-Request-Id")
1706
1707		// If X-Request-Id is empty, then check Request-Id
1708		if rid == "" {
1709			rid = r.Header.Get("Request-Id")
1710		}
1711
1712		// If Request-Id is empty then generate a v1 UUID.
1713		if rid == "" {
1714			rid = uuid.TimeUUID().String()
1715		}
1716
1717		// We read Request-Id in other handler code so we'll use that naming
1718		// convention from this point in the request cycle.
1719		r.Header.Set("Request-Id", rid)
1720
1721		// Set the request ID on the response headers.
1722		// X-Request-Id is the most common name for a request ID header.
1723		w.Header().Set("X-Request-Id", rid)
1724
1725		// We will also set Request-Id for backwards compatibility with previous
1726		// versions of InfluxDB.
1727		w.Header().Set("Request-Id", rid)
1728
1729		inner.ServeHTTP(w, r)
1730	})
1731}
1732
1733func (h *Handler) logging(inner http.Handler, name string) http.Handler {
1734	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1735		start := time.Now()
1736		l := &responseLogger{w: w}
1737		inner.ServeHTTP(l, r)
1738
1739		if h.accessLogFilters.Match(l.Status()) {
1740			h.CLFLogger.Println(buildLogLine(l, r, start))
1741		}
1742
1743		// Log server errors.
1744		if l.Status()/100 == 5 {
1745			errStr := l.Header().Get("X-InfluxDB-Error")
1746			if errStr != "" {
1747				h.Logger.Error(fmt.Sprintf("[%d] - %q", l.Status(), errStr))
1748			}
1749		}
1750	})
1751}
1752
1753func (h *Handler) responseWriter(inner http.Handler) http.Handler {
1754	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1755		w = NewResponseWriter(w, r)
1756		inner.ServeHTTP(w, r)
1757	})
1758}
1759
1760// if the env var is set, and the value is truthy, then we will *not*
1761// recover from a panic.
1762var willCrash bool
1763
1764func init() {
1765	var err error
1766	if willCrash, err = strconv.ParseBool(os.Getenv(query.PanicCrashEnv)); err != nil {
1767		willCrash = false
1768	}
1769}
1770
1771func (h *Handler) recovery(inner http.Handler, name string) http.Handler {
1772	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1773		start := time.Now()
1774		l := &responseLogger{w: w}
1775
1776		defer func() {
1777			if err := recover(); err != nil {
1778				logLine := buildLogLine(l, r, start)
1779				logLine = fmt.Sprintf("%s [panic:%s] %s", logLine, err, debug.Stack())
1780				h.CLFLogger.Println(logLine)
1781				http.Error(w, http.StatusText(http.StatusInternalServerError), 500)
1782				atomic.AddInt64(&h.stats.RecoveredPanics, 1) // Capture the panic in _internal stats.
1783
1784				if willCrash {
1785					h.CLFLogger.Println("\n\n=====\nAll goroutines now follow:")
1786					buf := debug.Stack()
1787					h.CLFLogger.Printf("%s\n", buf)
1788					os.Exit(1) // If we panic then the Go server will recover.
1789				}
1790			}
1791		}()
1792
1793		inner.ServeHTTP(l, r)
1794	})
1795}
1796
1797// Store describes the behaviour of the storage packages Store type.
1798type Store interface {
1799	Read(ctx context.Context, req *datatypes.ReadRequest) (reads.ResultSet, error)
1800}
1801
1802// Response represents a list of statement results.
1803type Response struct {
1804	Results []*query.Result
1805	Err     error
1806}
1807
1808// MarshalJSON encodes a Response struct into JSON.
1809func (r Response) MarshalJSON() ([]byte, error) {
1810	// Define a struct that outputs "error" as a string.
1811	var o struct {
1812		Results []*query.Result `json:"results,omitempty"`
1813		Err     string          `json:"error,omitempty"`
1814	}
1815
1816	// Copy fields to output struct.
1817	o.Results = r.Results
1818	if r.Err != nil {
1819		o.Err = r.Err.Error()
1820	}
1821
1822	return json.Marshal(&o)
1823}
1824
1825// UnmarshalJSON decodes the data into the Response struct.
1826func (r *Response) UnmarshalJSON(b []byte) error {
1827	var o struct {
1828		Results []*query.Result `json:"results,omitempty"`
1829		Err     string          `json:"error,omitempty"`
1830	}
1831
1832	err := json.Unmarshal(b, &o)
1833	if err != nil {
1834		return err
1835	}
1836	r.Results = o.Results
1837	if o.Err != "" {
1838		r.Err = errors.New(o.Err)
1839	}
1840	return nil
1841}
1842
1843// Error returns the first error from any statement.
1844// Returns nil if no errors occurred on any statements.
1845func (r *Response) Error() error {
1846	if r.Err != nil {
1847		return r.Err
1848	}
1849	for _, rr := range r.Results {
1850		if rr.Err != nil {
1851			return rr.Err
1852		}
1853	}
1854	return nil
1855}
1856
1857// Throttler represents an HTTP throttler that limits the number of concurrent
1858// requests being processed as well as the number of enqueued requests.
1859type Throttler struct {
1860	current  chan struct{}
1861	enqueued chan struct{}
1862
1863	// Maximum amount of time requests can wait in queue.
1864	// Must be set before adding middleware.
1865	EnqueueTimeout time.Duration
1866
1867	Logger *zap.Logger
1868}
1869
1870// NewThrottler returns a new instance of Throttler that limits to concurrentN.
1871// requests processed at a time and maxEnqueueN requests waiting to be processed.
1872func NewThrottler(concurrentN, maxEnqueueN int) *Throttler {
1873	return &Throttler{
1874		current:  make(chan struct{}, concurrentN),
1875		enqueued: make(chan struct{}, concurrentN+maxEnqueueN),
1876		Logger:   zap.NewNop(),
1877	}
1878}
1879
1880// Handler wraps h in a middleware handler that throttles requests.
1881func (t *Throttler) Handler(h http.Handler) http.Handler {
1882	timeout := t.EnqueueTimeout
1883
1884	// Return original handler if concurrent requests is zero.
1885	if cap(t.current) == 0 {
1886		return h
1887	}
1888
1889	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1890		// Start a timer to limit enqueued request times.
1891		var timerCh <-chan time.Time
1892		if timeout > 0 {
1893			timer := time.NewTimer(timeout)
1894			defer timer.Stop()
1895			timerCh = timer.C
1896		}
1897
1898		// Wait for a spot in the queue.
1899		if cap(t.enqueued) > cap(t.current) {
1900			select {
1901			case t.enqueued <- struct{}{}:
1902				defer func() { <-t.enqueued }()
1903			default:
1904				t.Logger.Warn("request throttled, queue full", zap.Duration("d", timeout))
1905				http.Error(w, "request throttled, queue full", http.StatusServiceUnavailable)
1906				return
1907			}
1908		}
1909
1910		// First check if we can immediately send in to current because there is
1911		// available capacity. This helps reduce racyness in tests.
1912		select {
1913		case t.current <- struct{}{}:
1914		default:
1915			// Wait for a spot in the list of concurrent requests, but allow checking the timeout.
1916			select {
1917			case t.current <- struct{}{}:
1918			case <-timerCh:
1919				t.Logger.Warn("request throttled, exceeds timeout", zap.Duration("d", timeout))
1920				http.Error(w, "request throttled, exceeds timeout", http.StatusServiceUnavailable)
1921				return
1922			}
1923		}
1924		defer func() { <-t.current }()
1925
1926		// Execute request.
1927		h.ServeHTTP(w, r)
1928	})
1929}
1930