1package httpd_test
2
3import (
4	"bytes"
5	"errors"
6	"fmt"
7	"io"
8	"log"
9	"math"
10	"mime/multipart"
11	"net/http"
12	"net/http/httptest"
13	"net/url"
14	"os"
15	"reflect"
16	"strings"
17	"sync/atomic"
18	"testing"
19	"time"
20
21	"github.com/dgrijalva/jwt-go"
22	"github.com/gogo/protobuf/proto"
23	"github.com/golang/snappy"
24	"github.com/google/go-cmp/cmp"
25	"github.com/influxdata/influxdb/internal"
26	"github.com/influxdata/influxdb/logger"
27	"github.com/influxdata/influxdb/models"
28	"github.com/influxdata/influxdb/prometheus/remote"
29	"github.com/influxdata/influxdb/query"
30	"github.com/influxdata/influxdb/services/httpd"
31	"github.com/influxdata/influxdb/services/meta"
32	"github.com/influxdata/influxdb/tsdb"
33	"github.com/influxdata/influxql"
34)
35
36// Ensure the handler returns results from a query (including nil results).
37func TestHandler_Query(t *testing.T) {
38	h := NewHandler(false)
39	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
40		if stmt.String() != `SELECT * FROM bar` {
41			t.Fatalf("unexpected query: %s", stmt.String())
42		} else if ctx.Database != `foo` {
43			t.Fatalf("unexpected db: %s", ctx.Database)
44		}
45		ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
46		ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})}
47		return nil
48	}
49
50	w := httptest.NewRecorder()
51	h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil))
52	if w.Code != http.StatusOK {
53		t.Fatalf("unexpected status: %d", w.Code)
54	} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
55		t.Fatalf("unexpected body: %s", body)
56	}
57}
58
59// Ensure the handler returns results from a query passed as a file.
60func TestHandler_Query_File(t *testing.T) {
61	h := NewHandler(false)
62	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
63		if stmt.String() != `SELECT * FROM bar` {
64			t.Fatalf("unexpected query: %s", stmt.String())
65		} else if ctx.Database != `foo` {
66			t.Fatalf("unexpected db: %s", ctx.Database)
67		}
68		ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
69		ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})}
70		return nil
71	}
72
73	var body bytes.Buffer
74	writer := multipart.NewWriter(&body)
75	part, err := writer.CreateFormFile("q", "")
76	if err != nil {
77		t.Fatal(err)
78	}
79	io.WriteString(part, "SELECT * FROM bar")
80
81	if err := writer.Close(); err != nil {
82		t.Fatal(err)
83	}
84
85	r := MustNewJSONRequest("POST", "/query?db=foo", &body)
86	r.Header.Set("Content-Type", writer.FormDataContentType())
87
88	w := httptest.NewRecorder()
89	h.ServeHTTP(w, r)
90	if w.Code != http.StatusOK {
91		t.Fatalf("unexpected status: %d", w.Code)
92	} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
93		t.Fatalf("unexpected body: %s", body)
94	}
95}
96
97// Test query with user authentication.
98func TestHandler_Query_Auth(t *testing.T) {
99	// Create the handler to be tested.
100	h := NewHandler(true)
101
102	// Set mock meta client functions for the handler to use.
103	h.MetaClient.AdminUserExistsFn = func() bool { return true }
104
105	h.MetaClient.UserFn = func(username string) (meta.User, error) {
106		if username != "user1" {
107			return nil, meta.ErrUserNotFound
108		}
109		return &meta.UserInfo{
110			Name:  "user1",
111			Hash:  "abcd",
112			Admin: true,
113		}, nil
114	}
115
116	h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) {
117		if u != "user1" {
118			return nil, fmt.Errorf("unexpected user: exp: user1, got: %s", u)
119		} else if p != "abcd" {
120			return nil, fmt.Errorf("unexpected password: exp: abcd, got: %s", p)
121		}
122		return h.MetaClient.User(u)
123	}
124
125	// Set mock query authorizer for handler to use.
126	h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error {
127		return nil
128	}
129
130	// Set mock statement executor for handler to use.
131	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
132		if stmt.String() != `SELECT * FROM bar` {
133			t.Fatalf("unexpected query: %s", stmt.String())
134		} else if ctx.Database != `foo` {
135			t.Fatalf("unexpected db: %s", ctx.Database)
136		}
137		ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
138		ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})}
139		return nil
140	}
141
142	// Test the handler with valid user and password in the URL parameters.
143	w := httptest.NewRecorder()
144	h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u=user1&p=abcd&db=foo&q=SELECT+*+FROM+bar", nil))
145	if w.Code != http.StatusOK {
146		t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
147	} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
148		t.Fatalf("unexpected body: %s", body)
149	}
150
151	// Test the handler with valid user and password using basic auth.
152	w = httptest.NewRecorder()
153	r := MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)
154	r.SetBasicAuth("user1", "abcd")
155	h.ServeHTTP(w, r)
156	if w.Code != http.StatusOK {
157		t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
158	} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
159		t.Fatalf("unexpected body: %s", body)
160	}
161
162	// Test the handler with valid JWT bearer token.
163	req := MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)
164	// Create a signed JWT token string and add it to the request header.
165	_, signedToken := MustJWTToken("user1", h.Config.SharedSecret, false)
166	req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
167
168	w = httptest.NewRecorder()
169	h.ServeHTTP(w, req)
170	if w.Code != http.StatusOK {
171		t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
172	} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
173		t.Fatalf("unexpected body: %s", body)
174	}
175
176	// Test the handler with JWT token signed with invalid key.
177	req = MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)
178	// Create a signed JWT token string and add it to the request header.
179	_, signedToken = MustJWTToken("user1", "invalid key", false)
180	req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
181
182	w = httptest.NewRecorder()
183	h.ServeHTTP(w, req)
184	if w.Code != http.StatusUnauthorized {
185		t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
186	} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"signature is invalid"}` {
187		t.Fatalf("unexpected body: %s", body)
188	}
189
190	// Test handler with valid JWT token carrying non-existant user.
191	_, signedToken = MustJWTToken("bad_user", h.Config.SharedSecret, false)
192	req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
193
194	w = httptest.NewRecorder()
195	h.ServeHTTP(w, req)
196	if w.Code != http.StatusUnauthorized {
197		t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
198	} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"user not found"}` {
199		t.Fatalf("unexpected body: %s", body)
200	}
201
202	// Test handler with expired JWT token.
203	_, signedToken = MustJWTToken("user1", h.Config.SharedSecret, true)
204	req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
205
206	w = httptest.NewRecorder()
207	h.ServeHTTP(w, req)
208	if w.Code != http.StatusUnauthorized {
209		t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
210	} else if !strings.Contains(w.Body.String(), `{"error":"Token is expired`) {
211		t.Fatalf("unexpected body: %s", w.Body.String())
212	}
213
214	// Test handler with JWT token that has no expiration set.
215	token, _ := MustJWTToken("user1", h.Config.SharedSecret, false)
216	delete(token.Claims.(jwt.MapClaims), "exp")
217	signedToken, err := token.SignedString([]byte(h.Config.SharedSecret))
218	if err != nil {
219		t.Fatal(err)
220	}
221	req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken))
222	w = httptest.NewRecorder()
223	h.ServeHTTP(w, req)
224	if w.Code != http.StatusUnauthorized {
225		t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
226	} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"token expiration required"}` {
227		t.Fatalf("unexpected body: %s", body)
228	}
229
230	// Test the handler with valid user and password in the url and invalid in
231	// basic auth (prioritize url).
232	w = httptest.NewRecorder()
233	r = MustNewJSONRequest("GET", "/query?u=user1&p=abcd&db=foo&q=SELECT+*+FROM+bar", nil)
234	r.SetBasicAuth("user1", "efgh")
235	h.ServeHTTP(w, r)
236	if w.Code != http.StatusOK {
237		t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
238	} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
239		t.Fatalf("unexpected body: %s", body)
240	}
241}
242
243// Ensure the handler returns results from a query (including nil results).
244func TestHandler_QueryRegex(t *testing.T) {
245	h := NewHandler(false)
246	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
247		if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` {
248			t.Fatalf("unexpected query: %s", stmt.String())
249		} else if ctx.Database != `test` {
250			t.Fatalf("unexpected db: %s", ctx.Database)
251		}
252		ctx.Results <- nil
253		return nil
254	}
255
256	w := httptest.NewRecorder()
257	h.ServeHTTP(w, MustNewRequest("GET", "/query?db=test&q=SELECT%20%2A%20FROM%20test%20WHERE%20url%20%3D~%20%2Fhttp%5C%3A%5C%2F%5C%2Fwww.akamai%5C.com%2F", nil))
258}
259
260// Ensure the handler merges results from the same statement.
261func TestHandler_Query_MergeResults(t *testing.T) {
262	h := NewHandler(false)
263	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
264		ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
265		ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
266		return nil
267	}
268
269	w := httptest.NewRecorder()
270	h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil))
271	if w.Code != http.StatusOK {
272		t.Fatalf("unexpected status: %d", w.Code)
273	} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"},{"name":"series1"}]}]}` {
274		t.Fatalf("unexpected body: %s", body)
275	}
276}
277
278// Ensure the handler merges results from the same statement.
279func TestHandler_Query_MergeEmptyResults(t *testing.T) {
280	h := NewHandler(false)
281	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
282		ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}}
283		ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
284		return nil
285	}
286
287	w := httptest.NewRecorder()
288	h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil))
289	if w.Code != http.StatusOK {
290		t.Fatalf("unexpected status: %d", w.Code)
291	} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]}` {
292		t.Fatalf("unexpected body: %s", body)
293	}
294}
295
296// Ensure the handler can parse chunked and chunk size query parameters.
297func TestHandler_Query_Chunked(t *testing.T) {
298	h := NewHandler(false)
299	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
300		if ctx.ChunkSize != 2 {
301			t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize)
302		}
303		ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
304		ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
305		return nil
306	}
307
308	w := httptest.NewRecorder()
309	h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&chunked=true&chunk_size=2", nil))
310	if w.Code != http.StatusOK {
311		t.Fatalf("unexpected status: %d", w.Code)
312	} else if w.Body.String() != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]}]}
313{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]}
314` {
315		t.Fatalf("unexpected body: %s", w.Body.String())
316	}
317}
318
319// Ensure the handler can accept an async query.
320func TestHandler_Query_Async(t *testing.T) {
321	done := make(chan struct{})
322	h := NewHandler(false)
323	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
324		if stmt.String() != `SELECT * FROM bar` {
325			t.Fatalf("unexpected query: %s", stmt.String())
326		} else if ctx.Database != `foo` {
327			t.Fatalf("unexpected db: %s", ctx.Database)
328		}
329		ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
330		ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})}
331		close(done)
332		return nil
333	}
334
335	w := httptest.NewRecorder()
336	h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&async=true", nil))
337	if w.Code != http.StatusNoContent {
338		t.Fatalf("unexpected status: %d", w.Code)
339	} else if body := strings.TrimSpace(w.Body.String()); body != `` {
340		t.Fatalf("unexpected body: %s", body)
341	}
342
343	// Wait to make sure the async query runs and completes.
344	timer := time.NewTimer(100 * time.Millisecond)
345	defer timer.Stop()
346
347	select {
348	case <-timer.C:
349		t.Fatal("timeout while waiting for async query to complete")
350	case <-done:
351	}
352}
353
354// Ensure the handler returns a status 400 if the query is not passed in.
355func TestHandler_Query_ErrQueryRequired(t *testing.T) {
356	h := NewHandler(false)
357	w := httptest.NewRecorder()
358	h.ServeHTTP(w, MustNewJSONRequest("GET", "/query", nil))
359	if w.Code != http.StatusBadRequest {
360		t.Fatalf("unexpected status: %d", w.Code)
361	} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"missing required parameter \"q\""}` {
362		t.Fatalf("unexpected body: %s", body)
363	}
364}
365
366// Ensure the handler returns a status 400 if the query cannot be parsed.
367func TestHandler_Query_ErrInvalidQuery(t *testing.T) {
368	h := NewHandler(false)
369	w := httptest.NewRecorder()
370	h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?q=SELECT", nil))
371	if w.Code != http.StatusBadRequest {
372		t.Fatalf("unexpected status: %d", w.Code)
373	} else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"error parsing query: found EOF, expected identifier, string, number, bool at line 1, char 8"}` {
374		t.Fatalf("unexpected body: %s", body)
375	}
376}
377
378// Ensure the handler returns an appropriate 401 or 403 status when authentication or authorization fails.
379func TestHandler_Query_ErrAuthorize(t *testing.T) {
380	h := NewHandler(true)
381	h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, q *influxql.Query, db string) error {
382		return errors.New("marker")
383	}
384	h.MetaClient.AdminUserExistsFn = func() bool { return true }
385	h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) {
386
387		users := []meta.UserInfo{
388			{
389				Name:  "admin",
390				Hash:  "admin",
391				Admin: true,
392			},
393			{
394				Name: "user1",
395				Hash: "abcd",
396				Privileges: map[string]influxql.Privilege{
397					"db0": influxql.ReadPrivilege,
398				},
399			},
400		}
401
402		for _, user := range users {
403			if u == user.Name {
404				if p == user.Hash {
405					return &user, nil
406				}
407				return nil, meta.ErrAuthenticate
408			}
409		}
410		return nil, meta.ErrUserNotFound
411	}
412
413	for i, tt := range []struct {
414		user     string
415		password string
416		query    string
417		code     int
418	}{
419		{
420			query: "/query?q=SHOW+DATABASES",
421			code:  http.StatusUnauthorized,
422		},
423		{
424			user:     "user1",
425			password: "abcd",
426			query:    "/query?q=SHOW+DATABASES",
427			code:     http.StatusForbidden,
428		},
429		{
430			user:     "user2",
431			password: "abcd",
432			query:    "/query?q=SHOW+DATABASES",
433			code:     http.StatusUnauthorized,
434		},
435	} {
436		w := httptest.NewRecorder()
437		r := MustNewJSONRequest("GET", tt.query, nil)
438		params := r.URL.Query()
439		if tt.user != "" {
440			params.Set("u", tt.user)
441		}
442		if tt.password != "" {
443			params.Set("p", tt.password)
444		}
445		r.URL.RawQuery = params.Encode()
446
447		h.ServeHTTP(w, r)
448		if w.Code != tt.code {
449			t.Errorf("%d. unexpected status: got=%d exp=%d\noutput: %s", i, w.Code, tt.code, w.Body.String())
450		}
451	}
452}
453
454// Ensure the handler returns a status 200 if an error is returned in the result.
455func TestHandler_Query_ErrResult(t *testing.T) {
456	h := NewHandler(false)
457	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
458		return errors.New("measurement not found")
459	}
460
461	w := httptest.NewRecorder()
462	h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SHOW+SERIES+from+bin", nil))
463	if w.Code != http.StatusOK {
464		t.Fatalf("unexpected status: %d", w.Code)
465	} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":0,"error":"measurement not found"}]}` {
466		t.Fatalf("unexpected body: %s", body)
467	}
468}
469
470// Ensure that closing the HTTP connection causes the query to be interrupted.
471func TestHandler_Query_CloseNotify(t *testing.T) {
472	// Avoid leaking a goroutine when this fails.
473	done := make(chan struct{})
474	defer close(done)
475
476	interrupted := make(chan struct{})
477	h := NewHandler(false)
478	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
479		select {
480		case <-ctx.Done():
481		case <-done:
482		}
483		close(interrupted)
484		return nil
485	}
486
487	s := httptest.NewServer(h)
488	defer s.Close()
489
490	// Parse the URL and generate a query request.
491	u, err := url.Parse(s.URL)
492	if err != nil {
493		t.Fatal(err)
494	}
495	u.Path = "/query"
496
497	values := url.Values{}
498	values.Set("q", "SELECT * FROM cpu")
499	values.Set("db", "db0")
500	values.Set("rp", "rp0")
501	values.Set("chunked", "true")
502	u.RawQuery = values.Encode()
503
504	req, err := http.NewRequest("GET", u.String(), nil)
505	if err != nil {
506		t.Fatal(err)
507	}
508
509	// Perform the request and retrieve the response.
510	resp, err := http.DefaultClient.Do(req)
511	if err != nil {
512		t.Fatal(err)
513	}
514
515	// Validate that the interrupted channel has NOT been closed yet.
516	timer := time.NewTimer(100 * time.Millisecond)
517	select {
518	case <-interrupted:
519		timer.Stop()
520		t.Fatal("query interrupted unexpectedly")
521	case <-timer.C:
522	}
523
524	// Close the response body which should abort the query in the handler.
525	resp.Body.Close()
526
527	// The query should abort within 100 milliseconds.
528	timer.Reset(100 * time.Millisecond)
529	select {
530	case <-interrupted:
531		timer.Stop()
532	case <-timer.C:
533		t.Fatal("timeout while waiting for query to abort")
534	}
535}
536
537// Ensure the prometheus remote write works
538func TestHandler_PromWrite(t *testing.T) {
539	req := &remote.WriteRequest{
540		Timeseries: []*remote.TimeSeries{
541			{
542				Labels: []*remote.LabelPair{
543					{Name: "host", Value: "a"},
544					{Name: "region", Value: "west"},
545				},
546				Samples: []*remote.Sample{
547					{TimestampMs: 1, Value: 1.2},
548					{TimestampMs: 2, Value: math.NaN()},
549				},
550			},
551		},
552	}
553
554	data, err := proto.Marshal(req)
555	if err != nil {
556		t.Fatal("couldn't marshal prometheus request")
557	}
558	compressed := snappy.Encode(nil, data)
559
560	b := bytes.NewReader(compressed)
561	h := NewHandler(false)
562	h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
563		return &meta.DatabaseInfo{}
564	}
565	called := false
566	h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
567		called = true
568		point := points[0]
569		if point.UnixNano() != int64(time.Millisecond) {
570			t.Fatalf("Exp point time %d but got %d", int64(time.Millisecond), point.UnixNano())
571		}
572		tags := point.Tags()
573		expectedTags := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
574		if !reflect.DeepEqual(tags, expectedTags) {
575			t.Fatalf("tags don't match\n\texp: %v\n\tgot: %v", expectedTags, tags)
576		}
577
578		fields, err := point.Fields()
579		if err != nil {
580			t.Fatal(err.Error())
581		}
582		expFields := models.Fields{"value": 1.2}
583		if !reflect.DeepEqual(fields, expFields) {
584			t.Fatalf("fields don't match\n\texp: %v\n\tgot: %v", expFields, fields)
585		}
586		return nil
587	}
588
589	w := httptest.NewRecorder()
590	h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b))
591	if !called {
592		t.Fatal("WritePoints: expected call")
593	}
594	if w.Code != http.StatusNoContent {
595		t.Fatalf("unexpected status: %d", w.Code)
596	}
597}
598
599// Ensure Prometheus remote read requests are converted to the correct InfluxQL query and
600// data is returned
601func TestHandler_PromRead(t *testing.T) {
602	req := &remote.ReadRequest{
603		Queries: []*remote.Query{{
604			Matchers: []*remote.LabelMatcher{
605				{
606					Type:  remote.MatchType_EQUAL,
607					Name:  "__name__",
608					Value: "value",
609				},
610			},
611			StartTimestampMs: 1,
612			EndTimestampMs:   2,
613		}},
614	}
615	data, err := proto.Marshal(req)
616	if err != nil {
617		t.Fatal("couldn't marshal prometheus request")
618	}
619	compressed := snappy.Encode(nil, data)
620	b := bytes.NewReader(compressed)
621	h := NewHandler(false)
622	w := httptest.NewRecorder()
623
624	// Number of results in the result set
625	var i int64
626	h.Store.ResultSet.NextFn = func() bool {
627		i++
628		return i <= 2
629	}
630
631	// data for each cursor.
632	h.Store.ResultSet.CursorFn = func() tsdb.Cursor {
633		cursor := internal.NewFloatBatchCursorMock()
634
635		var i int64
636		cursor.NextFn = func() ([]int64, []float64) {
637			i++
638			ts := []int64{22000000 * i, 10000000000 * i}
639			vs := []float64{2.3, 2992.33}
640			if i > 2 {
641				ts, vs = nil, nil
642			}
643			return ts, vs
644		}
645
646		return cursor
647	}
648
649	// Tags for each cursor.
650	h.Store.ResultSet.TagsFn = func() models.Tags {
651		return models.NewTags(map[string]string{
652			"host":         fmt.Sprintf("server-%d", i),
653			"_measurement": "mem",
654		})
655	}
656
657	h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/read?db=foo&rp=bar", b))
658	if w.Code != http.StatusOK {
659		t.Fatalf("unexpected status: %d", w.Code)
660	}
661
662	reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
663	if err != nil {
664		t.Fatal(err)
665	}
666
667	var resp remote.ReadResponse
668	if err := proto.Unmarshal(reqBuf, &resp); err != nil {
669		t.Fatal(err)
670	}
671
672	expResults := []*remote.QueryResult{
673		{
674			Timeseries: []*remote.TimeSeries{
675				{
676					Labels: []*remote.LabelPair{
677						{Name: "host", Value: "server-1"},
678					},
679					Samples: []*remote.Sample{
680						{TimestampMs: 22, Value: 2.3},
681						{TimestampMs: 10000, Value: 2992.33},
682						{TimestampMs: 44, Value: 2.3},
683						{TimestampMs: 20000, Value: 2992.33},
684					},
685				},
686				{
687					Labels: []*remote.LabelPair{
688						{Name: "host", Value: "server-2"},
689					},
690					Samples: []*remote.Sample{
691						{TimestampMs: 22, Value: 2.3},
692						{TimestampMs: 10000, Value: 2992.33},
693						{TimestampMs: 44, Value: 2.3},
694						{TimestampMs: 20000, Value: 2992.33},
695					},
696				},
697			},
698		},
699	}
700
701	if !reflect.DeepEqual(resp.Results, expResults) {
702		t.Fatalf("Results differ:\n%v", cmp.Diff(resp.Results, expResults))
703	}
704}
705
706func TestHandler_PromRead_NoResults(t *testing.T) {
707	req := &remote.ReadRequest{Queries: []*remote.Query{&remote.Query{
708		Matchers: []*remote.LabelMatcher{
709			{
710				Type:  remote.MatchType_EQUAL,
711				Name:  "__name__",
712				Value: "value",
713			},
714		},
715		StartTimestampMs: 0,
716		EndTimestampMs:   models.MaxNanoTime / int64(time.Millisecond),
717	}}}
718	data, err := proto.Marshal(req)
719	if err != nil {
720		t.Fatal("couldn't marshal prometheus request")
721	}
722	compressed := snappy.Encode(nil, data)
723	h := NewHandler(false)
724	w := httptest.NewRecorder()
725
726	b := bytes.NewReader(compressed)
727	h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b))
728	if w.Code != http.StatusOK {
729		t.Fatalf("unexpected status: %d", w.Code)
730	}
731	reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
732	if err != nil {
733		t.Fatal(err.Error())
734	}
735
736	var resp remote.ReadResponse
737	if err := proto.Unmarshal(reqBuf, &resp); err != nil {
738		t.Fatal(err.Error())
739	}
740}
741
742func TestHandler_PromRead_UnsupportedCursors(t *testing.T) {
743	req := &remote.ReadRequest{Queries: []*remote.Query{&remote.Query{
744		Matchers: []*remote.LabelMatcher{
745			{
746				Type:  remote.MatchType_EQUAL,
747				Name:  "__name__",
748				Value: "value",
749			},
750		},
751		StartTimestampMs: 0,
752		EndTimestampMs:   models.MaxNanoTime / int64(time.Millisecond),
753	}}}
754	data, err := proto.Marshal(req)
755	if err != nil {
756		t.Fatal("couldn't marshal prometheus request")
757	}
758	compressed := snappy.Encode(nil, data)
759
760	unsupported := []tsdb.Cursor{
761		internal.NewIntegerBatchCursorMock(),
762		internal.NewBooleanBatchCursorMock(),
763		internal.NewUnsignedBatchCursorMock(),
764		internal.NewStringBatchCursorMock(),
765	}
766
767	for _, cursor := range unsupported {
768		h := NewHandler(false)
769		w := httptest.NewRecorder()
770		var lb bytes.Buffer
771		h.Logger = logger.New(&lb)
772
773		more := true
774		h.Store.ResultSet.NextFn = func() bool { defer func() { more = false }(); return more }
775
776		// Set the cursor type that will be returned while iterating over
777		// the mock store.
778		h.Store.ResultSet.CursorFn = func() tsdb.Cursor {
779			return cursor
780		}
781
782		b := bytes.NewReader(compressed)
783		h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b))
784		if w.Code != http.StatusOK {
785			t.Fatalf("unexpected status: %d", w.Code)
786		}
787		reqBuf, err := snappy.Decode(nil, w.Body.Bytes())
788		if err != nil {
789			t.Fatal(err.Error())
790		}
791
792		var resp remote.ReadResponse
793		if err := proto.Unmarshal(reqBuf, &resp); err != nil {
794			t.Fatal(err.Error())
795		}
796
797		if !strings.Contains(lb.String(), "cursor_type=") {
798			t.Fatalf("got log message %q, expected to contain \"cursor_type\"", lb.String())
799		}
800	}
801}
802
803// Ensure the handler handles ping requests correctly.
804// TODO: This should be expanded to verify the MetaClient check in servePing is working correctly
805func TestHandler_Ping(t *testing.T) {
806	h := NewHandler(false)
807	w := httptest.NewRecorder()
808	h.ServeHTTP(w, MustNewRequest("GET", "/ping", nil))
809	if w.Code != http.StatusNoContent {
810		t.Fatalf("unexpected status: %d", w.Code)
811	}
812	h.ServeHTTP(w, MustNewRequest("HEAD", "/ping", nil))
813	if w.Code != http.StatusNoContent {
814		t.Fatalf("unexpected status: %d", w.Code)
815	}
816}
817
818// Ensure the handler returns the version correctly from the different endpoints.
819func TestHandler_Version(t *testing.T) {
820	h := NewHandler(false)
821	h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
822		return nil
823	}
824	tests := []struct {
825		method   string
826		endpoint string
827		body     io.Reader
828	}{
829		{
830			method:   "GET",
831			endpoint: "/ping",
832			body:     nil,
833		},
834		{
835			method:   "GET",
836			endpoint: "/query?db=foo&q=SELECT+*+FROM+bar",
837			body:     nil,
838		},
839		{
840			method:   "POST",
841			endpoint: "/write",
842			body:     bytes.NewReader(make([]byte, 10)),
843		},
844		{
845			method:   "GET",
846			endpoint: "/notfound",
847			body:     nil,
848		},
849	}
850
851	for _, test := range tests {
852		w := httptest.NewRecorder()
853		h.ServeHTTP(w, MustNewRequest(test.method, test.endpoint, test.body))
854		if v := w.HeaderMap["X-Influxdb-Version"]; len(v) > 0 {
855			if v[0] != "0.0.0" {
856				t.Fatalf("unexpected version: %s", v)
857			}
858		} else {
859			t.Fatalf("Header entry 'X-Influxdb-Version' not present")
860		}
861
862		if v := w.HeaderMap["X-Influxdb-Build"]; len(v) > 0 {
863			if v[0] != "OSS" {
864				t.Fatalf("unexpected BuildType: %s", v)
865			}
866		} else {
867			t.Fatalf("Header entry 'X-Influxdb-Build' not present")
868		}
869	}
870}
871
872// Ensure the handler handles status requests correctly.
873func TestHandler_Status(t *testing.T) {
874	h := NewHandler(false)
875	w := httptest.NewRecorder()
876	h.ServeHTTP(w, MustNewRequest("GET", "/status", nil))
877	if w.Code != http.StatusNoContent {
878		t.Fatalf("unexpected status: %d", w.Code)
879	}
880	h.ServeHTTP(w, MustNewRequest("HEAD", "/status", nil))
881	if w.Code != http.StatusNoContent {
882		t.Fatalf("unexpected status: %d", w.Code)
883	}
884}
885
886// Ensure write endpoint can handle bad requests
887func TestHandler_HandleBadRequestBody(t *testing.T) {
888	b := bytes.NewReader(make([]byte, 10))
889	h := NewHandler(false)
890	w := httptest.NewRecorder()
891	h.ServeHTTP(w, MustNewRequest("POST", "/write", b))
892	if w.Code != http.StatusBadRequest {
893		t.Fatalf("unexpected status: %d", w.Code)
894	}
895}
896
897func TestHandler_Write_EntityTooLarge_ContentLength(t *testing.T) {
898	b := bytes.NewReader(make([]byte, 100))
899	h := NewHandler(false)
900	h.Config.MaxBodySize = 5
901	h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
902		return &meta.DatabaseInfo{}
903	}
904
905	w := httptest.NewRecorder()
906	h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b))
907	if w.Code != http.StatusRequestEntityTooLarge {
908		t.Fatalf("unexpected status: %d", w.Code)
909	}
910}
911
912func TestHandler_Write_SuppressLog(t *testing.T) {
913	var buf bytes.Buffer
914	c := httpd.NewConfig()
915	c.SuppressWriteLog = true
916	h := NewHandlerWithConfig(c)
917	h.CLFLogger = log.New(&buf, "", log.LstdFlags)
918	h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
919		return &meta.DatabaseInfo{}
920	}
921	h.PointsWriter.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
922		return nil
923	}
924
925	b := strings.NewReader("cpu,host=server01 value=2\n")
926	w := httptest.NewRecorder()
927	h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b))
928	if w.Code != http.StatusNoContent {
929		t.Fatalf("unexpected status: %d", w.Code)
930	}
931
932	// If the log has anything in it, this failed.
933	if buf.Len() > 0 {
934		t.Fatalf("expected no bytes to be written to the log, got %d", buf.Len())
935	}
936}
937
938// onlyReader implements io.Reader only to ensure Request.ContentLength is not set
939type onlyReader struct {
940	r io.Reader
941}
942
943func (o onlyReader) Read(p []byte) (n int, err error) {
944	return o.r.Read(p)
945}
946
947func TestHandler_Write_EntityTooLarge_NoContentLength(t *testing.T) {
948	b := onlyReader{bytes.NewReader(make([]byte, 100))}
949	h := NewHandler(false)
950	h.Config.MaxBodySize = 5
951	h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
952		return &meta.DatabaseInfo{}
953	}
954
955	w := httptest.NewRecorder()
956	h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b))
957	if w.Code != http.StatusRequestEntityTooLarge {
958		t.Fatalf("unexpected status: %d", w.Code)
959	}
960}
961
962// TestHandler_Write_NegativeMaxBodySize verifies no error occurs if MaxBodySize is < 0
963func TestHandler_Write_NegativeMaxBodySize(t *testing.T) {
964	b := bytes.NewReader([]byte(`foo n=1`))
965	h := NewHandler(false)
966	h.Config.MaxBodySize = -1
967	h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo {
968		return &meta.DatabaseInfo{}
969	}
970	called := false
971	h.PointsWriter.WritePointsFn = func(_, _ string, _ models.ConsistencyLevel, _ meta.User, _ []models.Point) error {
972		called = true
973		return nil
974	}
975
976	w := httptest.NewRecorder()
977	h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b))
978	if !called {
979		t.Fatal("WritePoints: expected call")
980	}
981	if w.Code != http.StatusNoContent {
982		t.Fatalf("unexpected status: %d", w.Code)
983	}
984}
985
986// Ensure X-Forwarded-For header writes the correct log message.
987func TestHandler_XForwardedFor(t *testing.T) {
988	var buf bytes.Buffer
989	h := NewHandler(false)
990	h.CLFLogger = log.New(&buf, "", 0)
991
992	req := MustNewRequest("GET", "/query", nil)
993	req.Header.Set("X-Forwarded-For", "192.168.0.1")
994	req.RemoteAddr = "127.0.0.1"
995	h.ServeHTTP(httptest.NewRecorder(), req)
996
997	parts := strings.Split(buf.String(), " ")
998	if parts[0] != "192.168.0.1,127.0.0.1" {
999		t.Errorf("unexpected host ip address: %s", parts[0])
1000	}
1001}
1002
1003func TestHandler_XRequestId(t *testing.T) {
1004	var buf bytes.Buffer
1005	h := NewHandler(false)
1006	h.CLFLogger = log.New(&buf, "", 0)
1007
1008	cases := []map[string]string{
1009		{"X-Request-Id": "abc123", "Request-Id": ""},          // X-Request-Id is used.
1010		{"X-REQUEST-ID": "cde", "Request-Id": ""},             // X-REQUEST-ID is used.
1011		{"X-Request-Id": "", "Request-Id": "foobarzoo"},       // Request-Id is used.
1012		{"X-Request-Id": "abc123", "Request-Id": "foobarzoo"}, // X-Request-Id takes precedence.
1013		{"X-Request-Id": "", "Request-Id": ""},                // v1 UUID generated.
1014	}
1015
1016	for _, c := range cases {
1017		t.Run(fmt.Sprint(c), func(t *testing.T) {
1018			buf.Reset()
1019			req := MustNewRequest("GET", "/ping", nil)
1020			req.RemoteAddr = "127.0.0.1"
1021
1022			// Set the relevant request ID headers
1023			var allEmpty = true
1024			for k, v := range c {
1025				req.Header.Set(k, v)
1026				if v != "" {
1027					allEmpty = false
1028				}
1029			}
1030
1031			w := httptest.NewRecorder()
1032			h.ServeHTTP(w, req)
1033
1034			// Split up the HTTP log line. The request ID is currently located in
1035			// index 12. If the log line gets changed in the future, this test
1036			// will likely break and the index will need to be updated.
1037			parts := strings.Split(buf.String(), " ")
1038			i := 12
1039
1040			// If neither header is set then we expect a v1 UUID to be generated.
1041			if allEmpty {
1042				if got, exp := len(parts[i]), 36; got != exp {
1043					t.Fatalf("got ID of length %d, expected one of length %d", got, exp)
1044				}
1045			} else if c["X-Request-Id"] != "" {
1046				if got, exp := parts[i], c["X-Request-Id"]; got != exp {
1047					t.Fatalf("got ID of %q, expected %q", got, exp)
1048				}
1049			} else if c["X-REQUEST-ID"] != "" {
1050				if got, exp := parts[i], c["X-REQUEST-ID"]; got != exp {
1051					t.Fatalf("got ID of %q, expected %q", got, exp)
1052				}
1053			} else {
1054				if got, exp := parts[i], c["Request-Id"]; got != exp {
1055					t.Fatalf("got ID of %q, expected %q", got, exp)
1056				}
1057			}
1058
1059			// Check response headers
1060			if got, exp := w.Header().Get("Request-Id"), parts[i]; got != exp {
1061				t.Fatalf("Request-Id header was %s, expected %s", got, exp)
1062			} else if got, exp := w.Header().Get("X-Request-Id"), parts[i]; got != exp {
1063				t.Fatalf("X-Request-Id header was %s, expected %s", got, exp)
1064			}
1065		})
1066	}
1067}
1068
1069func TestThrottler_Handler(t *testing.T) {
1070	t.Run("OK", func(t *testing.T) {
1071		throttler := httpd.NewThrottler(2, 98)
1072
1073		// Send the total number of concurrent requests to the channel.
1074		var concurrentN int32
1075		concurrentCh := make(chan int)
1076
1077		h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1078			atomic.AddInt32(&concurrentN, 1)
1079			concurrentCh <- int(atomic.LoadInt32(&concurrentN))
1080			time.Sleep(1 * time.Millisecond)
1081			atomic.AddInt32(&concurrentN, -1)
1082		}))
1083
1084		// Execute requests concurrently.
1085		const n = 100
1086		for i := 0; i < n; i++ {
1087			go func() { h.ServeHTTP(nil, nil) }()
1088		}
1089
1090		// Read the number of concurrent requests for every execution.
1091		for i := 0; i < n; i++ {
1092			if v := <-concurrentCh; v > 2 {
1093				t.Fatalf("concurrent requests exceed maximum: %d", v)
1094			}
1095		}
1096	})
1097
1098	t.Run("ErrTimeout", func(t *testing.T) {
1099		throttler := httpd.NewThrottler(2, 1)
1100		throttler.EnqueueTimeout = 1 * time.Millisecond
1101
1102		begin, end := make(chan struct{}), make(chan struct{})
1103		h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1104			begin <- struct{}{}
1105			end <- struct{}{}
1106		}))
1107
1108		// First two requests should execute immediately.
1109		go func() { h.ServeHTTP(nil, nil) }()
1110		go func() { h.ServeHTTP(nil, nil) }()
1111
1112		<-begin
1113		<-begin
1114
1115		// Third request should be enqueued but timeout.
1116		w := httptest.NewRecorder()
1117		h.ServeHTTP(w, nil)
1118		if w.Code != http.StatusServiceUnavailable {
1119			t.Fatalf("unexpected status code: %d", w.Code)
1120		} else if body := w.Body.String(); body != "request throttled, exceeds timeout\n" {
1121			t.Fatalf("unexpected response body: %q", body)
1122		}
1123
1124		// Allow 2 existing requests to complete.
1125		<-end
1126		<-end
1127	})
1128
1129	t.Run("ErrFull", func(t *testing.T) {
1130		delay := 100 * time.Millisecond
1131		if os.Getenv("CI") != "" {
1132			delay = 2 * time.Second
1133		}
1134
1135		throttler := httpd.NewThrottler(2, 1)
1136
1137		resp := make(chan struct{})
1138		h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1139			resp <- struct{}{}
1140		}))
1141
1142		// First two requests should execute immediately and third should be queued.
1143		go func() { h.ServeHTTP(nil, nil) }()
1144		go func() { h.ServeHTTP(nil, nil) }()
1145		go func() { h.ServeHTTP(nil, nil) }()
1146		time.Sleep(delay)
1147
1148		// Fourth request should fail when trying to enqueue.
1149		w := httptest.NewRecorder()
1150		h.ServeHTTP(w, nil)
1151		if w.Code != http.StatusServiceUnavailable {
1152			t.Fatalf("unexpected status code: %d", w.Code)
1153		} else if body := w.Body.String(); body != "request throttled, queue full\n" {
1154			t.Fatalf("unexpected response body: %q", body)
1155		}
1156
1157		// Allow 3 existing requests to complete.
1158		<-resp
1159		<-resp
1160		<-resp
1161	})
1162}
1163
1164// NewHandler represents a test wrapper for httpd.Handler.
1165type Handler struct {
1166	*httpd.Handler
1167	MetaClient        *internal.MetaClientMock
1168	StatementExecutor HandlerStatementExecutor
1169	QueryAuthorizer   HandlerQueryAuthorizer
1170	PointsWriter      HandlerPointsWriter
1171	Store             *internal.StorageStoreMock
1172}
1173
1174// NewHandler returns a new instance of Handler.
1175func NewHandler(requireAuthentication bool) *Handler {
1176	config := httpd.NewConfig()
1177	config.AuthEnabled = requireAuthentication
1178	config.SharedSecret = "super secret key"
1179	return NewHandlerWithConfig(config)
1180}
1181
1182func NewHandlerWithConfig(config httpd.Config) *Handler {
1183	h := &Handler{
1184		Handler: httpd.NewHandler(config),
1185	}
1186
1187	h.MetaClient = &internal.MetaClientMock{}
1188	h.Store = internal.NewStorageStoreMock()
1189
1190	h.Handler.MetaClient = h.MetaClient
1191	h.Handler.Store = h.Store
1192	h.Handler.QueryExecutor = query.NewExecutor()
1193	h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor
1194	h.Handler.QueryAuthorizer = &h.QueryAuthorizer
1195	h.Handler.PointsWriter = &h.PointsWriter
1196	h.Handler.Version = "0.0.0"
1197	h.Handler.BuildType = "OSS"
1198
1199	if testing.Verbose() {
1200		l := logger.New(os.Stdout)
1201		h.Handler.Logger = l
1202		h.Handler.Store.WithLogger(l)
1203	}
1204
1205	return h
1206}
1207
1208// HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor.
1209type HandlerStatementExecutor struct {
1210	ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
1211}
1212
1213func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
1214	return e.ExecuteStatementFn(stmt, ctx)
1215}
1216
1217// HandlerQueryAuthorizer is a mock implementation of Handler.QueryAuthorizer.
1218type HandlerQueryAuthorizer struct {
1219	AuthorizeQueryFn func(u meta.User, query *influxql.Query, database string) error
1220}
1221
1222func (a *HandlerQueryAuthorizer) AuthorizeQuery(u meta.User, query *influxql.Query, database string) error {
1223	return a.AuthorizeQueryFn(u, query, database)
1224}
1225
1226type HandlerPointsWriter struct {
1227	WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
1228}
1229
1230func (h *HandlerPointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
1231	return h.WritePointsFn(database, retentionPolicy, consistencyLevel, user, points)
1232}
1233
1234// MustNewRequest returns a new HTTP request. Panic on error.
1235func MustNewRequest(method, urlStr string, body io.Reader) *http.Request {
1236	r, err := http.NewRequest(method, urlStr, body)
1237	if err != nil {
1238		panic(err.Error())
1239	}
1240	return r
1241}
1242
1243// MustNewRequest returns a new HTTP request with the content type set. Panic on error.
1244func MustNewJSONRequest(method, urlStr string, body io.Reader) *http.Request {
1245	r := MustNewRequest(method, urlStr, body)
1246	r.Header.Set("Accept", "application/json")
1247	return r
1248}
1249
1250// MustJWTToken returns a new JWT token and signed string or panics trying.
1251func MustJWTToken(username, secret string, expired bool) (*jwt.Token, string) {
1252	token := jwt.New(jwt.GetSigningMethod("HS512"))
1253	token.Claims.(jwt.MapClaims)["username"] = username
1254	if expired {
1255		token.Claims.(jwt.MapClaims)["exp"] = time.Now().Add(-time.Second).Unix()
1256	} else {
1257		token.Claims.(jwt.MapClaims)["exp"] = time.Now().Add(time.Minute * 10).Unix()
1258	}
1259	signed, err := token.SignedString([]byte(secret))
1260	if err != nil {
1261		panic(err)
1262	}
1263	return token, signed
1264}
1265