1package query
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"os"
8	"runtime/debug"
9	"strconv"
10	"sync"
11	"sync/atomic"
12	"time"
13
14	"github.com/influxdata/influxdb/models"
15	"github.com/influxdata/influxql"
16	"go.uber.org/zap"
17)
18
19var (
20	// ErrInvalidQuery is returned when executing an unknown query type.
21	ErrInvalidQuery = errors.New("invalid query")
22
23	// ErrNotExecuted is returned when a statement is not executed in a query.
24	// This can occur when a previous statement in the same query has errored.
25	ErrNotExecuted = errors.New("not executed")
26
27	// ErrQueryInterrupted is an error returned when the query is interrupted.
28	ErrQueryInterrupted = errors.New("query interrupted")
29
30	// ErrQueryAborted is an error returned when the query is aborted.
31	ErrQueryAborted = errors.New("query aborted")
32
33	// ErrQueryEngineShutdown is an error sent when the query cannot be
34	// created because the query engine was shutdown.
35	ErrQueryEngineShutdown = errors.New("query engine shutdown")
36
37	// ErrQueryTimeoutLimitExceeded is an error when a query hits the max time allowed to run.
38	ErrQueryTimeoutLimitExceeded = errors.New("query-timeout limit exceeded")
39
40	// ErrAlreadyKilled is returned when attempting to kill a query that has already been killed.
41	ErrAlreadyKilled = errors.New("already killed")
42)
43
44// Statistics for the Executor
45const (
46	statQueriesActive          = "queriesActive"   // Number of queries currently being executed.
47	statQueriesExecuted        = "queriesExecuted" // Number of queries that have been executed (started).
48	statQueriesFinished        = "queriesFinished" // Number of queries that have finished.
49	statQueryExecutionDuration = "queryDurationNs" // Total (wall) time spent executing queries.
50	statRecoveredPanics        = "recoveredPanics" // Number of panics recovered by Query Executor.
51
52	// PanicCrashEnv is the environment variable that, when set, will prevent
53	// the handler from recovering any panics.
54	PanicCrashEnv = "INFLUXDB_PANIC_CRASH"
55)
56
57// ErrDatabaseNotFound returns a database not found error for the given database name.
58func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not found: %s", name) }
59
60// ErrMaxSelectPointsLimitExceeded is an error when a query hits the maximum number of points.
61func ErrMaxSelectPointsLimitExceeded(n, limit int) error {
62	return fmt.Errorf("max-select-point limit exceeed: (%d/%d)", n, limit)
63}
64
65// ErrMaxConcurrentQueriesLimitExceeded is an error when a query cannot be run
66// because the maximum number of queries has been reached.
67func ErrMaxConcurrentQueriesLimitExceeded(n, limit int) error {
68	return fmt.Errorf("max-concurrent-queries limit exceeded(%d, %d)", n, limit)
69}
70
71// Authorizer determines if certain operations are authorized.
72type Authorizer interface {
73	// AuthorizeDatabase indicates whether the given Privilege is authorized on the database with the given name.
74	AuthorizeDatabase(p influxql.Privilege, name string) bool
75
76	// AuthorizeQuery returns an error if the query cannot be executed
77	AuthorizeQuery(database string, query *influxql.Query) error
78
79	// AuthorizeSeriesRead determines if a series is authorized for reading
80	AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool
81
82	// AuthorizeSeriesWrite determines if a series is authorized for writing
83	AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool
84}
85
86// OpenAuthorizer is the Authorizer used when authorization is disabled.
87// It allows all operations.
88type openAuthorizer struct{}
89
90// OpenAuthorizer can be shared by all goroutines.
91var OpenAuthorizer = openAuthorizer{}
92
93// AuthorizeDatabase returns true to allow any operation on a database.
94func (a openAuthorizer) AuthorizeDatabase(influxql.Privilege, string) bool { return true }
95
96// AuthorizeSeriesRead allows access to any series.
97func (a openAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
98	return true
99}
100
101// AuthorizeSeriesWrite allows access to any series.
102func (a openAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
103	return true
104}
105
106// AuthorizeSeriesRead allows any query to execute.
107func (a openAuthorizer) AuthorizeQuery(_ string, _ *influxql.Query) error { return nil }
108
109// AuthorizerIsOpen returns true if the provided Authorizer is guaranteed to
110// authorize anything. A nil Authorizer returns true for this function, and this
111// function should be preferred over directly checking if an Authorizer is nil
112// or not.
113func AuthorizerIsOpen(a Authorizer) bool {
114	if u, ok := a.(interface{ AuthorizeUnrestricted() bool }); ok {
115		return u.AuthorizeUnrestricted()
116	}
117	return a == nil || a == OpenAuthorizer
118}
119
120// ExecutionOptions contains the options for executing a query.
121type ExecutionOptions struct {
122	// The database the query is running against.
123	Database string
124
125	// The retention policy the query is running against.
126	RetentionPolicy string
127
128	// How to determine whether the query is allowed to execute,
129	// what resources can be returned in SHOW queries, etc.
130	Authorizer Authorizer
131
132	// The requested maximum number of points to return in each result.
133	ChunkSize int
134
135	// If this query is being executed in a read-only context.
136	ReadOnly bool
137
138	// Node to execute on.
139	NodeID uint64
140
141	// Quiet suppresses non-essential output from the query executor.
142	Quiet bool
143
144	// AbortCh is a channel that signals when results are no longer desired by the caller.
145	AbortCh <-chan struct{}
146}
147
148type (
149	iteratorsContextKey struct{}
150	monitorContextKey   struct{}
151)
152
153// NewContextWithIterators returns a new context.Context with the *Iterators slice added.
154// The query planner will add instances of AuxIterator to the Iterators slice.
155func NewContextWithIterators(ctx context.Context, itr *Iterators) context.Context {
156	return context.WithValue(ctx, iteratorsContextKey{}, itr)
157}
158
159// StatementExecutor executes a statement within the Executor.
160type StatementExecutor interface {
161	// ExecuteStatement executes a statement. Results should be sent to the
162	// results channel in the ExecutionContext.
163	ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error
164}
165
166// StatementNormalizer normalizes a statement before it is executed.
167type StatementNormalizer interface {
168	// NormalizeStatement adds a default database and policy to the
169	// measurements in the statement.
170	NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error
171}
172
173// Executor executes every statement in an Query.
174type Executor struct {
175	// Used for executing a statement in the query.
176	StatementExecutor StatementExecutor
177
178	// Used for tracking running queries.
179	TaskManager *TaskManager
180
181	// Logger to use for all logging.
182	// Defaults to discarding all log output.
183	Logger *zap.Logger
184
185	// expvar-based stats.
186	stats *Statistics
187}
188
189// NewExecutor returns a new instance of Executor.
190func NewExecutor() *Executor {
191	return &Executor{
192		TaskManager: NewTaskManager(),
193		Logger:      zap.NewNop(),
194		stats:       &Statistics{},
195	}
196}
197
198// Statistics keeps statistics related to the Executor.
199type Statistics struct {
200	ActiveQueries          int64
201	ExecutedQueries        int64
202	FinishedQueries        int64
203	QueryExecutionDuration int64
204	RecoveredPanics        int64
205}
206
207// Statistics returns statistics for periodic monitoring.
208func (e *Executor) Statistics(tags map[string]string) []models.Statistic {
209	return []models.Statistic{{
210		Name: "queryExecutor",
211		Tags: tags,
212		Values: map[string]interface{}{
213			statQueriesActive:          atomic.LoadInt64(&e.stats.ActiveQueries),
214			statQueriesExecuted:        atomic.LoadInt64(&e.stats.ExecutedQueries),
215			statQueriesFinished:        atomic.LoadInt64(&e.stats.FinishedQueries),
216			statQueryExecutionDuration: atomic.LoadInt64(&e.stats.QueryExecutionDuration),
217			statRecoveredPanics:        atomic.LoadInt64(&e.stats.RecoveredPanics),
218		},
219	}}
220}
221
222// Close kills all running queries and prevents new queries from being attached.
223func (e *Executor) Close() error {
224	return e.TaskManager.Close()
225}
226
227// SetLogOutput sets the writer to which all logs are written. It must not be
228// called after Open is called.
229func (e *Executor) WithLogger(log *zap.Logger) {
230	e.Logger = log.With(zap.String("service", "query"))
231	e.TaskManager.Logger = e.Logger
232}
233
234// ExecuteQuery executes each statement within a query.
235func (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result {
236	results := make(chan *Result)
237	go e.executeQuery(query, opt, closing, results)
238	return results
239}
240
241func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) {
242	defer close(results)
243	defer e.recover(query, results)
244
245	atomic.AddInt64(&e.stats.ActiveQueries, 1)
246	atomic.AddInt64(&e.stats.ExecutedQueries, 1)
247	defer func(start time.Time) {
248		atomic.AddInt64(&e.stats.ActiveQueries, -1)
249		atomic.AddInt64(&e.stats.FinishedQueries, 1)
250		atomic.AddInt64(&e.stats.QueryExecutionDuration, time.Since(start).Nanoseconds())
251	}(time.Now())
252
253	ctx, detach, err := e.TaskManager.AttachQuery(query, opt, closing)
254	if err != nil {
255		select {
256		case results <- &Result{Err: err}:
257		case <-opt.AbortCh:
258		}
259		return
260	}
261	defer detach()
262
263	// Setup the execution context that will be used when executing statements.
264	ctx.Results = results
265
266	var i int
267LOOP:
268	for ; i < len(query.Statements); i++ {
269		ctx.statementID = i
270		stmt := query.Statements[i]
271
272		// If a default database wasn't passed in by the caller, check the statement.
273		defaultDB := opt.Database
274		if defaultDB == "" {
275			if s, ok := stmt.(influxql.HasDefaultDatabase); ok {
276				defaultDB = s.DefaultDatabase()
277			}
278		}
279
280		// Do not let queries manually use the system measurements. If we find
281		// one, return an error. This prevents a person from using the
282		// measurement incorrectly and causing a panic.
283		if stmt, ok := stmt.(*influxql.SelectStatement); ok {
284			for _, s := range stmt.Sources {
285				switch s := s.(type) {
286				case *influxql.Measurement:
287					if influxql.IsSystemName(s.Name) {
288						command := "the appropriate meta command"
289						switch s.Name {
290						case "_fieldKeys":
291							command = "SHOW FIELD KEYS"
292						case "_measurements":
293							command = "SHOW MEASUREMENTS"
294						case "_series":
295							command = "SHOW SERIES"
296						case "_tagKeys":
297							command = "SHOW TAG KEYS"
298						case "_tags":
299							command = "SHOW TAG VALUES"
300						}
301						results <- &Result{
302							Err: fmt.Errorf("unable to use system source '%s': use %s instead", s.Name, command),
303						}
304						break LOOP
305					}
306				}
307			}
308		}
309
310		// Rewrite statements, if necessary.
311		// This can occur on meta read statements which convert to SELECT statements.
312		newStmt, err := RewriteStatement(stmt)
313		if err != nil {
314			results <- &Result{Err: err}
315			break
316		}
317		stmt = newStmt
318
319		// Normalize each statement if possible.
320		if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok {
321			if err := normalizer.NormalizeStatement(stmt, defaultDB, opt.RetentionPolicy); err != nil {
322				if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted {
323					return
324				}
325				break
326			}
327		}
328
329		// Log each normalized statement.
330		if !ctx.Quiet {
331			e.Logger.Info("Executing query", zap.Stringer("query", stmt))
332		}
333
334		// Send any other statements to the underlying statement executor.
335		err = e.StatementExecutor.ExecuteStatement(stmt, ctx)
336		if err == ErrQueryInterrupted {
337			// Query was interrupted so retrieve the real interrupt error from
338			// the query task if there is one.
339			if qerr := ctx.Err(); qerr != nil {
340				err = qerr
341			}
342		}
343
344		// Send an error for this result if it failed for some reason.
345		if err != nil {
346			if err := ctx.send(&Result{
347				StatementID: i,
348				Err:         err,
349			}); err == ErrQueryAborted {
350				return
351			}
352			// Stop after the first error.
353			break
354		}
355
356		// Check if the query was interrupted during an uninterruptible statement.
357		interrupted := false
358		select {
359		case <-ctx.Done():
360			interrupted = true
361		default:
362			// Query has not been interrupted.
363		}
364
365		if interrupted {
366			break
367		}
368	}
369
370	// Send error results for any statements which were not executed.
371	for ; i < len(query.Statements)-1; i++ {
372		if err := ctx.send(&Result{
373			StatementID: i,
374			Err:         ErrNotExecuted,
375		}); err == ErrQueryAborted {
376			return
377		}
378	}
379}
380
381// Determines if the Executor will recover any panics or let them crash
382// the server.
383var willCrash bool
384
385func init() {
386	var err error
387	if willCrash, err = strconv.ParseBool(os.Getenv(PanicCrashEnv)); err != nil {
388		willCrash = false
389	}
390}
391
392func (e *Executor) recover(query *influxql.Query, results chan *Result) {
393	if err := recover(); err != nil {
394		atomic.AddInt64(&e.stats.RecoveredPanics, 1) // Capture the panic in _internal stats.
395		e.Logger.Error(fmt.Sprintf("%s [panic:%s] %s", query.String(), err, debug.Stack()))
396		results <- &Result{
397			StatementID: -1,
398			Err:         fmt.Errorf("%s [panic:%s]", query.String(), err),
399		}
400
401		if willCrash {
402			e.Logger.Error(fmt.Sprintf("\n\n=====\nAll goroutines now follow:"))
403			buf := debug.Stack()
404			e.Logger.Error(fmt.Sprintf("%s", buf))
405			os.Exit(1)
406		}
407	}
408}
409
410// Task is the internal data structure for managing queries.
411// For the public use data structure that gets returned, see Task.
412type Task struct {
413	query     string
414	database  string
415	status    TaskStatus
416	startTime time.Time
417	closing   chan struct{}
418	monitorCh chan error
419	err       error
420	mu        sync.Mutex
421}
422
423// Monitor starts a new goroutine that will monitor a query. The function
424// will be passed in a channel to signal when the query has been finished
425// normally. If the function returns with an error and the query is still
426// running, the query will be terminated.
427func (q *Task) Monitor(fn MonitorFunc) {
428	go q.monitor(fn)
429}
430
431// Error returns any asynchronous error that may have occurred while executing
432// the query.
433func (q *Task) Error() error {
434	q.mu.Lock()
435	defer q.mu.Unlock()
436	return q.err
437}
438
439func (q *Task) setError(err error) {
440	q.mu.Lock()
441	q.err = err
442	q.mu.Unlock()
443}
444
445func (q *Task) monitor(fn MonitorFunc) {
446	if err := fn(q.closing); err != nil {
447		select {
448		case <-q.closing:
449		case q.monitorCh <- err:
450		}
451	}
452}
453
454// close closes the query task closing channel if the query hasn't been previously killed.
455func (q *Task) close() {
456	q.mu.Lock()
457	if q.status != KilledTask {
458		// Set the status to killed to prevent closing the channel twice.
459		q.status = KilledTask
460		close(q.closing)
461	}
462	q.mu.Unlock()
463}
464
465func (q *Task) kill() error {
466	q.mu.Lock()
467	if q.status == KilledTask {
468		q.mu.Unlock()
469		return ErrAlreadyKilled
470	}
471	q.status = KilledTask
472	close(q.closing)
473	q.mu.Unlock()
474	return nil
475}
476