1package coordinator_test 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "os" 10 "reflect" 11 "regexp" 12 "testing" 13 "time" 14 15 "github.com/davecgh/go-spew/spew" 16 "github.com/influxdata/influxdb/coordinator" 17 "github.com/influxdata/influxdb/internal" 18 "github.com/influxdata/influxdb/logger" 19 "github.com/influxdata/influxdb/models" 20 "github.com/influxdata/influxdb/query" 21 "github.com/influxdata/influxdb/services/meta" 22 "github.com/influxdata/influxdb/tsdb" 23 "github.com/influxdata/influxql" 24) 25 26const ( 27 // DefaultDatabase is the default database name used in tests. 28 DefaultDatabase = "db0" 29 30 // DefaultRetentionPolicy is the default retention policy name used in tests. 31 DefaultRetentionPolicy = "rp0" 32) 33 34// Ensure query executor can execute a simple SELECT statement. 35func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) { 36 e := DefaultQueryExecutor() 37 38 // The meta client should return a single shard owned by the local node. 39 e.MetaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) { 40 return []meta.ShardGroupInfo{ 41 {ID: 1, Shards: []meta.ShardInfo{ 42 {ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}}, 43 }}, 44 }, nil 45 } 46 47 // The TSDB store should return an IteratorCreator for shard. 48 // This IteratorCreator returns a single iterator with "value" in the aux fields. 49 e.TSDBStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup { 50 if !reflect.DeepEqual(ids, []uint64{100}) { 51 t.Fatalf("unexpected shard ids: %v", ids) 52 } 53 54 var sh MockShard 55 sh.CreateIteratorFn = func(_ context.Context, _ *influxql.Measurement, _ query.IteratorOptions) (query.Iterator, error) { 56 return &FloatIterator{Points: []query.FloatPoint{ 57 {Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}}, 58 {Name: "cpu", Time: int64(1 * time.Second), Aux: []interface{}{float64(200)}}, 59 }}, nil 60 } 61 sh.FieldDimensionsFn = func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { 62 if !reflect.DeepEqual(measurements, []string{"cpu"}) { 63 t.Fatalf("unexpected source: %#v", measurements) 64 } 65 return map[string]influxql.DataType{"value": influxql.Float}, nil, nil 66 } 67 return &sh 68 } 69 70 // Verify all results from the query. 71 if a := ReadAllResults(e.ExecuteQuery(`SELECT * FROM cpu`, "db0", 0)); !reflect.DeepEqual(a, []*query.Result{ 72 { 73 StatementID: 0, 74 Series: []*models.Row{{ 75 Name: "cpu", 76 Columns: []string{"time", "value"}, 77 Values: [][]interface{}{ 78 {time.Unix(0, 0).UTC(), float64(100)}, 79 {time.Unix(1, 0).UTC(), float64(200)}, 80 }, 81 }}, 82 }, 83 }) { 84 t.Fatalf("unexpected results: %s", spew.Sdump(a)) 85 } 86} 87 88// Ensure query executor can enforce a maximum bucket selection count. 89func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) { 90 e := DefaultQueryExecutor() 91 e.StatementExecutor.MaxSelectBucketsN = 3 92 93 // The meta client should return a single shards on the local node. 94 e.MetaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) { 95 return []meta.ShardGroupInfo{ 96 {ID: 1, Shards: []meta.ShardInfo{ 97 {ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}}, 98 }}, 99 }, nil 100 } 101 102 e.TSDBStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup { 103 if !reflect.DeepEqual(ids, []uint64{100}) { 104 t.Fatalf("unexpected shard ids: %v", ids) 105 } 106 107 var sh MockShard 108 sh.CreateIteratorFn = func(_ context.Context, _ *influxql.Measurement, _ query.IteratorOptions) (query.Iterator, error) { 109 return &FloatIterator{ 110 Points: []query.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}}}, 111 }, nil 112 } 113 sh.FieldDimensionsFn = func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { 114 if !reflect.DeepEqual(measurements, []string{"cpu"}) { 115 t.Fatalf("unexpected source: %#v", measurements) 116 } 117 return map[string]influxql.DataType{"value": influxql.Float}, nil, nil 118 } 119 return &sh 120 } 121 122 // Verify all results from the query. 123 if a := ReadAllResults(e.ExecuteQuery(`SELECT count(value) FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, "db0", 0)); !reflect.DeepEqual(a, []*query.Result{ 124 { 125 StatementID: 0, 126 Err: errors.New("max-select-buckets limit exceeded: (4/3)"), 127 }, 128 }) { 129 t.Fatalf("unexpected results: %s", spew.Sdump(a)) 130 } 131} 132 133func TestStatementExecutor_ExecuteQuery_WriteInto(t *testing.T) { 134 for _, tt := range []struct { 135 name string 136 pw func(t *testing.T, req *coordinator.IntoWriteRequest) error 137 query string 138 source func() query.Iterator 139 written int64 140 }{ 141 { 142 name: "DropNullPoints", 143 pw: func(t *testing.T, req *coordinator.IntoWriteRequest) error { 144 if want, got := len(req.Points), 0; want != got { 145 t.Errorf("unexpected written points: %d != %d", want, got) 146 } 147 return nil 148 }, 149 query: `SELECT stddev(value) INTO cpu_stddev FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, 150 source: func() query.Iterator { 151 return &FloatIterator{ 152 Points: []query.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Value: 100}}, 153 } 154 }, 155 written: 0, 156 }, 157 { 158 name: "PartialDrop", 159 pw: func(t *testing.T, req *coordinator.IntoWriteRequest) error { 160 if want, got := len(req.Points), 1; want != got { 161 t.Errorf("unexpected written points: %d != %d", want, got) 162 } else { 163 fields, err := req.Points[0].Fields() 164 if err != nil { 165 return err 166 } else if want, got := len(fields), 1; want != got { 167 t.Errorf("unexpected number of fields: %d != %d", want, got) 168 } 169 } 170 return nil 171 }, 172 query: `SELECT max(value), stddev(value) INTO cpu_agg FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, 173 source: func() query.Iterator { 174 return &FloatIterator{ 175 Points: []query.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Value: 100}}, 176 } 177 }, 178 written: 1, 179 }, 180 } { 181 t.Run(tt.name, func(t *testing.T) { 182 e := DefaultQueryExecutor() 183 e.StatementExecutor.PointsWriter = writePointsIntoFunc(func(req *coordinator.IntoWriteRequest) error { 184 return tt.pw(t, req) 185 }) 186 187 // The meta client should return a single shards on the local node. 188 e.MetaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) { 189 return []meta.ShardGroupInfo{ 190 {ID: 1, Shards: []meta.ShardInfo{ 191 {ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}}, 192 }}, 193 }, nil 194 } 195 196 e.TSDBStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup { 197 if !reflect.DeepEqual(ids, []uint64{100}) { 198 t.Fatalf("unexpected shard ids: %v", ids) 199 } 200 201 var sh MockShard 202 sh.CreateIteratorFn = func(_ context.Context, _ *influxql.Measurement, _ query.IteratorOptions) (query.Iterator, error) { 203 return tt.source(), nil 204 } 205 sh.FieldDimensionsFn = func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { 206 if !reflect.DeepEqual(measurements, []string{"cpu"}) { 207 t.Fatalf("unexpected source: %#v", measurements) 208 } 209 return map[string]influxql.DataType{"value": influxql.Float}, nil, nil 210 } 211 return &sh 212 } 213 214 // Verify all results from the query. 215 if a := ReadAllResults(e.ExecuteQuery(tt.query, "db0", 0)); !reflect.DeepEqual(a, []*query.Result{ 216 { 217 StatementID: 0, 218 Series: models.Rows{ 219 { 220 Name: "result", 221 Columns: []string{"time", "written"}, 222 Values: [][]interface{}{ 223 {ts("1970-01-01T00:00:00Z"), int64(tt.written)}, 224 }, 225 }, 226 }, 227 }, 228 }) { 229 t.Fatalf("unexpected results: %s", spew.Sdump(a)) 230 } 231 }) 232 } 233} 234 235func TestStatementExecutor_NormalizeStatement(t *testing.T) { 236 237 testCases := []struct { 238 name string 239 query string 240 defaultDB string 241 defaultRP string 242 expectedDB string 243 expectedRP string 244 }{ 245 { 246 name: "defaults", 247 query: "SELECT f FROM m", 248 defaultDB: DefaultDatabase, 249 defaultRP: "", 250 expectedDB: DefaultDatabase, 251 expectedRP: DefaultRetentionPolicy, 252 }, 253 { 254 name: "alternate database via param", 255 query: "SELECT f FROM m", 256 defaultDB: "dbalt", 257 defaultRP: "", 258 expectedDB: "dbalt", 259 expectedRP: DefaultRetentionPolicy, 260 }, 261 { 262 name: "alternate database via query", 263 query: fmt.Sprintf("SELECT f FROM dbalt.%s.m", DefaultRetentionPolicy), 264 defaultDB: DefaultDatabase, 265 defaultRP: "", 266 expectedDB: "dbalt", 267 expectedRP: DefaultRetentionPolicy, 268 }, 269 { 270 name: "alternate RP via param", 271 query: "SELECT f FROM m", 272 defaultDB: DefaultDatabase, 273 defaultRP: "rpalt", 274 expectedDB: DefaultDatabase, 275 expectedRP: "rpalt", 276 }, 277 { 278 name: "alternate RP via query", 279 query: fmt.Sprintf("SELECT f FROM %s.rpalt.m", DefaultDatabase), 280 defaultDB: DefaultDatabase, 281 defaultRP: "", 282 expectedDB: DefaultDatabase, 283 expectedRP: "rpalt", 284 }, 285 { 286 name: "alternate RP query disagrees with param and query wins", 287 query: fmt.Sprintf("SELECT f FROM %s.rpquery.m", DefaultDatabase), 288 defaultDB: DefaultDatabase, 289 defaultRP: "rpparam", 290 expectedDB: DefaultDatabase, 291 expectedRP: "rpquery", 292 }, 293 } 294 295 for _, testCase := range testCases { 296 t.Run(testCase.name, func(t *testing.T) { 297 q, err := influxql.ParseQuery(testCase.query) 298 if err != nil { 299 t.Fatalf("unexpected error parsing query: %v", err) 300 } 301 302 stmt := q.Statements[0].(*influxql.SelectStatement) 303 304 err = DefaultQueryExecutor().StatementExecutor.NormalizeStatement(stmt, testCase.defaultDB, testCase.defaultRP) 305 if err != nil { 306 t.Fatalf("unexpected error normalizing statement: %v", err) 307 } 308 309 m := stmt.Sources[0].(*influxql.Measurement) 310 if m.Database != testCase.expectedDB { 311 t.Errorf("database got %v, want %v", m.Database, testCase.expectedDB) 312 } 313 if m.RetentionPolicy != testCase.expectedRP { 314 t.Errorf("retention policy got %v, want %v", m.RetentionPolicy, testCase.expectedRP) 315 } 316 }) 317 } 318} 319 320func TestStatementExecutor_NormalizeDropSeries(t *testing.T) { 321 q, err := influxql.ParseQuery("DROP SERIES FROM cpu") 322 if err != nil { 323 t.Fatalf("unexpected error parsing query: %v", err) 324 } 325 326 stmt := q.Statements[0].(*influxql.DropSeriesStatement) 327 328 s := &coordinator.StatementExecutor{ 329 MetaClient: &internal.MetaClientMock{ 330 DatabaseFn: func(name string) *meta.DatabaseInfo { 331 t.Fatal("meta client should not be called") 332 return nil 333 }, 334 }, 335 } 336 if err := s.NormalizeStatement(stmt, "foo", "bar"); err != nil { 337 t.Fatalf("unexpected error normalizing statement: %v", err) 338 } 339 340 m := stmt.Sources[0].(*influxql.Measurement) 341 if m.Database != "" { 342 t.Fatalf("database rewritten when not supposed to: %v", m.Database) 343 } 344 if m.RetentionPolicy != "" { 345 t.Fatalf("retention policy rewritten when not supposed to: %v", m.RetentionPolicy) 346 } 347 348 if exp, got := "DROP SERIES FROM cpu", q.String(); exp != got { 349 t.Fatalf("generated query does match parsed: exp %v, got %v", exp, got) 350 } 351} 352 353func TestStatementExecutor_NormalizeDeleteSeries(t *testing.T) { 354 q, err := influxql.ParseQuery("DELETE FROM cpu") 355 if err != nil { 356 t.Fatalf("unexpected error parsing query: %v", err) 357 } 358 359 stmt := q.Statements[0].(*influxql.DeleteSeriesStatement) 360 361 s := &coordinator.StatementExecutor{ 362 MetaClient: &internal.MetaClientMock{ 363 DatabaseFn: func(name string) *meta.DatabaseInfo { 364 t.Fatal("meta client should not be called") 365 return nil 366 }, 367 }, 368 } 369 if err := s.NormalizeStatement(stmt, "foo", "bar"); err != nil { 370 t.Fatalf("unexpected error normalizing statement: %v", err) 371 } 372 373 m := stmt.Sources[0].(*influxql.Measurement) 374 if m.Database != "" { 375 t.Fatalf("database rewritten when not supposed to: %v", m.Database) 376 } 377 if m.RetentionPolicy != "" { 378 t.Fatalf("retention policy rewritten when not supposed to: %v", m.RetentionPolicy) 379 } 380 381 if exp, got := "DELETE FROM cpu", q.String(); exp != got { 382 t.Fatalf("generated query does match parsed: exp %v, got %v", exp, got) 383 } 384} 385 386type mockAuthorizer struct { 387 AuthorizeDatabaseFn func(influxql.Privilege, string) bool 388} 389 390func (a *mockAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bool { 391 return a.AuthorizeDatabaseFn(p, name) 392} 393 394func (m *mockAuthorizer) AuthorizeQuery(database string, query *influxql.Query) error { 395 panic("fail") 396} 397 398func (m *mockAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool { 399 panic("fail") 400} 401 402func (m *mockAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool { 403 panic("fail") 404} 405 406func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) { 407 qe := query.NewExecutor() 408 qe.StatementExecutor = &coordinator.StatementExecutor{ 409 MetaClient: &internal.MetaClientMock{ 410 DatabasesFn: func() []meta.DatabaseInfo { 411 return []meta.DatabaseInfo{ 412 {Name: "db1"}, {Name: "db2"}, {Name: "db3"}, {Name: "db4"}, 413 } 414 }, 415 }, 416 } 417 418 opt := query.ExecutionOptions{ 419 Authorizer: &mockAuthorizer{ 420 AuthorizeDatabaseFn: func(p influxql.Privilege, name string) bool { 421 return name == "db2" || name == "db4" 422 }, 423 }, 424 } 425 426 q, err := influxql.ParseQuery("SHOW DATABASES") 427 if err != nil { 428 t.Fatal(err) 429 } 430 431 results := ReadAllResults(qe.ExecuteQuery(q, opt, make(chan struct{}))) 432 exp := []*query.Result{ 433 { 434 StatementID: 0, 435 Series: []*models.Row{{ 436 Name: "databases", 437 Columns: []string{"name"}, 438 Values: [][]interface{}{ 439 {"db2"}, {"db4"}, 440 }, 441 }}, 442 }, 443 } 444 if !reflect.DeepEqual(results, exp) { 445 t.Fatalf("unexpected results: exp %s, got %s", spew.Sdump(exp), spew.Sdump(results)) 446 } 447} 448 449// QueryExecutor is a test wrapper for coordinator.QueryExecutor. 450type QueryExecutor struct { 451 *query.Executor 452 453 MetaClient MetaClient 454 TSDBStore *internal.TSDBStoreMock 455 StatementExecutor *coordinator.StatementExecutor 456 LogOutput bytes.Buffer 457} 458 459// NewQueryExecutor returns a new instance of Executor. 460// This query executor always has a node id of 0. 461func NewQueryExecutor() *QueryExecutor { 462 e := &QueryExecutor{ 463 Executor: query.NewExecutor(), 464 TSDBStore: &internal.TSDBStoreMock{}, 465 } 466 467 e.TSDBStore.CreateShardFn = func(database, policy string, shardID uint64, enabled bool) error { 468 return nil 469 } 470 471 e.TSDBStore.MeasurementNamesFn = func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { 472 return nil, nil 473 } 474 475 e.TSDBStore.TagValuesFn = func(_ query.Authorizer, _ []uint64, _ influxql.Expr) ([]tsdb.TagValues, error) { 476 return nil, nil 477 } 478 479 e.StatementExecutor = &coordinator.StatementExecutor{ 480 MetaClient: &e.MetaClient, 481 TSDBStore: e.TSDBStore, 482 ShardMapper: &coordinator.LocalShardMapper{ 483 MetaClient: &e.MetaClient, 484 TSDBStore: e.TSDBStore, 485 }, 486 } 487 e.Executor.StatementExecutor = e.StatementExecutor 488 489 var out io.Writer = &e.LogOutput 490 if testing.Verbose() { 491 out = io.MultiWriter(out, os.Stderr) 492 } 493 e.Executor.WithLogger(logger.New(out)) 494 495 return e 496} 497 498// DefaultQueryExecutor returns a Executor with a database (db0) and retention policy (rp0). 499func DefaultQueryExecutor() *QueryExecutor { 500 e := NewQueryExecutor() 501 e.MetaClient.DatabaseFn = DefaultMetaClientDatabaseFn 502 return e 503} 504 505// ExecuteQuery parses query and executes against the database. 506func (e *QueryExecutor) ExecuteQuery(q, database string, chunkSize int) <-chan *query.Result { 507 return e.Executor.ExecuteQuery(MustParseQuery(q), query.ExecutionOptions{ 508 Database: database, 509 ChunkSize: chunkSize, 510 }, make(chan struct{})) 511} 512 513type MockShard struct { 514 Measurements []string 515 FieldDimensionsFn func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) 516 FieldKeysByMeasurementFn func(name []byte) []string 517 CreateIteratorFn func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) 518 IteratorCostFn func(m string, opt query.IteratorOptions) (query.IteratorCost, error) 519 ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) 520} 521 522func (sh *MockShard) MeasurementsByRegex(re *regexp.Regexp) []string { 523 names := make([]string, 0, len(sh.Measurements)) 524 for _, name := range sh.Measurements { 525 if re.MatchString(name) { 526 names = append(names, name) 527 } 528 } 529 return names 530} 531 532func (sh *MockShard) FieldKeysByMeasurement(name []byte) []string { 533 return sh.FieldKeysByMeasurementFn(name) 534} 535 536func (sh *MockShard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { 537 return sh.FieldDimensionsFn(measurements) 538} 539 540func (sh *MockShard) MapType(measurement, field string) influxql.DataType { 541 f, d, err := sh.FieldDimensions([]string{measurement}) 542 if err != nil { 543 return influxql.Unknown 544 } 545 546 if typ, ok := f[field]; ok { 547 return typ 548 } else if _, ok := d[field]; ok { 549 return influxql.Tag 550 } 551 return influxql.Unknown 552} 553 554func (sh *MockShard) CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { 555 return sh.CreateIteratorFn(ctx, measurement, opt) 556} 557 558func (sh *MockShard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { 559 return sh.IteratorCostFn(measurement, opt) 560} 561 562func (sh *MockShard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { 563 return sh.ExpandSourcesFn(sources) 564} 565 566// MustParseQuery parses s into a query. Panic on error. 567func MustParseQuery(s string) *influxql.Query { 568 q, err := influxql.ParseQuery(s) 569 if err != nil { 570 panic(err) 571 } 572 return q 573} 574 575// ReadAllResults reads all results from c and returns as a slice. 576func ReadAllResults(c <-chan *query.Result) []*query.Result { 577 var a []*query.Result 578 for result := range c { 579 a = append(a, result) 580 } 581 return a 582} 583 584// FloatIterator is a represents an iterator that reads from a slice. 585type FloatIterator struct { 586 Points []query.FloatPoint 587 stats query.IteratorStats 588} 589 590func (itr *FloatIterator) Stats() query.IteratorStats { return itr.stats } 591func (itr *FloatIterator) Close() error { return nil } 592 593// Next returns the next value and shifts it off the beginning of the points slice. 594func (itr *FloatIterator) Next() (*query.FloatPoint, error) { 595 if len(itr.Points) == 0 { 596 return nil, nil 597 } 598 599 v := &itr.Points[0] 600 itr.Points = itr.Points[1:] 601 return v, nil 602} 603 604func ts(s string) time.Time { 605 t, err := time.Parse(time.RFC3339, s) 606 if err != nil { 607 panic(err) 608 } 609 return t 610} 611 612type writePointsIntoFunc func(req *coordinator.IntoWriteRequest) error 613 614func (fn writePointsIntoFunc) WritePointsInto(req *coordinator.IntoWriteRequest) error { 615 return fn(req) 616} 617