1package query 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "sync" 9 "time" 10 11 "github.com/influxdata/influxdb/models" 12 "github.com/influxdata/influxql" 13 "go.uber.org/zap" 14) 15 16const ( 17 // DefaultQueryTimeout is the default timeout for executing a query. 18 // A value of zero will have no query timeout. 19 DefaultQueryTimeout = time.Duration(0) 20) 21 22type TaskStatus int 23 24const ( 25 // RunningTask is set when the task is running. 26 RunningTask TaskStatus = iota + 1 27 28 // KilledTask is set when the task is killed, but resources are still 29 // being used. 30 KilledTask 31) 32 33func (t TaskStatus) String() string { 34 switch t { 35 case RunningTask: 36 return "running" 37 case KilledTask: 38 return "killed" 39 default: 40 return "unknown" 41 } 42} 43 44func (t TaskStatus) MarshalJSON() ([]byte, error) { 45 s := t.String() 46 return json.Marshal(s) 47} 48 49func (t *TaskStatus) UnmarshalJSON(data []byte) error { 50 if bytes.Equal(data, []byte("running")) { 51 *t = RunningTask 52 } else if bytes.Equal(data, []byte("killed")) { 53 *t = KilledTask 54 } else if bytes.Equal(data, []byte("unknown")) { 55 *t = TaskStatus(0) 56 } else { 57 return fmt.Errorf("unknown task status: %s", string(data)) 58 } 59 return nil 60} 61 62// TaskManager takes care of all aspects related to managing running queries. 63type TaskManager struct { 64 // Query execution timeout. 65 QueryTimeout time.Duration 66 67 // Log queries if they are slower than this time. 68 // If zero, slow queries will never be logged. 69 LogQueriesAfter time.Duration 70 71 // Maximum number of concurrent queries. 72 MaxConcurrentQueries int 73 74 // Logger to use for all logging. 75 // Defaults to discarding all log output. 76 Logger *zap.Logger 77 78 // Used for managing and tracking running queries. 79 queries map[uint64]*Task 80 nextID uint64 81 mu sync.RWMutex 82 shutdown bool 83} 84 85// NewTaskManager creates a new TaskManager. 86func NewTaskManager() *TaskManager { 87 return &TaskManager{ 88 QueryTimeout: DefaultQueryTimeout, 89 Logger: zap.NewNop(), 90 queries: make(map[uint64]*Task), 91 nextID: 1, 92 } 93} 94 95// ExecuteStatement executes a statement containing one of the task management queries. 96func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx *ExecutionContext) error { 97 switch stmt := stmt.(type) { 98 case *influxql.ShowQueriesStatement: 99 rows, err := t.executeShowQueriesStatement(stmt) 100 if err != nil { 101 return err 102 } 103 104 ctx.Send(&Result{ 105 Series: rows, 106 }) 107 case *influxql.KillQueryStatement: 108 var messages []*Message 109 if ctx.ReadOnly { 110 messages = append(messages, ReadOnlyWarning(stmt.String())) 111 } 112 113 if err := t.executeKillQueryStatement(stmt); err != nil { 114 return err 115 } 116 ctx.Send(&Result{ 117 Messages: messages, 118 }) 119 default: 120 return ErrInvalidQuery 121 } 122 return nil 123} 124 125func (t *TaskManager) executeKillQueryStatement(stmt *influxql.KillQueryStatement) error { 126 return t.KillQuery(stmt.QueryID) 127} 128 129func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStatement) (models.Rows, error) { 130 t.mu.RLock() 131 defer t.mu.RUnlock() 132 133 now := time.Now() 134 135 values := make([][]interface{}, 0, len(t.queries)) 136 for id, qi := range t.queries { 137 d := now.Sub(qi.startTime) 138 139 switch { 140 case d >= time.Second: 141 d = d - (d % time.Second) 142 case d >= time.Millisecond: 143 d = d - (d % time.Millisecond) 144 case d >= time.Microsecond: 145 d = d - (d % time.Microsecond) 146 } 147 148 values = append(values, []interface{}{id, qi.query, qi.database, d.String(), qi.status.String()}) 149 } 150 151 return []*models.Row{{ 152 Columns: []string{"qid", "query", "database", "duration", "status"}, 153 Values: values, 154 }}, nil 155} 156 157func (t *TaskManager) queryError(qid uint64, err error) { 158 t.mu.RLock() 159 query := t.queries[qid] 160 t.mu.RUnlock() 161 if query != nil { 162 query.setError(err) 163 } 164} 165 166// AttachQuery attaches a running query to be managed by the TaskManager. 167// Returns the query id of the newly attached query or an error if it was 168// unable to assign a query id or attach the query to the TaskManager. 169// This function also returns a channel that will be closed when this 170// query finishes running. 171// 172// After a query finishes running, the system is free to reuse a query id. 173func (t *TaskManager) AttachQuery(q *influxql.Query, opt ExecutionOptions, interrupt <-chan struct{}) (*ExecutionContext, func(), error) { 174 t.mu.Lock() 175 defer t.mu.Unlock() 176 177 if t.shutdown { 178 return nil, nil, ErrQueryEngineShutdown 179 } 180 181 if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries { 182 return nil, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries) 183 } 184 185 qid := t.nextID 186 query := &Task{ 187 query: q.String(), 188 database: opt.Database, 189 status: RunningTask, 190 startTime: time.Now(), 191 closing: make(chan struct{}), 192 monitorCh: make(chan error), 193 } 194 t.queries[qid] = query 195 196 go t.waitForQuery(qid, query.closing, interrupt, query.monitorCh) 197 if t.LogQueriesAfter != 0 { 198 go query.monitor(func(closing <-chan struct{}) error { 199 timer := time.NewTimer(t.LogQueriesAfter) 200 defer timer.Stop() 201 202 select { 203 case <-timer.C: 204 t.Logger.Warn(fmt.Sprintf("Detected slow query: %s (qid: %d, database: %s, threshold: %s)", 205 query.query, qid, query.database, t.LogQueriesAfter)) 206 case <-closing: 207 } 208 return nil 209 }) 210 } 211 t.nextID++ 212 213 ctx := &ExecutionContext{ 214 Context: context.Background(), 215 QueryID: qid, 216 task: query, 217 ExecutionOptions: opt, 218 } 219 ctx.watch() 220 return ctx, func() { t.DetachQuery(qid) }, nil 221} 222 223// KillQuery enters a query into the killed state and closes the channel 224// from the TaskManager. This method can be used to forcefully terminate a 225// running query. 226func (t *TaskManager) KillQuery(qid uint64) error { 227 t.mu.Lock() 228 query := t.queries[qid] 229 t.mu.Unlock() 230 231 if query == nil { 232 return fmt.Errorf("no such query id: %d", qid) 233 } 234 return query.kill() 235} 236 237// DetachQuery removes a query from the query table. If the query is not in the 238// killed state, this will also close the related channel. 239func (t *TaskManager) DetachQuery(qid uint64) error { 240 t.mu.Lock() 241 defer t.mu.Unlock() 242 243 query := t.queries[qid] 244 if query == nil { 245 return fmt.Errorf("no such query id: %d", qid) 246 } 247 248 query.close() 249 delete(t.queries, qid) 250 return nil 251} 252 253// QueryInfo represents the information for a query. 254type QueryInfo struct { 255 ID uint64 `json:"id"` 256 Query string `json:"query"` 257 Database string `json:"database"` 258 Duration time.Duration `json:"duration"` 259 Status TaskStatus `json:"status"` 260} 261 262// Queries returns a list of all running queries with information about them. 263func (t *TaskManager) Queries() []QueryInfo { 264 t.mu.RLock() 265 defer t.mu.RUnlock() 266 267 now := time.Now() 268 queries := make([]QueryInfo, 0, len(t.queries)) 269 for id, qi := range t.queries { 270 queries = append(queries, QueryInfo{ 271 ID: id, 272 Query: qi.query, 273 Database: qi.database, 274 Duration: now.Sub(qi.startTime), 275 Status: qi.status, 276 }) 277 } 278 return queries 279} 280 281func (t *TaskManager) waitForQuery(qid uint64, interrupt <-chan struct{}, closing <-chan struct{}, monitorCh <-chan error) { 282 var timerCh <-chan time.Time 283 if t.QueryTimeout != 0 { 284 timer := time.NewTimer(t.QueryTimeout) 285 timerCh = timer.C 286 defer timer.Stop() 287 } 288 289 select { 290 case <-closing: 291 t.queryError(qid, ErrQueryInterrupted) 292 case err := <-monitorCh: 293 if err == nil { 294 break 295 } 296 297 t.queryError(qid, err) 298 case <-timerCh: 299 t.queryError(qid, ErrQueryTimeoutLimitExceeded) 300 case <-interrupt: 301 // Query was manually closed so exit the select. 302 return 303 } 304 t.KillQuery(qid) 305} 306 307// Close kills all running queries and prevents new queries from being attached. 308func (t *TaskManager) Close() error { 309 t.mu.Lock() 310 defer t.mu.Unlock() 311 312 t.shutdown = true 313 for _, query := range t.queries { 314 query.setError(ErrQueryEngineShutdown) 315 query.close() 316 } 317 t.queries = nil 318 return nil 319} 320