1package query 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "os" 8 "runtime/debug" 9 "strconv" 10 "sync" 11 "sync/atomic" 12 "time" 13 14 "github.com/influxdata/influxdb/models" 15 "github.com/influxdata/influxql" 16 "go.uber.org/zap" 17) 18 19var ( 20 // ErrInvalidQuery is returned when executing an unknown query type. 21 ErrInvalidQuery = errors.New("invalid query") 22 23 // ErrNotExecuted is returned when a statement is not executed in a query. 24 // This can occur when a previous statement in the same query has errored. 25 ErrNotExecuted = errors.New("not executed") 26 27 // ErrQueryInterrupted is an error returned when the query is interrupted. 28 ErrQueryInterrupted = errors.New("query interrupted") 29 30 // ErrQueryAborted is an error returned when the query is aborted. 31 ErrQueryAborted = errors.New("query aborted") 32 33 // ErrQueryEngineShutdown is an error sent when the query cannot be 34 // created because the query engine was shutdown. 35 ErrQueryEngineShutdown = errors.New("query engine shutdown") 36 37 // ErrQueryTimeoutLimitExceeded is an error when a query hits the max time allowed to run. 38 ErrQueryTimeoutLimitExceeded = errors.New("query-timeout limit exceeded") 39 40 // ErrAlreadyKilled is returned when attempting to kill a query that has already been killed. 41 ErrAlreadyKilled = errors.New("already killed") 42) 43 44// Statistics for the Executor 45const ( 46 statQueriesActive = "queriesActive" // Number of queries currently being executed. 47 statQueriesExecuted = "queriesExecuted" // Number of queries that have been executed (started). 48 statQueriesFinished = "queriesFinished" // Number of queries that have finished. 49 statQueryExecutionDuration = "queryDurationNs" // Total (wall) time spent executing queries. 50 statRecoveredPanics = "recoveredPanics" // Number of panics recovered by Query Executor. 51 52 // PanicCrashEnv is the environment variable that, when set, will prevent 53 // the handler from recovering any panics. 54 PanicCrashEnv = "INFLUXDB_PANIC_CRASH" 55) 56 57// ErrDatabaseNotFound returns a database not found error for the given database name. 58func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not found: %s", name) } 59 60// ErrMaxSelectPointsLimitExceeded is an error when a query hits the maximum number of points. 61func ErrMaxSelectPointsLimitExceeded(n, limit int) error { 62 return fmt.Errorf("max-select-point limit exceeed: (%d/%d)", n, limit) 63} 64 65// ErrMaxConcurrentQueriesLimitExceeded is an error when a query cannot be run 66// because the maximum number of queries has been reached. 67func ErrMaxConcurrentQueriesLimitExceeded(n, limit int) error { 68 return fmt.Errorf("max-concurrent-queries limit exceeded(%d, %d)", n, limit) 69} 70 71// Authorizer determines if certain operations are authorized. 72type Authorizer interface { 73 // AuthorizeDatabase indicates whether the given Privilege is authorized on the database with the given name. 74 AuthorizeDatabase(p influxql.Privilege, name string) bool 75 76 // AuthorizeQuery returns an error if the query cannot be executed 77 AuthorizeQuery(database string, query *influxql.Query) error 78 79 // AuthorizeSeriesRead determines if a series is authorized for reading 80 AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool 81 82 // AuthorizeSeriesWrite determines if a series is authorized for writing 83 AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool 84} 85 86// OpenAuthorizer is the Authorizer used when authorization is disabled. 87// It allows all operations. 88type openAuthorizer struct{} 89 90// OpenAuthorizer can be shared by all goroutines. 91var OpenAuthorizer = openAuthorizer{} 92 93// AuthorizeDatabase returns true to allow any operation on a database. 94func (a openAuthorizer) AuthorizeDatabase(influxql.Privilege, string) bool { return true } 95 96// AuthorizeSeriesRead allows access to any series. 97func (a openAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool { 98 return true 99} 100 101// AuthorizeSeriesWrite allows access to any series. 102func (a openAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool { 103 return true 104} 105 106// AuthorizeSeriesRead allows any query to execute. 107func (a openAuthorizer) AuthorizeQuery(_ string, _ *influxql.Query) error { return nil } 108 109// AuthorizerIsOpen returns true if the provided Authorizer is guaranteed to 110// authorize anything. A nil Authorizer returns true for this function, and this 111// function should be preferred over directly checking if an Authorizer is nil 112// or not. 113func AuthorizerIsOpen(a Authorizer) bool { 114 if u, ok := a.(interface{ AuthorizeUnrestricted() bool }); ok { 115 return u.AuthorizeUnrestricted() 116 } 117 return a == nil || a == OpenAuthorizer 118} 119 120// ExecutionOptions contains the options for executing a query. 121type ExecutionOptions struct { 122 // The database the query is running against. 123 Database string 124 125 // The retention policy the query is running against. 126 RetentionPolicy string 127 128 // How to determine whether the query is allowed to execute, 129 // what resources can be returned in SHOW queries, etc. 130 Authorizer Authorizer 131 132 // The requested maximum number of points to return in each result. 133 ChunkSize int 134 135 // If this query is being executed in a read-only context. 136 ReadOnly bool 137 138 // Node to execute on. 139 NodeID uint64 140 141 // Quiet suppresses non-essential output from the query executor. 142 Quiet bool 143 144 // AbortCh is a channel that signals when results are no longer desired by the caller. 145 AbortCh <-chan struct{} 146} 147 148type ( 149 iteratorsContextKey struct{} 150 monitorContextKey struct{} 151) 152 153// NewContextWithIterators returns a new context.Context with the *Iterators slice added. 154// The query planner will add instances of AuxIterator to the Iterators slice. 155func NewContextWithIterators(ctx context.Context, itr *Iterators) context.Context { 156 return context.WithValue(ctx, iteratorsContextKey{}, itr) 157} 158 159// StatementExecutor executes a statement within the Executor. 160type StatementExecutor interface { 161 // ExecuteStatement executes a statement. Results should be sent to the 162 // results channel in the ExecutionContext. 163 ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error 164} 165 166// StatementNormalizer normalizes a statement before it is executed. 167type StatementNormalizer interface { 168 // NormalizeStatement adds a default database and policy to the 169 // measurements in the statement. 170 NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error 171} 172 173// Executor executes every statement in an Query. 174type Executor struct { 175 // Used for executing a statement in the query. 176 StatementExecutor StatementExecutor 177 178 // Used for tracking running queries. 179 TaskManager *TaskManager 180 181 // Logger to use for all logging. 182 // Defaults to discarding all log output. 183 Logger *zap.Logger 184 185 // expvar-based stats. 186 stats *Statistics 187} 188 189// NewExecutor returns a new instance of Executor. 190func NewExecutor() *Executor { 191 return &Executor{ 192 TaskManager: NewTaskManager(), 193 Logger: zap.NewNop(), 194 stats: &Statistics{}, 195 } 196} 197 198// Statistics keeps statistics related to the Executor. 199type Statistics struct { 200 ActiveQueries int64 201 ExecutedQueries int64 202 FinishedQueries int64 203 QueryExecutionDuration int64 204 RecoveredPanics int64 205} 206 207// Statistics returns statistics for periodic monitoring. 208func (e *Executor) Statistics(tags map[string]string) []models.Statistic { 209 return []models.Statistic{{ 210 Name: "queryExecutor", 211 Tags: tags, 212 Values: map[string]interface{}{ 213 statQueriesActive: atomic.LoadInt64(&e.stats.ActiveQueries), 214 statQueriesExecuted: atomic.LoadInt64(&e.stats.ExecutedQueries), 215 statQueriesFinished: atomic.LoadInt64(&e.stats.FinishedQueries), 216 statQueryExecutionDuration: atomic.LoadInt64(&e.stats.QueryExecutionDuration), 217 statRecoveredPanics: atomic.LoadInt64(&e.stats.RecoveredPanics), 218 }, 219 }} 220} 221 222// Close kills all running queries and prevents new queries from being attached. 223func (e *Executor) Close() error { 224 return e.TaskManager.Close() 225} 226 227// SetLogOutput sets the writer to which all logs are written. It must not be 228// called after Open is called. 229func (e *Executor) WithLogger(log *zap.Logger) { 230 e.Logger = log.With(zap.String("service", "query")) 231 e.TaskManager.Logger = e.Logger 232} 233 234// ExecuteQuery executes each statement within a query. 235func (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result { 236 results := make(chan *Result) 237 go e.executeQuery(query, opt, closing, results) 238 return results 239} 240 241func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { 242 defer close(results) 243 defer e.recover(query, results) 244 245 atomic.AddInt64(&e.stats.ActiveQueries, 1) 246 atomic.AddInt64(&e.stats.ExecutedQueries, 1) 247 defer func(start time.Time) { 248 atomic.AddInt64(&e.stats.ActiveQueries, -1) 249 atomic.AddInt64(&e.stats.FinishedQueries, 1) 250 atomic.AddInt64(&e.stats.QueryExecutionDuration, time.Since(start).Nanoseconds()) 251 }(time.Now()) 252 253 ctx, detach, err := e.TaskManager.AttachQuery(query, opt, closing) 254 if err != nil { 255 select { 256 case results <- &Result{Err: err}: 257 case <-opt.AbortCh: 258 } 259 return 260 } 261 defer detach() 262 263 // Setup the execution context that will be used when executing statements. 264 ctx.Results = results 265 266 var i int 267LOOP: 268 for ; i < len(query.Statements); i++ { 269 ctx.statementID = i 270 stmt := query.Statements[i] 271 272 // If a default database wasn't passed in by the caller, check the statement. 273 defaultDB := opt.Database 274 if defaultDB == "" { 275 if s, ok := stmt.(influxql.HasDefaultDatabase); ok { 276 defaultDB = s.DefaultDatabase() 277 } 278 } 279 280 // Do not let queries manually use the system measurements. If we find 281 // one, return an error. This prevents a person from using the 282 // measurement incorrectly and causing a panic. 283 if stmt, ok := stmt.(*influxql.SelectStatement); ok { 284 for _, s := range stmt.Sources { 285 switch s := s.(type) { 286 case *influxql.Measurement: 287 if influxql.IsSystemName(s.Name) { 288 command := "the appropriate meta command" 289 switch s.Name { 290 case "_fieldKeys": 291 command = "SHOW FIELD KEYS" 292 case "_measurements": 293 command = "SHOW MEASUREMENTS" 294 case "_series": 295 command = "SHOW SERIES" 296 case "_tagKeys": 297 command = "SHOW TAG KEYS" 298 case "_tags": 299 command = "SHOW TAG VALUES" 300 } 301 results <- &Result{ 302 Err: fmt.Errorf("unable to use system source '%s': use %s instead", s.Name, command), 303 } 304 break LOOP 305 } 306 } 307 } 308 } 309 310 // Rewrite statements, if necessary. 311 // This can occur on meta read statements which convert to SELECT statements. 312 newStmt, err := RewriteStatement(stmt) 313 if err != nil { 314 results <- &Result{Err: err} 315 break 316 } 317 stmt = newStmt 318 319 // Normalize each statement if possible. 320 if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok { 321 if err := normalizer.NormalizeStatement(stmt, defaultDB, opt.RetentionPolicy); err != nil { 322 if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted { 323 return 324 } 325 break 326 } 327 } 328 329 // Log each normalized statement. 330 if !ctx.Quiet { 331 e.Logger.Info("Executing query", zap.Stringer("query", stmt)) 332 } 333 334 // Send any other statements to the underlying statement executor. 335 err = e.StatementExecutor.ExecuteStatement(stmt, ctx) 336 if err == ErrQueryInterrupted { 337 // Query was interrupted so retrieve the real interrupt error from 338 // the query task if there is one. 339 if qerr := ctx.Err(); qerr != nil { 340 err = qerr 341 } 342 } 343 344 // Send an error for this result if it failed for some reason. 345 if err != nil { 346 if err := ctx.send(&Result{ 347 StatementID: i, 348 Err: err, 349 }); err == ErrQueryAborted { 350 return 351 } 352 // Stop after the first error. 353 break 354 } 355 356 // Check if the query was interrupted during an uninterruptible statement. 357 interrupted := false 358 select { 359 case <-ctx.Done(): 360 interrupted = true 361 default: 362 // Query has not been interrupted. 363 } 364 365 if interrupted { 366 break 367 } 368 } 369 370 // Send error results for any statements which were not executed. 371 for ; i < len(query.Statements)-1; i++ { 372 if err := ctx.send(&Result{ 373 StatementID: i, 374 Err: ErrNotExecuted, 375 }); err == ErrQueryAborted { 376 return 377 } 378 } 379} 380 381// Determines if the Executor will recover any panics or let them crash 382// the server. 383var willCrash bool 384 385func init() { 386 var err error 387 if willCrash, err = strconv.ParseBool(os.Getenv(PanicCrashEnv)); err != nil { 388 willCrash = false 389 } 390} 391 392func (e *Executor) recover(query *influxql.Query, results chan *Result) { 393 if err := recover(); err != nil { 394 atomic.AddInt64(&e.stats.RecoveredPanics, 1) // Capture the panic in _internal stats. 395 e.Logger.Error(fmt.Sprintf("%s [panic:%s] %s", query.String(), err, debug.Stack())) 396 results <- &Result{ 397 StatementID: -1, 398 Err: fmt.Errorf("%s [panic:%s]", query.String(), err), 399 } 400 401 if willCrash { 402 e.Logger.Error(fmt.Sprintf("\n\n=====\nAll goroutines now follow:")) 403 buf := debug.Stack() 404 e.Logger.Error(fmt.Sprintf("%s", buf)) 405 os.Exit(1) 406 } 407 } 408} 409 410// Task is the internal data structure for managing queries. 411// For the public use data structure that gets returned, see Task. 412type Task struct { 413 query string 414 database string 415 status TaskStatus 416 startTime time.Time 417 closing chan struct{} 418 monitorCh chan error 419 err error 420 mu sync.Mutex 421} 422 423// Monitor starts a new goroutine that will monitor a query. The function 424// will be passed in a channel to signal when the query has been finished 425// normally. If the function returns with an error and the query is still 426// running, the query will be terminated. 427func (q *Task) Monitor(fn MonitorFunc) { 428 go q.monitor(fn) 429} 430 431// Error returns any asynchronous error that may have occurred while executing 432// the query. 433func (q *Task) Error() error { 434 q.mu.Lock() 435 defer q.mu.Unlock() 436 return q.err 437} 438 439func (q *Task) setError(err error) { 440 q.mu.Lock() 441 q.err = err 442 q.mu.Unlock() 443} 444 445func (q *Task) monitor(fn MonitorFunc) { 446 if err := fn(q.closing); err != nil { 447 select { 448 case <-q.closing: 449 case q.monitorCh <- err: 450 } 451 } 452} 453 454// close closes the query task closing channel if the query hasn't been previously killed. 455func (q *Task) close() { 456 q.mu.Lock() 457 if q.status != KilledTask { 458 // Set the status to killed to prevent closing the channel twice. 459 q.status = KilledTask 460 close(q.closing) 461 } 462 q.mu.Unlock() 463} 464 465func (q *Task) kill() error { 466 q.mu.Lock() 467 if q.status == KilledTask { 468 q.mu.Unlock() 469 return ErrAlreadyKilled 470 } 471 q.status = KilledTask 472 close(q.closing) 473 q.mu.Unlock() 474 return nil 475} 476