1package coordinator_test
2
3import (
4	"bytes"
5	"context"
6	"errors"
7	"fmt"
8	"io"
9	"os"
10	"reflect"
11	"regexp"
12	"testing"
13	"time"
14
15	"github.com/davecgh/go-spew/spew"
16	"github.com/influxdata/influxdb/coordinator"
17	"github.com/influxdata/influxdb/internal"
18	"github.com/influxdata/influxdb/logger"
19	"github.com/influxdata/influxdb/models"
20	"github.com/influxdata/influxdb/query"
21	"github.com/influxdata/influxdb/services/meta"
22	"github.com/influxdata/influxdb/tsdb"
23	"github.com/influxdata/influxql"
24)
25
26const (
27	// DefaultDatabase is the default database name used in tests.
28	DefaultDatabase = "db0"
29
30	// DefaultRetentionPolicy is the default retention policy name used in tests.
31	DefaultRetentionPolicy = "rp0"
32)
33
34// Ensure query executor can execute a simple SELECT statement.
35func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) {
36	e := DefaultQueryExecutor()
37
38	// The meta client should return a single shard owned by the local node.
39	e.MetaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) {
40		return []meta.ShardGroupInfo{
41			{ID: 1, Shards: []meta.ShardInfo{
42				{ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}},
43			}},
44		}, nil
45	}
46
47	// The TSDB store should return an IteratorCreator for shard.
48	// This IteratorCreator returns a single iterator with "value" in the aux fields.
49	e.TSDBStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup {
50		if !reflect.DeepEqual(ids, []uint64{100}) {
51			t.Fatalf("unexpected shard ids: %v", ids)
52		}
53
54		var sh MockShard
55		sh.CreateIteratorFn = func(_ context.Context, _ *influxql.Measurement, _ query.IteratorOptions) (query.Iterator, error) {
56			return &FloatIterator{Points: []query.FloatPoint{
57				{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}},
58				{Name: "cpu", Time: int64(1 * time.Second), Aux: []interface{}{float64(200)}},
59			}}, nil
60		}
61		sh.FieldDimensionsFn = func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
62			if !reflect.DeepEqual(measurements, []string{"cpu"}) {
63				t.Fatalf("unexpected source: %#v", measurements)
64			}
65			return map[string]influxql.DataType{"value": influxql.Float}, nil, nil
66		}
67		return &sh
68	}
69
70	// Verify all results from the query.
71	if a := ReadAllResults(e.ExecuteQuery(`SELECT * FROM cpu`, "db0", 0)); !reflect.DeepEqual(a, []*query.Result{
72		{
73			StatementID: 0,
74			Series: []*models.Row{{
75				Name:    "cpu",
76				Columns: []string{"time", "value"},
77				Values: [][]interface{}{
78					{time.Unix(0, 0).UTC(), float64(100)},
79					{time.Unix(1, 0).UTC(), float64(200)},
80				},
81			}},
82		},
83	}) {
84		t.Fatalf("unexpected results: %s", spew.Sdump(a))
85	}
86}
87
88// Ensure query executor can enforce a maximum bucket selection count.
89func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
90	e := DefaultQueryExecutor()
91	e.StatementExecutor.MaxSelectBucketsN = 3
92
93	// The meta client should return a single shards on the local node.
94	e.MetaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) {
95		return []meta.ShardGroupInfo{
96			{ID: 1, Shards: []meta.ShardInfo{
97				{ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}},
98			}},
99		}, nil
100	}
101
102	e.TSDBStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup {
103		if !reflect.DeepEqual(ids, []uint64{100}) {
104			t.Fatalf("unexpected shard ids: %v", ids)
105		}
106
107		var sh MockShard
108		sh.CreateIteratorFn = func(_ context.Context, _ *influxql.Measurement, _ query.IteratorOptions) (query.Iterator, error) {
109			return &FloatIterator{
110				Points: []query.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}}},
111			}, nil
112		}
113		sh.FieldDimensionsFn = func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
114			if !reflect.DeepEqual(measurements, []string{"cpu"}) {
115				t.Fatalf("unexpected source: %#v", measurements)
116			}
117			return map[string]influxql.DataType{"value": influxql.Float}, nil, nil
118		}
119		return &sh
120	}
121
122	// Verify all results from the query.
123	if a := ReadAllResults(e.ExecuteQuery(`SELECT count(value) FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, "db0", 0)); !reflect.DeepEqual(a, []*query.Result{
124		{
125			StatementID: 0,
126			Err:         errors.New("max-select-buckets limit exceeded: (4/3)"),
127		},
128	}) {
129		t.Fatalf("unexpected results: %s", spew.Sdump(a))
130	}
131}
132
133func TestStatementExecutor_ExecuteQuery_WriteInto(t *testing.T) {
134	for _, tt := range []struct {
135		name    string
136		pw      func(t *testing.T, req *coordinator.IntoWriteRequest) error
137		query   string
138		source  func() query.Iterator
139		written int64
140	}{
141		{
142			name: "DropNullPoints",
143			pw: func(t *testing.T, req *coordinator.IntoWriteRequest) error {
144				if want, got := len(req.Points), 0; want != got {
145					t.Errorf("unexpected written points: %d != %d", want, got)
146				}
147				return nil
148			},
149			query: `SELECT stddev(value) INTO cpu_stddev FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`,
150			source: func() query.Iterator {
151				return &FloatIterator{
152					Points: []query.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Value: 100}},
153				}
154			},
155			written: 0,
156		},
157		{
158			name: "PartialDrop",
159			pw: func(t *testing.T, req *coordinator.IntoWriteRequest) error {
160				if want, got := len(req.Points), 1; want != got {
161					t.Errorf("unexpected written points: %d != %d", want, got)
162				} else {
163					fields, err := req.Points[0].Fields()
164					if err != nil {
165						return err
166					} else if want, got := len(fields), 1; want != got {
167						t.Errorf("unexpected number of fields: %d != %d", want, got)
168					}
169				}
170				return nil
171			},
172			query: `SELECT max(value), stddev(value) INTO cpu_agg FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`,
173			source: func() query.Iterator {
174				return &FloatIterator{
175					Points: []query.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Value: 100}},
176				}
177			},
178			written: 1,
179		},
180	} {
181		t.Run(tt.name, func(t *testing.T) {
182			e := DefaultQueryExecutor()
183			e.StatementExecutor.PointsWriter = writePointsIntoFunc(func(req *coordinator.IntoWriteRequest) error {
184				return tt.pw(t, req)
185			})
186
187			// The meta client should return a single shards on the local node.
188			e.MetaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) {
189				return []meta.ShardGroupInfo{
190					{ID: 1, Shards: []meta.ShardInfo{
191						{ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}},
192					}},
193				}, nil
194			}
195
196			e.TSDBStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup {
197				if !reflect.DeepEqual(ids, []uint64{100}) {
198					t.Fatalf("unexpected shard ids: %v", ids)
199				}
200
201				var sh MockShard
202				sh.CreateIteratorFn = func(_ context.Context, _ *influxql.Measurement, _ query.IteratorOptions) (query.Iterator, error) {
203					return tt.source(), nil
204				}
205				sh.FieldDimensionsFn = func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
206					if !reflect.DeepEqual(measurements, []string{"cpu"}) {
207						t.Fatalf("unexpected source: %#v", measurements)
208					}
209					return map[string]influxql.DataType{"value": influxql.Float}, nil, nil
210				}
211				return &sh
212			}
213
214			// Verify all results from the query.
215			if a := ReadAllResults(e.ExecuteQuery(tt.query, "db0", 0)); !reflect.DeepEqual(a, []*query.Result{
216				{
217					StatementID: 0,
218					Series: models.Rows{
219						{
220							Name:    "result",
221							Columns: []string{"time", "written"},
222							Values: [][]interface{}{
223								{ts("1970-01-01T00:00:00Z"), int64(tt.written)},
224							},
225						},
226					},
227				},
228			}) {
229				t.Fatalf("unexpected results: %s", spew.Sdump(a))
230			}
231		})
232	}
233}
234
235func TestStatementExecutor_NormalizeStatement(t *testing.T) {
236
237	testCases := []struct {
238		name       string
239		query      string
240		defaultDB  string
241		defaultRP  string
242		expectedDB string
243		expectedRP string
244	}{
245		{
246			name:       "defaults",
247			query:      "SELECT f FROM m",
248			defaultDB:  DefaultDatabase,
249			defaultRP:  "",
250			expectedDB: DefaultDatabase,
251			expectedRP: DefaultRetentionPolicy,
252		},
253		{
254			name:       "alternate database via param",
255			query:      "SELECT f FROM m",
256			defaultDB:  "dbalt",
257			defaultRP:  "",
258			expectedDB: "dbalt",
259			expectedRP: DefaultRetentionPolicy,
260		},
261		{
262			name:       "alternate database via query",
263			query:      fmt.Sprintf("SELECT f FROM dbalt.%s.m", DefaultRetentionPolicy),
264			defaultDB:  DefaultDatabase,
265			defaultRP:  "",
266			expectedDB: "dbalt",
267			expectedRP: DefaultRetentionPolicy,
268		},
269		{
270			name:       "alternate RP via param",
271			query:      "SELECT f FROM m",
272			defaultDB:  DefaultDatabase,
273			defaultRP:  "rpalt",
274			expectedDB: DefaultDatabase,
275			expectedRP: "rpalt",
276		},
277		{
278			name:       "alternate RP via query",
279			query:      fmt.Sprintf("SELECT f FROM %s.rpalt.m", DefaultDatabase),
280			defaultDB:  DefaultDatabase,
281			defaultRP:  "",
282			expectedDB: DefaultDatabase,
283			expectedRP: "rpalt",
284		},
285		{
286			name:       "alternate RP query disagrees with param and query wins",
287			query:      fmt.Sprintf("SELECT f FROM %s.rpquery.m", DefaultDatabase),
288			defaultDB:  DefaultDatabase,
289			defaultRP:  "rpparam",
290			expectedDB: DefaultDatabase,
291			expectedRP: "rpquery",
292		},
293	}
294
295	for _, testCase := range testCases {
296		t.Run(testCase.name, func(t *testing.T) {
297			q, err := influxql.ParseQuery(testCase.query)
298			if err != nil {
299				t.Fatalf("unexpected error parsing query: %v", err)
300			}
301
302			stmt := q.Statements[0].(*influxql.SelectStatement)
303
304			err = DefaultQueryExecutor().StatementExecutor.NormalizeStatement(stmt, testCase.defaultDB, testCase.defaultRP)
305			if err != nil {
306				t.Fatalf("unexpected error normalizing statement: %v", err)
307			}
308
309			m := stmt.Sources[0].(*influxql.Measurement)
310			if m.Database != testCase.expectedDB {
311				t.Errorf("database got %v, want %v", m.Database, testCase.expectedDB)
312			}
313			if m.RetentionPolicy != testCase.expectedRP {
314				t.Errorf("retention policy got %v, want %v", m.RetentionPolicy, testCase.expectedRP)
315			}
316		})
317	}
318}
319
320func TestStatementExecutor_NormalizeDropSeries(t *testing.T) {
321	q, err := influxql.ParseQuery("DROP SERIES FROM cpu")
322	if err != nil {
323		t.Fatalf("unexpected error parsing query: %v", err)
324	}
325
326	stmt := q.Statements[0].(*influxql.DropSeriesStatement)
327
328	s := &coordinator.StatementExecutor{
329		MetaClient: &internal.MetaClientMock{
330			DatabaseFn: func(name string) *meta.DatabaseInfo {
331				t.Fatal("meta client should not be called")
332				return nil
333			},
334		},
335	}
336	if err := s.NormalizeStatement(stmt, "foo", "bar"); err != nil {
337		t.Fatalf("unexpected error normalizing statement: %v", err)
338	}
339
340	m := stmt.Sources[0].(*influxql.Measurement)
341	if m.Database != "" {
342		t.Fatalf("database rewritten when not supposed to: %v", m.Database)
343	}
344	if m.RetentionPolicy != "" {
345		t.Fatalf("retention policy rewritten when not supposed to: %v", m.RetentionPolicy)
346	}
347
348	if exp, got := "DROP SERIES FROM cpu", q.String(); exp != got {
349		t.Fatalf("generated query does match parsed: exp %v, got %v", exp, got)
350	}
351}
352
353func TestStatementExecutor_NormalizeDeleteSeries(t *testing.T) {
354	q, err := influxql.ParseQuery("DELETE FROM cpu")
355	if err != nil {
356		t.Fatalf("unexpected error parsing query: %v", err)
357	}
358
359	stmt := q.Statements[0].(*influxql.DeleteSeriesStatement)
360
361	s := &coordinator.StatementExecutor{
362		MetaClient: &internal.MetaClientMock{
363			DatabaseFn: func(name string) *meta.DatabaseInfo {
364				t.Fatal("meta client should not be called")
365				return nil
366			},
367		},
368	}
369	if err := s.NormalizeStatement(stmt, "foo", "bar"); err != nil {
370		t.Fatalf("unexpected error normalizing statement: %v", err)
371	}
372
373	m := stmt.Sources[0].(*influxql.Measurement)
374	if m.Database != "" {
375		t.Fatalf("database rewritten when not supposed to: %v", m.Database)
376	}
377	if m.RetentionPolicy != "" {
378		t.Fatalf("retention policy rewritten when not supposed to: %v", m.RetentionPolicy)
379	}
380
381	if exp, got := "DELETE FROM cpu", q.String(); exp != got {
382		t.Fatalf("generated query does match parsed: exp %v, got %v", exp, got)
383	}
384}
385
386type mockAuthorizer struct {
387	AuthorizeDatabaseFn func(influxql.Privilege, string) bool
388}
389
390func (a *mockAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bool {
391	return a.AuthorizeDatabaseFn(p, name)
392}
393
394func (m *mockAuthorizer) AuthorizeQuery(database string, query *influxql.Query) error {
395	panic("fail")
396}
397
398func (m *mockAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
399	panic("fail")
400}
401
402func (m *mockAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
403	panic("fail")
404}
405
406func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) {
407	qe := query.NewExecutor()
408	qe.StatementExecutor = &coordinator.StatementExecutor{
409		MetaClient: &internal.MetaClientMock{
410			DatabasesFn: func() []meta.DatabaseInfo {
411				return []meta.DatabaseInfo{
412					{Name: "db1"}, {Name: "db2"}, {Name: "db3"}, {Name: "db4"},
413				}
414			},
415		},
416	}
417
418	opt := query.ExecutionOptions{
419		Authorizer: &mockAuthorizer{
420			AuthorizeDatabaseFn: func(p influxql.Privilege, name string) bool {
421				return name == "db2" || name == "db4"
422			},
423		},
424	}
425
426	q, err := influxql.ParseQuery("SHOW DATABASES")
427	if err != nil {
428		t.Fatal(err)
429	}
430
431	results := ReadAllResults(qe.ExecuteQuery(q, opt, make(chan struct{})))
432	exp := []*query.Result{
433		{
434			StatementID: 0,
435			Series: []*models.Row{{
436				Name:    "databases",
437				Columns: []string{"name"},
438				Values: [][]interface{}{
439					{"db2"}, {"db4"},
440				},
441			}},
442		},
443	}
444	if !reflect.DeepEqual(results, exp) {
445		t.Fatalf("unexpected results: exp %s, got %s", spew.Sdump(exp), spew.Sdump(results))
446	}
447}
448
449// QueryExecutor is a test wrapper for coordinator.QueryExecutor.
450type QueryExecutor struct {
451	*query.Executor
452
453	MetaClient        MetaClient
454	TSDBStore         *internal.TSDBStoreMock
455	StatementExecutor *coordinator.StatementExecutor
456	LogOutput         bytes.Buffer
457}
458
459// NewQueryExecutor returns a new instance of Executor.
460// This query executor always has a node id of 0.
461func NewQueryExecutor() *QueryExecutor {
462	e := &QueryExecutor{
463		Executor:  query.NewExecutor(),
464		TSDBStore: &internal.TSDBStoreMock{},
465	}
466
467	e.TSDBStore.CreateShardFn = func(database, policy string, shardID uint64, enabled bool) error {
468		return nil
469	}
470
471	e.TSDBStore.MeasurementNamesFn = func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
472		return nil, nil
473	}
474
475	e.TSDBStore.TagValuesFn = func(_ query.Authorizer, _ []uint64, _ influxql.Expr) ([]tsdb.TagValues, error) {
476		return nil, nil
477	}
478
479	e.StatementExecutor = &coordinator.StatementExecutor{
480		MetaClient: &e.MetaClient,
481		TSDBStore:  e.TSDBStore,
482		ShardMapper: &coordinator.LocalShardMapper{
483			MetaClient: &e.MetaClient,
484			TSDBStore:  e.TSDBStore,
485		},
486	}
487	e.Executor.StatementExecutor = e.StatementExecutor
488
489	var out io.Writer = &e.LogOutput
490	if testing.Verbose() {
491		out = io.MultiWriter(out, os.Stderr)
492	}
493	e.Executor.WithLogger(logger.New(out))
494
495	return e
496}
497
498// DefaultQueryExecutor returns a Executor with a database (db0) and retention policy (rp0).
499func DefaultQueryExecutor() *QueryExecutor {
500	e := NewQueryExecutor()
501	e.MetaClient.DatabaseFn = DefaultMetaClientDatabaseFn
502	return e
503}
504
505// ExecuteQuery parses query and executes against the database.
506func (e *QueryExecutor) ExecuteQuery(q, database string, chunkSize int) <-chan *query.Result {
507	return e.Executor.ExecuteQuery(MustParseQuery(q), query.ExecutionOptions{
508		Database:  database,
509		ChunkSize: chunkSize,
510	}, make(chan struct{}))
511}
512
513type MockShard struct {
514	Measurements             []string
515	FieldDimensionsFn        func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
516	FieldKeysByMeasurementFn func(name []byte) []string
517	CreateIteratorFn         func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error)
518	IteratorCostFn           func(m string, opt query.IteratorOptions) (query.IteratorCost, error)
519	ExpandSourcesFn          func(sources influxql.Sources) (influxql.Sources, error)
520}
521
522func (sh *MockShard) MeasurementsByRegex(re *regexp.Regexp) []string {
523	names := make([]string, 0, len(sh.Measurements))
524	for _, name := range sh.Measurements {
525		if re.MatchString(name) {
526			names = append(names, name)
527		}
528	}
529	return names
530}
531
532func (sh *MockShard) FieldKeysByMeasurement(name []byte) []string {
533	return sh.FieldKeysByMeasurementFn(name)
534}
535
536func (sh *MockShard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
537	return sh.FieldDimensionsFn(measurements)
538}
539
540func (sh *MockShard) MapType(measurement, field string) influxql.DataType {
541	f, d, err := sh.FieldDimensions([]string{measurement})
542	if err != nil {
543		return influxql.Unknown
544	}
545
546	if typ, ok := f[field]; ok {
547		return typ
548	} else if _, ok := d[field]; ok {
549		return influxql.Tag
550	}
551	return influxql.Unknown
552}
553
554func (sh *MockShard) CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
555	return sh.CreateIteratorFn(ctx, measurement, opt)
556}
557
558func (sh *MockShard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
559	return sh.IteratorCostFn(measurement, opt)
560}
561
562func (sh *MockShard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
563	return sh.ExpandSourcesFn(sources)
564}
565
566// MustParseQuery parses s into a query. Panic on error.
567func MustParseQuery(s string) *influxql.Query {
568	q, err := influxql.ParseQuery(s)
569	if err != nil {
570		panic(err)
571	}
572	return q
573}
574
575// ReadAllResults reads all results from c and returns as a slice.
576func ReadAllResults(c <-chan *query.Result) []*query.Result {
577	var a []*query.Result
578	for result := range c {
579		a = append(a, result)
580	}
581	return a
582}
583
584// FloatIterator is a represents an iterator that reads from a slice.
585type FloatIterator struct {
586	Points []query.FloatPoint
587	stats  query.IteratorStats
588}
589
590func (itr *FloatIterator) Stats() query.IteratorStats { return itr.stats }
591func (itr *FloatIterator) Close() error               { return nil }
592
593// Next returns the next value and shifts it off the beginning of the points slice.
594func (itr *FloatIterator) Next() (*query.FloatPoint, error) {
595	if len(itr.Points) == 0 {
596		return nil, nil
597	}
598
599	v := &itr.Points[0]
600	itr.Points = itr.Points[1:]
601	return v, nil
602}
603
604func ts(s string) time.Time {
605	t, err := time.Parse(time.RFC3339, s)
606	if err != nil {
607		panic(err)
608	}
609	return t
610}
611
612type writePointsIntoFunc func(req *coordinator.IntoWriteRequest) error
613
614func (fn writePointsIntoFunc) WritePointsInto(req *coordinator.IntoWriteRequest) error {
615	return fn(req)
616}
617