1package continuous_querier
2
3import (
4	"errors"
5	"fmt"
6	"os"
7	"sync"
8	"testing"
9	"time"
10
11	"github.com/influxdata/influxdb/logger"
12	"github.com/influxdata/influxdb/models"
13	"github.com/influxdata/influxdb/query"
14	"github.com/influxdata/influxdb/services/meta"
15	"github.com/influxdata/influxql"
16)
17
18var (
19	errExpected   = errors.New("expected error")
20	errUnexpected = errors.New("unexpected error")
21)
22
23// Test closing never opened, open, open already open, close, and close already closed.
24func TestOpenAndClose(t *testing.T) {
25	s := NewTestService(t)
26
27	if err := s.Close(); err != nil {
28		t.Error(err)
29	} else if err = s.Open(); err != nil {
30		t.Error(err)
31	} else if err = s.Open(); err != nil {
32		t.Error(err)
33	} else if err = s.Close(); err != nil {
34		t.Error(err)
35	} else if err = s.Close(); err != nil {
36		t.Error(err)
37	}
38}
39
40// Test Run method.
41func TestContinuousQueryService_Run(t *testing.T) {
42	s := NewTestService(t)
43
44	// Set RunInterval high so we can trigger using Run method.
45	s.RunInterval = 10 * time.Minute
46
47	done := make(chan struct{})
48	expectCallCnt := 3
49	callCnt := 0
50
51	// Set a callback for ExecuteStatement.
52	s.QueryExecutor.StatementExecutor = &StatementExecutor{
53		ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
54			callCnt++
55			if callCnt >= expectCallCnt {
56				done <- struct{}{}
57			}
58			ctx.Results <- &query.Result{}
59			return nil
60		},
61	}
62
63	// Use a custom "now" time since the internals of last run care about
64	// what the actual time is. Truncate to 10 minutes we are starting on an interval.
65	now := time.Now().Truncate(10 * time.Minute)
66
67	s.Open()
68	// Trigger service to run all CQs.
69	s.Run("", "", now)
70	// Shouldn't time out.
71	if err := wait(done, 100*time.Millisecond); err != nil {
72		t.Error(err)
73	}
74	// This time it should timeout because ExecuteQuery should not get called again.
75	if err := wait(done, 100*time.Millisecond); err == nil {
76		t.Error("too many queries executed")
77	}
78	s.Close()
79
80	// Now test just one query.
81	expectCallCnt = 1
82	callCnt = 0
83	s.Open()
84	s.Run("db", "cq", now)
85	// Shouldn't time out.
86	if err := wait(done, 100*time.Millisecond); err != nil {
87		t.Error(err)
88	}
89	// This time it should timeout because ExecuteQuery should not get called again.
90	if err := wait(done, 100*time.Millisecond); err == nil {
91		t.Error("too many queries executed")
92	}
93	s.Close()
94}
95
96func TestContinuousQueryService_ResampleOptions(t *testing.T) {
97	s := NewTestService(t)
98	mc := NewMetaClient(t)
99	mc.CreateDatabase("db", "")
100	mc.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db RESAMPLE EVERY 10s FOR 2m BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(1m) END`)
101	s.MetaClient = mc
102
103	db := s.MetaClient.Database("db")
104
105	cq, err := NewContinuousQuery(db.Name, &db.ContinuousQueries[0])
106	if err != nil {
107		t.Fatal(err)
108	} else if cq.Resample.Every != 10*time.Second {
109		t.Errorf("expected resample every to be 10s, got %s", influxql.FormatDuration(cq.Resample.Every))
110	} else if cq.Resample.For != 2*time.Minute {
111		t.Errorf("expected resample for 2m, got %s", influxql.FormatDuration(cq.Resample.For))
112	}
113
114	// Set RunInterval high so we can trigger using Run method.
115	s.RunInterval = 10 * time.Minute
116
117	done := make(chan struct{})
118	var expected struct {
119		min time.Time
120		max time.Time
121	}
122
123	// Set a callback for ExecuteStatement.
124	s.QueryExecutor.StatementExecutor = &StatementExecutor{
125		ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
126			s := stmt.(*influxql.SelectStatement)
127			valuer := &influxql.NowValuer{Location: s.Location}
128			_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
129			if err != nil {
130				t.Errorf("unexpected error parsing time range: %s", err)
131			} else if !expected.min.Equal(timeRange.Min) || !expected.max.Equal(timeRange.Max) {
132				t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max)
133			}
134			done <- struct{}{}
135			ctx.Results <- &query.Result{}
136			return nil
137		},
138	}
139
140	s.Open()
141	defer s.Close()
142
143	// Set the 'now' time to the start of a 10 minute interval. Then trigger a run.
144	// This should trigger two queries (one for the current time interval, one for the previous).
145	now := time.Now().UTC().Truncate(10 * time.Minute)
146	expected.min = now.Add(-2 * time.Minute)
147	expected.max = now.Add(-1)
148	s.RunCh <- &RunRequest{Now: now}
149
150	if err := wait(done, 100*time.Millisecond); err != nil {
151		t.Fatal(err)
152	}
153
154	// Trigger another run 10 seconds later. Another two queries should happen,
155	// but it will be a different two queries.
156	expected.min = expected.min.Add(time.Minute)
157	expected.max = expected.max.Add(time.Minute)
158	s.RunCh <- &RunRequest{Now: now.Add(10 * time.Second)}
159
160	if err := wait(done, 100*time.Millisecond); err != nil {
161		t.Fatal(err)
162	}
163
164	// Reset the time period and send the initial request at 5 seconds after the
165	// 10 minute mark. There should be exactly one call since the current interval is too
166	// young and only one interval matches the FOR duration.
167	expected.min = now.Add(-time.Minute)
168	expected.max = now.Add(-1)
169	s.Run("", "", now.Add(5*time.Second))
170
171	if err := wait(done, 100*time.Millisecond); err != nil {
172		t.Fatal(err)
173	}
174
175	// Send a message 10 minutes later and ensure that the system plays catchup.
176	expected.max = now.Add(10*time.Minute - 1)
177	s.RunCh <- &RunRequest{Now: now.Add(10 * time.Minute)}
178
179	if err := wait(done, 100*time.Millisecond); err != nil {
180		t.Fatal(err)
181	}
182
183	// No overflow should be sent.
184	if err := wait(done, 100*time.Millisecond); err == nil {
185		t.Error("too many queries executed")
186	}
187}
188
189func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) {
190	s := NewTestService(t)
191	ms := NewMetaClient(t)
192	ms.CreateDatabase("db", "")
193	ms.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db RESAMPLE EVERY 1m BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(30s) END`)
194	s.MetaClient = ms
195
196	// Set RunInterval high so we can trigger using Run method.
197	s.RunInterval = 10 * time.Minute
198
199	done := make(chan struct{})
200	var expected struct {
201		min time.Time
202		max time.Time
203	}
204
205	// Set a callback for ExecuteQuery.
206	s.QueryExecutor.StatementExecutor = &StatementExecutor{
207		ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
208			s := stmt.(*influxql.SelectStatement)
209			valuer := &influxql.NowValuer{Location: s.Location}
210			_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
211			if err != nil {
212				t.Errorf("unexpected error parsing time range: %s", err)
213			} else if !expected.min.Equal(timeRange.Min) || !expected.max.Equal(timeRange.Max) {
214				t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max)
215			}
216			done <- struct{}{}
217			ctx.Results <- &query.Result{}
218			return nil
219		},
220	}
221
222	s.Open()
223	defer s.Close()
224
225	// Set the 'now' time to the start of a 10 minute interval. Then trigger a run.
226	// This should trigger two queries (one for the current time interval, one for the previous)
227	// since the default FOR interval should be EVERY, not the GROUP BY interval.
228	now := time.Now().Truncate(10 * time.Minute)
229	expected.min = now.Add(-time.Minute)
230	expected.max = now.Add(-1)
231	s.RunCh <- &RunRequest{Now: now}
232
233	if err := wait(done, 100*time.Millisecond); err != nil {
234		t.Fatal(err)
235	}
236
237	// Trigger 30 seconds later. Nothing should run.
238	s.RunCh <- &RunRequest{Now: now.Add(30 * time.Second)}
239
240	if err := wait(done, 100*time.Millisecond); err == nil {
241		t.Fatal("too many queries")
242	}
243
244	// Run again 1 minute later. Another two queries should run.
245	expected.min = now
246	expected.max = now.Add(time.Minute - 1)
247	s.RunCh <- &RunRequest{Now: now.Add(time.Minute)}
248
249	if err := wait(done, 100*time.Millisecond); err != nil {
250		t.Fatal(err)
251	}
252
253	// No overflow should be sent.
254	if err := wait(done, 100*time.Millisecond); err == nil {
255		t.Error("too many queries executed")
256	}
257}
258
259func TestContinuousQueryService_GroupByOffset(t *testing.T) {
260	s := NewTestService(t)
261	mc := NewMetaClient(t)
262	mc.CreateDatabase("db", "")
263	mc.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(1m, 30s) END`)
264	s.MetaClient = mc
265
266	// Set RunInterval high so we can trigger using Run method.
267	s.RunInterval = 10 * time.Minute
268
269	done := make(chan struct{})
270	var expected struct {
271		min time.Time
272		max time.Time
273	}
274
275	// Set a callback for ExecuteStatement.
276	s.QueryExecutor.StatementExecutor = &StatementExecutor{
277		ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
278			s := stmt.(*influxql.SelectStatement)
279			valuer := &influxql.NowValuer{Location: s.Location}
280			_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
281			if err != nil {
282				t.Errorf("unexpected error parsing time range: %s", err)
283			} else if !expected.min.Equal(timeRange.Min) || !expected.max.Equal(timeRange.Max) {
284				t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max)
285			}
286			done <- struct{}{}
287			ctx.Results <- &query.Result{}
288			return nil
289		},
290	}
291
292	s.Open()
293	defer s.Close()
294
295	// Set the 'now' time to the start of a 10 minute interval with a 30 second offset.
296	// Then trigger a run. This should trigger two queries (one for the current time
297	// interval, one for the previous).
298	now := time.Now().UTC().Truncate(10 * time.Minute).Add(30 * time.Second)
299	expected.min = now.Add(-time.Minute)
300	expected.max = now.Add(-1)
301	s.RunCh <- &RunRequest{Now: now}
302
303	if err := wait(done, 100*time.Millisecond); err != nil {
304		t.Fatal(err)
305	}
306}
307
308// Test service when not the cluster leader (CQs shouldn't run).
309func TestContinuousQueryService_NotLeader(t *testing.T) {
310	s := NewTestService(t)
311	// Set RunInterval high so we can test triggering with the RunCh below.
312	s.RunInterval = 10 * time.Second
313	s.MetaClient.(*MetaClient).Leader = false
314
315	done := make(chan struct{})
316	// Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader.
317	s.QueryExecutor.StatementExecutor = &StatementExecutor{
318		ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
319			done <- struct{}{}
320			ctx.Results <- &query.Result{Err: errUnexpected}
321			return nil
322		},
323	}
324
325	s.Open()
326	// Trigger service to run CQs.
327	s.RunCh <- &RunRequest{Now: time.Now()}
328	// Expect timeout error because ExecuteQuery callback wasn't called.
329	if err := wait(done, 100*time.Millisecond); err == nil {
330		t.Error(err)
331	}
332	s.Close()
333}
334
335// Test ExecuteContinuousQuery with invalid queries.
336func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
337	s := NewTestService(t)
338	s.QueryExecutor.StatementExecutor = &StatementExecutor{
339		ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
340			return errUnexpected
341		},
342	}
343	dbis := s.MetaClient.Databases()
344	dbi := dbis[0]
345	cqi := dbi.ContinuousQueries[0]
346
347	cqi.Query = `this is not a query`
348	if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil {
349		t.Error("expected error but got nil")
350	}
351
352	// Valid query but invalid continuous query.
353	cqi.Query = `SELECT * FROM cpu`
354	if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil {
355		t.Error("expected error but got nil")
356	}
357
358	// Group by requires aggregate.
359	cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)`
360	if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil {
361		t.Error("expected error but got nil")
362	}
363}
364
365// Test the time range for different CQ durations.
366func TestExecuteContinuousQuery_TimeRange(t *testing.T) {
367	// Choose a start date that is not on an interval border for anyone.
368	now := mustParseTime(t, "2000-01-01T00:00:00Z")
369	for _, tt := range []struct {
370		d          string
371		start, end time.Time
372	}{
373		{
374			d:     "10s",
375			start: mustParseTime(t, "2000-01-01T00:00:00Z"),
376			end:   mustParseTime(t, "2000-01-01T00:00:10Z"),
377		},
378		{
379			d:     "1m",
380			start: mustParseTime(t, "2000-01-01T00:00:00Z"),
381			end:   mustParseTime(t, "2000-01-01T00:01:00Z"),
382		},
383		{
384			d:     "10m",
385			start: mustParseTime(t, "2000-01-01T00:00:00Z"),
386			end:   mustParseTime(t, "2000-01-01T00:10:00Z"),
387		},
388		{
389			d:     "30m",
390			start: mustParseTime(t, "2000-01-01T00:00:00Z"),
391			end:   mustParseTime(t, "2000-01-01T00:30:00Z"),
392		},
393		{
394			d:     "1h",
395			start: mustParseTime(t, "2000-01-01T00:00:00Z"),
396			end:   mustParseTime(t, "2000-01-01T01:00:00Z"),
397		},
398		{
399			d:     "2h",
400			start: mustParseTime(t, "2000-01-01T00:00:00Z"),
401			end:   mustParseTime(t, "2000-01-01T02:00:00Z"),
402		},
403		{
404			d:     "12h",
405			start: mustParseTime(t, "2000-01-01T00:00:00Z"),
406			end:   mustParseTime(t, "2000-01-01T12:00:00Z"),
407		},
408		{
409			d:     "1d",
410			start: mustParseTime(t, "2000-01-01T00:00:00Z"),
411			end:   mustParseTime(t, "2000-01-02T00:00:00Z"),
412		},
413		{
414			d:     "1w",
415			start: mustParseTime(t, "1999-12-30T00:00:00Z"),
416			end:   mustParseTime(t, "2000-01-06T00:00:00Z"),
417		},
418	} {
419		t.Run(tt.d, func(t *testing.T) {
420			d, err := influxql.ParseDuration(tt.d)
421			if err != nil {
422				t.Fatalf("unable to parse duration: %s", err)
423			}
424
425			s := NewTestService(t)
426			mc := NewMetaClient(t)
427			mc.CreateDatabase("db", "")
428			mc.CreateContinuousQuery("db", "cq",
429				fmt.Sprintf(`CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(%s) END`, tt.d))
430			s.MetaClient = mc
431
432			// Set RunInterval high so we can trigger using Run method.
433			s.RunInterval = 10 * time.Minute
434			done := make(chan struct{})
435
436			// Set a callback for ExecuteStatement.
437			s.QueryExecutor.StatementExecutor = &StatementExecutor{
438				ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
439					s := stmt.(*influxql.SelectStatement)
440					valuer := &influxql.NowValuer{Location: s.Location}
441					_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
442					timeRange.Max = timeRange.Max.Add(time.Nanosecond)
443					if err != nil {
444						t.Errorf("unexpected error parsing time range: %s", err)
445					} else if !tt.start.Equal(timeRange.Min) || !tt.end.Equal(timeRange.Max) {
446						t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, tt.start, tt.end)
447					}
448					done <- struct{}{}
449					ctx.Results <- &query.Result{}
450					return nil
451				},
452			}
453
454			s.Open()
455			defer s.Close()
456
457			// Send an initial run request one nanosecond after the start to
458			// prime the last CQ map.
459			s.RunCh <- &RunRequest{Now: now.Add(time.Nanosecond)}
460			// Execute the real request after the time interval.
461			s.RunCh <- &RunRequest{Now: now.Add(d)}
462			if err := wait(done, 100*time.Millisecond); err != nil {
463				t.Fatal(err)
464			}
465		})
466	}
467}
468
469// Test the time range for different CQ durations.
470func TestExecuteContinuousQuery_TimeZone(t *testing.T) {
471	type test struct {
472		now        time.Time
473		start, end time.Time
474	}
475
476	// Choose a start date that is not on an interval border for anyone.
477	for _, tt := range []struct {
478		name    string
479		d       string
480		options string
481		initial time.Time
482		tests   []test
483	}{
484		{
485			name:    "DaylightSavingsStart/1d",
486			d:       "1d",
487			initial: mustParseTime(t, "2000-04-02T00:00:00-05:00"),
488			tests: []test{
489				{
490					start: mustParseTime(t, "2000-04-02T00:00:00-05:00"),
491					end:   mustParseTime(t, "2000-04-03T00:00:00-04:00"),
492				},
493			},
494		},
495		{
496			name:    "DaylightSavingsStart/2h",
497			d:       "2h",
498			initial: mustParseTime(t, "2000-04-02T00:00:00-05:00"),
499			tests: []test{
500				{
501					start: mustParseTime(t, "2000-04-02T00:00:00-05:00"),
502					end:   mustParseTime(t, "2000-04-02T03:00:00-04:00"),
503				},
504				{
505					start: mustParseTime(t, "2000-04-02T03:00:00-04:00"),
506					end:   mustParseTime(t, "2000-04-02T04:00:00-04:00"),
507				},
508			},
509		},
510		{
511			name:    "DaylightSavingsEnd/1d",
512			d:       "1d",
513			initial: mustParseTime(t, "2000-10-29T00:00:00-04:00"),
514			tests: []test{
515				{
516					start: mustParseTime(t, "2000-10-29T00:00:00-04:00"),
517					end:   mustParseTime(t, "2000-10-30T00:00:00-05:00"),
518				},
519			},
520		},
521		{
522			name:    "DaylightSavingsEnd/2h",
523			d:       "2h",
524			initial: mustParseTime(t, "2000-10-29T00:00:00-04:00"),
525			tests: []test{
526				{
527					start: mustParseTime(t, "2000-10-29T00:00:00-04:00"),
528					end:   mustParseTime(t, "2000-10-29T02:00:00-05:00"),
529				},
530				{
531					start: mustParseTime(t, "2000-10-29T02:00:00-05:00"),
532					end:   mustParseTime(t, "2000-10-29T04:00:00-05:00"),
533				},
534			},
535		},
536	} {
537		t.Run(tt.name, func(t *testing.T) {
538			s := NewTestService(t)
539			mc := NewMetaClient(t)
540			mc.CreateDatabase("db", "")
541			mc.CreateContinuousQuery("db", "cq",
542				fmt.Sprintf(`CREATE CONTINUOUS QUERY cq ON db %s BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(%s) TZ('America/New_York') END`, tt.options, tt.d))
543			s.MetaClient = mc
544
545			// Set RunInterval high so we can trigger using Run method.
546			s.RunInterval = 10 * time.Minute
547			done := make(chan struct{})
548
549			// Set a callback for ExecuteStatement.
550			tests := make(chan test, 1)
551			s.QueryExecutor.StatementExecutor = &StatementExecutor{
552				ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
553					test := <-tests
554					s := stmt.(*influxql.SelectStatement)
555					valuer := &influxql.NowValuer{Location: s.Location}
556					_, timeRange, err := influxql.ConditionExpr(s.Condition, valuer)
557					timeRange.Max = timeRange.Max.Add(time.Nanosecond)
558					if err != nil {
559						t.Errorf("unexpected error parsing time range: %s", err)
560					} else if !test.start.Equal(timeRange.Min) || !test.end.Equal(timeRange.Max) {
561						t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, test.start, test.end)
562					}
563					done <- struct{}{}
564					ctx.Results <- &query.Result{}
565					return nil
566				},
567			}
568
569			s.Open()
570			defer s.Close()
571
572			// Send an initial run request one nanosecond after the start to
573			// prime the last CQ map.
574			s.RunCh <- &RunRequest{Now: tt.initial.Add(time.Nanosecond)}
575			// Execute each of the tests and ensure the times are correct.
576			for i, test := range tt.tests {
577				tests <- test
578				now := test.now
579				if now.IsZero() {
580					now = test.end
581				}
582				s.RunCh <- &RunRequest{Now: now}
583				if err := wait(done, 100*time.Millisecond); err != nil {
584					t.Fatal(fmt.Errorf("%d. %s", i+1, err))
585				}
586			}
587		})
588	}
589}
590
591// Test ExecuteContinuousQuery when QueryExecutor returns an error.
592func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
593	s := NewTestService(t)
594	s.QueryExecutor.StatementExecutor = &StatementExecutor{
595		ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
596			return errExpected
597		},
598	}
599
600	dbis := s.MetaClient.Databases()
601	dbi := dbis[0]
602	cqi := dbi.ContinuousQueries[0]
603
604	now := time.Now().Truncate(10 * time.Minute)
605	if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); err != errExpected {
606		t.Errorf("exp = %s, got = %v", errExpected, err)
607	}
608}
609
610func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) {
611	s := NewTestService(t)
612	const writeN = int64(50)
613
614	s.QueryExecutor.StatementExecutor = &StatementExecutor{
615		ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
616			ctx.Results <- &query.Result{
617				Series: []*models.Row{{
618					Name:    "result",
619					Columns: []string{"time", "written"},
620					Values:  [][]interface{}{{time.Time{}, writeN}},
621				}},
622			}
623			return nil
624		},
625	}
626	s.queryStatsEnabled = true
627	var point models.Point
628	s.Monitor = &monitor{
629		EnabledFn: func() bool { return true },
630		WritePointsFn: func(p models.Points) error {
631			if len(p) != 1 {
632				t.Fatalf("expected point")
633			}
634			point = p[0]
635			return nil
636		},
637	}
638
639	dbis := s.MetaClient.Databases()
640	dbi := dbis[0]
641	cqi := dbi.ContinuousQueries[0]
642
643	now := time.Now().Truncate(10 * time.Minute)
644	if ok, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); !ok || err != nil {
645		t.Fatalf("ExecuteContinuousQuery failed, ok=%t, err=%v", ok, err)
646	}
647
648	if point == nil {
649		t.Fatal("expected Monitor.WritePoints call")
650	}
651
652	f, _ := point.Fields()
653	if got, ok := f["pointsWrittenOK"].(int64); !ok || got != writeN {
654		t.Errorf("unexpected value for written; exp=%d, got=%d", writeN, got)
655	}
656}
657
658func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testing.T) {
659	s := NewTestService(t)
660	s.QueryExecutor.StatementExecutor = &StatementExecutor{
661		ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
662			ctx.Send(&query.Result{})
663			return nil
664		},
665	}
666	s.Monitor = &monitor{
667		EnabledFn: func() bool { return true },
668		WritePointsFn: func(p models.Points) error {
669			t.Fatalf("unexpected Monitor.WritePoints call")
670			return nil
671		},
672	}
673
674	dbis := s.MetaClient.Databases()
675	dbi := dbis[0]
676	cqi := dbi.ContinuousQueries[0]
677
678	now := time.Now().Truncate(10 * time.Minute)
679	if ok, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); !ok || err != nil {
680		t.Fatalf("ExecuteContinuousQuery failed, ok=%t, err=%v", ok, err)
681	}
682}
683
684// NewTestService returns a new *Service with default mock object members.
685func NewTestService(t *testing.T) *Service {
686	s := NewService(NewConfig())
687	ms := NewMetaClient(t)
688	s.MetaClient = ms
689	s.QueryExecutor = query.NewExecutor()
690	s.RunInterval = time.Millisecond
691
692	// Set Logger to write to dev/null so stdout isn't polluted.
693	if testing.Verbose() {
694		s.WithLogger(logger.New(os.Stderr))
695	}
696
697	// Add a couple test databases and CQs.
698	ms.CreateDatabase("db", "rp")
699	ms.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s) END`)
700	ms.CreateDatabase("db2", "default")
701	ms.CreateContinuousQuery("db2", "cq2", `CREATE CONTINUOUS QUERY cq2 ON db2 BEGIN SELECT mean(value) INTO cpu_mean FROM cpu WHERE time > now() - 10m GROUP BY time(1m) END`)
702	ms.CreateDatabase("db3", "default")
703	ms.CreateContinuousQuery("db3", "cq3", `CREATE CONTINUOUS QUERY cq3 ON db3 BEGIN SELECT mean(value) INTO "1hAverages".:MEASUREMENT FROM /cpu[0-9]?/ GROUP BY time(10s) END`)
704
705	return s
706}
707
708// MetaClient is a mock meta store.
709type MetaClient struct {
710	mu            sync.RWMutex
711	Leader        bool
712	AllowLease    bool
713	DatabaseInfos []meta.DatabaseInfo
714	Err           error
715	t             *testing.T
716	nodeID        uint64
717}
718
719// NewMetaClient returns a *MetaClient.
720func NewMetaClient(t *testing.T) *MetaClient {
721	return &MetaClient{
722		Leader:     true,
723		AllowLease: true,
724		t:          t,
725		nodeID:     1,
726	}
727}
728
729// NodeID returns the client's node ID.
730func (ms *MetaClient) NodeID() uint64 { return ms.nodeID }
731
732// AcquireLease attempts to acquire the specified lease.
733func (ms *MetaClient) AcquireLease(name string) (l *meta.Lease, err error) {
734	if ms.Leader {
735		if ms.AllowLease {
736			return &meta.Lease{Name: name}, nil
737		}
738		return nil, errors.New("another node owns the lease")
739	}
740	return nil, meta.ErrServiceUnavailable
741}
742
743// Databases returns a list of database info about each database in the coordinator.
744func (ms *MetaClient) Databases() []meta.DatabaseInfo {
745	ms.mu.RLock()
746	defer ms.mu.RUnlock()
747	return ms.DatabaseInfos
748}
749
750// Database returns a single database by name.
751func (ms *MetaClient) Database(name string) *meta.DatabaseInfo {
752	ms.mu.RLock()
753	defer ms.mu.RUnlock()
754	return ms.database(name)
755}
756
757func (ms *MetaClient) database(name string) *meta.DatabaseInfo {
758	if ms.Err != nil {
759		return nil
760	}
761	for i := range ms.DatabaseInfos {
762		if ms.DatabaseInfos[i].Name == name {
763			return &ms.DatabaseInfos[i]
764		}
765	}
766	return nil
767}
768
769// CreateDatabase adds a new database to the meta store.
770func (ms *MetaClient) CreateDatabase(name, defaultRetentionPolicy string) error {
771	ms.mu.Lock()
772	defer ms.mu.Unlock()
773	if ms.Err != nil {
774		return ms.Err
775	}
776
777	// See if the database already exists.
778	for _, dbi := range ms.DatabaseInfos {
779		if dbi.Name == name {
780			return fmt.Errorf("database already exists: %s", name)
781		}
782	}
783
784	// Create database.
785	ms.DatabaseInfos = append(ms.DatabaseInfos, meta.DatabaseInfo{
786		Name: name,
787
788		DefaultRetentionPolicy: defaultRetentionPolicy,
789	})
790
791	return nil
792}
793
794// CreateContinuousQuery adds a CQ to the meta store.
795func (ms *MetaClient) CreateContinuousQuery(database, name, query string) error {
796	ms.mu.Lock()
797	defer ms.mu.Unlock()
798	if ms.Err != nil {
799		return ms.Err
800	}
801
802	dbi := ms.database(database)
803	if dbi == nil {
804		return fmt.Errorf("database not found: %s", database)
805	}
806
807	// See if CQ already exists.
808	for _, cqi := range dbi.ContinuousQueries {
809		if cqi.Name == name {
810			return fmt.Errorf("continuous query already exists: %s", name)
811		}
812	}
813
814	// Create a new CQ and store it.
815	dbi.ContinuousQueries = append(dbi.ContinuousQueries, meta.ContinuousQueryInfo{
816		Name:  name,
817		Query: query,
818	})
819
820	return nil
821}
822
823// StatementExecutor is a mock statement executor.
824type StatementExecutor struct {
825	ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
826}
827
828func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
829	return e.ExecuteStatementFn(stmt, ctx)
830}
831
832func wait(c chan struct{}, d time.Duration) (err error) {
833	select {
834	case <-c:
835	case <-time.After(d):
836		err = errors.New("timed out")
837	}
838	return
839}
840
841type monitor struct {
842	EnabledFn     func() bool
843	WritePointsFn func(models.Points) error
844}
845
846func (m *monitor) Enabled() bool                     { return m.EnabledFn() }
847func (m *monitor) WritePoints(p models.Points) error { return m.WritePointsFn(p) }
848
849func mustParseTime(t *testing.T, value string) time.Time {
850	ts, err := time.Parse(time.RFC3339, value)
851	if err != nil {
852		t.Fatalf("unable to parse time: %s", err)
853	}
854	return ts
855}
856