1package httpd_test 2 3import ( 4 "bytes" 5 "errors" 6 "fmt" 7 "io" 8 "log" 9 "math" 10 "mime/multipart" 11 "net/http" 12 "net/http/httptest" 13 "net/url" 14 "os" 15 "reflect" 16 "strings" 17 "sync/atomic" 18 "testing" 19 "time" 20 21 "github.com/dgrijalva/jwt-go" 22 "github.com/gogo/protobuf/proto" 23 "github.com/golang/snappy" 24 "github.com/google/go-cmp/cmp" 25 "github.com/influxdata/influxdb/internal" 26 "github.com/influxdata/influxdb/logger" 27 "github.com/influxdata/influxdb/models" 28 "github.com/influxdata/influxdb/prometheus/remote" 29 "github.com/influxdata/influxdb/query" 30 "github.com/influxdata/influxdb/services/httpd" 31 "github.com/influxdata/influxdb/services/meta" 32 "github.com/influxdata/influxdb/tsdb" 33 "github.com/influxdata/influxql" 34) 35 36// Ensure the handler returns results from a query (including nil results). 37func TestHandler_Query(t *testing.T) { 38 h := NewHandler(false) 39 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 40 if stmt.String() != `SELECT * FROM bar` { 41 t.Fatalf("unexpected query: %s", stmt.String()) 42 } else if ctx.Database != `foo` { 43 t.Fatalf("unexpected db: %s", ctx.Database) 44 } 45 ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} 46 ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} 47 return nil 48 } 49 50 w := httptest.NewRecorder() 51 h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) 52 if w.Code != http.StatusOK { 53 t.Fatalf("unexpected status: %d", w.Code) 54 } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { 55 t.Fatalf("unexpected body: %s", body) 56 } 57} 58 59// Ensure the handler returns results from a query passed as a file. 60func TestHandler_Query_File(t *testing.T) { 61 h := NewHandler(false) 62 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 63 if stmt.String() != `SELECT * FROM bar` { 64 t.Fatalf("unexpected query: %s", stmt.String()) 65 } else if ctx.Database != `foo` { 66 t.Fatalf("unexpected db: %s", ctx.Database) 67 } 68 ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} 69 ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} 70 return nil 71 } 72 73 var body bytes.Buffer 74 writer := multipart.NewWriter(&body) 75 part, err := writer.CreateFormFile("q", "") 76 if err != nil { 77 t.Fatal(err) 78 } 79 io.WriteString(part, "SELECT * FROM bar") 80 81 if err := writer.Close(); err != nil { 82 t.Fatal(err) 83 } 84 85 r := MustNewJSONRequest("POST", "/query?db=foo", &body) 86 r.Header.Set("Content-Type", writer.FormDataContentType()) 87 88 w := httptest.NewRecorder() 89 h.ServeHTTP(w, r) 90 if w.Code != http.StatusOK { 91 t.Fatalf("unexpected status: %d", w.Code) 92 } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { 93 t.Fatalf("unexpected body: %s", body) 94 } 95} 96 97// Test query with user authentication. 98func TestHandler_Query_Auth(t *testing.T) { 99 // Create the handler to be tested. 100 h := NewHandler(true) 101 102 // Set mock meta client functions for the handler to use. 103 h.MetaClient.AdminUserExistsFn = func() bool { return true } 104 105 h.MetaClient.UserFn = func(username string) (meta.User, error) { 106 if username != "user1" { 107 return nil, meta.ErrUserNotFound 108 } 109 return &meta.UserInfo{ 110 Name: "user1", 111 Hash: "abcd", 112 Admin: true, 113 }, nil 114 } 115 116 h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { 117 if u != "user1" { 118 return nil, fmt.Errorf("unexpected user: exp: user1, got: %s", u) 119 } else if p != "abcd" { 120 return nil, fmt.Errorf("unexpected password: exp: abcd, got: %s", p) 121 } 122 return h.MetaClient.User(u) 123 } 124 125 // Set mock query authorizer for handler to use. 126 h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error { 127 return nil 128 } 129 130 // Set mock statement executor for handler to use. 131 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 132 if stmt.String() != `SELECT * FROM bar` { 133 t.Fatalf("unexpected query: %s", stmt.String()) 134 } else if ctx.Database != `foo` { 135 t.Fatalf("unexpected db: %s", ctx.Database) 136 } 137 ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} 138 ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} 139 return nil 140 } 141 142 // Test the handler with valid user and password in the URL parameters. 143 w := httptest.NewRecorder() 144 h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u=user1&p=abcd&db=foo&q=SELECT+*+FROM+bar", nil)) 145 if w.Code != http.StatusOK { 146 t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) 147 } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { 148 t.Fatalf("unexpected body: %s", body) 149 } 150 151 // Test the handler with valid user and password using basic auth. 152 w = httptest.NewRecorder() 153 r := MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil) 154 r.SetBasicAuth("user1", "abcd") 155 h.ServeHTTP(w, r) 156 if w.Code != http.StatusOK { 157 t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) 158 } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { 159 t.Fatalf("unexpected body: %s", body) 160 } 161 162 // Test the handler with valid JWT bearer token. 163 req := MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil) 164 // Create a signed JWT token string and add it to the request header. 165 _, signedToken := MustJWTToken("user1", h.Config.SharedSecret, false) 166 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) 167 168 w = httptest.NewRecorder() 169 h.ServeHTTP(w, req) 170 if w.Code != http.StatusOK { 171 t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) 172 } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { 173 t.Fatalf("unexpected body: %s", body) 174 } 175 176 // Test the handler with JWT token signed with invalid key. 177 req = MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil) 178 // Create a signed JWT token string and add it to the request header. 179 _, signedToken = MustJWTToken("user1", "invalid key", false) 180 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) 181 182 w = httptest.NewRecorder() 183 h.ServeHTTP(w, req) 184 if w.Code != http.StatusUnauthorized { 185 t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) 186 } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"signature is invalid"}` { 187 t.Fatalf("unexpected body: %s", body) 188 } 189 190 // Test handler with valid JWT token carrying non-existant user. 191 _, signedToken = MustJWTToken("bad_user", h.Config.SharedSecret, false) 192 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) 193 194 w = httptest.NewRecorder() 195 h.ServeHTTP(w, req) 196 if w.Code != http.StatusUnauthorized { 197 t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) 198 } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"user not found"}` { 199 t.Fatalf("unexpected body: %s", body) 200 } 201 202 // Test handler with expired JWT token. 203 _, signedToken = MustJWTToken("user1", h.Config.SharedSecret, true) 204 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) 205 206 w = httptest.NewRecorder() 207 h.ServeHTTP(w, req) 208 if w.Code != http.StatusUnauthorized { 209 t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) 210 } else if !strings.Contains(w.Body.String(), `{"error":"Token is expired`) { 211 t.Fatalf("unexpected body: %s", w.Body.String()) 212 } 213 214 // Test handler with JWT token that has no expiration set. 215 token, _ := MustJWTToken("user1", h.Config.SharedSecret, false) 216 delete(token.Claims.(jwt.MapClaims), "exp") 217 signedToken, err := token.SignedString([]byte(h.Config.SharedSecret)) 218 if err != nil { 219 t.Fatal(err) 220 } 221 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", signedToken)) 222 w = httptest.NewRecorder() 223 h.ServeHTTP(w, req) 224 if w.Code != http.StatusUnauthorized { 225 t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) 226 } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"token expiration required"}` { 227 t.Fatalf("unexpected body: %s", body) 228 } 229 230 // Test the handler with valid user and password in the url and invalid in 231 // basic auth (prioritize url). 232 w = httptest.NewRecorder() 233 r = MustNewJSONRequest("GET", "/query?u=user1&p=abcd&db=foo&q=SELECT+*+FROM+bar", nil) 234 r.SetBasicAuth("user1", "efgh") 235 h.ServeHTTP(w, r) 236 if w.Code != http.StatusOK { 237 t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String()) 238 } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` { 239 t.Fatalf("unexpected body: %s", body) 240 } 241} 242 243// Ensure the handler returns results from a query (including nil results). 244func TestHandler_QueryRegex(t *testing.T) { 245 h := NewHandler(false) 246 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 247 if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` { 248 t.Fatalf("unexpected query: %s", stmt.String()) 249 } else if ctx.Database != `test` { 250 t.Fatalf("unexpected db: %s", ctx.Database) 251 } 252 ctx.Results <- nil 253 return nil 254 } 255 256 w := httptest.NewRecorder() 257 h.ServeHTTP(w, MustNewRequest("GET", "/query?db=test&q=SELECT%20%2A%20FROM%20test%20WHERE%20url%20%3D~%20%2Fhttp%5C%3A%5C%2F%5C%2Fwww.akamai%5C.com%2F", nil)) 258} 259 260// Ensure the handler merges results from the same statement. 261func TestHandler_Query_MergeResults(t *testing.T) { 262 h := NewHandler(false) 263 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 264 ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} 265 ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} 266 return nil 267 } 268 269 w := httptest.NewRecorder() 270 h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) 271 if w.Code != http.StatusOK { 272 t.Fatalf("unexpected status: %d", w.Code) 273 } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"},{"name":"series1"}]}]}` { 274 t.Fatalf("unexpected body: %s", body) 275 } 276} 277 278// Ensure the handler merges results from the same statement. 279func TestHandler_Query_MergeEmptyResults(t *testing.T) { 280 h := NewHandler(false) 281 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 282 ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}} 283 ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} 284 return nil 285 } 286 287 w := httptest.NewRecorder() 288 h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) 289 if w.Code != http.StatusOK { 290 t.Fatalf("unexpected status: %d", w.Code) 291 } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]}` { 292 t.Fatalf("unexpected body: %s", body) 293 } 294} 295 296// Ensure the handler can parse chunked and chunk size query parameters. 297func TestHandler_Query_Chunked(t *testing.T) { 298 h := NewHandler(false) 299 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 300 if ctx.ChunkSize != 2 { 301 t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize) 302 } 303 ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} 304 ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} 305 return nil 306 } 307 308 w := httptest.NewRecorder() 309 h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&chunked=true&chunk_size=2", nil)) 310 if w.Code != http.StatusOK { 311 t.Fatalf("unexpected status: %d", w.Code) 312 } else if w.Body.String() != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]}]} 313{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]} 314` { 315 t.Fatalf("unexpected body: %s", w.Body.String()) 316 } 317} 318 319// Ensure the handler can accept an async query. 320func TestHandler_Query_Async(t *testing.T) { 321 done := make(chan struct{}) 322 h := NewHandler(false) 323 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 324 if stmt.String() != `SELECT * FROM bar` { 325 t.Fatalf("unexpected query: %s", stmt.String()) 326 } else if ctx.Database != `foo` { 327 t.Fatalf("unexpected db: %s", ctx.Database) 328 } 329 ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} 330 ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} 331 close(done) 332 return nil 333 } 334 335 w := httptest.NewRecorder() 336 h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&async=true", nil)) 337 if w.Code != http.StatusNoContent { 338 t.Fatalf("unexpected status: %d", w.Code) 339 } else if body := strings.TrimSpace(w.Body.String()); body != `` { 340 t.Fatalf("unexpected body: %s", body) 341 } 342 343 // Wait to make sure the async query runs and completes. 344 timer := time.NewTimer(100 * time.Millisecond) 345 defer timer.Stop() 346 347 select { 348 case <-timer.C: 349 t.Fatal("timeout while waiting for async query to complete") 350 case <-done: 351 } 352} 353 354// Ensure the handler returns a status 400 if the query is not passed in. 355func TestHandler_Query_ErrQueryRequired(t *testing.T) { 356 h := NewHandler(false) 357 w := httptest.NewRecorder() 358 h.ServeHTTP(w, MustNewJSONRequest("GET", "/query", nil)) 359 if w.Code != http.StatusBadRequest { 360 t.Fatalf("unexpected status: %d", w.Code) 361 } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"missing required parameter \"q\""}` { 362 t.Fatalf("unexpected body: %s", body) 363 } 364} 365 366// Ensure the handler returns a status 400 if the query cannot be parsed. 367func TestHandler_Query_ErrInvalidQuery(t *testing.T) { 368 h := NewHandler(false) 369 w := httptest.NewRecorder() 370 h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?q=SELECT", nil)) 371 if w.Code != http.StatusBadRequest { 372 t.Fatalf("unexpected status: %d", w.Code) 373 } else if body := strings.TrimSpace(w.Body.String()); body != `{"error":"error parsing query: found EOF, expected identifier, string, number, bool at line 1, char 8"}` { 374 t.Fatalf("unexpected body: %s", body) 375 } 376} 377 378// Ensure the handler returns an appropriate 401 or 403 status when authentication or authorization fails. 379func TestHandler_Query_ErrAuthorize(t *testing.T) { 380 h := NewHandler(true) 381 h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, q *influxql.Query, db string) error { 382 return errors.New("marker") 383 } 384 h.MetaClient.AdminUserExistsFn = func() bool { return true } 385 h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { 386 387 users := []meta.UserInfo{ 388 { 389 Name: "admin", 390 Hash: "admin", 391 Admin: true, 392 }, 393 { 394 Name: "user1", 395 Hash: "abcd", 396 Privileges: map[string]influxql.Privilege{ 397 "db0": influxql.ReadPrivilege, 398 }, 399 }, 400 } 401 402 for _, user := range users { 403 if u == user.Name { 404 if p == user.Hash { 405 return &user, nil 406 } 407 return nil, meta.ErrAuthenticate 408 } 409 } 410 return nil, meta.ErrUserNotFound 411 } 412 413 for i, tt := range []struct { 414 user string 415 password string 416 query string 417 code int 418 }{ 419 { 420 query: "/query?q=SHOW+DATABASES", 421 code: http.StatusUnauthorized, 422 }, 423 { 424 user: "user1", 425 password: "abcd", 426 query: "/query?q=SHOW+DATABASES", 427 code: http.StatusForbidden, 428 }, 429 { 430 user: "user2", 431 password: "abcd", 432 query: "/query?q=SHOW+DATABASES", 433 code: http.StatusUnauthorized, 434 }, 435 } { 436 w := httptest.NewRecorder() 437 r := MustNewJSONRequest("GET", tt.query, nil) 438 params := r.URL.Query() 439 if tt.user != "" { 440 params.Set("u", tt.user) 441 } 442 if tt.password != "" { 443 params.Set("p", tt.password) 444 } 445 r.URL.RawQuery = params.Encode() 446 447 h.ServeHTTP(w, r) 448 if w.Code != tt.code { 449 t.Errorf("%d. unexpected status: got=%d exp=%d\noutput: %s", i, w.Code, tt.code, w.Body.String()) 450 } 451 } 452} 453 454// Ensure the handler returns a status 200 if an error is returned in the result. 455func TestHandler_Query_ErrResult(t *testing.T) { 456 h := NewHandler(false) 457 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 458 return errors.New("measurement not found") 459 } 460 461 w := httptest.NewRecorder() 462 h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SHOW+SERIES+from+bin", nil)) 463 if w.Code != http.StatusOK { 464 t.Fatalf("unexpected status: %d", w.Code) 465 } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":0,"error":"measurement not found"}]}` { 466 t.Fatalf("unexpected body: %s", body) 467 } 468} 469 470// Ensure that closing the HTTP connection causes the query to be interrupted. 471func TestHandler_Query_CloseNotify(t *testing.T) { 472 // Avoid leaking a goroutine when this fails. 473 done := make(chan struct{}) 474 defer close(done) 475 476 interrupted := make(chan struct{}) 477 h := NewHandler(false) 478 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 479 select { 480 case <-ctx.Done(): 481 case <-done: 482 } 483 close(interrupted) 484 return nil 485 } 486 487 s := httptest.NewServer(h) 488 defer s.Close() 489 490 // Parse the URL and generate a query request. 491 u, err := url.Parse(s.URL) 492 if err != nil { 493 t.Fatal(err) 494 } 495 u.Path = "/query" 496 497 values := url.Values{} 498 values.Set("q", "SELECT * FROM cpu") 499 values.Set("db", "db0") 500 values.Set("rp", "rp0") 501 values.Set("chunked", "true") 502 u.RawQuery = values.Encode() 503 504 req, err := http.NewRequest("GET", u.String(), nil) 505 if err != nil { 506 t.Fatal(err) 507 } 508 509 // Perform the request and retrieve the response. 510 resp, err := http.DefaultClient.Do(req) 511 if err != nil { 512 t.Fatal(err) 513 } 514 515 // Validate that the interrupted channel has NOT been closed yet. 516 timer := time.NewTimer(100 * time.Millisecond) 517 select { 518 case <-interrupted: 519 timer.Stop() 520 t.Fatal("query interrupted unexpectedly") 521 case <-timer.C: 522 } 523 524 // Close the response body which should abort the query in the handler. 525 resp.Body.Close() 526 527 // The query should abort within 100 milliseconds. 528 timer.Reset(100 * time.Millisecond) 529 select { 530 case <-interrupted: 531 timer.Stop() 532 case <-timer.C: 533 t.Fatal("timeout while waiting for query to abort") 534 } 535} 536 537// Ensure the prometheus remote write works 538func TestHandler_PromWrite(t *testing.T) { 539 req := &remote.WriteRequest{ 540 Timeseries: []*remote.TimeSeries{ 541 { 542 Labels: []*remote.LabelPair{ 543 {Name: "host", Value: "a"}, 544 {Name: "region", Value: "west"}, 545 }, 546 Samples: []*remote.Sample{ 547 {TimestampMs: 1, Value: 1.2}, 548 {TimestampMs: 2, Value: math.NaN()}, 549 }, 550 }, 551 }, 552 } 553 554 data, err := proto.Marshal(req) 555 if err != nil { 556 t.Fatal("couldn't marshal prometheus request") 557 } 558 compressed := snappy.Encode(nil, data) 559 560 b := bytes.NewReader(compressed) 561 h := NewHandler(false) 562 h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { 563 return &meta.DatabaseInfo{} 564 } 565 called := false 566 h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error { 567 called = true 568 point := points[0] 569 if point.UnixNano() != int64(time.Millisecond) { 570 t.Fatalf("Exp point time %d but got %d", int64(time.Millisecond), point.UnixNano()) 571 } 572 tags := point.Tags() 573 expectedTags := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}} 574 if !reflect.DeepEqual(tags, expectedTags) { 575 t.Fatalf("tags don't match\n\texp: %v\n\tgot: %v", expectedTags, tags) 576 } 577 578 fields, err := point.Fields() 579 if err != nil { 580 t.Fatal(err.Error()) 581 } 582 expFields := models.Fields{"value": 1.2} 583 if !reflect.DeepEqual(fields, expFields) { 584 t.Fatalf("fields don't match\n\texp: %v\n\tgot: %v", expFields, fields) 585 } 586 return nil 587 } 588 589 w := httptest.NewRecorder() 590 h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b)) 591 if !called { 592 t.Fatal("WritePoints: expected call") 593 } 594 if w.Code != http.StatusNoContent { 595 t.Fatalf("unexpected status: %d", w.Code) 596 } 597} 598 599// Ensure Prometheus remote read requests are converted to the correct InfluxQL query and 600// data is returned 601func TestHandler_PromRead(t *testing.T) { 602 req := &remote.ReadRequest{ 603 Queries: []*remote.Query{{ 604 Matchers: []*remote.LabelMatcher{ 605 { 606 Type: remote.MatchType_EQUAL, 607 Name: "__name__", 608 Value: "value", 609 }, 610 }, 611 StartTimestampMs: 1, 612 EndTimestampMs: 2, 613 }}, 614 } 615 data, err := proto.Marshal(req) 616 if err != nil { 617 t.Fatal("couldn't marshal prometheus request") 618 } 619 compressed := snappy.Encode(nil, data) 620 b := bytes.NewReader(compressed) 621 h := NewHandler(false) 622 w := httptest.NewRecorder() 623 624 // Number of results in the result set 625 var i int64 626 h.Store.ResultSet.NextFn = func() bool { 627 i++ 628 return i <= 2 629 } 630 631 // data for each cursor. 632 h.Store.ResultSet.CursorFn = func() tsdb.Cursor { 633 cursor := internal.NewFloatBatchCursorMock() 634 635 var i int64 636 cursor.NextFn = func() ([]int64, []float64) { 637 i++ 638 ts := []int64{22000000 * i, 10000000000 * i} 639 vs := []float64{2.3, 2992.33} 640 if i > 2 { 641 ts, vs = nil, nil 642 } 643 return ts, vs 644 } 645 646 return cursor 647 } 648 649 // Tags for each cursor. 650 h.Store.ResultSet.TagsFn = func() models.Tags { 651 return models.NewTags(map[string]string{ 652 "host": fmt.Sprintf("server-%d", i), 653 "_measurement": "mem", 654 }) 655 } 656 657 h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/read?db=foo&rp=bar", b)) 658 if w.Code != http.StatusOK { 659 t.Fatalf("unexpected status: %d", w.Code) 660 } 661 662 reqBuf, err := snappy.Decode(nil, w.Body.Bytes()) 663 if err != nil { 664 t.Fatal(err) 665 } 666 667 var resp remote.ReadResponse 668 if err := proto.Unmarshal(reqBuf, &resp); err != nil { 669 t.Fatal(err) 670 } 671 672 expResults := []*remote.QueryResult{ 673 { 674 Timeseries: []*remote.TimeSeries{ 675 { 676 Labels: []*remote.LabelPair{ 677 {Name: "host", Value: "server-1"}, 678 }, 679 Samples: []*remote.Sample{ 680 {TimestampMs: 22, Value: 2.3}, 681 {TimestampMs: 10000, Value: 2992.33}, 682 {TimestampMs: 44, Value: 2.3}, 683 {TimestampMs: 20000, Value: 2992.33}, 684 }, 685 }, 686 { 687 Labels: []*remote.LabelPair{ 688 {Name: "host", Value: "server-2"}, 689 }, 690 Samples: []*remote.Sample{ 691 {TimestampMs: 22, Value: 2.3}, 692 {TimestampMs: 10000, Value: 2992.33}, 693 {TimestampMs: 44, Value: 2.3}, 694 {TimestampMs: 20000, Value: 2992.33}, 695 }, 696 }, 697 }, 698 }, 699 } 700 701 if !reflect.DeepEqual(resp.Results, expResults) { 702 t.Fatalf("Results differ:\n%v", cmp.Diff(resp.Results, expResults)) 703 } 704} 705 706func TestHandler_PromRead_NoResults(t *testing.T) { 707 req := &remote.ReadRequest{Queries: []*remote.Query{&remote.Query{ 708 Matchers: []*remote.LabelMatcher{ 709 { 710 Type: remote.MatchType_EQUAL, 711 Name: "__name__", 712 Value: "value", 713 }, 714 }, 715 StartTimestampMs: 0, 716 EndTimestampMs: models.MaxNanoTime / int64(time.Millisecond), 717 }}} 718 data, err := proto.Marshal(req) 719 if err != nil { 720 t.Fatal("couldn't marshal prometheus request") 721 } 722 compressed := snappy.Encode(nil, data) 723 h := NewHandler(false) 724 w := httptest.NewRecorder() 725 726 b := bytes.NewReader(compressed) 727 h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b)) 728 if w.Code != http.StatusOK { 729 t.Fatalf("unexpected status: %d", w.Code) 730 } 731 reqBuf, err := snappy.Decode(nil, w.Body.Bytes()) 732 if err != nil { 733 t.Fatal(err.Error()) 734 } 735 736 var resp remote.ReadResponse 737 if err := proto.Unmarshal(reqBuf, &resp); err != nil { 738 t.Fatal(err.Error()) 739 } 740} 741 742func TestHandler_PromRead_UnsupportedCursors(t *testing.T) { 743 req := &remote.ReadRequest{Queries: []*remote.Query{&remote.Query{ 744 Matchers: []*remote.LabelMatcher{ 745 { 746 Type: remote.MatchType_EQUAL, 747 Name: "__name__", 748 Value: "value", 749 }, 750 }, 751 StartTimestampMs: 0, 752 EndTimestampMs: models.MaxNanoTime / int64(time.Millisecond), 753 }}} 754 data, err := proto.Marshal(req) 755 if err != nil { 756 t.Fatal("couldn't marshal prometheus request") 757 } 758 compressed := snappy.Encode(nil, data) 759 760 unsupported := []tsdb.Cursor{ 761 internal.NewIntegerBatchCursorMock(), 762 internal.NewBooleanBatchCursorMock(), 763 internal.NewUnsignedBatchCursorMock(), 764 internal.NewStringBatchCursorMock(), 765 } 766 767 for _, cursor := range unsupported { 768 h := NewHandler(false) 769 w := httptest.NewRecorder() 770 var lb bytes.Buffer 771 h.Logger = logger.New(&lb) 772 773 more := true 774 h.Store.ResultSet.NextFn = func() bool { defer func() { more = false }(); return more } 775 776 // Set the cursor type that will be returned while iterating over 777 // the mock store. 778 h.Store.ResultSet.CursorFn = func() tsdb.Cursor { 779 return cursor 780 } 781 782 b := bytes.NewReader(compressed) 783 h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b)) 784 if w.Code != http.StatusOK { 785 t.Fatalf("unexpected status: %d", w.Code) 786 } 787 reqBuf, err := snappy.Decode(nil, w.Body.Bytes()) 788 if err != nil { 789 t.Fatal(err.Error()) 790 } 791 792 var resp remote.ReadResponse 793 if err := proto.Unmarshal(reqBuf, &resp); err != nil { 794 t.Fatal(err.Error()) 795 } 796 797 if !strings.Contains(lb.String(), "cursor_type=") { 798 t.Fatalf("got log message %q, expected to contain \"cursor_type\"", lb.String()) 799 } 800 } 801} 802 803// Ensure the handler handles ping requests correctly. 804// TODO: This should be expanded to verify the MetaClient check in servePing is working correctly 805func TestHandler_Ping(t *testing.T) { 806 h := NewHandler(false) 807 w := httptest.NewRecorder() 808 h.ServeHTTP(w, MustNewRequest("GET", "/ping", nil)) 809 if w.Code != http.StatusNoContent { 810 t.Fatalf("unexpected status: %d", w.Code) 811 } 812 h.ServeHTTP(w, MustNewRequest("HEAD", "/ping", nil)) 813 if w.Code != http.StatusNoContent { 814 t.Fatalf("unexpected status: %d", w.Code) 815 } 816} 817 818// Ensure the handler returns the version correctly from the different endpoints. 819func TestHandler_Version(t *testing.T) { 820 h := NewHandler(false) 821 h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 822 return nil 823 } 824 tests := []struct { 825 method string 826 endpoint string 827 body io.Reader 828 }{ 829 { 830 method: "GET", 831 endpoint: "/ping", 832 body: nil, 833 }, 834 { 835 method: "GET", 836 endpoint: "/query?db=foo&q=SELECT+*+FROM+bar", 837 body: nil, 838 }, 839 { 840 method: "POST", 841 endpoint: "/write", 842 body: bytes.NewReader(make([]byte, 10)), 843 }, 844 { 845 method: "GET", 846 endpoint: "/notfound", 847 body: nil, 848 }, 849 } 850 851 for _, test := range tests { 852 w := httptest.NewRecorder() 853 h.ServeHTTP(w, MustNewRequest(test.method, test.endpoint, test.body)) 854 if v := w.HeaderMap["X-Influxdb-Version"]; len(v) > 0 { 855 if v[0] != "0.0.0" { 856 t.Fatalf("unexpected version: %s", v) 857 } 858 } else { 859 t.Fatalf("Header entry 'X-Influxdb-Version' not present") 860 } 861 862 if v := w.HeaderMap["X-Influxdb-Build"]; len(v) > 0 { 863 if v[0] != "OSS" { 864 t.Fatalf("unexpected BuildType: %s", v) 865 } 866 } else { 867 t.Fatalf("Header entry 'X-Influxdb-Build' not present") 868 } 869 } 870} 871 872// Ensure the handler handles status requests correctly. 873func TestHandler_Status(t *testing.T) { 874 h := NewHandler(false) 875 w := httptest.NewRecorder() 876 h.ServeHTTP(w, MustNewRequest("GET", "/status", nil)) 877 if w.Code != http.StatusNoContent { 878 t.Fatalf("unexpected status: %d", w.Code) 879 } 880 h.ServeHTTP(w, MustNewRequest("HEAD", "/status", nil)) 881 if w.Code != http.StatusNoContent { 882 t.Fatalf("unexpected status: %d", w.Code) 883 } 884} 885 886// Ensure write endpoint can handle bad requests 887func TestHandler_HandleBadRequestBody(t *testing.T) { 888 b := bytes.NewReader(make([]byte, 10)) 889 h := NewHandler(false) 890 w := httptest.NewRecorder() 891 h.ServeHTTP(w, MustNewRequest("POST", "/write", b)) 892 if w.Code != http.StatusBadRequest { 893 t.Fatalf("unexpected status: %d", w.Code) 894 } 895} 896 897func TestHandler_Write_EntityTooLarge_ContentLength(t *testing.T) { 898 b := bytes.NewReader(make([]byte, 100)) 899 h := NewHandler(false) 900 h.Config.MaxBodySize = 5 901 h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { 902 return &meta.DatabaseInfo{} 903 } 904 905 w := httptest.NewRecorder() 906 h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b)) 907 if w.Code != http.StatusRequestEntityTooLarge { 908 t.Fatalf("unexpected status: %d", w.Code) 909 } 910} 911 912func TestHandler_Write_SuppressLog(t *testing.T) { 913 var buf bytes.Buffer 914 c := httpd.NewConfig() 915 c.SuppressWriteLog = true 916 h := NewHandlerWithConfig(c) 917 h.CLFLogger = log.New(&buf, "", log.LstdFlags) 918 h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { 919 return &meta.DatabaseInfo{} 920 } 921 h.PointsWriter.WritePointsFn = func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error { 922 return nil 923 } 924 925 b := strings.NewReader("cpu,host=server01 value=2\n") 926 w := httptest.NewRecorder() 927 h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b)) 928 if w.Code != http.StatusNoContent { 929 t.Fatalf("unexpected status: %d", w.Code) 930 } 931 932 // If the log has anything in it, this failed. 933 if buf.Len() > 0 { 934 t.Fatalf("expected no bytes to be written to the log, got %d", buf.Len()) 935 } 936} 937 938// onlyReader implements io.Reader only to ensure Request.ContentLength is not set 939type onlyReader struct { 940 r io.Reader 941} 942 943func (o onlyReader) Read(p []byte) (n int, err error) { 944 return o.r.Read(p) 945} 946 947func TestHandler_Write_EntityTooLarge_NoContentLength(t *testing.T) { 948 b := onlyReader{bytes.NewReader(make([]byte, 100))} 949 h := NewHandler(false) 950 h.Config.MaxBodySize = 5 951 h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { 952 return &meta.DatabaseInfo{} 953 } 954 955 w := httptest.NewRecorder() 956 h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b)) 957 if w.Code != http.StatusRequestEntityTooLarge { 958 t.Fatalf("unexpected status: %d", w.Code) 959 } 960} 961 962// TestHandler_Write_NegativeMaxBodySize verifies no error occurs if MaxBodySize is < 0 963func TestHandler_Write_NegativeMaxBodySize(t *testing.T) { 964 b := bytes.NewReader([]byte(`foo n=1`)) 965 h := NewHandler(false) 966 h.Config.MaxBodySize = -1 967 h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { 968 return &meta.DatabaseInfo{} 969 } 970 called := false 971 h.PointsWriter.WritePointsFn = func(_, _ string, _ models.ConsistencyLevel, _ meta.User, _ []models.Point) error { 972 called = true 973 return nil 974 } 975 976 w := httptest.NewRecorder() 977 h.ServeHTTP(w, MustNewRequest("POST", "/write?db=foo", b)) 978 if !called { 979 t.Fatal("WritePoints: expected call") 980 } 981 if w.Code != http.StatusNoContent { 982 t.Fatalf("unexpected status: %d", w.Code) 983 } 984} 985 986// Ensure X-Forwarded-For header writes the correct log message. 987func TestHandler_XForwardedFor(t *testing.T) { 988 var buf bytes.Buffer 989 h := NewHandler(false) 990 h.CLFLogger = log.New(&buf, "", 0) 991 992 req := MustNewRequest("GET", "/query", nil) 993 req.Header.Set("X-Forwarded-For", "192.168.0.1") 994 req.RemoteAddr = "127.0.0.1" 995 h.ServeHTTP(httptest.NewRecorder(), req) 996 997 parts := strings.Split(buf.String(), " ") 998 if parts[0] != "192.168.0.1,127.0.0.1" { 999 t.Errorf("unexpected host ip address: %s", parts[0]) 1000 } 1001} 1002 1003func TestHandler_XRequestId(t *testing.T) { 1004 var buf bytes.Buffer 1005 h := NewHandler(false) 1006 h.CLFLogger = log.New(&buf, "", 0) 1007 1008 cases := []map[string]string{ 1009 {"X-Request-Id": "abc123", "Request-Id": ""}, // X-Request-Id is used. 1010 {"X-REQUEST-ID": "cde", "Request-Id": ""}, // X-REQUEST-ID is used. 1011 {"X-Request-Id": "", "Request-Id": "foobarzoo"}, // Request-Id is used. 1012 {"X-Request-Id": "abc123", "Request-Id": "foobarzoo"}, // X-Request-Id takes precedence. 1013 {"X-Request-Id": "", "Request-Id": ""}, // v1 UUID generated. 1014 } 1015 1016 for _, c := range cases { 1017 t.Run(fmt.Sprint(c), func(t *testing.T) { 1018 buf.Reset() 1019 req := MustNewRequest("GET", "/ping", nil) 1020 req.RemoteAddr = "127.0.0.1" 1021 1022 // Set the relevant request ID headers 1023 var allEmpty = true 1024 for k, v := range c { 1025 req.Header.Set(k, v) 1026 if v != "" { 1027 allEmpty = false 1028 } 1029 } 1030 1031 w := httptest.NewRecorder() 1032 h.ServeHTTP(w, req) 1033 1034 // Split up the HTTP log line. The request ID is currently located in 1035 // index 12. If the log line gets changed in the future, this test 1036 // will likely break and the index will need to be updated. 1037 parts := strings.Split(buf.String(), " ") 1038 i := 12 1039 1040 // If neither header is set then we expect a v1 UUID to be generated. 1041 if allEmpty { 1042 if got, exp := len(parts[i]), 36; got != exp { 1043 t.Fatalf("got ID of length %d, expected one of length %d", got, exp) 1044 } 1045 } else if c["X-Request-Id"] != "" { 1046 if got, exp := parts[i], c["X-Request-Id"]; got != exp { 1047 t.Fatalf("got ID of %q, expected %q", got, exp) 1048 } 1049 } else if c["X-REQUEST-ID"] != "" { 1050 if got, exp := parts[i], c["X-REQUEST-ID"]; got != exp { 1051 t.Fatalf("got ID of %q, expected %q", got, exp) 1052 } 1053 } else { 1054 if got, exp := parts[i], c["Request-Id"]; got != exp { 1055 t.Fatalf("got ID of %q, expected %q", got, exp) 1056 } 1057 } 1058 1059 // Check response headers 1060 if got, exp := w.Header().Get("Request-Id"), parts[i]; got != exp { 1061 t.Fatalf("Request-Id header was %s, expected %s", got, exp) 1062 } else if got, exp := w.Header().Get("X-Request-Id"), parts[i]; got != exp { 1063 t.Fatalf("X-Request-Id header was %s, expected %s", got, exp) 1064 } 1065 }) 1066 } 1067} 1068 1069func TestThrottler_Handler(t *testing.T) { 1070 t.Run("OK", func(t *testing.T) { 1071 throttler := httpd.NewThrottler(2, 98) 1072 1073 // Send the total number of concurrent requests to the channel. 1074 var concurrentN int32 1075 concurrentCh := make(chan int) 1076 1077 h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1078 atomic.AddInt32(&concurrentN, 1) 1079 concurrentCh <- int(atomic.LoadInt32(&concurrentN)) 1080 time.Sleep(1 * time.Millisecond) 1081 atomic.AddInt32(&concurrentN, -1) 1082 })) 1083 1084 // Execute requests concurrently. 1085 const n = 100 1086 for i := 0; i < n; i++ { 1087 go func() { h.ServeHTTP(nil, nil) }() 1088 } 1089 1090 // Read the number of concurrent requests for every execution. 1091 for i := 0; i < n; i++ { 1092 if v := <-concurrentCh; v > 2 { 1093 t.Fatalf("concurrent requests exceed maximum: %d", v) 1094 } 1095 } 1096 }) 1097 1098 t.Run("ErrTimeout", func(t *testing.T) { 1099 throttler := httpd.NewThrottler(2, 1) 1100 throttler.EnqueueTimeout = 1 * time.Millisecond 1101 1102 begin, end := make(chan struct{}), make(chan struct{}) 1103 h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1104 begin <- struct{}{} 1105 end <- struct{}{} 1106 })) 1107 1108 // First two requests should execute immediately. 1109 go func() { h.ServeHTTP(nil, nil) }() 1110 go func() { h.ServeHTTP(nil, nil) }() 1111 1112 <-begin 1113 <-begin 1114 1115 // Third request should be enqueued but timeout. 1116 w := httptest.NewRecorder() 1117 h.ServeHTTP(w, nil) 1118 if w.Code != http.StatusServiceUnavailable { 1119 t.Fatalf("unexpected status code: %d", w.Code) 1120 } else if body := w.Body.String(); body != "request throttled, exceeds timeout\n" { 1121 t.Fatalf("unexpected response body: %q", body) 1122 } 1123 1124 // Allow 2 existing requests to complete. 1125 <-end 1126 <-end 1127 }) 1128 1129 t.Run("ErrFull", func(t *testing.T) { 1130 delay := 100 * time.Millisecond 1131 if os.Getenv("CI") != "" { 1132 delay = 2 * time.Second 1133 } 1134 1135 throttler := httpd.NewThrottler(2, 1) 1136 1137 resp := make(chan struct{}) 1138 h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1139 resp <- struct{}{} 1140 })) 1141 1142 // First two requests should execute immediately and third should be queued. 1143 go func() { h.ServeHTTP(nil, nil) }() 1144 go func() { h.ServeHTTP(nil, nil) }() 1145 go func() { h.ServeHTTP(nil, nil) }() 1146 time.Sleep(delay) 1147 1148 // Fourth request should fail when trying to enqueue. 1149 w := httptest.NewRecorder() 1150 h.ServeHTTP(w, nil) 1151 if w.Code != http.StatusServiceUnavailable { 1152 t.Fatalf("unexpected status code: %d", w.Code) 1153 } else if body := w.Body.String(); body != "request throttled, queue full\n" { 1154 t.Fatalf("unexpected response body: %q", body) 1155 } 1156 1157 // Allow 3 existing requests to complete. 1158 <-resp 1159 <-resp 1160 <-resp 1161 }) 1162} 1163 1164// NewHandler represents a test wrapper for httpd.Handler. 1165type Handler struct { 1166 *httpd.Handler 1167 MetaClient *internal.MetaClientMock 1168 StatementExecutor HandlerStatementExecutor 1169 QueryAuthorizer HandlerQueryAuthorizer 1170 PointsWriter HandlerPointsWriter 1171 Store *internal.StorageStoreMock 1172} 1173 1174// NewHandler returns a new instance of Handler. 1175func NewHandler(requireAuthentication bool) *Handler { 1176 config := httpd.NewConfig() 1177 config.AuthEnabled = requireAuthentication 1178 config.SharedSecret = "super secret key" 1179 return NewHandlerWithConfig(config) 1180} 1181 1182func NewHandlerWithConfig(config httpd.Config) *Handler { 1183 h := &Handler{ 1184 Handler: httpd.NewHandler(config), 1185 } 1186 1187 h.MetaClient = &internal.MetaClientMock{} 1188 h.Store = internal.NewStorageStoreMock() 1189 1190 h.Handler.MetaClient = h.MetaClient 1191 h.Handler.Store = h.Store 1192 h.Handler.QueryExecutor = query.NewExecutor() 1193 h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor 1194 h.Handler.QueryAuthorizer = &h.QueryAuthorizer 1195 h.Handler.PointsWriter = &h.PointsWriter 1196 h.Handler.Version = "0.0.0" 1197 h.Handler.BuildType = "OSS" 1198 1199 if testing.Verbose() { 1200 l := logger.New(os.Stdout) 1201 h.Handler.Logger = l 1202 h.Handler.Store.WithLogger(l) 1203 } 1204 1205 return h 1206} 1207 1208// HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor. 1209type HandlerStatementExecutor struct { 1210 ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error 1211} 1212 1213func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { 1214 return e.ExecuteStatementFn(stmt, ctx) 1215} 1216 1217// HandlerQueryAuthorizer is a mock implementation of Handler.QueryAuthorizer. 1218type HandlerQueryAuthorizer struct { 1219 AuthorizeQueryFn func(u meta.User, query *influxql.Query, database string) error 1220} 1221 1222func (a *HandlerQueryAuthorizer) AuthorizeQuery(u meta.User, query *influxql.Query, database string) error { 1223 return a.AuthorizeQueryFn(u, query, database) 1224} 1225 1226type HandlerPointsWriter struct { 1227 WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error 1228} 1229 1230func (h *HandlerPointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error { 1231 return h.WritePointsFn(database, retentionPolicy, consistencyLevel, user, points) 1232} 1233 1234// MustNewRequest returns a new HTTP request. Panic on error. 1235func MustNewRequest(method, urlStr string, body io.Reader) *http.Request { 1236 r, err := http.NewRequest(method, urlStr, body) 1237 if err != nil { 1238 panic(err.Error()) 1239 } 1240 return r 1241} 1242 1243// MustNewRequest returns a new HTTP request with the content type set. Panic on error. 1244func MustNewJSONRequest(method, urlStr string, body io.Reader) *http.Request { 1245 r := MustNewRequest(method, urlStr, body) 1246 r.Header.Set("Accept", "application/json") 1247 return r 1248} 1249 1250// MustJWTToken returns a new JWT token and signed string or panics trying. 1251func MustJWTToken(username, secret string, expired bool) (*jwt.Token, string) { 1252 token := jwt.New(jwt.GetSigningMethod("HS512")) 1253 token.Claims.(jwt.MapClaims)["username"] = username 1254 if expired { 1255 token.Claims.(jwt.MapClaims)["exp"] = time.Now().Add(-time.Second).Unix() 1256 } else { 1257 token.Claims.(jwt.MapClaims)["exp"] = time.Now().Add(time.Minute * 10).Unix() 1258 } 1259 signed, err := token.SignedString([]byte(secret)) 1260 if err != nil { 1261 panic(err) 1262 } 1263 return token, signed 1264} 1265