1package coordinator
2
3import (
4	"bytes"
5	"context"
6	"errors"
7	"fmt"
8	"io"
9	"sort"
10	"strconv"
11	"strings"
12	"time"
13
14	"github.com/influxdata/influxdb"
15	"github.com/influxdata/influxdb/models"
16	"github.com/influxdata/influxdb/monitor"
17	"github.com/influxdata/influxdb/pkg/tracing"
18	"github.com/influxdata/influxdb/pkg/tracing/fields"
19	"github.com/influxdata/influxdb/query"
20	"github.com/influxdata/influxdb/services/meta"
21	"github.com/influxdata/influxdb/tsdb"
22	"github.com/influxdata/influxql"
23)
24
25// ErrDatabaseNameRequired is returned when executing statements that require a database,
26// when a database has not been provided.
27var ErrDatabaseNameRequired = errors.New("database name required")
28
29type pointsWriter interface {
30	WritePointsInto(*IntoWriteRequest) error
31}
32
33// StatementExecutor executes a statement in the query.
34type StatementExecutor struct {
35	MetaClient MetaClient
36
37	// TaskManager holds the StatementExecutor that handles task-related commands.
38	TaskManager query.StatementExecutor
39
40	// TSDB storage for local node.
41	TSDBStore TSDBStore
42
43	// ShardMapper for mapping shards when executing a SELECT statement.
44	ShardMapper query.ShardMapper
45
46	// Holds monitoring data for SHOW STATS and SHOW DIAGNOSTICS.
47	Monitor *monitor.Monitor
48
49	// Used for rewriting points back into system for SELECT INTO statements.
50	PointsWriter pointsWriter
51
52	// Select statement limits
53	MaxSelectPointN   int
54	MaxSelectSeriesN  int
55	MaxSelectBucketsN int
56}
57
58// ExecuteStatement executes the given statement with the given execution context.
59func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
60	// Select statements are handled separately so that they can be streamed.
61	if stmt, ok := stmt.(*influxql.SelectStatement); ok {
62		return e.executeSelectStatement(stmt, ctx)
63	}
64
65	var rows models.Rows
66	var messages []*query.Message
67	var err error
68	switch stmt := stmt.(type) {
69	case *influxql.AlterRetentionPolicyStatement:
70		if ctx.ReadOnly {
71			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
72		}
73		err = e.executeAlterRetentionPolicyStatement(stmt)
74	case *influxql.CreateContinuousQueryStatement:
75		if ctx.ReadOnly {
76			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
77		}
78		err = e.executeCreateContinuousQueryStatement(stmt)
79	case *influxql.CreateDatabaseStatement:
80		if ctx.ReadOnly {
81			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
82		}
83		err = e.executeCreateDatabaseStatement(stmt)
84	case *influxql.CreateRetentionPolicyStatement:
85		if ctx.ReadOnly {
86			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
87		}
88		err = e.executeCreateRetentionPolicyStatement(stmt)
89	case *influxql.CreateSubscriptionStatement:
90		if ctx.ReadOnly {
91			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
92		}
93		err = e.executeCreateSubscriptionStatement(stmt)
94	case *influxql.CreateUserStatement:
95		if ctx.ReadOnly {
96			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
97		}
98		err = e.executeCreateUserStatement(stmt)
99	case *influxql.DeleteSeriesStatement:
100		err = e.executeDeleteSeriesStatement(stmt, ctx.Database)
101	case *influxql.DropContinuousQueryStatement:
102		if ctx.ReadOnly {
103			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
104		}
105		err = e.executeDropContinuousQueryStatement(stmt)
106	case *influxql.DropDatabaseStatement:
107		if ctx.ReadOnly {
108			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
109		}
110		err = e.executeDropDatabaseStatement(stmt)
111	case *influxql.DropMeasurementStatement:
112		if ctx.ReadOnly {
113			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
114		}
115		err = e.executeDropMeasurementStatement(stmt, ctx.Database)
116	case *influxql.DropSeriesStatement:
117		if ctx.ReadOnly {
118			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
119		}
120		err = e.executeDropSeriesStatement(stmt, ctx.Database)
121	case *influxql.DropRetentionPolicyStatement:
122		if ctx.ReadOnly {
123			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
124		}
125		err = e.executeDropRetentionPolicyStatement(stmt)
126	case *influxql.DropShardStatement:
127		if ctx.ReadOnly {
128			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
129		}
130		err = e.executeDropShardStatement(stmt)
131	case *influxql.DropSubscriptionStatement:
132		if ctx.ReadOnly {
133			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
134		}
135		err = e.executeDropSubscriptionStatement(stmt)
136	case *influxql.DropUserStatement:
137		if ctx.ReadOnly {
138			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
139		}
140		err = e.executeDropUserStatement(stmt)
141	case *influxql.ExplainStatement:
142		if stmt.Analyze {
143			rows, err = e.executeExplainAnalyzeStatement(stmt, ctx)
144		} else {
145			rows, err = e.executeExplainStatement(stmt, ctx)
146		}
147	case *influxql.GrantStatement:
148		if ctx.ReadOnly {
149			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
150		}
151		err = e.executeGrantStatement(stmt)
152	case *influxql.GrantAdminStatement:
153		if ctx.ReadOnly {
154			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
155		}
156		err = e.executeGrantAdminStatement(stmt)
157	case *influxql.RevokeStatement:
158		if ctx.ReadOnly {
159			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
160		}
161		err = e.executeRevokeStatement(stmt)
162	case *influxql.RevokeAdminStatement:
163		if ctx.ReadOnly {
164			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
165		}
166		err = e.executeRevokeAdminStatement(stmt)
167	case *influxql.ShowContinuousQueriesStatement:
168		rows, err = e.executeShowContinuousQueriesStatement(stmt)
169	case *influxql.ShowDatabasesStatement:
170		rows, err = e.executeShowDatabasesStatement(stmt, ctx)
171	case *influxql.ShowDiagnosticsStatement:
172		rows, err = e.executeShowDiagnosticsStatement(stmt)
173	case *influxql.ShowGrantsForUserStatement:
174		rows, err = e.executeShowGrantsForUserStatement(stmt)
175	case *influxql.ShowMeasurementsStatement:
176		return e.executeShowMeasurementsStatement(stmt, ctx)
177	case *influxql.ShowMeasurementCardinalityStatement:
178		rows, err = e.executeShowMeasurementCardinalityStatement(stmt)
179	case *influxql.ShowRetentionPoliciesStatement:
180		rows, err = e.executeShowRetentionPoliciesStatement(stmt)
181	case *influxql.ShowSeriesCardinalityStatement:
182		rows, err = e.executeShowSeriesCardinalityStatement(stmt)
183	case *influxql.ShowShardsStatement:
184		rows, err = e.executeShowShardsStatement(stmt)
185	case *influxql.ShowShardGroupsStatement:
186		rows, err = e.executeShowShardGroupsStatement(stmt)
187	case *influxql.ShowStatsStatement:
188		rows, err = e.executeShowStatsStatement(stmt)
189	case *influxql.ShowSubscriptionsStatement:
190		rows, err = e.executeShowSubscriptionsStatement(stmt)
191	case *influxql.ShowTagKeysStatement:
192		return e.executeShowTagKeys(stmt, ctx)
193	case *influxql.ShowTagValuesStatement:
194		return e.executeShowTagValues(stmt, ctx)
195	case *influxql.ShowUsersStatement:
196		rows, err = e.executeShowUsersStatement(stmt)
197	case *influxql.SetPasswordUserStatement:
198		if ctx.ReadOnly {
199			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
200		}
201		err = e.executeSetPasswordUserStatement(stmt)
202	case *influxql.ShowQueriesStatement, *influxql.KillQueryStatement:
203		// Send query related statements to the task manager.
204		return e.TaskManager.ExecuteStatement(stmt, ctx)
205	default:
206		return query.ErrInvalidQuery
207	}
208
209	if err != nil {
210		return err
211	}
212
213	return ctx.Send(&query.Result{
214		Series:   rows,
215		Messages: messages,
216	})
217}
218
219func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error {
220	rpu := &meta.RetentionPolicyUpdate{
221		Duration:           stmt.Duration,
222		ReplicaN:           stmt.Replication,
223		ShardGroupDuration: stmt.ShardGroupDuration,
224	}
225
226	// Update the retention policy.
227	return e.MetaClient.UpdateRetentionPolicy(stmt.Database, stmt.Name, rpu, stmt.Default)
228}
229
230func (e *StatementExecutor) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement) error {
231	// Verify that retention policies exist.
232	var err error
233	verifyRPFn := func(n influxql.Node) {
234		if err != nil {
235			return
236		}
237		switch m := n.(type) {
238		case *influxql.Measurement:
239			var rp *meta.RetentionPolicyInfo
240			if rp, err = e.MetaClient.RetentionPolicy(m.Database, m.RetentionPolicy); err != nil {
241				return
242			} else if rp == nil {
243				err = fmt.Errorf("%s: %s.%s", meta.ErrRetentionPolicyNotFound, m.Database, m.RetentionPolicy)
244			}
245		default:
246			return
247		}
248	}
249
250	influxql.WalkFunc(q, verifyRPFn)
251
252	if err != nil {
253		return err
254	}
255
256	return e.MetaClient.CreateContinuousQuery(q.Database, q.Name, q.String())
257}
258
259func (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateDatabaseStatement) error {
260	if !meta.ValidName(stmt.Name) {
261		// TODO This should probably be in `(*meta.Data).CreateDatabase`
262		// but can't go there until 1.1 is used everywhere
263		return meta.ErrInvalidName
264	}
265
266	if !stmt.RetentionPolicyCreate {
267		_, err := e.MetaClient.CreateDatabase(stmt.Name)
268		return err
269	}
270
271	// If we're doing, for example, CREATE DATABASE "db" WITH DURATION 1d then
272	// the name will not yet be set. We only need to validate non-empty
273	// retention policy names, such as in the statement:
274	// 	CREATE DATABASE "db" WITH DURATION 1d NAME "xyz"
275	if stmt.RetentionPolicyName != "" && !meta.ValidName(stmt.RetentionPolicyName) {
276		return meta.ErrInvalidName
277	}
278
279	spec := meta.RetentionPolicySpec{
280		Name:               stmt.RetentionPolicyName,
281		Duration:           stmt.RetentionPolicyDuration,
282		ReplicaN:           stmt.RetentionPolicyReplication,
283		ShardGroupDuration: stmt.RetentionPolicyShardGroupDuration,
284	}
285	_, err := e.MetaClient.CreateDatabaseWithRetentionPolicy(stmt.Name, &spec)
286	return err
287}
288
289func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) error {
290	if !meta.ValidName(stmt.Name) {
291		// TODO This should probably be in `(*meta.Data).CreateRetentionPolicy`
292		// but can't go there until 1.1 is used everywhere
293		return meta.ErrInvalidName
294	}
295
296	spec := meta.RetentionPolicySpec{
297		Name:               stmt.Name,
298		Duration:           &stmt.Duration,
299		ReplicaN:           &stmt.Replication,
300		ShardGroupDuration: stmt.ShardGroupDuration,
301	}
302
303	// Create new retention policy.
304	_, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec, stmt.Default)
305	return err
306}
307
308func (e *StatementExecutor) executeCreateSubscriptionStatement(q *influxql.CreateSubscriptionStatement) error {
309	return e.MetaClient.CreateSubscription(q.Database, q.RetentionPolicy, q.Name, q.Mode, q.Destinations)
310}
311
312func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) error {
313	_, err := e.MetaClient.CreateUser(q.Name, q.Password, q.Admin)
314	return err
315}
316
317func (e *StatementExecutor) executeDeleteSeriesStatement(stmt *influxql.DeleteSeriesStatement, database string) error {
318	if dbi := e.MetaClient.Database(database); dbi == nil {
319		return query.ErrDatabaseNotFound(database)
320	}
321
322	// Convert "now()" to current time.
323	stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()})
324
325	// Locally delete the series.
326	return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
327}
328
329func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) error {
330	return e.MetaClient.DropContinuousQuery(q.Database, q.Name)
331}
332
333// executeDropDatabaseStatement drops a database from the cluster.
334// It does not return an error if the database was not found on any of
335// the nodes, or in the Meta store.
336func (e *StatementExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabaseStatement) error {
337	if e.MetaClient.Database(stmt.Name) == nil {
338		return nil
339	}
340
341	// Locally delete the datababse.
342	if err := e.TSDBStore.DeleteDatabase(stmt.Name); err != nil {
343		return err
344	}
345
346	// Remove the database from the Meta Store.
347	return e.MetaClient.DropDatabase(stmt.Name)
348}
349
350func (e *StatementExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string) error {
351	if dbi := e.MetaClient.Database(database); dbi == nil {
352		return query.ErrDatabaseNotFound(database)
353	}
354
355	// Locally drop the measurement
356	return e.TSDBStore.DeleteMeasurement(database, stmt.Name)
357}
358
359func (e *StatementExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string) error {
360	if dbi := e.MetaClient.Database(database); dbi == nil {
361		return query.ErrDatabaseNotFound(database)
362	}
363
364	// Check for time in WHERE clause (not supported).
365	if influxql.HasTimeExpr(stmt.Condition) {
366		return errors.New("DROP SERIES doesn't support time in WHERE clause")
367	}
368
369	// Locally drop the series.
370	return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
371}
372
373func (e *StatementExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error {
374	// Locally delete the shard.
375	if err := e.TSDBStore.DeleteShard(stmt.ID); err != nil {
376		return err
377	}
378
379	// Remove the shard reference from the Meta Store.
380	return e.MetaClient.DropShard(stmt.ID)
381}
382
383func (e *StatementExecutor) executeDropRetentionPolicyStatement(stmt *influxql.DropRetentionPolicyStatement) error {
384	dbi := e.MetaClient.Database(stmt.Database)
385	if dbi == nil {
386		return nil
387	}
388
389	if dbi.RetentionPolicy(stmt.Name) == nil {
390		return nil
391	}
392
393	// Locally drop the retention policy.
394	if err := e.TSDBStore.DeleteRetentionPolicy(stmt.Database, stmt.Name); err != nil {
395		return err
396	}
397
398	return e.MetaClient.DropRetentionPolicy(stmt.Database, stmt.Name)
399}
400
401func (e *StatementExecutor) executeDropSubscriptionStatement(q *influxql.DropSubscriptionStatement) error {
402	return e.MetaClient.DropSubscription(q.Database, q.RetentionPolicy, q.Name)
403}
404
405func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStatement) error {
406	return e.MetaClient.DropUser(q.Name)
407}
408
409func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) {
410	opt := query.SelectOptions{
411		NodeID:      ctx.ExecutionOptions.NodeID,
412		MaxSeriesN:  e.MaxSelectSeriesN,
413		MaxBucketsN: e.MaxSelectBucketsN,
414		Authorizer:  ctx.Authorizer,
415	}
416
417	// Prepare the query for execution, but do not actually execute it.
418	// This should perform any needed substitutions.
419	p, err := query.Prepare(q.Statement, e.ShardMapper, opt)
420	if err != nil {
421		return nil, err
422	}
423	defer p.Close()
424
425	plan, err := p.Explain()
426	if err != nil {
427		return nil, err
428	}
429	plan = strings.TrimSpace(plan)
430
431	row := &models.Row{
432		Columns: []string{"QUERY PLAN"},
433	}
434	for _, s := range strings.Split(plan, "\n") {
435		row.Values = append(row.Values, []interface{}{s})
436	}
437	return models.Rows{row}, nil
438}
439
440func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) {
441	stmt := q.Statement
442	t, span := tracing.NewTrace("select")
443	ctx := tracing.NewContextWithTrace(ectx, t)
444	ctx = tracing.NewContextWithSpan(ctx, span)
445	var aux query.Iterators
446	ctx = query.NewContextWithIterators(ctx, &aux)
447	start := time.Now()
448
449	cur, err := e.createIterators(ctx, stmt, ectx.ExecutionOptions)
450	if err != nil {
451		return nil, err
452	}
453
454	iterTime := time.Since(start)
455
456	// Generate a row emitter from the iterator set.
457	em := query.NewEmitter(cur, ectx.ChunkSize)
458
459	// Emit rows to the results channel.
460	var writeN int64
461	for {
462		var row *models.Row
463		row, _, err = em.Emit()
464		if err != nil {
465			goto CLEANUP
466		} else if row == nil {
467			// Check if the query was interrupted while emitting.
468			select {
469			case <-ectx.Done():
470				err = ectx.Err()
471				goto CLEANUP
472			default:
473			}
474			break
475		}
476
477		writeN += int64(len(row.Values))
478	}
479
480CLEANUP:
481	em.Close()
482	if err != nil {
483		return nil, err
484	}
485
486	// close auxiliary iterators deterministically to finalize any captured measurements
487	aux.Close()
488
489	totalTime := time.Since(start)
490	span.MergeFields(
491		fields.Duration("total_time", totalTime),
492		fields.Duration("planning_time", iterTime),
493		fields.Duration("execution_time", totalTime-iterTime),
494	)
495	span.Finish()
496
497	row := &models.Row{
498		Columns: []string{"EXPLAIN ANALYZE"},
499	}
500	for _, s := range strings.Split(t.Tree().String(), "\n") {
501		row.Values = append(row.Values, []interface{}{s})
502	}
503
504	return models.Rows{row}, nil
505}
506
507func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error {
508	return e.MetaClient.SetPrivilege(stmt.User, stmt.On, stmt.Privilege)
509}
510
511func (e *StatementExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStatement) error {
512	return e.MetaClient.SetAdminPrivilege(stmt.User, true)
513}
514
515func (e *StatementExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) error {
516	priv := influxql.NoPrivileges
517
518	// Revoking all privileges means there's no need to look at existing user privileges.
519	if stmt.Privilege != influxql.AllPrivileges {
520		p, err := e.MetaClient.UserPrivilege(stmt.User, stmt.On)
521		if err != nil {
522			return err
523		}
524		// Bit clear (AND NOT) the user's privilege with the revoked privilege.
525		priv = *p &^ stmt.Privilege
526	}
527
528	return e.MetaClient.SetPrivilege(stmt.User, stmt.On, priv)
529}
530
531func (e *StatementExecutor) executeRevokeAdminStatement(stmt *influxql.RevokeAdminStatement) error {
532	return e.MetaClient.SetAdminPrivilege(stmt.User, false)
533}
534
535func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordUserStatement) error {
536	return e.MetaClient.UpdateUser(q.Name, q.Password)
537}
538
539func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error {
540	cur, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions)
541	if err != nil {
542		return err
543	}
544
545	// Generate a row emitter from the iterator set.
546	em := query.NewEmitter(cur, ctx.ChunkSize)
547	defer em.Close()
548
549	// Emit rows to the results channel.
550	var writeN int64
551	var emitted bool
552
553	var pointsWriter *BufferedPointsWriter
554	if stmt.Target != nil {
555		pointsWriter = NewBufferedPointsWriter(e.PointsWriter, stmt.Target.Measurement.Database, stmt.Target.Measurement.RetentionPolicy, 10000)
556	}
557
558	for {
559		row, partial, err := em.Emit()
560		if err != nil {
561			return err
562		} else if row == nil {
563			// Check if the query was interrupted while emitting.
564			select {
565			case <-ctx.Done():
566				return ctx.Err()
567			default:
568			}
569			break
570		}
571
572		// Write points back into system for INTO statements.
573		if stmt.Target != nil {
574			if err := e.writeInto(pointsWriter, stmt, row); err != nil {
575				return err
576			}
577			writeN += int64(len(row.Values))
578			continue
579		}
580
581		result := &query.Result{
582			Series:  []*models.Row{row},
583			Partial: partial,
584		}
585
586		// Send results or exit if closing.
587		if err := ctx.Send(result); err != nil {
588			return err
589		}
590
591		emitted = true
592	}
593
594	// Flush remaining points and emit write count if an INTO statement.
595	if stmt.Target != nil {
596		if err := pointsWriter.Flush(); err != nil {
597			return err
598		}
599
600		var messages []*query.Message
601		if ctx.ReadOnly {
602			messages = append(messages, query.ReadOnlyWarning(stmt.String()))
603		}
604
605		return ctx.Send(&query.Result{
606			Messages: messages,
607			Series: []*models.Row{{
608				Name:    "result",
609				Columns: []string{"time", "written"},
610				Values:  [][]interface{}{{time.Unix(0, 0).UTC(), writeN}},
611			}},
612		})
613	}
614
615	// Always emit at least one result.
616	if !emitted {
617		return ctx.Send(&query.Result{
618			Series: make([]*models.Row, 0),
619		})
620	}
621
622	return nil
623}
624
625func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, opt query.ExecutionOptions) (query.Cursor, error) {
626	sopt := query.SelectOptions{
627		NodeID:      opt.NodeID,
628		MaxSeriesN:  e.MaxSelectSeriesN,
629		MaxPointN:   e.MaxSelectPointN,
630		MaxBucketsN: e.MaxSelectBucketsN,
631		Authorizer:  opt.Authorizer,
632	}
633
634	// Create a set of iterators from a selection.
635	cur, err := query.Select(ctx, stmt, e.ShardMapper, sopt)
636	if err != nil {
637		return nil, err
638	}
639	return cur, nil
640}
641
642func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) {
643	dis := e.MetaClient.Databases()
644
645	rows := []*models.Row{}
646	for _, di := range dis {
647		row := &models.Row{Columns: []string{"name", "query"}, Name: di.Name}
648		for _, cqi := range di.ContinuousQueries {
649			row.Values = append(row.Values, []interface{}{cqi.Name, cqi.Query})
650		}
651		rows = append(rows, row)
652	}
653	return rows, nil
654}
655
656func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) {
657	dis := e.MetaClient.Databases()
658	a := ctx.ExecutionOptions.Authorizer
659
660	row := &models.Row{Name: "databases", Columns: []string{"name"}}
661	for _, di := range dis {
662		// Only include databases that the user is authorized to read or write.
663		if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) {
664			row.Values = append(row.Values, []interface{}{di.Name})
665		}
666	}
667	return []*models.Row{row}, nil
668}
669
670func (e *StatementExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement) (models.Rows, error) {
671	diags, err := e.Monitor.Diagnostics()
672	if err != nil {
673		return nil, err
674	}
675
676	// Get a sorted list of diagnostics keys.
677	sortedKeys := make([]string, 0, len(diags))
678	for k := range diags {
679		sortedKeys = append(sortedKeys, k)
680	}
681	sort.Strings(sortedKeys)
682
683	rows := make([]*models.Row, 0, len(diags))
684	for _, k := range sortedKeys {
685		if stmt.Module != "" && k != stmt.Module {
686			continue
687		}
688
689		row := &models.Row{Name: k}
690
691		row.Columns = diags[k].Columns
692		row.Values = diags[k].Rows
693		rows = append(rows, row)
694	}
695	return rows, nil
696}
697
698func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement) (models.Rows, error) {
699	priv, err := e.MetaClient.UserPrivileges(q.Name)
700	if err != nil {
701		return nil, err
702	}
703
704	row := &models.Row{Columns: []string{"database", "privilege"}}
705	for d, p := range priv {
706		row.Values = append(row.Values, []interface{}{d, p.String()})
707	}
708	return []*models.Row{row}, nil
709}
710
711func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMeasurementsStatement, ctx *query.ExecutionContext) error {
712	if q.Database == "" {
713		return ErrDatabaseNameRequired
714	}
715
716	names, err := e.TSDBStore.MeasurementNames(ctx.Authorizer, q.Database, q.Condition)
717	if err != nil || len(names) == 0 {
718		return ctx.Send(&query.Result{
719			Err: err,
720		})
721	}
722
723	if q.Offset > 0 {
724		if q.Offset >= len(names) {
725			names = nil
726		} else {
727			names = names[q.Offset:]
728		}
729	}
730
731	if q.Limit > 0 {
732		if q.Limit < len(names) {
733			names = names[:q.Limit]
734		}
735	}
736
737	values := make([][]interface{}, len(names))
738	for i, name := range names {
739		values[i] = []interface{}{string(name)}
740	}
741
742	if len(values) == 0 {
743		return ctx.Send(&query.Result{})
744	}
745
746	return ctx.Send(&query.Result{
747		Series: []*models.Row{{
748			Name:    "measurements",
749			Columns: []string{"name"},
750			Values:  values,
751		}},
752	})
753}
754
755func (e *StatementExecutor) executeShowMeasurementCardinalityStatement(stmt *influxql.ShowMeasurementCardinalityStatement) (models.Rows, error) {
756	if stmt.Database == "" {
757		return nil, ErrDatabaseNameRequired
758	}
759
760	n, err := e.TSDBStore.MeasurementsCardinality(stmt.Database)
761	if err != nil {
762		return nil, err
763	}
764
765	return []*models.Row{&models.Row{
766		Columns: []string{"cardinality estimation"},
767		Values:  [][]interface{}{{n}},
768	}}, nil
769}
770
771func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
772	if q.Database == "" {
773		return nil, ErrDatabaseNameRequired
774	}
775
776	di := e.MetaClient.Database(q.Database)
777	if di == nil {
778		return nil, influxdb.ErrDatabaseNotFound(q.Database)
779	}
780
781	row := &models.Row{Columns: []string{"name", "duration", "shardGroupDuration", "replicaN", "default"}}
782	for _, rpi := range di.RetentionPolicies {
783		row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ShardGroupDuration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name})
784	}
785	return []*models.Row{row}, nil
786}
787
788func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) (models.Rows, error) {
789	dis := e.MetaClient.Databases()
790
791	rows := []*models.Row{}
792	for _, di := range dis {
793		row := &models.Row{Columns: []string{"id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name}
794		for _, rpi := range di.RetentionPolicies {
795			for _, sgi := range rpi.ShardGroups {
796				// Shards associated with deleted shard groups are effectively deleted.
797				// Don't list them.
798				if sgi.Deleted() {
799					continue
800				}
801
802				for _, si := range sgi.Shards {
803					ownerIDs := make([]uint64, len(si.Owners))
804					for i, owner := range si.Owners {
805						ownerIDs[i] = owner.NodeID
806					}
807
808					row.Values = append(row.Values, []interface{}{
809						si.ID,
810						di.Name,
811						rpi.Name,
812						sgi.ID,
813						sgi.StartTime.UTC().Format(time.RFC3339),
814						sgi.EndTime.UTC().Format(time.RFC3339),
815						sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339),
816						joinUint64(ownerIDs),
817					})
818				}
819			}
820		}
821		rows = append(rows, row)
822	}
823	return rows, nil
824}
825
826func (e *StatementExecutor) executeShowSeriesCardinalityStatement(stmt *influxql.ShowSeriesCardinalityStatement) (models.Rows, error) {
827	if stmt.Database == "" {
828		return nil, ErrDatabaseNameRequired
829	}
830
831	n, err := e.TSDBStore.SeriesCardinality(stmt.Database)
832	if err != nil {
833		return nil, err
834	}
835
836	return []*models.Row{&models.Row{
837		Columns: []string{"cardinality estimation"},
838		Values:  [][]interface{}{{n}},
839	}}, nil
840}
841
842func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement) (models.Rows, error) {
843	dis := e.MetaClient.Databases()
844
845	row := &models.Row{Columns: []string{"id", "database", "retention_policy", "start_time", "end_time", "expiry_time"}, Name: "shard groups"}
846	for _, di := range dis {
847		for _, rpi := range di.RetentionPolicies {
848			for _, sgi := range rpi.ShardGroups {
849				// Shards associated with deleted shard groups are effectively deleted.
850				// Don't list them.
851				if sgi.Deleted() {
852					continue
853				}
854
855				row.Values = append(row.Values, []interface{}{
856					sgi.ID,
857					di.Name,
858					rpi.Name,
859					sgi.StartTime.UTC().Format(time.RFC3339),
860					sgi.EndTime.UTC().Format(time.RFC3339),
861					sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339),
862				})
863			}
864		}
865	}
866
867	return []*models.Row{row}, nil
868}
869
870func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) (models.Rows, error) {
871	var rows []*models.Row
872
873	if _, ok := e.TSDBStore.(*tsdb.Store); stmt.Module == "indexes" && ok {
874		// The cost of collecting indexes metrics grows with the size of the indexes, so only collect this
875		// stat when explicitly requested.
876		b := e.TSDBStore.(*tsdb.Store).IndexBytes()
877		row := &models.Row{
878			Name:    "indexes",
879			Columns: []string{"memoryBytes"},
880			Values:  [][]interface{}{{b}},
881		}
882		rows = append(rows, row)
883
884	} else {
885		stats, err := e.Monitor.Statistics(nil)
886		if err != nil {
887			return nil, err
888		}
889
890		for _, stat := range stats {
891			if stmt.Module != "" && stat.Name != stmt.Module {
892				continue
893			}
894			row := &models.Row{Name: stat.Name, Tags: stat.Tags}
895
896			values := make([]interface{}, 0, len(stat.Values))
897			for _, k := range stat.ValueNames() {
898				row.Columns = append(row.Columns, k)
899				values = append(values, stat.Values[k])
900			}
901			row.Values = [][]interface{}{values}
902			rows = append(rows, row)
903		}
904	}
905	return rows, nil
906}
907
908func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement) (models.Rows, error) {
909	dis := e.MetaClient.Databases()
910
911	rows := []*models.Row{}
912	for _, di := range dis {
913		row := &models.Row{Columns: []string{"retention_policy", "name", "mode", "destinations"}, Name: di.Name}
914		for _, rpi := range di.RetentionPolicies {
915			for _, si := range rpi.Subscriptions {
916				row.Values = append(row.Values, []interface{}{rpi.Name, si.Name, si.Mode, si.Destinations})
917			}
918		}
919		if len(row.Values) > 0 {
920			rows = append(rows, row)
921		}
922	}
923	return rows, nil
924}
925
926func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement, ctx *query.ExecutionContext) error {
927	if q.Database == "" {
928		return ErrDatabaseNameRequired
929	}
930
931	// Determine shard set based on database and time range.
932	// SHOW TAG KEYS returns all tag keys for the default retention policy.
933	di := e.MetaClient.Database(q.Database)
934	if di == nil {
935		return fmt.Errorf("database not found: %s", q.Database)
936	}
937
938	// Determine appropriate time range. If one or fewer time boundaries provided
939	// then min/max possible time should be used instead.
940	valuer := &influxql.NowValuer{Now: time.Now()}
941	cond, timeRange, err := influxql.ConditionExpr(q.Condition, valuer)
942	if err != nil {
943		return err
944	}
945
946	// Get all shards for all retention policies.
947	var allGroups []meta.ShardGroupInfo
948	for _, rpi := range di.RetentionPolicies {
949		sgis, err := e.MetaClient.ShardGroupsByTimeRange(q.Database, rpi.Name, timeRange.MinTime(), timeRange.MaxTime())
950		if err != nil {
951			return err
952		}
953		allGroups = append(allGroups, sgis...)
954	}
955
956	var shardIDs []uint64
957	for _, sgi := range allGroups {
958		for _, si := range sgi.Shards {
959			shardIDs = append(shardIDs, si.ID)
960		}
961	}
962
963	tagKeys, err := e.TSDBStore.TagKeys(ctx.Authorizer, shardIDs, cond)
964	if err != nil {
965		return ctx.Send(&query.Result{
966			Err: err,
967		})
968	}
969
970	emitted := false
971	for _, m := range tagKeys {
972		keys := m.Keys
973
974		if q.Offset > 0 {
975			if q.Offset >= len(keys) {
976				keys = nil
977			} else {
978				keys = keys[q.Offset:]
979			}
980		}
981		if q.Limit > 0 && q.Limit < len(keys) {
982			keys = keys[:q.Limit]
983		}
984
985		if len(keys) == 0 {
986			continue
987		}
988
989		row := &models.Row{
990			Name:    m.Measurement,
991			Columns: []string{"tagKey"},
992			Values:  make([][]interface{}, len(keys)),
993		}
994		for i, key := range keys {
995			row.Values[i] = []interface{}{key}
996		}
997
998		if err := ctx.Send(&query.Result{
999			Series: []*models.Row{row},
1000		}); err != nil {
1001			return err
1002		}
1003		emitted = true
1004	}
1005
1006	// Ensure at least one result is emitted.
1007	if !emitted {
1008		return ctx.Send(&query.Result{})
1009	}
1010	return nil
1011}
1012
1013func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatement, ctx *query.ExecutionContext) error {
1014	if q.Database == "" {
1015		return ErrDatabaseNameRequired
1016	}
1017
1018	// Determine shard set based on database and time range.
1019	// SHOW TAG VALUES returns all tag values for the default retention policy.
1020	di := e.MetaClient.Database(q.Database)
1021	if di == nil {
1022		return fmt.Errorf("database not found: %s", q.Database)
1023	}
1024
1025	// Determine appropriate time range. If one or fewer time boundaries provided
1026	// then min/max possible time should be used instead.
1027	valuer := &influxql.NowValuer{Now: time.Now()}
1028	cond, timeRange, err := influxql.ConditionExpr(q.Condition, valuer)
1029	if err != nil {
1030		return err
1031	}
1032
1033	// Get all shards for all retention policies.
1034	var allGroups []meta.ShardGroupInfo
1035	for _, rpi := range di.RetentionPolicies {
1036		sgis, err := e.MetaClient.ShardGroupsByTimeRange(q.Database, rpi.Name, timeRange.MinTime(), timeRange.MaxTime())
1037		if err != nil {
1038			return err
1039		}
1040		allGroups = append(allGroups, sgis...)
1041	}
1042
1043	var shardIDs []uint64
1044	for _, sgi := range allGroups {
1045		for _, si := range sgi.Shards {
1046			shardIDs = append(shardIDs, si.ID)
1047		}
1048	}
1049
1050	tagValues, err := e.TSDBStore.TagValues(ctx.Authorizer, shardIDs, cond)
1051	if err != nil {
1052		return ctx.Send(&query.Result{Err: err})
1053	}
1054
1055	emitted := false
1056	for _, m := range tagValues {
1057		values := m.Values
1058
1059		if q.Offset > 0 {
1060			if q.Offset >= len(values) {
1061				values = nil
1062			} else {
1063				values = values[q.Offset:]
1064			}
1065		}
1066
1067		if q.Limit > 0 {
1068			if q.Limit < len(values) {
1069				values = values[:q.Limit]
1070			}
1071		}
1072
1073		if len(values) == 0 {
1074			continue
1075		}
1076
1077		row := &models.Row{
1078			Name:    m.Measurement,
1079			Columns: []string{"key", "value"},
1080			Values:  make([][]interface{}, len(values)),
1081		}
1082		for i, v := range values {
1083			row.Values[i] = []interface{}{v.Key, v.Value}
1084		}
1085
1086		if err := ctx.Send(&query.Result{
1087			Series: []*models.Row{row},
1088		}); err != nil {
1089			return err
1090		}
1091		emitted = true
1092	}
1093
1094	// Ensure at least one result is emitted.
1095	if !emitted {
1096		return ctx.Send(&query.Result{})
1097	}
1098	return nil
1099}
1100
1101func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) (models.Rows, error) {
1102	row := &models.Row{Columns: []string{"user", "admin"}}
1103	for _, ui := range e.MetaClient.Users() {
1104		row.Values = append(row.Values, []interface{}{ui.Name, ui.Admin})
1105	}
1106	return []*models.Row{row}, nil
1107}
1108
1109// BufferedPointsWriter adds buffering to a pointsWriter so that SELECT INTO queries
1110// write their points to the destination in batches.
1111type BufferedPointsWriter struct {
1112	w               pointsWriter
1113	buf             []models.Point
1114	database        string
1115	retentionPolicy string
1116}
1117
1118// NewBufferedPointsWriter returns a new BufferedPointsWriter.
1119func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter {
1120	return &BufferedPointsWriter{
1121		w:               w,
1122		buf:             make([]models.Point, 0, capacity),
1123		database:        database,
1124		retentionPolicy: retentionPolicy,
1125	}
1126}
1127
1128// WritePointsInto implements pointsWriter for BufferedPointsWriter.
1129func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error {
1130	// Make sure we're buffering points only for the expected destination.
1131	if req.Database != w.database || req.RetentionPolicy != w.retentionPolicy {
1132		return fmt.Errorf("writer for %s.%s can't write into %s.%s", w.database, w.retentionPolicy, req.Database, req.RetentionPolicy)
1133	}
1134
1135	for i := 0; i < len(req.Points); {
1136		// Get the available space in the buffer.
1137		avail := cap(w.buf) - len(w.buf)
1138
1139		// Calculate number of points to copy into the buffer.
1140		n := len(req.Points[i:])
1141		if n > avail {
1142			n = avail
1143		}
1144
1145		// Copy points into buffer.
1146		w.buf = append(w.buf, req.Points[i:n+i]...)
1147
1148		// Advance the index by number of points copied.
1149		i += n
1150
1151		// If buffer is full, flush points to underlying writer.
1152		if len(w.buf) == cap(w.buf) {
1153			if err := w.Flush(); err != nil {
1154				return err
1155			}
1156		}
1157	}
1158
1159	return nil
1160}
1161
1162// Flush writes all buffered points to the underlying writer.
1163func (w *BufferedPointsWriter) Flush() error {
1164	if len(w.buf) == 0 {
1165		return nil
1166	}
1167
1168	if err := w.w.WritePointsInto(&IntoWriteRequest{
1169		Database:        w.database,
1170		RetentionPolicy: w.retentionPolicy,
1171		Points:          w.buf,
1172	}); err != nil {
1173		return err
1174	}
1175
1176	// Clear the buffer.
1177	w.buf = w.buf[:0]
1178
1179	return nil
1180}
1181
1182// Len returns the number of points buffered.
1183func (w *BufferedPointsWriter) Len() int { return len(w.buf) }
1184
1185// Cap returns the capacity (in points) of the buffer.
1186func (w *BufferedPointsWriter) Cap() int { return cap(w.buf) }
1187
1188func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row) error {
1189	if stmt.Target.Measurement.Database == "" {
1190		return errNoDatabaseInTarget
1191	}
1192
1193	// It might seem a bit weird that this is where we do this, since we will have to
1194	// convert rows back to points. The Executors (both aggregate and raw) are complex
1195	// enough that changing them to write back to the DB is going to be clumsy
1196	//
1197	// it might seem weird to have the write be in the Executor, but the interweaving of
1198	// limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the
1199	// results will be the same as when queried normally.
1200	name := stmt.Target.Measurement.Name
1201	if name == "" {
1202		name = row.Name
1203	}
1204
1205	points, err := convertRowToPoints(name, row)
1206	if err != nil {
1207		return err
1208	}
1209
1210	if err := w.WritePointsInto(&IntoWriteRequest{
1211		Database:        stmt.Target.Measurement.Database,
1212		RetentionPolicy: stmt.Target.Measurement.RetentionPolicy,
1213		Points:          points,
1214	}); err != nil {
1215		return err
1216	}
1217
1218	return nil
1219}
1220
1221var errNoDatabaseInTarget = errors.New("no database in target")
1222
1223// convertRowToPoints will convert a query result Row into Points that can be written back in.
1224func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) {
1225	// figure out which parts of the result are the time and which are the fields
1226	timeIndex := -1
1227	fieldIndexes := make(map[string]int)
1228	for i, c := range row.Columns {
1229		if c == "time" {
1230			timeIndex = i
1231		} else {
1232			fieldIndexes[c] = i
1233		}
1234	}
1235
1236	if timeIndex == -1 {
1237		return nil, errors.New("error finding time index in result")
1238	}
1239
1240	points := make([]models.Point, 0, len(row.Values))
1241	for _, v := range row.Values {
1242		vals := make(map[string]interface{})
1243		for fieldName, fieldIndex := range fieldIndexes {
1244			val := v[fieldIndex]
1245			if val != nil {
1246				vals[fieldName] = v[fieldIndex]
1247			}
1248		}
1249
1250		p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time))
1251		if err != nil {
1252			// Drop points that can't be stored
1253			continue
1254		}
1255
1256		points = append(points, p)
1257	}
1258
1259	return points, nil
1260}
1261
1262// NormalizeStatement adds a default database and policy to the measurements in statement.
1263// Parameter defaultRetentionPolicy can be "".
1264func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultDatabase, defaultRetentionPolicy string) (err error) {
1265	influxql.WalkFunc(stmt, func(node influxql.Node) {
1266		if err != nil {
1267			return
1268		}
1269		switch node := node.(type) {
1270		case *influxql.ShowRetentionPoliciesStatement:
1271			if node.Database == "" {
1272				node.Database = defaultDatabase
1273			}
1274		case *influxql.ShowMeasurementsStatement:
1275			if node.Database == "" {
1276				node.Database = defaultDatabase
1277			}
1278		case *influxql.ShowTagKeysStatement:
1279			if node.Database == "" {
1280				node.Database = defaultDatabase
1281			}
1282		case *influxql.ShowTagValuesStatement:
1283			if node.Database == "" {
1284				node.Database = defaultDatabase
1285			}
1286		case *influxql.ShowMeasurementCardinalityStatement:
1287			if node.Database == "" {
1288				node.Database = defaultDatabase
1289			}
1290		case *influxql.ShowSeriesCardinalityStatement:
1291			if node.Database == "" {
1292				node.Database = defaultDatabase
1293			}
1294		case *influxql.Measurement:
1295			switch stmt.(type) {
1296			case *influxql.DropSeriesStatement, *influxql.DeleteSeriesStatement:
1297				// DB and RP not supported by these statements so don't rewrite into invalid
1298				// statements
1299			default:
1300				err = e.normalizeMeasurement(node, defaultDatabase, defaultRetentionPolicy)
1301			}
1302		}
1303	})
1304	return
1305}
1306
1307func (e *StatementExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase, defaultRetentionPolicy string) error {
1308	// Targets (measurements in an INTO clause) can have blank names, which means it will be
1309	// the same as the measurement name it came from in the FROM clause.
1310	if !m.IsTarget && m.Name == "" && m.SystemIterator == "" && m.Regex == nil {
1311		return errors.New("invalid measurement")
1312	}
1313
1314	// Measurement does not have an explicit database? Insert default.
1315	if m.Database == "" {
1316		m.Database = defaultDatabase
1317	}
1318
1319	// The database must now be specified by this point.
1320	if m.Database == "" {
1321		return ErrDatabaseNameRequired
1322	}
1323
1324	// Find database.
1325	di := e.MetaClient.Database(m.Database)
1326	if di == nil {
1327		return influxdb.ErrDatabaseNotFound(m.Database)
1328	}
1329
1330	// If no retention policy was specified, use the default.
1331	if m.RetentionPolicy == "" {
1332		if defaultRetentionPolicy != "" {
1333			m.RetentionPolicy = defaultRetentionPolicy
1334		} else if di.DefaultRetentionPolicy != "" {
1335			m.RetentionPolicy = di.DefaultRetentionPolicy
1336		} else {
1337			return fmt.Errorf("default retention policy not set for: %s", di.Name)
1338		}
1339	}
1340	return nil
1341}
1342
1343// IntoWriteRequest is a partial copy of cluster.WriteRequest
1344type IntoWriteRequest struct {
1345	Database        string
1346	RetentionPolicy string
1347	Points          []models.Point
1348}
1349
1350// TSDBStore is an interface for accessing the time series data store.
1351type TSDBStore interface {
1352	CreateShard(database, policy string, shardID uint64, enabled bool) error
1353	WriteToShard(shardID uint64, points []models.Point) error
1354
1355	RestoreShard(id uint64, r io.Reader) error
1356	BackupShard(id uint64, since time.Time, w io.Writer) error
1357
1358	DeleteDatabase(name string) error
1359	DeleteMeasurement(database, name string) error
1360	DeleteRetentionPolicy(database, name string) error
1361	DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
1362	DeleteShard(id uint64) error
1363
1364	MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
1365	TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
1366	TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
1367
1368	SeriesCardinality(database string) (int64, error)
1369	MeasurementsCardinality(database string) (int64, error)
1370}
1371
1372var _ TSDBStore = LocalTSDBStore{}
1373
1374// LocalTSDBStore embeds a tsdb.Store and implements IteratorCreator
1375// to satisfy the TSDBStore interface.
1376type LocalTSDBStore struct {
1377	*tsdb.Store
1378}
1379
1380// ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard.
1381type ShardIteratorCreator interface {
1382	ShardIteratorCreator(id uint64) query.IteratorCreator
1383}
1384
1385// joinUint64 returns a comma-delimited string of uint64 numbers.
1386func joinUint64(a []uint64) string {
1387	var buf bytes.Buffer
1388	for i, x := range a {
1389		buf.WriteString(strconv.FormatUint(x, 10))
1390		if i < len(a)-1 {
1391			buf.WriteRune(',')
1392		}
1393	}
1394	return buf.String()
1395}
1396