1// Copyright 2015 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package promql
15
16import (
17	"context"
18	"fmt"
19	"io/ioutil"
20	"math"
21	"regexp"
22	"strconv"
23	"strings"
24	"time"
25
26	"github.com/pkg/errors"
27	"github.com/prometheus/common/model"
28
29	"github.com/prometheus/prometheus/pkg/exemplar"
30	"github.com/prometheus/prometheus/pkg/labels"
31	"github.com/prometheus/prometheus/pkg/timestamp"
32	"github.com/prometheus/prometheus/promql/parser"
33	"github.com/prometheus/prometheus/storage"
34	"github.com/prometheus/prometheus/tsdb"
35	"github.com/prometheus/prometheus/util/teststorage"
36	"github.com/prometheus/prometheus/util/testutil"
37)
38
39var (
40	minNormal = math.Float64frombits(0x0010000000000000) // The smallest positive normal value of type float64.
41
42	patSpace       = regexp.MustCompile("[\t ]+")
43	patLoad        = regexp.MustCompile(`^load\s+(.+?)$`)
44	patEvalInstant = regexp.MustCompile(`^eval(?:_(fail|ordered))?\s+instant\s+(?:at\s+(.+?))?\s+(.+)$`)
45)
46
47const (
48	epsilon = 0.000001 // Relative error allowed for sample values.
49)
50
51var testStartTime = time.Unix(0, 0).UTC()
52
53// Test is a sequence of read and write commands that are run
54// against a test storage.
55type Test struct {
56	testutil.T
57
58	cmds []testCommand
59
60	storage *teststorage.TestStorage
61
62	queryEngine *Engine
63	context     context.Context
64	cancelCtx   context.CancelFunc
65}
66
67// NewTest returns an initialized empty Test.
68func NewTest(t testutil.T, input string) (*Test, error) {
69	test := &Test{
70		T:    t,
71		cmds: []testCommand{},
72	}
73	err := test.parse(input)
74	test.clear()
75
76	return test, err
77}
78
79func newTestFromFile(t testutil.T, filename string) (*Test, error) {
80	content, err := ioutil.ReadFile(filename)
81	if err != nil {
82		return nil, err
83	}
84	return NewTest(t, string(content))
85}
86
87// QueryEngine returns the test's query engine.
88func (t *Test) QueryEngine() *Engine {
89	return t.queryEngine
90}
91
92// Queryable allows querying the test data.
93func (t *Test) Queryable() storage.Queryable {
94	return t.storage
95}
96
97// Context returns the test's context.
98func (t *Test) Context() context.Context {
99	return t.context
100}
101
102// Storage returns the test's storage.
103func (t *Test) Storage() storage.Storage {
104	return t.storage
105}
106
107// TSDB returns test's TSDB.
108func (t *Test) TSDB() *tsdb.DB {
109	return t.storage.DB
110}
111
112// ExemplarStorage returns the test's exemplar storage.
113func (t *Test) ExemplarStorage() storage.ExemplarStorage {
114	return t.storage
115}
116
117func (t *Test) ExemplarQueryable() storage.ExemplarQueryable {
118	return t.storage.ExemplarQueryable()
119}
120
121func raise(line int, format string, v ...interface{}) error {
122	return &parser.ParseErr{
123		LineOffset: line,
124		Err:        errors.Errorf(format, v...),
125	}
126}
127
128func parseLoad(lines []string, i int) (int, *loadCmd, error) {
129	if !patLoad.MatchString(lines[i]) {
130		return i, nil, raise(i, "invalid load command. (load <step:duration>)")
131	}
132	parts := patLoad.FindStringSubmatch(lines[i])
133
134	gap, err := model.ParseDuration(parts[1])
135	if err != nil {
136		return i, nil, raise(i, "invalid step definition %q: %s", parts[1], err)
137	}
138	cmd := newLoadCmd(time.Duration(gap))
139	for i+1 < len(lines) {
140		i++
141		defLine := lines[i]
142		if len(defLine) == 0 {
143			i--
144			break
145		}
146		metric, vals, err := parser.ParseSeriesDesc(defLine)
147		if err != nil {
148			if perr, ok := err.(*parser.ParseErr); ok {
149				perr.LineOffset = i
150			}
151			return i, nil, err
152		}
153		cmd.set(metric, vals...)
154	}
155	return i, cmd, nil
156}
157
158func (t *Test) parseEval(lines []string, i int) (int, *evalCmd, error) {
159	if !patEvalInstant.MatchString(lines[i]) {
160		return i, nil, raise(i, "invalid evaluation command. (eval[_fail|_ordered] instant [at <offset:duration>] <query>")
161	}
162	parts := patEvalInstant.FindStringSubmatch(lines[i])
163	var (
164		mod  = parts[1]
165		at   = parts[2]
166		expr = parts[3]
167	)
168	_, err := parser.ParseExpr(expr)
169	if err != nil {
170		if perr, ok := err.(*parser.ParseErr); ok {
171			perr.LineOffset = i
172			posOffset := parser.Pos(strings.Index(lines[i], expr))
173			perr.PositionRange.Start += posOffset
174			perr.PositionRange.End += posOffset
175			perr.Query = lines[i]
176		}
177		return i, nil, err
178	}
179
180	offset, err := model.ParseDuration(at)
181	if err != nil {
182		return i, nil, raise(i, "invalid step definition %q: %s", parts[1], err)
183	}
184	ts := testStartTime.Add(time.Duration(offset))
185
186	cmd := newEvalCmd(expr, ts, i+1)
187	switch mod {
188	case "ordered":
189		cmd.ordered = true
190	case "fail":
191		cmd.fail = true
192	}
193
194	for j := 1; i+1 < len(lines); j++ {
195		i++
196		defLine := lines[i]
197		if len(defLine) == 0 {
198			i--
199			break
200		}
201		if f, err := parseNumber(defLine); err == nil {
202			cmd.expect(0, nil, parser.SequenceValue{Value: f})
203			break
204		}
205		metric, vals, err := parser.ParseSeriesDesc(defLine)
206		if err != nil {
207			if perr, ok := err.(*parser.ParseErr); ok {
208				perr.LineOffset = i
209			}
210			return i, nil, err
211		}
212
213		// Currently, we are not expecting any matrices.
214		if len(vals) > 1 {
215			return i, nil, raise(i, "expecting multiple values in instant evaluation not allowed")
216		}
217		cmd.expect(j, metric, vals...)
218	}
219	return i, cmd, nil
220}
221
222// getLines returns trimmed lines after removing the comments.
223func getLines(input string) []string {
224	lines := strings.Split(input, "\n")
225	for i, l := range lines {
226		l = strings.TrimSpace(l)
227		if strings.HasPrefix(l, "#") {
228			l = ""
229		}
230		lines[i] = l
231	}
232	return lines
233}
234
235// parse the given command sequence and appends it to the test.
236func (t *Test) parse(input string) error {
237	lines := getLines(input)
238	var err error
239	// Scan for steps line by line.
240	for i := 0; i < len(lines); i++ {
241		l := lines[i]
242		if len(l) == 0 {
243			continue
244		}
245		var cmd testCommand
246
247		switch c := strings.ToLower(patSpace.Split(l, 2)[0]); {
248		case c == "clear":
249			cmd = &clearCmd{}
250		case c == "load":
251			i, cmd, err = parseLoad(lines, i)
252		case strings.HasPrefix(c, "eval"):
253			i, cmd, err = t.parseEval(lines, i)
254		default:
255			return raise(i, "invalid command %q", l)
256		}
257		if err != nil {
258			return err
259		}
260		t.cmds = append(t.cmds, cmd)
261	}
262	return nil
263}
264
265// testCommand is an interface that ensures that only the package internal
266// types can be a valid command for a test.
267type testCommand interface {
268	testCmd()
269}
270
271func (*clearCmd) testCmd() {}
272func (*loadCmd) testCmd()  {}
273func (*evalCmd) testCmd()  {}
274
275// loadCmd is a command that loads sequences of sample values for specific
276// metrics into the storage.
277type loadCmd struct {
278	gap       time.Duration
279	metrics   map[uint64]labels.Labels
280	defs      map[uint64][]Point
281	exemplars map[uint64][]exemplar.Exemplar
282}
283
284func newLoadCmd(gap time.Duration) *loadCmd {
285	return &loadCmd{
286		gap:       gap,
287		metrics:   map[uint64]labels.Labels{},
288		defs:      map[uint64][]Point{},
289		exemplars: map[uint64][]exemplar.Exemplar{},
290	}
291}
292
293func (cmd loadCmd) String() string {
294	return "load"
295}
296
297// set a sequence of sample values for the given metric.
298func (cmd *loadCmd) set(m labels.Labels, vals ...parser.SequenceValue) {
299	h := m.Hash()
300
301	samples := make([]Point, 0, len(vals))
302	ts := testStartTime
303	for _, v := range vals {
304		if !v.Omitted {
305			samples = append(samples, Point{
306				T: ts.UnixNano() / int64(time.Millisecond/time.Nanosecond),
307				V: v.Value,
308			})
309		}
310		ts = ts.Add(cmd.gap)
311	}
312	cmd.defs[h] = samples
313	cmd.metrics[h] = m
314}
315
316// append the defined time series to the storage.
317func (cmd *loadCmd) append(a storage.Appender) error {
318	for h, smpls := range cmd.defs {
319		m := cmd.metrics[h]
320
321		for _, s := range smpls {
322			if _, err := a.Append(0, m, s.T, s.V); err != nil {
323				return err
324			}
325		}
326	}
327	return nil
328}
329
330// evalCmd is a command that evaluates an expression for the given time (range)
331// and expects a specific result.
332type evalCmd struct {
333	expr  string
334	start time.Time
335	line  int
336
337	fail, ordered bool
338
339	metrics  map[uint64]labels.Labels
340	expected map[uint64]entry
341}
342
343type entry struct {
344	pos  int
345	vals []parser.SequenceValue
346}
347
348func (e entry) String() string {
349	return fmt.Sprintf("%d: %s", e.pos, e.vals)
350}
351
352func newEvalCmd(expr string, start time.Time, line int) *evalCmd {
353	return &evalCmd{
354		expr:  expr,
355		start: start,
356		line:  line,
357
358		metrics:  map[uint64]labels.Labels{},
359		expected: map[uint64]entry{},
360	}
361}
362
363func (ev *evalCmd) String() string {
364	return "eval"
365}
366
367// expect adds a new metric with a sequence of values to the set of expected
368// results for the query.
369func (ev *evalCmd) expect(pos int, m labels.Labels, vals ...parser.SequenceValue) {
370	if m == nil {
371		ev.expected[0] = entry{pos: pos, vals: vals}
372		return
373	}
374	h := m.Hash()
375	ev.metrics[h] = m
376	ev.expected[h] = entry{pos: pos, vals: vals}
377}
378
379// compareResult compares the result value with the defined expectation.
380func (ev *evalCmd) compareResult(result parser.Value) error {
381	switch val := result.(type) {
382	case Matrix:
383		return errors.New("received range result on instant evaluation")
384
385	case Vector:
386		seen := map[uint64]bool{}
387		for pos, v := range val {
388			fp := v.Metric.Hash()
389			if _, ok := ev.metrics[fp]; !ok {
390				return errors.Errorf("unexpected metric %s in result", v.Metric)
391			}
392			exp := ev.expected[fp]
393			if ev.ordered && exp.pos != pos+1 {
394				return errors.Errorf("expected metric %s with %v at position %d but was at %d", v.Metric, exp.vals, exp.pos, pos+1)
395			}
396			if !almostEqual(exp.vals[0].Value, v.V) {
397				return errors.Errorf("expected %v for %s but got %v", exp.vals[0].Value, v.Metric, v.V)
398			}
399
400			seen[fp] = true
401		}
402		for fp, expVals := range ev.expected {
403			if !seen[fp] {
404				fmt.Println("vector result", len(val), ev.expr)
405				for _, ss := range val {
406					fmt.Println("    ", ss.Metric, ss.Point)
407				}
408				return errors.Errorf("expected metric %s with %v not found", ev.metrics[fp], expVals)
409			}
410		}
411
412	case Scalar:
413		if !almostEqual(ev.expected[0].vals[0].Value, val.V) {
414			return errors.Errorf("expected Scalar %v but got %v", val.V, ev.expected[0].vals[0].Value)
415		}
416
417	default:
418		panic(errors.Errorf("promql.Test.compareResult: unexpected result type %T", result))
419	}
420	return nil
421}
422
423// clearCmd is a command that wipes the test's storage state.
424type clearCmd struct{}
425
426func (cmd clearCmd) String() string {
427	return "clear"
428}
429
430// Run executes the command sequence of the test. Until the maximum error number
431// is reached, evaluation errors do not terminate execution.
432func (t *Test) Run() error {
433	for _, cmd := range t.cmds {
434		// TODO(fabxc): aggregate command errors, yield diffs for result
435		// comparison errors.
436		if err := t.exec(cmd); err != nil {
437			return err
438		}
439	}
440	return nil
441}
442
443type atModifierTestCase struct {
444	expr     string
445	evalTime time.Time
446}
447
448func atModifierTestCases(exprStr string, evalTime time.Time) ([]atModifierTestCase, error) {
449	expr, err := parser.ParseExpr(exprStr)
450	if err != nil {
451		return nil, err
452	}
453	ts := timestamp.FromTime(evalTime)
454
455	containsNonStepInvariant := false
456	// Setting the @ timestamp for all selectors to be evalTime.
457	// If there is a subquery, then the selectors inside it don't get the @ timestamp.
458	// If any selector already has the @ timestamp set, then it is untouched.
459	parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
460		_, _, subqTs := subqueryTimes(path)
461		if subqTs != nil {
462			// There is a subquery with timestamp in the path,
463			// hence don't change any timestamps further.
464			return nil
465		}
466		switch n := node.(type) {
467		case *parser.VectorSelector:
468			if n.Timestamp == nil {
469				n.Timestamp = makeInt64Pointer(ts)
470			}
471
472		case *parser.MatrixSelector:
473			if vs := n.VectorSelector.(*parser.VectorSelector); vs.Timestamp == nil {
474				vs.Timestamp = makeInt64Pointer(ts)
475			}
476
477		case *parser.SubqueryExpr:
478			if n.Timestamp == nil {
479				n.Timestamp = makeInt64Pointer(ts)
480			}
481
482		case *parser.Call:
483			_, ok := AtModifierUnsafeFunctions[n.Func.Name]
484			containsNonStepInvariant = containsNonStepInvariant || ok
485		}
486		return nil
487	})
488
489	if containsNonStepInvariant {
490		// Since there is a step invariant function, we cannot automatically
491		// generate step invariant test cases for it sanely.
492		return nil, nil
493	}
494
495	newExpr := expr.String() // With all the @ evalTime set.
496	additionalEvalTimes := []int64{-10 * ts, 0, ts / 5, ts, 10 * ts}
497	if ts == 0 {
498		additionalEvalTimes = []int64{-1000, -ts, 1000}
499	}
500	testCases := make([]atModifierTestCase, 0, len(additionalEvalTimes))
501	for _, et := range additionalEvalTimes {
502		testCases = append(testCases, atModifierTestCase{
503			expr:     newExpr,
504			evalTime: timestamp.Time(et),
505		})
506	}
507
508	return testCases, nil
509}
510
511// exec processes a single step of the test.
512func (t *Test) exec(tc testCommand) error {
513	switch cmd := tc.(type) {
514	case *clearCmd:
515		t.clear()
516
517	case *loadCmd:
518		app := t.storage.Appender(t.context)
519		if err := cmd.append(app); err != nil {
520			app.Rollback()
521			return err
522		}
523
524		if err := app.Commit(); err != nil {
525			return err
526		}
527
528	case *evalCmd:
529		queries, err := atModifierTestCases(cmd.expr, cmd.start)
530		if err != nil {
531			return err
532		}
533		queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...)
534		for _, iq := range queries {
535			q, err := t.QueryEngine().NewInstantQuery(t.storage, iq.expr, iq.evalTime)
536			if err != nil {
537				return err
538			}
539			defer q.Close()
540			res := q.Exec(t.context)
541			if res.Err != nil {
542				if cmd.fail {
543					continue
544				}
545				return errors.Wrapf(res.Err, "error evaluating query %q (line %d)", iq.expr, cmd.line)
546			}
547			if res.Err == nil && cmd.fail {
548				return errors.Errorf("expected error evaluating query %q (line %d) but got none", iq.expr, cmd.line)
549			}
550			err = cmd.compareResult(res.Value)
551			if err != nil {
552				return errors.Wrapf(err, "error in %s %s", cmd, iq.expr)
553			}
554
555			// Check query returns same result in range mode,
556			// by checking against the middle step.
557			q, err = t.queryEngine.NewRangeQuery(t.storage, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute)
558			if err != nil {
559				return err
560			}
561			rangeRes := q.Exec(t.context)
562			if rangeRes.Err != nil {
563				return errors.Wrapf(rangeRes.Err, "error evaluating query %q (line %d) in range mode", iq.expr, cmd.line)
564			}
565			defer q.Close()
566			if cmd.ordered {
567				// Ordering isn't defined for range queries.
568				continue
569			}
570			mat := rangeRes.Value.(Matrix)
571			vec := make(Vector, 0, len(mat))
572			for _, series := range mat {
573				for _, point := range series.Points {
574					if point.T == timeMilliseconds(iq.evalTime) {
575						vec = append(vec, Sample{Metric: series.Metric, Point: point})
576						break
577					}
578				}
579			}
580			if _, ok := res.Value.(Scalar); ok {
581				err = cmd.compareResult(Scalar{V: vec[0].Point.V})
582			} else {
583				err = cmd.compareResult(vec)
584			}
585			if err != nil {
586				return errors.Wrapf(err, "error in %s %s (line %d) rande mode", cmd, iq.expr, cmd.line)
587			}
588
589		}
590
591	default:
592		panic("promql.Test.exec: unknown test command type")
593	}
594	return nil
595}
596
597// clear the current test storage of all inserted samples.
598func (t *Test) clear() {
599	if t.storage != nil {
600		if err := t.storage.Close(); err != nil {
601			t.T.Fatalf("closing test storage: %s", err)
602		}
603	}
604	if t.cancelCtx != nil {
605		t.cancelCtx()
606	}
607	t.storage = teststorage.New(t)
608
609	opts := EngineOpts{
610		Logger:                   nil,
611		Reg:                      nil,
612		MaxSamples:               10000,
613		Timeout:                  100 * time.Second,
614		NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) },
615		EnableAtModifier:         true,
616	}
617
618	t.queryEngine = NewEngine(opts)
619	t.context, t.cancelCtx = context.WithCancel(context.Background())
620}
621
622// Close closes resources associated with the Test.
623func (t *Test) Close() {
624	t.cancelCtx()
625
626	if err := t.storage.Close(); err != nil {
627		t.T.Fatalf("closing test storage: %s", err)
628	}
629}
630
631// samplesAlmostEqual returns true if the two sample lines only differ by a
632// small relative error in their sample value.
633func almostEqual(a, b float64) bool {
634	// NaN has no equality but for testing we still want to know whether both values
635	// are NaN.
636	if math.IsNaN(a) && math.IsNaN(b) {
637		return true
638	}
639
640	// Cf. http://floating-point-gui.de/errors/comparison/
641	if a == b {
642		return true
643	}
644
645	diff := math.Abs(a - b)
646
647	if a == 0 || b == 0 || diff < minNormal {
648		return diff < epsilon*minNormal
649	}
650	return diff/(math.Abs(a)+math.Abs(b)) < epsilon
651}
652
653func parseNumber(s string) (float64, error) {
654	n, err := strconv.ParseInt(s, 0, 64)
655	f := float64(n)
656	if err != nil {
657		f, err = strconv.ParseFloat(s, 64)
658	}
659	if err != nil {
660		return 0, errors.Wrap(err, "error parsing number")
661	}
662	return f, nil
663}
664
665// LazyLoader lazily loads samples into storage.
666// This is specifically implemented for unit testing of rules.
667type LazyLoader struct {
668	testutil.T
669
670	loadCmd *loadCmd
671
672	storage          storage.Storage
673	SubqueryInterval time.Duration
674
675	queryEngine *Engine
676	context     context.Context
677	cancelCtx   context.CancelFunc
678}
679
680// NewLazyLoader returns an initialized empty LazyLoader.
681func NewLazyLoader(t testutil.T, input string) (*LazyLoader, error) {
682	ll := &LazyLoader{
683		T: t,
684	}
685	err := ll.parse(input)
686	ll.clear()
687	return ll, err
688}
689
690// parse the given load command.
691func (ll *LazyLoader) parse(input string) error {
692	lines := getLines(input)
693	// Accepts only 'load' command.
694	for i := 0; i < len(lines); i++ {
695		l := lines[i]
696		if len(l) == 0 {
697			continue
698		}
699		if strings.ToLower(patSpace.Split(l, 2)[0]) == "load" {
700			_, cmd, err := parseLoad(lines, i)
701			if err != nil {
702				return err
703			}
704			ll.loadCmd = cmd
705			return nil
706		}
707
708		return raise(i, "invalid command %q", l)
709	}
710	return errors.New("no \"load\" command found")
711}
712
713// clear the current test storage of all inserted samples.
714func (ll *LazyLoader) clear() {
715	if ll.storage != nil {
716		if err := ll.storage.Close(); err != nil {
717			ll.T.Fatalf("closing test storage: %s", err)
718		}
719	}
720	if ll.cancelCtx != nil {
721		ll.cancelCtx()
722	}
723	ll.storage = teststorage.New(ll)
724
725	opts := EngineOpts{
726		Logger:                   nil,
727		Reg:                      nil,
728		MaxSamples:               10000,
729		Timeout:                  100 * time.Second,
730		NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(ll.SubqueryInterval) },
731		EnableAtModifier:         true,
732	}
733
734	ll.queryEngine = NewEngine(opts)
735	ll.context, ll.cancelCtx = context.WithCancel(context.Background())
736}
737
738// appendTill appends the defined time series to the storage till the given timestamp (in milliseconds).
739func (ll *LazyLoader) appendTill(ts int64) error {
740	app := ll.storage.Appender(ll.Context())
741	for h, smpls := range ll.loadCmd.defs {
742		m := ll.loadCmd.metrics[h]
743		for i, s := range smpls {
744			if s.T > ts {
745				// Removing the already added samples.
746				ll.loadCmd.defs[h] = smpls[i:]
747				break
748			}
749			if _, err := app.Append(0, m, s.T, s.V); err != nil {
750				return err
751			}
752			if i == len(smpls)-1 {
753				ll.loadCmd.defs[h] = nil
754			}
755		}
756	}
757	return app.Commit()
758}
759
760// WithSamplesTill loads the samples till given timestamp and executes the given function.
761func (ll *LazyLoader) WithSamplesTill(ts time.Time, fn func(error)) {
762	tsMilli := ts.Sub(time.Unix(0, 0).UTC()) / time.Millisecond
763	fn(ll.appendTill(int64(tsMilli)))
764}
765
766// QueryEngine returns the LazyLoader's query engine.
767func (ll *LazyLoader) QueryEngine() *Engine {
768	return ll.queryEngine
769}
770
771// Queryable allows querying the LazyLoader's data.
772// Note: only the samples till the max timestamp used
773//       in `WithSamplesTill` can be queried.
774func (ll *LazyLoader) Queryable() storage.Queryable {
775	return ll.storage
776}
777
778// Context returns the LazyLoader's context.
779func (ll *LazyLoader) Context() context.Context {
780	return ll.context
781}
782
783// Storage returns the LazyLoader's storage.
784func (ll *LazyLoader) Storage() storage.Storage {
785	return ll.storage
786}
787
788// Close closes resources associated with the LazyLoader.
789func (ll *LazyLoader) Close() {
790	ll.cancelCtx()
791
792	if err := ll.storage.Close(); err != nil {
793		ll.T.Fatalf("closing test storage: %s", err)
794	}
795}
796