1package continuous_querier 2 3import ( 4 "errors" 5 "fmt" 6 "os" 7 "sync" 8 "testing" 9 "time" 10 11 "github.com/influxdata/influxdb/logger" 12 "github.com/influxdata/influxdb/models" 13 "github.com/influxdata/influxdb/query" 14 "github.com/influxdata/influxdb/services/meta" 15 "github.com/influxdata/influxql" 16) 17 18var ( 19 errExpected = errors.New("expected error") 20 errUnexpected = errors.New("unexpected error") 21) 22 23// Test closing never opened, open, open already open, close, and close already closed. 24func TestOpenAndClose(t *testing.T) { 25 s := NewTestService(t) 26 27 if err := s.Close(); err != nil { 28 t.Error(err) 29 } else if err = s.Open(); err != nil { 30 t.Error(err) 31 } else if err = s.Open(); err != nil { 32 t.Error(err) 33 } else if err = s.Close(); err != nil { 34 t.Error(err) 35 } else if err = s.Close(); err != nil { 36 t.Error(err) 37 } 38} 39 40// Test Run method. 41func TestContinuousQueryService_Run(t *testing.T) { 42 s := NewTestService(t) 43 44 // Set RunInterval high so we can trigger using Run method. 45 s.RunInterval = 10 * time.Minute 46 47 done := make(chan struct{}) 48 expectCallCnt := 3 49 callCnt := 0 50 51 // Set a callback for ExecuteStatement. 52 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 53 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 54 callCnt++ 55 if callCnt >= expectCallCnt { 56 done <- struct{}{} 57 } 58 ctx.Results <- &query.Result{} 59 return nil 60 }, 61 } 62 63 // Use a custom "now" time since the internals of last run care about 64 // what the actual time is. Truncate to 10 minutes we are starting on an interval. 65 now := time.Now().Truncate(10 * time.Minute) 66 67 s.Open() 68 // Trigger service to run all CQs. 69 s.Run("", "", now) 70 // Shouldn't time out. 71 if err := wait(done, 100*time.Millisecond); err != nil { 72 t.Error(err) 73 } 74 // This time it should timeout because ExecuteQuery should not get called again. 75 if err := wait(done, 100*time.Millisecond); err == nil { 76 t.Error("too many queries executed") 77 } 78 s.Close() 79 80 // Now test just one query. 81 expectCallCnt = 1 82 callCnt = 0 83 s.Open() 84 s.Run("db", "cq", now) 85 // Shouldn't time out. 86 if err := wait(done, 100*time.Millisecond); err != nil { 87 t.Error(err) 88 } 89 // This time it should timeout because ExecuteQuery should not get called again. 90 if err := wait(done, 100*time.Millisecond); err == nil { 91 t.Error("too many queries executed") 92 } 93 s.Close() 94} 95 96func TestContinuousQueryService_ResampleOptions(t *testing.T) { 97 s := NewTestService(t) 98 mc := NewMetaClient(t) 99 mc.CreateDatabase("db", "") 100 mc.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db RESAMPLE EVERY 10s FOR 2m BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(1m) END`) 101 s.MetaClient = mc 102 103 db := s.MetaClient.Database("db") 104 105 cq, err := NewContinuousQuery(db.Name, &db.ContinuousQueries[0]) 106 if err != nil { 107 t.Fatal(err) 108 } else if cq.Resample.Every != 10*time.Second { 109 t.Errorf("expected resample every to be 10s, got %s", influxql.FormatDuration(cq.Resample.Every)) 110 } else if cq.Resample.For != 2*time.Minute { 111 t.Errorf("expected resample for 2m, got %s", influxql.FormatDuration(cq.Resample.For)) 112 } 113 114 // Set RunInterval high so we can trigger using Run method. 115 s.RunInterval = 10 * time.Minute 116 117 done := make(chan struct{}) 118 var expected struct { 119 min time.Time 120 max time.Time 121 } 122 123 // Set a callback for ExecuteStatement. 124 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 125 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 126 s := stmt.(*influxql.SelectStatement) 127 valuer := &influxql.NowValuer{Location: s.Location} 128 _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) 129 if err != nil { 130 t.Errorf("unexpected error parsing time range: %s", err) 131 } else if !expected.min.Equal(timeRange.Min) || !expected.max.Equal(timeRange.Max) { 132 t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max) 133 } 134 done <- struct{}{} 135 ctx.Results <- &query.Result{} 136 return nil 137 }, 138 } 139 140 s.Open() 141 defer s.Close() 142 143 // Set the 'now' time to the start of a 10 minute interval. Then trigger a run. 144 // This should trigger two queries (one for the current time interval, one for the previous). 145 now := time.Now().UTC().Truncate(10 * time.Minute) 146 expected.min = now.Add(-2 * time.Minute) 147 expected.max = now.Add(-1) 148 s.RunCh <- &RunRequest{Now: now} 149 150 if err := wait(done, 100*time.Millisecond); err != nil { 151 t.Fatal(err) 152 } 153 154 // Trigger another run 10 seconds later. Another two queries should happen, 155 // but it will be a different two queries. 156 expected.min = expected.min.Add(time.Minute) 157 expected.max = expected.max.Add(time.Minute) 158 s.RunCh <- &RunRequest{Now: now.Add(10 * time.Second)} 159 160 if err := wait(done, 100*time.Millisecond); err != nil { 161 t.Fatal(err) 162 } 163 164 // Reset the time period and send the initial request at 5 seconds after the 165 // 10 minute mark. There should be exactly one call since the current interval is too 166 // young and only one interval matches the FOR duration. 167 expected.min = now.Add(-time.Minute) 168 expected.max = now.Add(-1) 169 s.Run("", "", now.Add(5*time.Second)) 170 171 if err := wait(done, 100*time.Millisecond); err != nil { 172 t.Fatal(err) 173 } 174 175 // Send a message 10 minutes later and ensure that the system plays catchup. 176 expected.max = now.Add(10*time.Minute - 1) 177 s.RunCh <- &RunRequest{Now: now.Add(10 * time.Minute)} 178 179 if err := wait(done, 100*time.Millisecond); err != nil { 180 t.Fatal(err) 181 } 182 183 // No overflow should be sent. 184 if err := wait(done, 100*time.Millisecond); err == nil { 185 t.Error("too many queries executed") 186 } 187} 188 189func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) { 190 s := NewTestService(t) 191 ms := NewMetaClient(t) 192 ms.CreateDatabase("db", "") 193 ms.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db RESAMPLE EVERY 1m BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(30s) END`) 194 s.MetaClient = ms 195 196 // Set RunInterval high so we can trigger using Run method. 197 s.RunInterval = 10 * time.Minute 198 199 done := make(chan struct{}) 200 var expected struct { 201 min time.Time 202 max time.Time 203 } 204 205 // Set a callback for ExecuteQuery. 206 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 207 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 208 s := stmt.(*influxql.SelectStatement) 209 valuer := &influxql.NowValuer{Location: s.Location} 210 _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) 211 if err != nil { 212 t.Errorf("unexpected error parsing time range: %s", err) 213 } else if !expected.min.Equal(timeRange.Min) || !expected.max.Equal(timeRange.Max) { 214 t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max) 215 } 216 done <- struct{}{} 217 ctx.Results <- &query.Result{} 218 return nil 219 }, 220 } 221 222 s.Open() 223 defer s.Close() 224 225 // Set the 'now' time to the start of a 10 minute interval. Then trigger a run. 226 // This should trigger two queries (one for the current time interval, one for the previous) 227 // since the default FOR interval should be EVERY, not the GROUP BY interval. 228 now := time.Now().Truncate(10 * time.Minute) 229 expected.min = now.Add(-time.Minute) 230 expected.max = now.Add(-1) 231 s.RunCh <- &RunRequest{Now: now} 232 233 if err := wait(done, 100*time.Millisecond); err != nil { 234 t.Fatal(err) 235 } 236 237 // Trigger 30 seconds later. Nothing should run. 238 s.RunCh <- &RunRequest{Now: now.Add(30 * time.Second)} 239 240 if err := wait(done, 100*time.Millisecond); err == nil { 241 t.Fatal("too many queries") 242 } 243 244 // Run again 1 minute later. Another two queries should run. 245 expected.min = now 246 expected.max = now.Add(time.Minute - 1) 247 s.RunCh <- &RunRequest{Now: now.Add(time.Minute)} 248 249 if err := wait(done, 100*time.Millisecond); err != nil { 250 t.Fatal(err) 251 } 252 253 // No overflow should be sent. 254 if err := wait(done, 100*time.Millisecond); err == nil { 255 t.Error("too many queries executed") 256 } 257} 258 259func TestContinuousQueryService_GroupByOffset(t *testing.T) { 260 s := NewTestService(t) 261 mc := NewMetaClient(t) 262 mc.CreateDatabase("db", "") 263 mc.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(1m, 30s) END`) 264 s.MetaClient = mc 265 266 // Set RunInterval high so we can trigger using Run method. 267 s.RunInterval = 10 * time.Minute 268 269 done := make(chan struct{}) 270 var expected struct { 271 min time.Time 272 max time.Time 273 } 274 275 // Set a callback for ExecuteStatement. 276 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 277 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 278 s := stmt.(*influxql.SelectStatement) 279 valuer := &influxql.NowValuer{Location: s.Location} 280 _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) 281 if err != nil { 282 t.Errorf("unexpected error parsing time range: %s", err) 283 } else if !expected.min.Equal(timeRange.Min) || !expected.max.Equal(timeRange.Max) { 284 t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max) 285 } 286 done <- struct{}{} 287 ctx.Results <- &query.Result{} 288 return nil 289 }, 290 } 291 292 s.Open() 293 defer s.Close() 294 295 // Set the 'now' time to the start of a 10 minute interval with a 30 second offset. 296 // Then trigger a run. This should trigger two queries (one for the current time 297 // interval, one for the previous). 298 now := time.Now().UTC().Truncate(10 * time.Minute).Add(30 * time.Second) 299 expected.min = now.Add(-time.Minute) 300 expected.max = now.Add(-1) 301 s.RunCh <- &RunRequest{Now: now} 302 303 if err := wait(done, 100*time.Millisecond); err != nil { 304 t.Fatal(err) 305 } 306} 307 308// Test service when not the cluster leader (CQs shouldn't run). 309func TestContinuousQueryService_NotLeader(t *testing.T) { 310 s := NewTestService(t) 311 // Set RunInterval high so we can test triggering with the RunCh below. 312 s.RunInterval = 10 * time.Second 313 s.MetaClient.(*MetaClient).Leader = false 314 315 done := make(chan struct{}) 316 // Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader. 317 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 318 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 319 done <- struct{}{} 320 ctx.Results <- &query.Result{Err: errUnexpected} 321 return nil 322 }, 323 } 324 325 s.Open() 326 // Trigger service to run CQs. 327 s.RunCh <- &RunRequest{Now: time.Now()} 328 // Expect timeout error because ExecuteQuery callback wasn't called. 329 if err := wait(done, 100*time.Millisecond); err == nil { 330 t.Error(err) 331 } 332 s.Close() 333} 334 335// Test ExecuteContinuousQuery with invalid queries. 336func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { 337 s := NewTestService(t) 338 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 339 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 340 return errUnexpected 341 }, 342 } 343 dbis := s.MetaClient.Databases() 344 dbi := dbis[0] 345 cqi := dbi.ContinuousQueries[0] 346 347 cqi.Query = `this is not a query` 348 if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil { 349 t.Error("expected error but got nil") 350 } 351 352 // Valid query but invalid continuous query. 353 cqi.Query = `SELECT * FROM cpu` 354 if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil { 355 t.Error("expected error but got nil") 356 } 357 358 // Group by requires aggregate. 359 cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)` 360 if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil { 361 t.Error("expected error but got nil") 362 } 363} 364 365// Test the time range for different CQ durations. 366func TestExecuteContinuousQuery_TimeRange(t *testing.T) { 367 // Choose a start date that is not on an interval border for anyone. 368 now := mustParseTime(t, "2000-01-01T00:00:00Z") 369 for _, tt := range []struct { 370 d string 371 start, end time.Time 372 }{ 373 { 374 d: "10s", 375 start: mustParseTime(t, "2000-01-01T00:00:00Z"), 376 end: mustParseTime(t, "2000-01-01T00:00:10Z"), 377 }, 378 { 379 d: "1m", 380 start: mustParseTime(t, "2000-01-01T00:00:00Z"), 381 end: mustParseTime(t, "2000-01-01T00:01:00Z"), 382 }, 383 { 384 d: "10m", 385 start: mustParseTime(t, "2000-01-01T00:00:00Z"), 386 end: mustParseTime(t, "2000-01-01T00:10:00Z"), 387 }, 388 { 389 d: "30m", 390 start: mustParseTime(t, "2000-01-01T00:00:00Z"), 391 end: mustParseTime(t, "2000-01-01T00:30:00Z"), 392 }, 393 { 394 d: "1h", 395 start: mustParseTime(t, "2000-01-01T00:00:00Z"), 396 end: mustParseTime(t, "2000-01-01T01:00:00Z"), 397 }, 398 { 399 d: "2h", 400 start: mustParseTime(t, "2000-01-01T00:00:00Z"), 401 end: mustParseTime(t, "2000-01-01T02:00:00Z"), 402 }, 403 { 404 d: "12h", 405 start: mustParseTime(t, "2000-01-01T00:00:00Z"), 406 end: mustParseTime(t, "2000-01-01T12:00:00Z"), 407 }, 408 { 409 d: "1d", 410 start: mustParseTime(t, "2000-01-01T00:00:00Z"), 411 end: mustParseTime(t, "2000-01-02T00:00:00Z"), 412 }, 413 { 414 d: "1w", 415 start: mustParseTime(t, "1999-12-30T00:00:00Z"), 416 end: mustParseTime(t, "2000-01-06T00:00:00Z"), 417 }, 418 } { 419 t.Run(tt.d, func(t *testing.T) { 420 d, err := influxql.ParseDuration(tt.d) 421 if err != nil { 422 t.Fatalf("unable to parse duration: %s", err) 423 } 424 425 s := NewTestService(t) 426 mc := NewMetaClient(t) 427 mc.CreateDatabase("db", "") 428 mc.CreateContinuousQuery("db", "cq", 429 fmt.Sprintf(`CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(%s) END`, tt.d)) 430 s.MetaClient = mc 431 432 // Set RunInterval high so we can trigger using Run method. 433 s.RunInterval = 10 * time.Minute 434 done := make(chan struct{}) 435 436 // Set a callback for ExecuteStatement. 437 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 438 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 439 s := stmt.(*influxql.SelectStatement) 440 valuer := &influxql.NowValuer{Location: s.Location} 441 _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) 442 timeRange.Max = timeRange.Max.Add(time.Nanosecond) 443 if err != nil { 444 t.Errorf("unexpected error parsing time range: %s", err) 445 } else if !tt.start.Equal(timeRange.Min) || !tt.end.Equal(timeRange.Max) { 446 t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, tt.start, tt.end) 447 } 448 done <- struct{}{} 449 ctx.Results <- &query.Result{} 450 return nil 451 }, 452 } 453 454 s.Open() 455 defer s.Close() 456 457 // Send an initial run request one nanosecond after the start to 458 // prime the last CQ map. 459 s.RunCh <- &RunRequest{Now: now.Add(time.Nanosecond)} 460 // Execute the real request after the time interval. 461 s.RunCh <- &RunRequest{Now: now.Add(d)} 462 if err := wait(done, 100*time.Millisecond); err != nil { 463 t.Fatal(err) 464 } 465 }) 466 } 467} 468 469// Test the time range for different CQ durations. 470func TestExecuteContinuousQuery_TimeZone(t *testing.T) { 471 type test struct { 472 now time.Time 473 start, end time.Time 474 } 475 476 // Choose a start date that is not on an interval border for anyone. 477 for _, tt := range []struct { 478 name string 479 d string 480 options string 481 initial time.Time 482 tests []test 483 }{ 484 { 485 name: "DaylightSavingsStart/1d", 486 d: "1d", 487 initial: mustParseTime(t, "2000-04-02T00:00:00-05:00"), 488 tests: []test{ 489 { 490 start: mustParseTime(t, "2000-04-02T00:00:00-05:00"), 491 end: mustParseTime(t, "2000-04-03T00:00:00-04:00"), 492 }, 493 }, 494 }, 495 { 496 name: "DaylightSavingsStart/2h", 497 d: "2h", 498 initial: mustParseTime(t, "2000-04-02T00:00:00-05:00"), 499 tests: []test{ 500 { 501 start: mustParseTime(t, "2000-04-02T00:00:00-05:00"), 502 end: mustParseTime(t, "2000-04-02T03:00:00-04:00"), 503 }, 504 { 505 start: mustParseTime(t, "2000-04-02T03:00:00-04:00"), 506 end: mustParseTime(t, "2000-04-02T04:00:00-04:00"), 507 }, 508 }, 509 }, 510 { 511 name: "DaylightSavingsEnd/1d", 512 d: "1d", 513 initial: mustParseTime(t, "2000-10-29T00:00:00-04:00"), 514 tests: []test{ 515 { 516 start: mustParseTime(t, "2000-10-29T00:00:00-04:00"), 517 end: mustParseTime(t, "2000-10-30T00:00:00-05:00"), 518 }, 519 }, 520 }, 521 { 522 name: "DaylightSavingsEnd/2h", 523 d: "2h", 524 initial: mustParseTime(t, "2000-10-29T00:00:00-04:00"), 525 tests: []test{ 526 { 527 start: mustParseTime(t, "2000-10-29T00:00:00-04:00"), 528 end: mustParseTime(t, "2000-10-29T02:00:00-05:00"), 529 }, 530 { 531 start: mustParseTime(t, "2000-10-29T02:00:00-05:00"), 532 end: mustParseTime(t, "2000-10-29T04:00:00-05:00"), 533 }, 534 }, 535 }, 536 } { 537 t.Run(tt.name, func(t *testing.T) { 538 s := NewTestService(t) 539 mc := NewMetaClient(t) 540 mc.CreateDatabase("db", "") 541 mc.CreateContinuousQuery("db", "cq", 542 fmt.Sprintf(`CREATE CONTINUOUS QUERY cq ON db %s BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(%s) TZ('America/New_York') END`, tt.options, tt.d)) 543 s.MetaClient = mc 544 545 // Set RunInterval high so we can trigger using Run method. 546 s.RunInterval = 10 * time.Minute 547 done := make(chan struct{}) 548 549 // Set a callback for ExecuteStatement. 550 tests := make(chan test, 1) 551 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 552 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 553 test := <-tests 554 s := stmt.(*influxql.SelectStatement) 555 valuer := &influxql.NowValuer{Location: s.Location} 556 _, timeRange, err := influxql.ConditionExpr(s.Condition, valuer) 557 timeRange.Max = timeRange.Max.Add(time.Nanosecond) 558 if err != nil { 559 t.Errorf("unexpected error parsing time range: %s", err) 560 } else if !test.start.Equal(timeRange.Min) || !test.end.Equal(timeRange.Max) { 561 t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, test.start, test.end) 562 } 563 done <- struct{}{} 564 ctx.Results <- &query.Result{} 565 return nil 566 }, 567 } 568 569 s.Open() 570 defer s.Close() 571 572 // Send an initial run request one nanosecond after the start to 573 // prime the last CQ map. 574 s.RunCh <- &RunRequest{Now: tt.initial.Add(time.Nanosecond)} 575 // Execute each of the tests and ensure the times are correct. 576 for i, test := range tt.tests { 577 tests <- test 578 now := test.now 579 if now.IsZero() { 580 now = test.end 581 } 582 s.RunCh <- &RunRequest{Now: now} 583 if err := wait(done, 100*time.Millisecond); err != nil { 584 t.Fatal(fmt.Errorf("%d. %s", i+1, err)) 585 } 586 } 587 }) 588 } 589} 590 591// Test ExecuteContinuousQuery when QueryExecutor returns an error. 592func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { 593 s := NewTestService(t) 594 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 595 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 596 return errExpected 597 }, 598 } 599 600 dbis := s.MetaClient.Databases() 601 dbi := dbis[0] 602 cqi := dbi.ContinuousQueries[0] 603 604 now := time.Now().Truncate(10 * time.Minute) 605 if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); err != errExpected { 606 t.Errorf("exp = %s, got = %v", errExpected, err) 607 } 608} 609 610func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) { 611 s := NewTestService(t) 612 const writeN = int64(50) 613 614 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 615 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 616 ctx.Results <- &query.Result{ 617 Series: []*models.Row{{ 618 Name: "result", 619 Columns: []string{"time", "written"}, 620 Values: [][]interface{}{{time.Time{}, writeN}}, 621 }}, 622 } 623 return nil 624 }, 625 } 626 s.queryStatsEnabled = true 627 var point models.Point 628 s.Monitor = &monitor{ 629 EnabledFn: func() bool { return true }, 630 WritePointsFn: func(p models.Points) error { 631 if len(p) != 1 { 632 t.Fatalf("expected point") 633 } 634 point = p[0] 635 return nil 636 }, 637 } 638 639 dbis := s.MetaClient.Databases() 640 dbi := dbis[0] 641 cqi := dbi.ContinuousQueries[0] 642 643 now := time.Now().Truncate(10 * time.Minute) 644 if ok, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); !ok || err != nil { 645 t.Fatalf("ExecuteContinuousQuery failed, ok=%t, err=%v", ok, err) 646 } 647 648 if point == nil { 649 t.Fatal("expected Monitor.WritePoints call") 650 } 651 652 f, _ := point.Fields() 653 if got, ok := f["pointsWrittenOK"].(int64); !ok || got != writeN { 654 t.Errorf("unexpected value for written; exp=%d, got=%d", writeN, got) 655 } 656} 657 658func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testing.T) { 659 s := NewTestService(t) 660 s.QueryExecutor.StatementExecutor = &StatementExecutor{ 661 ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { 662 ctx.Send(&query.Result{}) 663 return nil 664 }, 665 } 666 s.Monitor = &monitor{ 667 EnabledFn: func() bool { return true }, 668 WritePointsFn: func(p models.Points) error { 669 t.Fatalf("unexpected Monitor.WritePoints call") 670 return nil 671 }, 672 } 673 674 dbis := s.MetaClient.Databases() 675 dbi := dbis[0] 676 cqi := dbi.ContinuousQueries[0] 677 678 now := time.Now().Truncate(10 * time.Minute) 679 if ok, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); !ok || err != nil { 680 t.Fatalf("ExecuteContinuousQuery failed, ok=%t, err=%v", ok, err) 681 } 682} 683 684// NewTestService returns a new *Service with default mock object members. 685func NewTestService(t *testing.T) *Service { 686 s := NewService(NewConfig()) 687 ms := NewMetaClient(t) 688 s.MetaClient = ms 689 s.QueryExecutor = query.NewExecutor() 690 s.RunInterval = time.Millisecond 691 692 // Set Logger to write to dev/null so stdout isn't polluted. 693 if testing.Verbose() { 694 s.WithLogger(logger.New(os.Stderr)) 695 } 696 697 // Add a couple test databases and CQs. 698 ms.CreateDatabase("db", "rp") 699 ms.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s) END`) 700 ms.CreateDatabase("db2", "default") 701 ms.CreateContinuousQuery("db2", "cq2", `CREATE CONTINUOUS QUERY cq2 ON db2 BEGIN SELECT mean(value) INTO cpu_mean FROM cpu WHERE time > now() - 10m GROUP BY time(1m) END`) 702 ms.CreateDatabase("db3", "default") 703 ms.CreateContinuousQuery("db3", "cq3", `CREATE CONTINUOUS QUERY cq3 ON db3 BEGIN SELECT mean(value) INTO "1hAverages".:MEASUREMENT FROM /cpu[0-9]?/ GROUP BY time(10s) END`) 704 705 return s 706} 707 708// MetaClient is a mock meta store. 709type MetaClient struct { 710 mu sync.RWMutex 711 Leader bool 712 AllowLease bool 713 DatabaseInfos []meta.DatabaseInfo 714 Err error 715 t *testing.T 716 nodeID uint64 717} 718 719// NewMetaClient returns a *MetaClient. 720func NewMetaClient(t *testing.T) *MetaClient { 721 return &MetaClient{ 722 Leader: true, 723 AllowLease: true, 724 t: t, 725 nodeID: 1, 726 } 727} 728 729// NodeID returns the client's node ID. 730func (ms *MetaClient) NodeID() uint64 { return ms.nodeID } 731 732// AcquireLease attempts to acquire the specified lease. 733func (ms *MetaClient) AcquireLease(name string) (l *meta.Lease, err error) { 734 if ms.Leader { 735 if ms.AllowLease { 736 return &meta.Lease{Name: name}, nil 737 } 738 return nil, errors.New("another node owns the lease") 739 } 740 return nil, meta.ErrServiceUnavailable 741} 742 743// Databases returns a list of database info about each database in the coordinator. 744func (ms *MetaClient) Databases() []meta.DatabaseInfo { 745 ms.mu.RLock() 746 defer ms.mu.RUnlock() 747 return ms.DatabaseInfos 748} 749 750// Database returns a single database by name. 751func (ms *MetaClient) Database(name string) *meta.DatabaseInfo { 752 ms.mu.RLock() 753 defer ms.mu.RUnlock() 754 return ms.database(name) 755} 756 757func (ms *MetaClient) database(name string) *meta.DatabaseInfo { 758 if ms.Err != nil { 759 return nil 760 } 761 for i := range ms.DatabaseInfos { 762 if ms.DatabaseInfos[i].Name == name { 763 return &ms.DatabaseInfos[i] 764 } 765 } 766 return nil 767} 768 769// CreateDatabase adds a new database to the meta store. 770func (ms *MetaClient) CreateDatabase(name, defaultRetentionPolicy string) error { 771 ms.mu.Lock() 772 defer ms.mu.Unlock() 773 if ms.Err != nil { 774 return ms.Err 775 } 776 777 // See if the database already exists. 778 for _, dbi := range ms.DatabaseInfos { 779 if dbi.Name == name { 780 return fmt.Errorf("database already exists: %s", name) 781 } 782 } 783 784 // Create database. 785 ms.DatabaseInfos = append(ms.DatabaseInfos, meta.DatabaseInfo{ 786 Name: name, 787 788 DefaultRetentionPolicy: defaultRetentionPolicy, 789 }) 790 791 return nil 792} 793 794// CreateContinuousQuery adds a CQ to the meta store. 795func (ms *MetaClient) CreateContinuousQuery(database, name, query string) error { 796 ms.mu.Lock() 797 defer ms.mu.Unlock() 798 if ms.Err != nil { 799 return ms.Err 800 } 801 802 dbi := ms.database(database) 803 if dbi == nil { 804 return fmt.Errorf("database not found: %s", database) 805 } 806 807 // See if CQ already exists. 808 for _, cqi := range dbi.ContinuousQueries { 809 if cqi.Name == name { 810 return fmt.Errorf("continuous query already exists: %s", name) 811 } 812 } 813 814 // Create a new CQ and store it. 815 dbi.ContinuousQueries = append(dbi.ContinuousQueries, meta.ContinuousQueryInfo{ 816 Name: name, 817 Query: query, 818 }) 819 820 return nil 821} 822 823// StatementExecutor is a mock statement executor. 824type StatementExecutor struct { 825 ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error 826} 827 828func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { 829 return e.ExecuteStatementFn(stmt, ctx) 830} 831 832func wait(c chan struct{}, d time.Duration) (err error) { 833 select { 834 case <-c: 835 case <-time.After(d): 836 err = errors.New("timed out") 837 } 838 return 839} 840 841type monitor struct { 842 EnabledFn func() bool 843 WritePointsFn func(models.Points) error 844} 845 846func (m *monitor) Enabled() bool { return m.EnabledFn() } 847func (m *monitor) WritePoints(p models.Points) error { return m.WritePointsFn(p) } 848 849func mustParseTime(t *testing.T, value string) time.Time { 850 ts, err := time.Parse(time.RFC3339, value) 851 if err != nil { 852 t.Fatalf("unable to parse time: %s", err) 853 } 854 return ts 855} 856