1package httpd 2 3import ( 4 "bytes" 5 "compress/gzip" 6 "context" 7 "encoding/json" 8 "errors" 9 "expvar" 10 "fmt" 11 "io" 12 "io/ioutil" 13 "log" 14 "math" 15 "net/http" 16 "os" 17 "runtime/debug" 18 "strconv" 19 "strings" 20 "sync/atomic" 21 "time" 22 23 "github.com/bmizerany/pat" 24 "github.com/dgrijalva/jwt-go" 25 "github.com/gogo/protobuf/proto" 26 "github.com/golang/snappy" 27 "github.com/influxdata/flux" 28 "github.com/influxdata/flux/lang" 29 "github.com/influxdata/influxdb" 30 "github.com/influxdata/influxdb/logger" 31 "github.com/influxdata/influxdb/models" 32 "github.com/influxdata/influxdb/monitor" 33 "github.com/influxdata/influxdb/monitor/diagnostics" 34 "github.com/influxdata/influxdb/prometheus" 35 "github.com/influxdata/influxdb/prometheus/remote" 36 "github.com/influxdata/influxdb/query" 37 "github.com/influxdata/influxdb/services/meta" 38 "github.com/influxdata/influxdb/services/storage" 39 "github.com/influxdata/influxdb/storage/reads" 40 "github.com/influxdata/influxdb/storage/reads/datatypes" 41 "github.com/influxdata/influxdb/tsdb" 42 "github.com/influxdata/influxdb/uuid" 43 "github.com/influxdata/influxql" 44 prom "github.com/prometheus/client_golang/prometheus" 45 "github.com/prometheus/client_golang/prometheus/promhttp" 46 "go.uber.org/zap" 47) 48 49const ( 50 // DefaultChunkSize specifies the maximum number of points that will 51 // be read before sending results back to the engine. 52 // 53 // This has no relation to the number of bytes that are returned. 54 DefaultChunkSize = 10000 55 56 DefaultDebugRequestsInterval = 10 * time.Second 57 58 MaxDebugRequestsInterval = 6 * time.Hour 59) 60 61// AuthenticationMethod defines the type of authentication used. 62type AuthenticationMethod int 63 64// Supported authentication methods. 65const ( 66 // Authenticate using basic authentication. 67 UserAuthentication AuthenticationMethod = iota 68 69 // Authenticate with jwt. 70 BearerAuthentication 71) 72 73// TODO: Check HTTP response codes: 400, 401, 403, 409. 74 75// Route specifies how to handle a HTTP verb for a given endpoint. 76type Route struct { 77 Name string 78 Method string 79 Pattern string 80 Gzipped bool 81 LoggingEnabled bool 82 HandlerFunc interface{} 83} 84 85// Handler represents an HTTP handler for the InfluxDB server. 86type Handler struct { 87 mux *pat.PatternServeMux 88 Version string 89 BuildType string 90 91 MetaClient interface { 92 Database(name string) *meta.DatabaseInfo 93 Databases() []meta.DatabaseInfo 94 Authenticate(username, password string) (ui meta.User, err error) 95 User(username string) (meta.User, error) 96 AdminUserExists() bool 97 } 98 99 QueryAuthorizer interface { 100 AuthorizeQuery(u meta.User, query *influxql.Query, database string) error 101 } 102 103 WriteAuthorizer interface { 104 AuthorizeWrite(username, database string) error 105 } 106 107 QueryExecutor *query.Executor 108 109 Monitor interface { 110 Statistics(tags map[string]string) ([]*monitor.Statistic, error) 111 Diagnostics() (map[string]*diagnostics.Diagnostics, error) 112 } 113 114 PointsWriter interface { 115 WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error 116 } 117 118 Store Store 119 120 // Flux services 121 Controller Controller 122 CompilerMappings flux.CompilerMappings 123 registered bool 124 125 Config *Config 126 Logger *zap.Logger 127 CLFLogger *log.Logger 128 accessLog *os.File 129 accessLogFilters StatusFilters 130 stats *Statistics 131 132 requestTracker *RequestTracker 133 writeThrottler *Throttler 134} 135 136// NewHandler returns a new instance of handler with routes. 137func NewHandler(c Config) *Handler { 138 h := &Handler{ 139 mux: pat.New(), 140 Config: &c, 141 Logger: zap.NewNop(), 142 CLFLogger: log.New(os.Stderr, "[httpd] ", 0), 143 stats: &Statistics{}, 144 requestTracker: NewRequestTracker(), 145 } 146 147 // Limit the number of concurrent & enqueued write requests. 148 h.writeThrottler = NewThrottler(c.MaxConcurrentWriteLimit, c.MaxEnqueuedWriteLimit) 149 h.writeThrottler.EnqueueTimeout = c.EnqueuedWriteTimeout 150 151 // Disable the write log if they have been suppressed. 152 writeLogEnabled := c.LogEnabled 153 if c.SuppressWriteLog { 154 writeLogEnabled = false 155 } 156 157 h.AddRoutes([]Route{ 158 Route{ 159 "query-options", // Satisfy CORS checks. 160 "OPTIONS", "/query", false, true, h.serveOptions, 161 }, 162 Route{ 163 "query", // Query serving route. 164 "GET", "/query", true, true, h.serveQuery, 165 }, 166 Route{ 167 "query", // Query serving route. 168 "POST", "/query", true, true, h.serveQuery, 169 }, 170 Route{ 171 "write-options", // Satisfy CORS checks. 172 "OPTIONS", "/write", false, true, h.serveOptions, 173 }, 174 Route{ 175 "write", // Data-ingest route. 176 "POST", "/write", true, writeLogEnabled, h.serveWrite, 177 }, 178 Route{ 179 "prometheus-write", // Prometheus remote write 180 "POST", "/api/v1/prom/write", false, true, h.servePromWrite, 181 }, 182 Route{ 183 "prometheus-read", // Prometheus remote read 184 "POST", "/api/v1/prom/read", true, true, h.servePromRead, 185 }, 186 Route{ // Ping 187 "ping", 188 "GET", "/ping", false, true, h.servePing, 189 }, 190 Route{ // Ping 191 "ping-head", 192 "HEAD", "/ping", false, true, h.servePing, 193 }, 194 Route{ // Ping w/ status 195 "status", 196 "GET", "/status", false, true, h.serveStatus, 197 }, 198 Route{ // Ping w/ status 199 "status-head", 200 "HEAD", "/status", false, true, h.serveStatus, 201 }, 202 Route{ 203 "prometheus-metrics", 204 "GET", "/metrics", false, true, promhttp.Handler().ServeHTTP, 205 }, 206 }...) 207 208 fluxRoute := Route{ 209 "flux-read", 210 "POST", "/api/v2/query", true, true, nil, 211 } 212 213 if !c.FluxEnabled { 214 fluxRoute.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { 215 http.Error(w, "Flux query service disabled. Verify flux-enabled=true in the [http] section of the InfluxDB config.", http.StatusForbidden) 216 } 217 } else { 218 fluxRoute.HandlerFunc = h.serveFluxQuery 219 } 220 h.AddRoutes(fluxRoute) 221 222 return h 223} 224 225func (h *Handler) Open() { 226 if h.Config.LogEnabled { 227 path := "stderr" 228 229 if h.Config.AccessLogPath != "" { 230 f, err := os.OpenFile(h.Config.AccessLogPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) 231 if err != nil { 232 h.Logger.Error("unable to open access log, falling back to stderr", zap.Error(err), zap.String("path", h.Config.AccessLogPath)) 233 return 234 } 235 h.CLFLogger = log.New(f, "", 0) // [httpd] prefix stripped when logging to a file 236 h.accessLog = f 237 path = h.Config.AccessLogPath 238 } 239 h.Logger.Info("opened HTTP access log", zap.String("path", path)) 240 } 241 h.accessLogFilters = StatusFilters(h.Config.AccessLogStatusFilters) 242 243 if h.Config.AuthEnabled && h.Config.SharedSecret == "" { 244 h.Logger.Info("Auth is enabled but shared-secret is blank. BearerAuthentication is disabled.") 245 } 246 247 if h.Config.FluxEnabled { 248 h.registered = true 249 prom.MustRegister(h.Controller.PrometheusCollectors()...) 250 } 251} 252 253func (h *Handler) Close() { 254 if h.accessLog != nil { 255 h.accessLog.Close() 256 h.accessLog = nil 257 h.accessLogFilters = nil 258 } 259 260 if h.registered { 261 for _, col := range h.Controller.PrometheusCollectors() { 262 prom.Unregister(col) 263 } 264 h.registered = false 265 } 266} 267 268// Statistics maintains statistics for the httpd service. 269type Statistics struct { 270 Requests int64 271 CQRequests int64 272 QueryRequests int64 273 WriteRequests int64 274 PingRequests int64 275 StatusRequests int64 276 WriteRequestBytesReceived int64 277 QueryRequestBytesTransmitted int64 278 PointsWrittenOK int64 279 PointsWrittenDropped int64 280 PointsWrittenFail int64 281 AuthenticationFailures int64 282 RequestDuration int64 283 QueryRequestDuration int64 284 WriteRequestDuration int64 285 ActiveRequests int64 286 ActiveWriteRequests int64 287 ClientErrors int64 288 ServerErrors int64 289 RecoveredPanics int64 290 PromWriteRequests int64 291 PromReadRequests int64 292 FluxQueryRequests int64 293 FluxQueryRequestDuration int64 294} 295 296// Statistics returns statistics for periodic monitoring. 297func (h *Handler) Statistics(tags map[string]string) []models.Statistic { 298 return []models.Statistic{{ 299 Name: "httpd", 300 Tags: tags, 301 Values: map[string]interface{}{ 302 statRequest: atomic.LoadInt64(&h.stats.Requests), 303 statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests), 304 statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests), 305 statPingRequest: atomic.LoadInt64(&h.stats.PingRequests), 306 statStatusRequest: atomic.LoadInt64(&h.stats.StatusRequests), 307 statWriteRequestBytesReceived: atomic.LoadInt64(&h.stats.WriteRequestBytesReceived), 308 statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted), 309 statPointsWrittenOK: atomic.LoadInt64(&h.stats.PointsWrittenOK), 310 statPointsWrittenDropped: atomic.LoadInt64(&h.stats.PointsWrittenDropped), 311 statPointsWrittenFail: atomic.LoadInt64(&h.stats.PointsWrittenFail), 312 statAuthFail: atomic.LoadInt64(&h.stats.AuthenticationFailures), 313 statRequestDuration: atomic.LoadInt64(&h.stats.RequestDuration), 314 statQueryRequestDuration: atomic.LoadInt64(&h.stats.QueryRequestDuration), 315 statWriteRequestDuration: atomic.LoadInt64(&h.stats.WriteRequestDuration), 316 statRequestsActive: atomic.LoadInt64(&h.stats.ActiveRequests), 317 statWriteRequestsActive: atomic.LoadInt64(&h.stats.ActiveWriteRequests), 318 statClientError: atomic.LoadInt64(&h.stats.ClientErrors), 319 statServerError: atomic.LoadInt64(&h.stats.ServerErrors), 320 statRecoveredPanics: atomic.LoadInt64(&h.stats.RecoveredPanics), 321 statPromWriteRequest: atomic.LoadInt64(&h.stats.PromWriteRequests), 322 statPromReadRequest: atomic.LoadInt64(&h.stats.PromReadRequests), 323 statFluxQueryRequests: atomic.LoadInt64(&h.stats.FluxQueryRequests), 324 statFluxQueryRequestDuration: atomic.LoadInt64(&h.stats.FluxQueryRequestDuration), 325 }, 326 }} 327} 328 329// AddRoutes sets the provided routes on the handler. 330func (h *Handler) AddRoutes(routes ...Route) { 331 for _, r := range routes { 332 var handler http.Handler 333 334 // If it's a handler func that requires authorization, wrap it in authentication 335 if hf, ok := r.HandlerFunc.(func(http.ResponseWriter, *http.Request, meta.User)); ok { 336 handler = authenticate(hf, h, h.Config.AuthEnabled) 337 } 338 339 // This is a normal handler signature and does not require authentication 340 if hf, ok := r.HandlerFunc.(func(http.ResponseWriter, *http.Request)); ok { 341 handler = http.HandlerFunc(hf) 342 } 343 344 // Throttle route if this is a write endpoint. 345 if r.Method == http.MethodPost { 346 switch r.Pattern { 347 case "/write", "/api/v1/prom/write": 348 handler = h.writeThrottler.Handler(handler) 349 default: 350 } 351 } 352 353 handler = h.responseWriter(handler) 354 if r.Gzipped { 355 handler = gzipFilter(handler) 356 } 357 handler = cors(handler) 358 handler = requestID(handler) 359 if h.Config.LogEnabled && r.LoggingEnabled { 360 handler = h.logging(handler, r.Name) 361 } 362 handler = h.recovery(handler, r.Name) // make sure recovery is always last 363 364 h.mux.Add(r.Method, r.Pattern, handler) 365 } 366} 367 368// ServeHTTP responds to HTTP request to the handler. 369func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 370 atomic.AddInt64(&h.stats.Requests, 1) 371 atomic.AddInt64(&h.stats.ActiveRequests, 1) 372 defer atomic.AddInt64(&h.stats.ActiveRequests, -1) 373 start := time.Now() 374 375 // Add version and build header to all InfluxDB requests. 376 w.Header().Add("X-Influxdb-Version", h.Version) 377 w.Header().Add("X-Influxdb-Build", h.BuildType) 378 379 if strings.HasPrefix(r.URL.Path, "/debug/pprof") && h.Config.PprofEnabled { 380 h.handleProfiles(w, r) 381 } else if strings.HasPrefix(r.URL.Path, "/debug/vars") { 382 h.serveExpvar(w, r) 383 } else if strings.HasPrefix(r.URL.Path, "/debug/requests") { 384 h.serveDebugRequests(w, r) 385 } else { 386 h.mux.ServeHTTP(w, r) 387 } 388 389 atomic.AddInt64(&h.stats.RequestDuration, time.Since(start).Nanoseconds()) 390} 391 392// writeHeader writes the provided status code in the response, and 393// updates relevant http error statistics. 394func (h *Handler) writeHeader(w http.ResponseWriter, code int) { 395 switch code / 100 { 396 case 4: 397 atomic.AddInt64(&h.stats.ClientErrors, 1) 398 case 5: 399 atomic.AddInt64(&h.stats.ServerErrors, 1) 400 } 401 w.WriteHeader(code) 402} 403 404// serveQuery parses an incoming query and, if valid, executes the query. 405func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) { 406 atomic.AddInt64(&h.stats.QueryRequests, 1) 407 defer func(start time.Time) { 408 atomic.AddInt64(&h.stats.QueryRequestDuration, time.Since(start).Nanoseconds()) 409 }(time.Now()) 410 h.requestTracker.Add(r, user) 411 412 // Retrieve the underlying ResponseWriter or initialize our own. 413 rw, ok := w.(ResponseWriter) 414 if !ok { 415 rw = NewResponseWriter(w, r) 416 } 417 418 // Retrieve the node id the query should be executed on. 419 nodeID, _ := strconv.ParseUint(r.FormValue("node_id"), 10, 64) 420 421 var qr io.Reader 422 // Attempt to read the form value from the "q" form value. 423 if qp := strings.TrimSpace(r.FormValue("q")); qp != "" { 424 qr = strings.NewReader(qp) 425 } else if r.MultipartForm != nil && r.MultipartForm.File != nil { 426 // If we have a multipart/form-data, try to retrieve a file from 'q'. 427 if fhs := r.MultipartForm.File["q"]; len(fhs) > 0 { 428 f, err := fhs[0].Open() 429 if err != nil { 430 h.httpError(rw, err.Error(), http.StatusBadRequest) 431 return 432 } 433 defer f.Close() 434 qr = f 435 } 436 } 437 438 if qr == nil { 439 h.httpError(rw, `missing required parameter "q"`, http.StatusBadRequest) 440 return 441 } 442 443 epoch := strings.TrimSpace(r.FormValue("epoch")) 444 445 p := influxql.NewParser(qr) 446 db := r.FormValue("db") 447 448 // Sanitize the request query params so it doesn't show up in the response logger. 449 // Do this before anything else so a parsing error doesn't leak passwords. 450 sanitize(r) 451 452 // Parse the parameters 453 rawParams := r.FormValue("params") 454 if rawParams != "" { 455 var params map[string]interface{} 456 decoder := json.NewDecoder(strings.NewReader(rawParams)) 457 decoder.UseNumber() 458 if err := decoder.Decode(¶ms); err != nil { 459 h.httpError(rw, "error parsing query parameters: "+err.Error(), http.StatusBadRequest) 460 return 461 } 462 463 // Convert json.Number into int64 and float64 values 464 for k, v := range params { 465 if v, ok := v.(json.Number); ok { 466 var err error 467 if strings.Contains(string(v), ".") { 468 params[k], err = v.Float64() 469 } else { 470 params[k], err = v.Int64() 471 } 472 473 if err != nil { 474 h.httpError(rw, "error parsing json value: "+err.Error(), http.StatusBadRequest) 475 return 476 } 477 } 478 } 479 p.SetParams(params) 480 } 481 482 // Parse query from query string. 483 q, err := p.ParseQuery() 484 if err != nil { 485 h.httpError(rw, "error parsing query: "+err.Error(), http.StatusBadRequest) 486 return 487 } 488 489 // Check authorization. 490 if h.Config.AuthEnabled { 491 if err := h.QueryAuthorizer.AuthorizeQuery(user, q, db); err != nil { 492 if err, ok := err.(meta.ErrAuthorize); ok { 493 h.Logger.Info("Unauthorized request", 494 zap.String("user", err.User), 495 zap.Stringer("query", err.Query), 496 logger.Database(err.Database)) 497 } 498 h.httpError(rw, "error authorizing query: "+err.Error(), http.StatusForbidden) 499 return 500 } 501 } 502 503 // Parse chunk size. Use default if not provided or unparsable. 504 chunked := r.FormValue("chunked") == "true" 505 chunkSize := DefaultChunkSize 506 if chunked { 507 if n, err := strconv.ParseInt(r.FormValue("chunk_size"), 10, 64); err == nil && int(n) > 0 { 508 chunkSize = int(n) 509 } 510 } 511 512 // Parse whether this is an async command. 513 async := r.FormValue("async") == "true" 514 515 opts := query.ExecutionOptions{ 516 Database: db, 517 RetentionPolicy: r.FormValue("rp"), 518 ChunkSize: chunkSize, 519 ReadOnly: r.Method == "GET", 520 NodeID: nodeID, 521 } 522 523 if h.Config.AuthEnabled { 524 if user != nil && user.AuthorizeUnrestricted() { 525 opts.Authorizer = query.OpenAuthorizer 526 } else { 527 // The current user determines the authorized actions. 528 opts.Authorizer = user 529 } 530 } else { 531 // Auth is disabled, so allow everything. 532 opts.Authorizer = query.OpenAuthorizer 533 } 534 535 // Make sure if the client disconnects we signal the query to abort 536 var closing chan struct{} 537 if !async { 538 closing = make(chan struct{}) 539 if notifier, ok := w.(http.CloseNotifier); ok { 540 // CloseNotify() is not guaranteed to send a notification when the query 541 // is closed. Use this channel to signal that the query is finished to 542 // prevent lingering goroutines that may be stuck. 543 done := make(chan struct{}) 544 defer close(done) 545 546 notify := notifier.CloseNotify() 547 go func() { 548 // Wait for either the request to finish 549 // or for the client to disconnect 550 select { 551 case <-done: 552 case <-notify: 553 close(closing) 554 } 555 }() 556 opts.AbortCh = done 557 } else { 558 defer close(closing) 559 } 560 } 561 562 // Execute query. 563 results := h.QueryExecutor.ExecuteQuery(q, opts, closing) 564 565 // If we are running in async mode, open a goroutine to drain the results 566 // and return with a StatusNoContent. 567 if async { 568 go h.async(q, results) 569 h.writeHeader(w, http.StatusNoContent) 570 return 571 } 572 573 // if we're not chunking, this will be the in memory buffer for all results before sending to client 574 resp := Response{Results: make([]*query.Result, 0)} 575 576 // Status header is OK once this point is reached. 577 // Attempt to flush the header immediately so the client gets the header information 578 // and knows the query was accepted. 579 h.writeHeader(rw, http.StatusOK) 580 if w, ok := w.(http.Flusher); ok { 581 w.Flush() 582 } 583 584 // pull all results from the channel 585 rows := 0 586 for r := range results { 587 // Ignore nil results. 588 if r == nil { 589 continue 590 } 591 592 // if requested, convert result timestamps to epoch 593 if epoch != "" { 594 convertToEpoch(r, epoch) 595 } 596 597 // Write out result immediately if chunked. 598 if chunked { 599 n, _ := rw.WriteResponse(Response{ 600 Results: []*query.Result{r}, 601 }) 602 atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n)) 603 w.(http.Flusher).Flush() 604 continue 605 } 606 607 // Limit the number of rows that can be returned in a non-chunked 608 // response. This is to prevent the server from going OOM when 609 // returning a large response. If you want to return more than the 610 // default chunk size, then use chunking to process multiple blobs. 611 // Iterate through the series in this result to count the rows and 612 // truncate any rows we shouldn't return. 613 if h.Config.MaxRowLimit > 0 { 614 for i, series := range r.Series { 615 n := h.Config.MaxRowLimit - rows 616 if n < len(series.Values) { 617 // We have reached the maximum number of values. Truncate 618 // the values within this row. 619 series.Values = series.Values[:n] 620 // Since this was truncated, it will always be a partial return. 621 // Add this so the client knows we truncated the response. 622 series.Partial = true 623 } 624 rows += len(series.Values) 625 626 if rows >= h.Config.MaxRowLimit { 627 // Drop any remaining series since we have already reached the row limit. 628 if i < len(r.Series) { 629 r.Series = r.Series[:i+1] 630 } 631 break 632 } 633 } 634 } 635 636 // It's not chunked so buffer results in memory. 637 // Results for statements need to be combined together. 638 // We need to check if this new result is for the same statement as 639 // the last result, or for the next statement 640 l := len(resp.Results) 641 if l == 0 { 642 resp.Results = append(resp.Results, r) 643 } else if resp.Results[l-1].StatementID == r.StatementID { 644 if r.Err != nil { 645 resp.Results[l-1] = r 646 continue 647 } 648 649 cr := resp.Results[l-1] 650 rowsMerged := 0 651 if len(cr.Series) > 0 { 652 lastSeries := cr.Series[len(cr.Series)-1] 653 654 for _, row := range r.Series { 655 if !lastSeries.SameSeries(row) { 656 // Next row is for a different series than last. 657 break 658 } 659 // Values are for the same series, so append them. 660 lastSeries.Values = append(lastSeries.Values, row.Values...) 661 rowsMerged++ 662 } 663 } 664 665 // Append remaining rows as new rows. 666 r.Series = r.Series[rowsMerged:] 667 cr.Series = append(cr.Series, r.Series...) 668 cr.Messages = append(cr.Messages, r.Messages...) 669 cr.Partial = r.Partial 670 } else { 671 resp.Results = append(resp.Results, r) 672 } 673 674 // Drop out of this loop and do not process further results when we hit the row limit. 675 if h.Config.MaxRowLimit > 0 && rows >= h.Config.MaxRowLimit { 676 // If the result is marked as partial, remove that partial marking 677 // here. While the series is partial and we would normally have 678 // tried to return the rest in the next chunk, we are not using 679 // chunking and are truncating the series so we don't want to 680 // signal to the client that we plan on sending another JSON blob 681 // with another result. The series, on the other hand, still 682 // returns partial true if it was truncated or had more data to 683 // send in a future chunk. 684 r.Partial = false 685 break 686 } 687 } 688 689 // If it's not chunked we buffered everything in memory, so write it out 690 if !chunked { 691 n, _ := rw.WriteResponse(resp) 692 atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n)) 693 } 694} 695 696// async drains the results from an async query and logs a message if it fails. 697func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) { 698 for r := range results { 699 // Drain the results and do nothing with them. 700 // If it fails, log the failure so there is at least a record of it. 701 if r.Err != nil { 702 // Do not log when a statement was not executed since there would 703 // have been an earlier error that was already logged. 704 if r.Err == query.ErrNotExecuted { 705 continue 706 } 707 h.Logger.Info("Error while running async query", 708 zap.Stringer("query", q), 709 zap.Error(r.Err)) 710 } 711 } 712} 713 714// serveWrite receives incoming series data in line protocol format and writes it to the database. 715func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) { 716 atomic.AddInt64(&h.stats.WriteRequests, 1) 717 atomic.AddInt64(&h.stats.ActiveWriteRequests, 1) 718 defer func(start time.Time) { 719 atomic.AddInt64(&h.stats.ActiveWriteRequests, -1) 720 atomic.AddInt64(&h.stats.WriteRequestDuration, time.Since(start).Nanoseconds()) 721 }(time.Now()) 722 h.requestTracker.Add(r, user) 723 724 database := r.URL.Query().Get("db") 725 if database == "" { 726 h.httpError(w, "database is required", http.StatusBadRequest) 727 return 728 } 729 730 if di := h.MetaClient.Database(database); di == nil { 731 h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound) 732 return 733 } 734 735 if h.Config.AuthEnabled { 736 if user == nil { 737 h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden) 738 return 739 } 740 741 if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil { 742 h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden) 743 return 744 } 745 } 746 747 body := r.Body 748 if h.Config.MaxBodySize > 0 { 749 body = truncateReader(body, int64(h.Config.MaxBodySize)) 750 } 751 752 // Handle gzip decoding of the body 753 if r.Header.Get("Content-Encoding") == "gzip" { 754 b, err := gzip.NewReader(r.Body) 755 if err != nil { 756 h.httpError(w, err.Error(), http.StatusBadRequest) 757 return 758 } 759 defer b.Close() 760 body = b 761 } 762 763 var bs []byte 764 if r.ContentLength > 0 { 765 if h.Config.MaxBodySize > 0 && r.ContentLength > int64(h.Config.MaxBodySize) { 766 h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) 767 return 768 } 769 770 // This will just be an initial hint for the gzip reader, as the 771 // bytes.Buffer will grow as needed when ReadFrom is called 772 bs = make([]byte, 0, r.ContentLength) 773 } 774 buf := bytes.NewBuffer(bs) 775 776 _, err := buf.ReadFrom(body) 777 if err != nil { 778 if err == errTruncated { 779 h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) 780 return 781 } 782 783 if h.Config.WriteTracing { 784 h.Logger.Info("Write handler unable to read bytes from request body") 785 } 786 h.httpError(w, err.Error(), http.StatusBadRequest) 787 return 788 } 789 atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len())) 790 791 if h.Config.WriteTracing { 792 h.Logger.Info("Write body received by handler", zap.ByteString("body", buf.Bytes())) 793 } 794 795 points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision")) 796 // Not points parsed correctly so return the error now 797 if parseError != nil && len(points) == 0 { 798 if parseError.Error() == "EOF" { 799 h.writeHeader(w, http.StatusOK) 800 return 801 } 802 h.httpError(w, parseError.Error(), http.StatusBadRequest) 803 return 804 } 805 806 // Determine required consistency level. 807 level := r.URL.Query().Get("consistency") 808 consistency := models.ConsistencyLevelOne 809 if level != "" { 810 var err error 811 consistency, err = models.ParseConsistencyLevel(level) 812 if err != nil { 813 h.httpError(w, err.Error(), http.StatusBadRequest) 814 return 815 } 816 } 817 818 // Write points. 819 if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) { 820 atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) 821 h.httpError(w, err.Error(), http.StatusBadRequest) 822 return 823 } else if influxdb.IsAuthorizationError(err) { 824 atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) 825 h.httpError(w, err.Error(), http.StatusForbidden) 826 return 827 } else if werr, ok := err.(tsdb.PartialWriteError); ok { 828 atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)-werr.Dropped)) 829 atomic.AddInt64(&h.stats.PointsWrittenDropped, int64(werr.Dropped)) 830 h.httpError(w, werr.Error(), http.StatusBadRequest) 831 return 832 } else if err != nil { 833 atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) 834 h.httpError(w, err.Error(), http.StatusInternalServerError) 835 return 836 } else if parseError != nil { 837 // We wrote some of the points 838 atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points))) 839 // The other points failed to parse which means the client sent invalid line protocol. We return a 400 840 // response code as well as the lines that failed to parse. 841 h.httpError(w, tsdb.PartialWriteError{Reason: parseError.Error()}.Error(), http.StatusBadRequest) 842 return 843 } 844 845 atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points))) 846 h.writeHeader(w, http.StatusNoContent) 847} 848 849// serveOptions returns an empty response to comply with OPTIONS pre-flight requests 850func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) { 851 h.writeHeader(w, http.StatusNoContent) 852} 853 854// servePing returns a simple response to let the client know the server is running. 855func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { 856 verbose := r.URL.Query().Get("verbose") 857 atomic.AddInt64(&h.stats.PingRequests, 1) 858 859 if verbose != "" && verbose != "0" && verbose != "false" { 860 h.writeHeader(w, http.StatusOK) 861 b, _ := json.Marshal(map[string]string{"version": h.Version}) 862 w.Write(b) 863 } else { 864 h.writeHeader(w, http.StatusNoContent) 865 } 866} 867 868// serveStatus has been deprecated. 869func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) { 870 h.Logger.Info("WARNING: /status has been deprecated. Use /ping instead.") 871 atomic.AddInt64(&h.stats.StatusRequests, 1) 872 h.writeHeader(w, http.StatusNoContent) 873} 874 875// convertToEpoch converts result timestamps from time.Time to the specified epoch. 876func convertToEpoch(r *query.Result, epoch string) { 877 divisor := int64(1) 878 879 switch epoch { 880 case "u": 881 divisor = int64(time.Microsecond) 882 case "ms": 883 divisor = int64(time.Millisecond) 884 case "s": 885 divisor = int64(time.Second) 886 case "m": 887 divisor = int64(time.Minute) 888 case "h": 889 divisor = int64(time.Hour) 890 } 891 892 for _, s := range r.Series { 893 for _, v := range s.Values { 894 if ts, ok := v[0].(time.Time); ok { 895 v[0] = ts.UnixNano() / divisor 896 } 897 } 898 } 899} 900 901// servePromWrite receives data in the Prometheus remote write protocol and writes it 902// to the database 903func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user meta.User) { 904 atomic.AddInt64(&h.stats.WriteRequests, 1) 905 atomic.AddInt64(&h.stats.ActiveWriteRequests, 1) 906 atomic.AddInt64(&h.stats.PromWriteRequests, 1) 907 defer func(start time.Time) { 908 atomic.AddInt64(&h.stats.ActiveWriteRequests, -1) 909 atomic.AddInt64(&h.stats.WriteRequestDuration, time.Since(start).Nanoseconds()) 910 }(time.Now()) 911 h.requestTracker.Add(r, user) 912 913 database := r.URL.Query().Get("db") 914 if database == "" { 915 h.httpError(w, "database is required", http.StatusBadRequest) 916 return 917 } 918 919 if di := h.MetaClient.Database(database); di == nil { 920 h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound) 921 return 922 } 923 924 if h.Config.AuthEnabled { 925 if user == nil { 926 h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden) 927 return 928 } 929 930 if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil { 931 h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden) 932 return 933 } 934 } 935 936 body := r.Body 937 if h.Config.MaxBodySize > 0 { 938 body = truncateReader(body, int64(h.Config.MaxBodySize)) 939 } 940 941 var bs []byte 942 if r.ContentLength > 0 { 943 if h.Config.MaxBodySize > 0 && r.ContentLength > int64(h.Config.MaxBodySize) { 944 h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) 945 return 946 } 947 948 // This will just be an initial hint for the reader, as the 949 // bytes.Buffer will grow as needed when ReadFrom is called 950 bs = make([]byte, 0, r.ContentLength) 951 } 952 buf := bytes.NewBuffer(bs) 953 954 _, err := buf.ReadFrom(body) 955 if err != nil { 956 if err == errTruncated { 957 h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) 958 return 959 } 960 961 if h.Config.WriteTracing { 962 h.Logger.Info("Prom write handler unable to read bytes from request body") 963 } 964 h.httpError(w, err.Error(), http.StatusBadRequest) 965 return 966 } 967 atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len())) 968 969 if h.Config.WriteTracing { 970 h.Logger.Info("Prom write body received by handler", zap.ByteString("body", buf.Bytes())) 971 } 972 973 reqBuf, err := snappy.Decode(nil, buf.Bytes()) 974 if err != nil { 975 h.httpError(w, err.Error(), http.StatusBadRequest) 976 return 977 } 978 979 // Convert the Prometheus remote write request to Influx Points 980 var req remote.WriteRequest 981 if err := proto.Unmarshal(reqBuf, &req); err != nil { 982 h.httpError(w, err.Error(), http.StatusBadRequest) 983 return 984 } 985 986 points, err := prometheus.WriteRequestToPoints(&req) 987 if err != nil { 988 if h.Config.WriteTracing { 989 h.Logger.Info("Prom write handler", zap.Error(err)) 990 } 991 992 // Check if the error was from something other than dropping invalid values. 993 if _, ok := err.(prometheus.DroppedValuesError); !ok { 994 h.httpError(w, err.Error(), http.StatusBadRequest) 995 return 996 } 997 } 998 999 // Determine required consistency level. 1000 level := r.URL.Query().Get("consistency") 1001 consistency := models.ConsistencyLevelOne 1002 if level != "" { 1003 consistency, err = models.ParseConsistencyLevel(level) 1004 if err != nil { 1005 h.httpError(w, err.Error(), http.StatusBadRequest) 1006 return 1007 } 1008 } 1009 1010 // Write points. 1011 if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) { 1012 atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) 1013 h.httpError(w, err.Error(), http.StatusBadRequest) 1014 return 1015 } else if influxdb.IsAuthorizationError(err) { 1016 atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) 1017 h.httpError(w, err.Error(), http.StatusForbidden) 1018 return 1019 } else if werr, ok := err.(tsdb.PartialWriteError); ok { 1020 atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)-werr.Dropped)) 1021 atomic.AddInt64(&h.stats.PointsWrittenDropped, int64(werr.Dropped)) 1022 h.httpError(w, werr.Error(), http.StatusBadRequest) 1023 return 1024 } else if err != nil { 1025 atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) 1026 h.httpError(w, err.Error(), http.StatusInternalServerError) 1027 return 1028 } 1029 1030 atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points))) 1031 h.writeHeader(w, http.StatusNoContent) 1032} 1033 1034// servePromRead will convert a Prometheus remote read request into a storage 1035// query and returns data in Prometheus remote read protobuf format. 1036func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user meta.User) { 1037 compressed, err := ioutil.ReadAll(r.Body) 1038 if err != nil { 1039 h.httpError(w, err.Error(), http.StatusInternalServerError) 1040 return 1041 } 1042 1043 reqBuf, err := snappy.Decode(nil, compressed) 1044 if err != nil { 1045 h.httpError(w, err.Error(), http.StatusBadRequest) 1046 return 1047 } 1048 1049 var req remote.ReadRequest 1050 if err := proto.Unmarshal(reqBuf, &req); err != nil { 1051 h.httpError(w, err.Error(), http.StatusBadRequest) 1052 return 1053 } 1054 1055 // Query the DB and create a ReadResponse for Prometheus 1056 db := r.FormValue("db") 1057 rp := r.FormValue("rp") 1058 1059 readRequest, err := prometheus.ReadRequestToInfluxStorageRequest(&req, db, rp) 1060 if err != nil { 1061 h.httpError(w, err.Error(), http.StatusBadRequest) 1062 return 1063 } 1064 1065 respond := func(resp *remote.ReadResponse) { 1066 data, err := proto.Marshal(resp) 1067 if err != nil { 1068 h.httpError(w, err.Error(), http.StatusInternalServerError) 1069 return 1070 } 1071 1072 w.Header().Set("Content-Type", "application/x-protobuf") 1073 w.Header().Set("Content-Encoding", "snappy") 1074 1075 compressed = snappy.Encode(nil, data) 1076 if _, err := w.Write(compressed); err != nil { 1077 h.httpError(w, err.Error(), http.StatusInternalServerError) 1078 return 1079 } 1080 1081 atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(len(compressed))) 1082 } 1083 1084 ctx := context.Background() 1085 rs, err := h.Store.Read(ctx, readRequest) 1086 if err != nil { 1087 h.httpError(w, err.Error(), http.StatusBadRequest) 1088 return 1089 } 1090 1091 resp := &remote.ReadResponse{ 1092 Results: []*remote.QueryResult{{}}, 1093 } 1094 1095 if rs == nil { 1096 respond(resp) 1097 return 1098 } 1099 defer rs.Close() 1100 1101 for rs.Next() { 1102 cur := rs.Cursor() 1103 if cur == nil { 1104 // no data for series key + field combination 1105 continue 1106 } 1107 1108 tags := prometheus.RemoveInfluxSystemTags(rs.Tags()) 1109 var unsupportedCursor string 1110 switch cur := cur.(type) { 1111 case tsdb.FloatArrayCursor: 1112 var series *remote.TimeSeries 1113 for { 1114 a := cur.Next() 1115 if a.Len() == 0 { 1116 break 1117 } 1118 1119 // We have some data for this series. 1120 if series == nil { 1121 series = &remote.TimeSeries{ 1122 Labels: prometheus.ModelTagsToLabelPairs(tags), 1123 } 1124 } 1125 1126 for i, ts := range a.Timestamps { 1127 series.Samples = append(series.Samples, &remote.Sample{ 1128 TimestampMs: ts / int64(time.Millisecond), 1129 Value: a.Values[i], 1130 }) 1131 } 1132 } 1133 1134 // There was data for the series. 1135 if series != nil { 1136 resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, series) 1137 } 1138 case tsdb.IntegerArrayCursor: 1139 unsupportedCursor = "int64" 1140 case tsdb.UnsignedArrayCursor: 1141 unsupportedCursor = "uint" 1142 case tsdb.BooleanArrayCursor: 1143 unsupportedCursor = "bool" 1144 case tsdb.StringArrayCursor: 1145 unsupportedCursor = "string" 1146 default: 1147 panic(fmt.Sprintf("unreachable: %T", cur)) 1148 } 1149 cur.Close() 1150 1151 if len(unsupportedCursor) > 0 { 1152 h.Logger.Info("Prometheus can't read cursor", 1153 zap.String("cursor_type", unsupportedCursor), 1154 zap.Stringer("series", tags), 1155 ) 1156 } 1157 } 1158 1159 respond(resp) 1160} 1161 1162func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user meta.User) { 1163 atomic.AddInt64(&h.stats.FluxQueryRequests, 1) 1164 defer func(start time.Time) { 1165 atomic.AddInt64(&h.stats.FluxQueryRequestDuration, time.Since(start).Nanoseconds()) 1166 }(time.Now()) 1167 1168 req, err := decodeQueryRequest(r) 1169 if err != nil { 1170 h.httpError(w, err.Error(), http.StatusBadRequest) 1171 return 1172 } 1173 1174 ctx := r.Context() 1175 if val := r.FormValue("node_id"); val != "" { 1176 if nodeID, err := strconv.ParseUint(val, 10, 64); err == nil { 1177 ctx = storage.NewContextWithReadOptions(ctx, &storage.ReadOptions{NodeID: nodeID}) 1178 } 1179 } 1180 1181 if h.Config.AuthEnabled { 1182 ctx = meta.NewContextWithUser(ctx, user) 1183 } 1184 1185 pr := req.ProxyRequest() 1186 1187 // Logging 1188 var ( 1189 stats flux.Statistics 1190 n int64 1191 ) 1192 if h.Config.FluxLogEnabled { 1193 defer func() { 1194 h.logFluxQuery(n, stats, pr.Compiler, err) 1195 }() 1196 } 1197 1198 q, err := h.Controller.Query(ctx, pr.Compiler) 1199 if err != nil { 1200 h.httpError(w, err.Error(), http.StatusInternalServerError) 1201 return 1202 } 1203 defer func() { 1204 q.Cancel() 1205 q.Done() 1206 }() 1207 1208 // NOTE: We do not write out the headers here. 1209 // It is possible that if the encoding step fails 1210 // that we can write an error header so long as 1211 // the encoder did not write anything. 1212 // As such we rely on the http.ResponseWriter behavior 1213 // to write an StatusOK header with the first write. 1214 1215 switch r.Header.Get("Accept") { 1216 case "text/csv": 1217 fallthrough 1218 default: 1219 if hd, ok := pr.Dialect.(httpDialect); !ok { 1220 h.httpError(w, fmt.Sprintf("unsupported dialect over HTTP %T", req.Dialect), http.StatusBadRequest) 1221 return 1222 } else { 1223 hd.SetHeaders(w) 1224 } 1225 encoder := pr.Dialect.Encoder() 1226 results := flux.NewResultIteratorFromQuery(q) 1227 if h.Config.FluxLogEnabled { 1228 defer func() { 1229 stats = results.Statistics() 1230 }() 1231 } 1232 defer results.Release() 1233 1234 n, err = encoder.Encode(w, results) 1235 if err != nil { 1236 if n == 0 { 1237 // If the encoder did not write anything, we can write an error header. 1238 h.httpError(w, err.Error(), http.StatusInternalServerError) 1239 } 1240 } 1241 } 1242} 1243 1244func (h *Handler) logFluxQuery(n int64, stats flux.Statistics, compiler flux.Compiler, err error) { 1245 var q string 1246 switch c := compiler.(type) { 1247 case lang.SpecCompiler: 1248 q = fmt.Sprint(flux.Formatted(c.Spec)) 1249 case lang.FluxCompiler: 1250 q = c.Query 1251 } 1252 1253 h.Logger.Info("Executed Flux query", 1254 zap.String("compiler_type", string(compiler.CompilerType())), 1255 zap.Int64("response_size", n), 1256 zap.String("query", q), 1257 zap.Error(err), 1258 zap.Duration("stat_total_duration", stats.TotalDuration), 1259 zap.Duration("stat_compile_duration", stats.CompileDuration), 1260 zap.Duration("stat_queue_duration", stats.QueueDuration), 1261 zap.Duration("stat_plan_duration", stats.PlanDuration), 1262 zap.Duration("stat_requeue_duration", stats.RequeueDuration), 1263 zap.Duration("stat_execute_duration", stats.ExecuteDuration), 1264 zap.Int64("stat_max_allocated", stats.MaxAllocated), 1265 zap.Int("stat_concurrency", stats.Concurrency), 1266 ) 1267} 1268 1269// serveExpvar serves internal metrics in /debug/vars format over HTTP. 1270func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { 1271 // Retrieve statistics from the monitor. 1272 stats, err := h.Monitor.Statistics(nil) 1273 if err != nil { 1274 h.httpError(w, err.Error(), http.StatusInternalServerError) 1275 return 1276 } 1277 1278 // Retrieve diagnostics from the monitor. 1279 diags, err := h.Monitor.Diagnostics() 1280 if err != nil { 1281 h.httpError(w, err.Error(), http.StatusInternalServerError) 1282 return 1283 } 1284 1285 w.Header().Set("Content-Type", "application/json; charset=utf-8") 1286 1287 first := true 1288 if val := diags["system"]; val != nil { 1289 jv, err := parseSystemDiagnostics(val) 1290 if err != nil { 1291 h.httpError(w, err.Error(), http.StatusInternalServerError) 1292 return 1293 } 1294 1295 data, err := json.Marshal(jv) 1296 if err != nil { 1297 h.httpError(w, err.Error(), http.StatusInternalServerError) 1298 return 1299 } 1300 1301 first = false 1302 fmt.Fprintln(w, "{") 1303 fmt.Fprintf(w, "\"system\": %s", data) 1304 } else { 1305 fmt.Fprintln(w, "{") 1306 } 1307 1308 if val := expvar.Get("cmdline"); val != nil { 1309 if !first { 1310 fmt.Fprintln(w, ",") 1311 } 1312 first = false 1313 fmt.Fprintf(w, "\"cmdline\": %s", val) 1314 } 1315 if val := expvar.Get("memstats"); val != nil { 1316 if !first { 1317 fmt.Fprintln(w, ",") 1318 } 1319 first = false 1320 fmt.Fprintf(w, "\"memstats\": %s", val) 1321 } 1322 1323 for _, s := range stats { 1324 val, err := json.Marshal(s) 1325 if err != nil { 1326 continue 1327 } 1328 1329 // Very hackily create a unique key. 1330 buf := bytes.NewBufferString(s.Name) 1331 if path, ok := s.Tags["path"]; ok { 1332 fmt.Fprintf(buf, ":%s", path) 1333 if id, ok := s.Tags["id"]; ok { 1334 fmt.Fprintf(buf, ":%s", id) 1335 } 1336 } else if bind, ok := s.Tags["bind"]; ok { 1337 if proto, ok := s.Tags["proto"]; ok { 1338 fmt.Fprintf(buf, ":%s", proto) 1339 } 1340 fmt.Fprintf(buf, ":%s", bind) 1341 } else if database, ok := s.Tags["database"]; ok { 1342 fmt.Fprintf(buf, ":%s", database) 1343 if rp, ok := s.Tags["retention_policy"]; ok { 1344 fmt.Fprintf(buf, ":%s", rp) 1345 if name, ok := s.Tags["name"]; ok { 1346 fmt.Fprintf(buf, ":%s", name) 1347 } 1348 if dest, ok := s.Tags["destination"]; ok { 1349 fmt.Fprintf(buf, ":%s", dest) 1350 } 1351 } 1352 } 1353 key := buf.String() 1354 1355 if !first { 1356 fmt.Fprintln(w, ",") 1357 } 1358 first = false 1359 fmt.Fprintf(w, "%q: ", key) 1360 w.Write(bytes.TrimSpace(val)) 1361 } 1362 fmt.Fprintln(w, "\n}") 1363} 1364 1365// serveDebugRequests will track requests for a period of time. 1366func (h *Handler) serveDebugRequests(w http.ResponseWriter, r *http.Request) { 1367 var d time.Duration 1368 if s := r.URL.Query().Get("seconds"); s == "" { 1369 d = DefaultDebugRequestsInterval 1370 } else if seconds, err := strconv.ParseInt(s, 10, 64); err != nil { 1371 h.httpError(w, err.Error(), http.StatusBadRequest) 1372 return 1373 } else { 1374 d = time.Duration(seconds) * time.Second 1375 if d > MaxDebugRequestsInterval { 1376 h.httpError(w, fmt.Sprintf("exceeded maximum interval time: %s > %s", 1377 influxql.FormatDuration(d), 1378 influxql.FormatDuration(MaxDebugRequestsInterval)), 1379 http.StatusBadRequest) 1380 return 1381 } 1382 } 1383 1384 var closing <-chan bool 1385 if notifier, ok := w.(http.CloseNotifier); ok { 1386 closing = notifier.CloseNotify() 1387 } 1388 1389 profile := h.requestTracker.TrackRequests() 1390 1391 timer := time.NewTimer(d) 1392 select { 1393 case <-timer.C: 1394 profile.Stop() 1395 case <-closing: 1396 // Connection was closed early. 1397 profile.Stop() 1398 timer.Stop() 1399 return 1400 } 1401 1402 w.Header().Set("Content-Type", "application/json; charset=utf-8") 1403 w.Header().Add("Connection", "close") 1404 1405 fmt.Fprintln(w, "{") 1406 first := true 1407 for req, st := range profile.Requests { 1408 val, err := json.Marshal(st) 1409 if err != nil { 1410 continue 1411 } 1412 1413 if !first { 1414 fmt.Fprintln(w, ",") 1415 } 1416 first = false 1417 fmt.Fprintf(w, "%q: ", req.String()) 1418 w.Write(bytes.TrimSpace(val)) 1419 } 1420 fmt.Fprintln(w, "\n}") 1421} 1422 1423// parseSystemDiagnostics converts the system diagnostics into an appropriate 1424// format for marshaling to JSON in the /debug/vars format. 1425func parseSystemDiagnostics(d *diagnostics.Diagnostics) (map[string]interface{}, error) { 1426 // We don't need PID in this case. 1427 m := map[string]interface{}{"currentTime": nil, "started": nil, "uptime": nil} 1428 for key := range m { 1429 // Find the associated column. 1430 ci := -1 1431 for i, col := range d.Columns { 1432 if col == key { 1433 ci = i 1434 break 1435 } 1436 } 1437 1438 if ci == -1 { 1439 return nil, fmt.Errorf("unable to find column %q", key) 1440 } 1441 1442 if len(d.Rows) < 1 || len(d.Rows[0]) <= ci { 1443 return nil, fmt.Errorf("no data for column %q", key) 1444 } 1445 1446 var res interface{} 1447 switch v := d.Rows[0][ci].(type) { 1448 case time.Time: 1449 res = v 1450 case string: 1451 // Should be a string representation of a time.Duration 1452 d, err := time.ParseDuration(v) 1453 if err != nil { 1454 return nil, err 1455 } 1456 res = int64(d.Seconds()) 1457 default: 1458 return nil, fmt.Errorf("value for column %q is not parsable (got %T)", key, v) 1459 } 1460 m[key] = res 1461 } 1462 return m, nil 1463} 1464 1465// httpError writes an error to the client in a standard format. 1466func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) { 1467 if code == http.StatusUnauthorized { 1468 // If an unauthorized header will be sent back, add a WWW-Authenticate header 1469 // as an authorization challenge. 1470 w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=\"%s\"", h.Config.Realm)) 1471 } else if code/100 != 2 { 1472 sz := math.Min(float64(len(errmsg)), 1024.0) 1473 w.Header().Set("X-InfluxDB-Error", errmsg[:int(sz)]) 1474 } 1475 1476 response := Response{Err: errors.New(errmsg)} 1477 if rw, ok := w.(ResponseWriter); ok { 1478 h.writeHeader(w, code) 1479 rw.WriteResponse(response) 1480 return 1481 } 1482 1483 // Default implementation if the response writer hasn't been replaced 1484 // with our special response writer type. 1485 w.Header().Add("Content-Type", "application/json") 1486 h.writeHeader(w, code) 1487 b, _ := json.Marshal(response) 1488 w.Write(b) 1489} 1490 1491// Filters and filter helpers 1492 1493type credentials struct { 1494 Method AuthenticationMethod 1495 Username string 1496 Password string 1497 Token string 1498} 1499 1500func parseToken(token string) (user, pass string, ok bool) { 1501 s := strings.IndexByte(token, ':') 1502 if s < 0 { 1503 return 1504 } 1505 return token[:s], token[s+1:], true 1506} 1507 1508// parseCredentials parses a request and returns the authentication credentials. 1509// The credentials may be present as URL query params, or as a Basic 1510// Authentication header. 1511// As params: http://127.0.0.1/query?u=username&p=password 1512// As basic auth: http://username:password@127.0.0.1 1513// As Bearer token in Authorization header: Bearer <JWT_TOKEN_BLOB> 1514// As Token in Authorization header: Token <username:password> 1515func parseCredentials(r *http.Request) (*credentials, error) { 1516 q := r.URL.Query() 1517 1518 // Check for username and password in URL params. 1519 if u, p := q.Get("u"), q.Get("p"); u != "" && p != "" { 1520 return &credentials{ 1521 Method: UserAuthentication, 1522 Username: u, 1523 Password: p, 1524 }, nil 1525 } 1526 1527 // Check for the HTTP Authorization header. 1528 if s := r.Header.Get("Authorization"); s != "" { 1529 // Check for Bearer token. 1530 strs := strings.Split(s, " ") 1531 if len(strs) == 2 { 1532 switch strs[0] { 1533 case "Bearer": 1534 return &credentials{ 1535 Method: BearerAuthentication, 1536 Token: strs[1], 1537 }, nil 1538 case "Token": 1539 if u, p, ok := parseToken(strs[1]); ok { 1540 return &credentials{ 1541 Method: UserAuthentication, 1542 Username: u, 1543 Password: p, 1544 }, nil 1545 } 1546 } 1547 } 1548 1549 // Check for basic auth. 1550 if u, p, ok := r.BasicAuth(); ok { 1551 return &credentials{ 1552 Method: UserAuthentication, 1553 Username: u, 1554 Password: p, 1555 }, nil 1556 } 1557 } 1558 1559 return nil, fmt.Errorf("unable to parse authentication credentials") 1560} 1561 1562// authenticate wraps a handler and ensures that if user credentials are passed in 1563// an attempt is made to authenticate that user. If authentication fails, an error is returned. 1564// 1565// There is one exception: if there are no users in the system, authentication is not required. This 1566// is to facilitate bootstrapping of a system with authentication enabled. 1567func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h *Handler, requireAuthentication bool) http.Handler { 1568 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1569 // Return early if we are not authenticating 1570 if !requireAuthentication { 1571 inner(w, r, nil) 1572 return 1573 } 1574 var user meta.User 1575 1576 // TODO corylanou: never allow this in the future without users 1577 if requireAuthentication && h.MetaClient.AdminUserExists() { 1578 creds, err := parseCredentials(r) 1579 if err != nil { 1580 atomic.AddInt64(&h.stats.AuthenticationFailures, 1) 1581 h.httpError(w, err.Error(), http.StatusUnauthorized) 1582 return 1583 } 1584 1585 switch creds.Method { 1586 case UserAuthentication: 1587 if creds.Username == "" { 1588 atomic.AddInt64(&h.stats.AuthenticationFailures, 1) 1589 h.httpError(w, "username required", http.StatusUnauthorized) 1590 return 1591 } 1592 1593 user, err = h.MetaClient.Authenticate(creds.Username, creds.Password) 1594 if err != nil { 1595 atomic.AddInt64(&h.stats.AuthenticationFailures, 1) 1596 h.httpError(w, "authorization failed", http.StatusUnauthorized) 1597 return 1598 } 1599 case BearerAuthentication: 1600 if h.Config.SharedSecret == "" { 1601 atomic.AddInt64(&h.stats.AuthenticationFailures, 1) 1602 h.httpError(w, "bearer auth disabled", http.StatusUnauthorized) 1603 return 1604 } 1605 keyLookupFn := func(token *jwt.Token) (interface{}, error) { 1606 // Check for expected signing method. 1607 if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { 1608 return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"]) 1609 } 1610 return []byte(h.Config.SharedSecret), nil 1611 } 1612 1613 // Parse and validate the token. 1614 token, err := jwt.Parse(creds.Token, keyLookupFn) 1615 if err != nil { 1616 h.httpError(w, err.Error(), http.StatusUnauthorized) 1617 return 1618 } else if !token.Valid { 1619 h.httpError(w, "invalid token", http.StatusUnauthorized) 1620 return 1621 } 1622 1623 claims, ok := token.Claims.(jwt.MapClaims) 1624 if !ok { 1625 h.httpError(w, "problem authenticating token", http.StatusInternalServerError) 1626 h.Logger.Info("Could not assert JWT token claims as jwt.MapClaims") 1627 return 1628 } 1629 1630 // Make sure an expiration was set on the token. 1631 if exp, ok := claims["exp"].(float64); !ok || exp <= 0.0 { 1632 h.httpError(w, "token expiration required", http.StatusUnauthorized) 1633 return 1634 } 1635 1636 // Get the username from the token. 1637 username, ok := claims["username"].(string) 1638 if !ok { 1639 h.httpError(w, "username in token must be a string", http.StatusUnauthorized) 1640 return 1641 } else if username == "" { 1642 h.httpError(w, "token must contain a username", http.StatusUnauthorized) 1643 return 1644 } 1645 1646 // Lookup user in the metastore. 1647 if user, err = h.MetaClient.User(username); err != nil { 1648 h.httpError(w, err.Error(), http.StatusUnauthorized) 1649 return 1650 } else if user == nil { 1651 h.httpError(w, meta.ErrUserNotFound.Error(), http.StatusUnauthorized) 1652 return 1653 } 1654 default: 1655 h.httpError(w, "unsupported authentication", http.StatusUnauthorized) 1656 } 1657 1658 } 1659 inner(w, r, user) 1660 }) 1661} 1662 1663// cors responds to incoming requests and adds the appropriate cors headers 1664// TODO: corylanou: add the ability to configure this in our config 1665func cors(inner http.Handler) http.Handler { 1666 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1667 if origin := r.Header.Get("Origin"); origin != "" { 1668 w.Header().Set(`Access-Control-Allow-Origin`, origin) 1669 w.Header().Set(`Access-Control-Allow-Methods`, strings.Join([]string{ 1670 `DELETE`, 1671 `GET`, 1672 `OPTIONS`, 1673 `POST`, 1674 `PUT`, 1675 }, ", ")) 1676 1677 w.Header().Set(`Access-Control-Allow-Headers`, strings.Join([]string{ 1678 `Accept`, 1679 `Accept-Encoding`, 1680 `Authorization`, 1681 `Content-Length`, 1682 `Content-Type`, 1683 `X-CSRF-Token`, 1684 `X-HTTP-Method-Override`, 1685 }, ", ")) 1686 1687 w.Header().Set(`Access-Control-Expose-Headers`, strings.Join([]string{ 1688 `Date`, 1689 `X-InfluxDB-Version`, 1690 `X-InfluxDB-Build`, 1691 }, ", ")) 1692 } 1693 1694 if r.Method == "OPTIONS" { 1695 return 1696 } 1697 1698 inner.ServeHTTP(w, r) 1699 }) 1700} 1701 1702func requestID(inner http.Handler) http.Handler { 1703 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1704 // X-Request-Id takes priority. 1705 rid := r.Header.Get("X-Request-Id") 1706 1707 // If X-Request-Id is empty, then check Request-Id 1708 if rid == "" { 1709 rid = r.Header.Get("Request-Id") 1710 } 1711 1712 // If Request-Id is empty then generate a v1 UUID. 1713 if rid == "" { 1714 rid = uuid.TimeUUID().String() 1715 } 1716 1717 // We read Request-Id in other handler code so we'll use that naming 1718 // convention from this point in the request cycle. 1719 r.Header.Set("Request-Id", rid) 1720 1721 // Set the request ID on the response headers. 1722 // X-Request-Id is the most common name for a request ID header. 1723 w.Header().Set("X-Request-Id", rid) 1724 1725 // We will also set Request-Id for backwards compatibility with previous 1726 // versions of InfluxDB. 1727 w.Header().Set("Request-Id", rid) 1728 1729 inner.ServeHTTP(w, r) 1730 }) 1731} 1732 1733func (h *Handler) logging(inner http.Handler, name string) http.Handler { 1734 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1735 start := time.Now() 1736 l := &responseLogger{w: w} 1737 inner.ServeHTTP(l, r) 1738 1739 if h.accessLogFilters.Match(l.Status()) { 1740 h.CLFLogger.Println(buildLogLine(l, r, start)) 1741 } 1742 1743 // Log server errors. 1744 if l.Status()/100 == 5 { 1745 errStr := l.Header().Get("X-InfluxDB-Error") 1746 if errStr != "" { 1747 h.Logger.Error(fmt.Sprintf("[%d] - %q", l.Status(), errStr)) 1748 } 1749 } 1750 }) 1751} 1752 1753func (h *Handler) responseWriter(inner http.Handler) http.Handler { 1754 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1755 w = NewResponseWriter(w, r) 1756 inner.ServeHTTP(w, r) 1757 }) 1758} 1759 1760// if the env var is set, and the value is truthy, then we will *not* 1761// recover from a panic. 1762var willCrash bool 1763 1764func init() { 1765 var err error 1766 if willCrash, err = strconv.ParseBool(os.Getenv(query.PanicCrashEnv)); err != nil { 1767 willCrash = false 1768 } 1769} 1770 1771func (h *Handler) recovery(inner http.Handler, name string) http.Handler { 1772 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1773 start := time.Now() 1774 l := &responseLogger{w: w} 1775 1776 defer func() { 1777 if err := recover(); err != nil { 1778 logLine := buildLogLine(l, r, start) 1779 logLine = fmt.Sprintf("%s [panic:%s] %s", logLine, err, debug.Stack()) 1780 h.CLFLogger.Println(logLine) 1781 http.Error(w, http.StatusText(http.StatusInternalServerError), 500) 1782 atomic.AddInt64(&h.stats.RecoveredPanics, 1) // Capture the panic in _internal stats. 1783 1784 if willCrash { 1785 h.CLFLogger.Println("\n\n=====\nAll goroutines now follow:") 1786 buf := debug.Stack() 1787 h.CLFLogger.Printf("%s\n", buf) 1788 os.Exit(1) // If we panic then the Go server will recover. 1789 } 1790 } 1791 }() 1792 1793 inner.ServeHTTP(l, r) 1794 }) 1795} 1796 1797// Store describes the behaviour of the storage packages Store type. 1798type Store interface { 1799 Read(ctx context.Context, req *datatypes.ReadRequest) (reads.ResultSet, error) 1800} 1801 1802// Response represents a list of statement results. 1803type Response struct { 1804 Results []*query.Result 1805 Err error 1806} 1807 1808// MarshalJSON encodes a Response struct into JSON. 1809func (r Response) MarshalJSON() ([]byte, error) { 1810 // Define a struct that outputs "error" as a string. 1811 var o struct { 1812 Results []*query.Result `json:"results,omitempty"` 1813 Err string `json:"error,omitempty"` 1814 } 1815 1816 // Copy fields to output struct. 1817 o.Results = r.Results 1818 if r.Err != nil { 1819 o.Err = r.Err.Error() 1820 } 1821 1822 return json.Marshal(&o) 1823} 1824 1825// UnmarshalJSON decodes the data into the Response struct. 1826func (r *Response) UnmarshalJSON(b []byte) error { 1827 var o struct { 1828 Results []*query.Result `json:"results,omitempty"` 1829 Err string `json:"error,omitempty"` 1830 } 1831 1832 err := json.Unmarshal(b, &o) 1833 if err != nil { 1834 return err 1835 } 1836 r.Results = o.Results 1837 if o.Err != "" { 1838 r.Err = errors.New(o.Err) 1839 } 1840 return nil 1841} 1842 1843// Error returns the first error from any statement. 1844// Returns nil if no errors occurred on any statements. 1845func (r *Response) Error() error { 1846 if r.Err != nil { 1847 return r.Err 1848 } 1849 for _, rr := range r.Results { 1850 if rr.Err != nil { 1851 return rr.Err 1852 } 1853 } 1854 return nil 1855} 1856 1857// Throttler represents an HTTP throttler that limits the number of concurrent 1858// requests being processed as well as the number of enqueued requests. 1859type Throttler struct { 1860 current chan struct{} 1861 enqueued chan struct{} 1862 1863 // Maximum amount of time requests can wait in queue. 1864 // Must be set before adding middleware. 1865 EnqueueTimeout time.Duration 1866 1867 Logger *zap.Logger 1868} 1869 1870// NewThrottler returns a new instance of Throttler that limits to concurrentN. 1871// requests processed at a time and maxEnqueueN requests waiting to be processed. 1872func NewThrottler(concurrentN, maxEnqueueN int) *Throttler { 1873 return &Throttler{ 1874 current: make(chan struct{}, concurrentN), 1875 enqueued: make(chan struct{}, concurrentN+maxEnqueueN), 1876 Logger: zap.NewNop(), 1877 } 1878} 1879 1880// Handler wraps h in a middleware handler that throttles requests. 1881func (t *Throttler) Handler(h http.Handler) http.Handler { 1882 timeout := t.EnqueueTimeout 1883 1884 // Return original handler if concurrent requests is zero. 1885 if cap(t.current) == 0 { 1886 return h 1887 } 1888 1889 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1890 // Start a timer to limit enqueued request times. 1891 var timerCh <-chan time.Time 1892 if timeout > 0 { 1893 timer := time.NewTimer(timeout) 1894 defer timer.Stop() 1895 timerCh = timer.C 1896 } 1897 1898 // Wait for a spot in the queue. 1899 if cap(t.enqueued) > cap(t.current) { 1900 select { 1901 case t.enqueued <- struct{}{}: 1902 defer func() { <-t.enqueued }() 1903 default: 1904 t.Logger.Warn("request throttled, queue full", zap.Duration("d", timeout)) 1905 http.Error(w, "request throttled, queue full", http.StatusServiceUnavailable) 1906 return 1907 } 1908 } 1909 1910 // First check if we can immediately send in to current because there is 1911 // available capacity. This helps reduce racyness in tests. 1912 select { 1913 case t.current <- struct{}{}: 1914 default: 1915 // Wait for a spot in the list of concurrent requests, but allow checking the timeout. 1916 select { 1917 case t.current <- struct{}{}: 1918 case <-timerCh: 1919 t.Logger.Warn("request throttled, exceeds timeout", zap.Duration("d", timeout)) 1920 http.Error(w, "request throttled, exceeds timeout", http.StatusServiceUnavailable) 1921 return 1922 } 1923 } 1924 defer func() { <-t.current }() 1925 1926 // Execute request. 1927 h.ServeHTTP(w, r) 1928 }) 1929} 1930