1package query
2
3import (
4	"bytes"
5	"context"
6	"encoding/json"
7	"fmt"
8	"sync"
9	"time"
10
11	"github.com/influxdata/influxdb/models"
12	"github.com/influxdata/influxql"
13	"go.uber.org/zap"
14)
15
16const (
17	// DefaultQueryTimeout is the default timeout for executing a query.
18	// A value of zero will have no query timeout.
19	DefaultQueryTimeout = time.Duration(0)
20)
21
22type TaskStatus int
23
24const (
25	// RunningTask is set when the task is running.
26	RunningTask TaskStatus = iota + 1
27
28	// KilledTask is set when the task is killed, but resources are still
29	// being used.
30	KilledTask
31)
32
33func (t TaskStatus) String() string {
34	switch t {
35	case RunningTask:
36		return "running"
37	case KilledTask:
38		return "killed"
39	default:
40		return "unknown"
41	}
42}
43
44func (t TaskStatus) MarshalJSON() ([]byte, error) {
45	s := t.String()
46	return json.Marshal(s)
47}
48
49func (t *TaskStatus) UnmarshalJSON(data []byte) error {
50	if bytes.Equal(data, []byte("running")) {
51		*t = RunningTask
52	} else if bytes.Equal(data, []byte("killed")) {
53		*t = KilledTask
54	} else if bytes.Equal(data, []byte("unknown")) {
55		*t = TaskStatus(0)
56	} else {
57		return fmt.Errorf("unknown task status: %s", string(data))
58	}
59	return nil
60}
61
62// TaskManager takes care of all aspects related to managing running queries.
63type TaskManager struct {
64	// Query execution timeout.
65	QueryTimeout time.Duration
66
67	// Log queries if they are slower than this time.
68	// If zero, slow queries will never be logged.
69	LogQueriesAfter time.Duration
70
71	// Maximum number of concurrent queries.
72	MaxConcurrentQueries int
73
74	// Logger to use for all logging.
75	// Defaults to discarding all log output.
76	Logger *zap.Logger
77
78	// Used for managing and tracking running queries.
79	queries  map[uint64]*Task
80	nextID   uint64
81	mu       sync.RWMutex
82	shutdown bool
83}
84
85// NewTaskManager creates a new TaskManager.
86func NewTaskManager() *TaskManager {
87	return &TaskManager{
88		QueryTimeout: DefaultQueryTimeout,
89		Logger:       zap.NewNop(),
90		queries:      make(map[uint64]*Task),
91		nextID:       1,
92	}
93}
94
95// ExecuteStatement executes a statement containing one of the task management queries.
96func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error {
97	switch stmt := stmt.(type) {
98	case *influxql.ShowQueriesStatement:
99		rows, err := t.executeShowQueriesStatement(stmt)
100		if err != nil {
101			return err
102		}
103
104		ctx.Send(&Result{
105			Series: rows,
106		})
107	case *influxql.KillQueryStatement:
108		var messages []*Message
109		if ctx.ReadOnly {
110			messages = append(messages, ReadOnlyWarning(stmt.String()))
111		}
112
113		if err := t.executeKillQueryStatement(stmt); err != nil {
114			return err
115		}
116		ctx.Send(&Result{
117			Messages: messages,
118		})
119	default:
120		return ErrInvalidQuery
121	}
122	return nil
123}
124
125func (t *TaskManager) executeKillQueryStatement(stmt *influxql.KillQueryStatement) error {
126	return t.KillQuery(stmt.QueryID)
127}
128
129func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStatement) (models.Rows, error) {
130	t.mu.RLock()
131	defer t.mu.RUnlock()
132
133	now := time.Now()
134
135	values := make([][]interface{}, 0, len(t.queries))
136	for id, qi := range t.queries {
137		d := now.Sub(qi.startTime)
138
139		switch {
140		case d >= time.Second:
141			d = d - (d % time.Second)
142		case d >= time.Millisecond:
143			d = d - (d % time.Millisecond)
144		case d >= time.Microsecond:
145			d = d - (d % time.Microsecond)
146		}
147
148		values = append(values, []interface{}{id, qi.query, qi.database, d.String(), qi.status.String()})
149	}
150
151	return []*models.Row{{
152		Columns: []string{"qid", "query", "database", "duration", "status"},
153		Values:  values,
154	}}, nil
155}
156
157func (t *TaskManager) queryError(qid uint64, err error) {
158	t.mu.RLock()
159	query := t.queries[qid]
160	t.mu.RUnlock()
161	if query != nil {
162		query.setError(err)
163	}
164}
165
166// AttachQuery attaches a running query to be managed by the TaskManager.
167// Returns the query id of the newly attached query or an error if it was
168// unable to assign a query id or attach the query to the TaskManager.
169// This function also returns a channel that will be closed when this
170// query finishes running.
171//
172// After a query finishes running, the system is free to reuse a query id.
173func (t *TaskManager) AttachQuery(q *influxql.Query, opt ExecutionOptions, interrupt <-chan struct{}) (*ExecutionContext, func(), error) {
174	t.mu.Lock()
175	defer t.mu.Unlock()
176
177	if t.shutdown {
178		return nil, nil, ErrQueryEngineShutdown
179	}
180
181	if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries {
182		return nil, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries)
183	}
184
185	qid := t.nextID
186	query := &Task{
187		query:     q.String(),
188		database:  opt.Database,
189		status:    RunningTask,
190		startTime: time.Now(),
191		closing:   make(chan struct{}),
192		monitorCh: make(chan error),
193	}
194	t.queries[qid] = query
195
196	go t.waitForQuery(qid, query.closing, interrupt, query.monitorCh)
197	if t.LogQueriesAfter != 0 {
198		go query.monitor(func(closing <-chan struct{}) error {
199			timer := time.NewTimer(t.LogQueriesAfter)
200			defer timer.Stop()
201
202			select {
203			case <-timer.C:
204				t.Logger.Warn(fmt.Sprintf("Detected slow query: %s (qid: %d, database: %s, threshold: %s)",
205					query.query, qid, query.database, t.LogQueriesAfter))
206			case <-closing:
207			}
208			return nil
209		})
210	}
211	t.nextID++
212
213	ctx := &ExecutionContext{
214		Context:          context.Background(),
215		QueryID:          qid,
216		task:             query,
217		ExecutionOptions: opt,
218	}
219	ctx.watch()
220	return ctx, func() { t.DetachQuery(qid) }, nil
221}
222
223// KillQuery enters a query into the killed state and closes the channel
224// from the TaskManager. This method can be used to forcefully terminate a
225// running query.
226func (t *TaskManager) KillQuery(qid uint64) error {
227	t.mu.Lock()
228	query := t.queries[qid]
229	t.mu.Unlock()
230
231	if query == nil {
232		return fmt.Errorf("no such query id: %d", qid)
233	}
234	return query.kill()
235}
236
237// DetachQuery removes a query from the query table. If the query is not in the
238// killed state, this will also close the related channel.
239func (t *TaskManager) DetachQuery(qid uint64) error {
240	t.mu.Lock()
241	defer t.mu.Unlock()
242
243	query := t.queries[qid]
244	if query == nil {
245		return fmt.Errorf("no such query id: %d", qid)
246	}
247
248	query.close()
249	delete(t.queries, qid)
250	return nil
251}
252
253// QueryInfo represents the information for a query.
254type QueryInfo struct {
255	ID       uint64        `json:"id"`
256	Query    string        `json:"query"`
257	Database string        `json:"database"`
258	Duration time.Duration `json:"duration"`
259	Status   TaskStatus    `json:"status"`
260}
261
262// Queries returns a list of all running queries with information about them.
263func (t *TaskManager) Queries() []QueryInfo {
264	t.mu.RLock()
265	defer t.mu.RUnlock()
266
267	now := time.Now()
268	queries := make([]QueryInfo, 0, len(t.queries))
269	for id, qi := range t.queries {
270		queries = append(queries, QueryInfo{
271			ID:       id,
272			Query:    qi.query,
273			Database: qi.database,
274			Duration: now.Sub(qi.startTime),
275			Status:   qi.status,
276		})
277	}
278	return queries
279}
280
281func (t *TaskManager) waitForQuery(qid uint64, interrupt <-chan struct{}, closing <-chan struct{}, monitorCh <-chan error) {
282	var timerCh <-chan time.Time
283	if t.QueryTimeout != 0 {
284		timer := time.NewTimer(t.QueryTimeout)
285		timerCh = timer.C
286		defer timer.Stop()
287	}
288
289	select {
290	case <-closing:
291		t.queryError(qid, ErrQueryInterrupted)
292	case err := <-monitorCh:
293		if err == nil {
294			break
295		}
296
297		t.queryError(qid, err)
298	case <-timerCh:
299		t.queryError(qid, ErrQueryTimeoutLimitExceeded)
300	case <-interrupt:
301		// Query was manually closed so exit the select.
302		return
303	}
304	t.KillQuery(qid)
305}
306
307// Close kills all running queries and prevents new queries from being attached.
308func (t *TaskManager) Close() error {
309	t.mu.Lock()
310	defer t.mu.Unlock()
311
312	t.shutdown = true
313	for _, query := range t.queries {
314		query.setError(ErrQueryEngineShutdown)
315		query.close()
316	}
317	t.queries = nil
318	return nil
319}
320