1// Copyright 2015 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package promql 15 16import ( 17 "context" 18 "fmt" 19 "io/ioutil" 20 "math" 21 "regexp" 22 "strconv" 23 "strings" 24 "time" 25 26 "github.com/pkg/errors" 27 "github.com/prometheus/common/model" 28 29 "github.com/prometheus/prometheus/pkg/exemplar" 30 "github.com/prometheus/prometheus/pkg/labels" 31 "github.com/prometheus/prometheus/pkg/timestamp" 32 "github.com/prometheus/prometheus/promql/parser" 33 "github.com/prometheus/prometheus/storage" 34 "github.com/prometheus/prometheus/tsdb" 35 "github.com/prometheus/prometheus/util/teststorage" 36 "github.com/prometheus/prometheus/util/testutil" 37) 38 39var ( 40 minNormal = math.Float64frombits(0x0010000000000000) // The smallest positive normal value of type float64. 41 42 patSpace = regexp.MustCompile("[\t ]+") 43 patLoad = regexp.MustCompile(`^load\s+(.+?)$`) 44 patEvalInstant = regexp.MustCompile(`^eval(?:_(fail|ordered))?\s+instant\s+(?:at\s+(.+?))?\s+(.+)$`) 45) 46 47const ( 48 epsilon = 0.000001 // Relative error allowed for sample values. 49) 50 51var testStartTime = time.Unix(0, 0).UTC() 52 53// Test is a sequence of read and write commands that are run 54// against a test storage. 55type Test struct { 56 testutil.T 57 58 cmds []testCommand 59 60 storage *teststorage.TestStorage 61 62 queryEngine *Engine 63 context context.Context 64 cancelCtx context.CancelFunc 65} 66 67// NewTest returns an initialized empty Test. 68func NewTest(t testutil.T, input string) (*Test, error) { 69 test := &Test{ 70 T: t, 71 cmds: []testCommand{}, 72 } 73 err := test.parse(input) 74 test.clear() 75 76 return test, err 77} 78 79func newTestFromFile(t testutil.T, filename string) (*Test, error) { 80 content, err := ioutil.ReadFile(filename) 81 if err != nil { 82 return nil, err 83 } 84 return NewTest(t, string(content)) 85} 86 87// QueryEngine returns the test's query engine. 88func (t *Test) QueryEngine() *Engine { 89 return t.queryEngine 90} 91 92// Queryable allows querying the test data. 93func (t *Test) Queryable() storage.Queryable { 94 return t.storage 95} 96 97// Context returns the test's context. 98func (t *Test) Context() context.Context { 99 return t.context 100} 101 102// Storage returns the test's storage. 103func (t *Test) Storage() storage.Storage { 104 return t.storage 105} 106 107// TSDB returns test's TSDB. 108func (t *Test) TSDB() *tsdb.DB { 109 return t.storage.DB 110} 111 112// ExemplarStorage returns the test's exemplar storage. 113func (t *Test) ExemplarStorage() storage.ExemplarStorage { 114 return t.storage 115} 116 117func (t *Test) ExemplarQueryable() storage.ExemplarQueryable { 118 return t.storage.ExemplarQueryable() 119} 120 121func raise(line int, format string, v ...interface{}) error { 122 return &parser.ParseErr{ 123 LineOffset: line, 124 Err: errors.Errorf(format, v...), 125 } 126} 127 128func parseLoad(lines []string, i int) (int, *loadCmd, error) { 129 if !patLoad.MatchString(lines[i]) { 130 return i, nil, raise(i, "invalid load command. (load <step:duration>)") 131 } 132 parts := patLoad.FindStringSubmatch(lines[i]) 133 134 gap, err := model.ParseDuration(parts[1]) 135 if err != nil { 136 return i, nil, raise(i, "invalid step definition %q: %s", parts[1], err) 137 } 138 cmd := newLoadCmd(time.Duration(gap)) 139 for i+1 < len(lines) { 140 i++ 141 defLine := lines[i] 142 if len(defLine) == 0 { 143 i-- 144 break 145 } 146 metric, vals, err := parser.ParseSeriesDesc(defLine) 147 if err != nil { 148 if perr, ok := err.(*parser.ParseErr); ok { 149 perr.LineOffset = i 150 } 151 return i, nil, err 152 } 153 cmd.set(metric, vals...) 154 } 155 return i, cmd, nil 156} 157 158func (t *Test) parseEval(lines []string, i int) (int, *evalCmd, error) { 159 if !patEvalInstant.MatchString(lines[i]) { 160 return i, nil, raise(i, "invalid evaluation command. (eval[_fail|_ordered] instant [at <offset:duration>] <query>") 161 } 162 parts := patEvalInstant.FindStringSubmatch(lines[i]) 163 var ( 164 mod = parts[1] 165 at = parts[2] 166 expr = parts[3] 167 ) 168 _, err := parser.ParseExpr(expr) 169 if err != nil { 170 if perr, ok := err.(*parser.ParseErr); ok { 171 perr.LineOffset = i 172 posOffset := parser.Pos(strings.Index(lines[i], expr)) 173 perr.PositionRange.Start += posOffset 174 perr.PositionRange.End += posOffset 175 perr.Query = lines[i] 176 } 177 return i, nil, err 178 } 179 180 offset, err := model.ParseDuration(at) 181 if err != nil { 182 return i, nil, raise(i, "invalid step definition %q: %s", parts[1], err) 183 } 184 ts := testStartTime.Add(time.Duration(offset)) 185 186 cmd := newEvalCmd(expr, ts, i+1) 187 switch mod { 188 case "ordered": 189 cmd.ordered = true 190 case "fail": 191 cmd.fail = true 192 } 193 194 for j := 1; i+1 < len(lines); j++ { 195 i++ 196 defLine := lines[i] 197 if len(defLine) == 0 { 198 i-- 199 break 200 } 201 if f, err := parseNumber(defLine); err == nil { 202 cmd.expect(0, nil, parser.SequenceValue{Value: f}) 203 break 204 } 205 metric, vals, err := parser.ParseSeriesDesc(defLine) 206 if err != nil { 207 if perr, ok := err.(*parser.ParseErr); ok { 208 perr.LineOffset = i 209 } 210 return i, nil, err 211 } 212 213 // Currently, we are not expecting any matrices. 214 if len(vals) > 1 { 215 return i, nil, raise(i, "expecting multiple values in instant evaluation not allowed") 216 } 217 cmd.expect(j, metric, vals...) 218 } 219 return i, cmd, nil 220} 221 222// getLines returns trimmed lines after removing the comments. 223func getLines(input string) []string { 224 lines := strings.Split(input, "\n") 225 for i, l := range lines { 226 l = strings.TrimSpace(l) 227 if strings.HasPrefix(l, "#") { 228 l = "" 229 } 230 lines[i] = l 231 } 232 return lines 233} 234 235// parse the given command sequence and appends it to the test. 236func (t *Test) parse(input string) error { 237 lines := getLines(input) 238 var err error 239 // Scan for steps line by line. 240 for i := 0; i < len(lines); i++ { 241 l := lines[i] 242 if len(l) == 0 { 243 continue 244 } 245 var cmd testCommand 246 247 switch c := strings.ToLower(patSpace.Split(l, 2)[0]); { 248 case c == "clear": 249 cmd = &clearCmd{} 250 case c == "load": 251 i, cmd, err = parseLoad(lines, i) 252 case strings.HasPrefix(c, "eval"): 253 i, cmd, err = t.parseEval(lines, i) 254 default: 255 return raise(i, "invalid command %q", l) 256 } 257 if err != nil { 258 return err 259 } 260 t.cmds = append(t.cmds, cmd) 261 } 262 return nil 263} 264 265// testCommand is an interface that ensures that only the package internal 266// types can be a valid command for a test. 267type testCommand interface { 268 testCmd() 269} 270 271func (*clearCmd) testCmd() {} 272func (*loadCmd) testCmd() {} 273func (*evalCmd) testCmd() {} 274 275// loadCmd is a command that loads sequences of sample values for specific 276// metrics into the storage. 277type loadCmd struct { 278 gap time.Duration 279 metrics map[uint64]labels.Labels 280 defs map[uint64][]Point 281 exemplars map[uint64][]exemplar.Exemplar 282} 283 284func newLoadCmd(gap time.Duration) *loadCmd { 285 return &loadCmd{ 286 gap: gap, 287 metrics: map[uint64]labels.Labels{}, 288 defs: map[uint64][]Point{}, 289 exemplars: map[uint64][]exemplar.Exemplar{}, 290 } 291} 292 293func (cmd loadCmd) String() string { 294 return "load" 295} 296 297// set a sequence of sample values for the given metric. 298func (cmd *loadCmd) set(m labels.Labels, vals ...parser.SequenceValue) { 299 h := m.Hash() 300 301 samples := make([]Point, 0, len(vals)) 302 ts := testStartTime 303 for _, v := range vals { 304 if !v.Omitted { 305 samples = append(samples, Point{ 306 T: ts.UnixNano() / int64(time.Millisecond/time.Nanosecond), 307 V: v.Value, 308 }) 309 } 310 ts = ts.Add(cmd.gap) 311 } 312 cmd.defs[h] = samples 313 cmd.metrics[h] = m 314} 315 316// append the defined time series to the storage. 317func (cmd *loadCmd) append(a storage.Appender) error { 318 for h, smpls := range cmd.defs { 319 m := cmd.metrics[h] 320 321 for _, s := range smpls { 322 if _, err := a.Append(0, m, s.T, s.V); err != nil { 323 return err 324 } 325 } 326 } 327 return nil 328} 329 330// evalCmd is a command that evaluates an expression for the given time (range) 331// and expects a specific result. 332type evalCmd struct { 333 expr string 334 start time.Time 335 line int 336 337 fail, ordered bool 338 339 metrics map[uint64]labels.Labels 340 expected map[uint64]entry 341} 342 343type entry struct { 344 pos int 345 vals []parser.SequenceValue 346} 347 348func (e entry) String() string { 349 return fmt.Sprintf("%d: %s", e.pos, e.vals) 350} 351 352func newEvalCmd(expr string, start time.Time, line int) *evalCmd { 353 return &evalCmd{ 354 expr: expr, 355 start: start, 356 line: line, 357 358 metrics: map[uint64]labels.Labels{}, 359 expected: map[uint64]entry{}, 360 } 361} 362 363func (ev *evalCmd) String() string { 364 return "eval" 365} 366 367// expect adds a new metric with a sequence of values to the set of expected 368// results for the query. 369func (ev *evalCmd) expect(pos int, m labels.Labels, vals ...parser.SequenceValue) { 370 if m == nil { 371 ev.expected[0] = entry{pos: pos, vals: vals} 372 return 373 } 374 h := m.Hash() 375 ev.metrics[h] = m 376 ev.expected[h] = entry{pos: pos, vals: vals} 377} 378 379// compareResult compares the result value with the defined expectation. 380func (ev *evalCmd) compareResult(result parser.Value) error { 381 switch val := result.(type) { 382 case Matrix: 383 return errors.New("received range result on instant evaluation") 384 385 case Vector: 386 seen := map[uint64]bool{} 387 for pos, v := range val { 388 fp := v.Metric.Hash() 389 if _, ok := ev.metrics[fp]; !ok { 390 return errors.Errorf("unexpected metric %s in result", v.Metric) 391 } 392 exp := ev.expected[fp] 393 if ev.ordered && exp.pos != pos+1 { 394 return errors.Errorf("expected metric %s with %v at position %d but was at %d", v.Metric, exp.vals, exp.pos, pos+1) 395 } 396 if !almostEqual(exp.vals[0].Value, v.V) { 397 return errors.Errorf("expected %v for %s but got %v", exp.vals[0].Value, v.Metric, v.V) 398 } 399 400 seen[fp] = true 401 } 402 for fp, expVals := range ev.expected { 403 if !seen[fp] { 404 fmt.Println("vector result", len(val), ev.expr) 405 for _, ss := range val { 406 fmt.Println(" ", ss.Metric, ss.Point) 407 } 408 return errors.Errorf("expected metric %s with %v not found", ev.metrics[fp], expVals) 409 } 410 } 411 412 case Scalar: 413 if !almostEqual(ev.expected[0].vals[0].Value, val.V) { 414 return errors.Errorf("expected Scalar %v but got %v", val.V, ev.expected[0].vals[0].Value) 415 } 416 417 default: 418 panic(errors.Errorf("promql.Test.compareResult: unexpected result type %T", result)) 419 } 420 return nil 421} 422 423// clearCmd is a command that wipes the test's storage state. 424type clearCmd struct{} 425 426func (cmd clearCmd) String() string { 427 return "clear" 428} 429 430// Run executes the command sequence of the test. Until the maximum error number 431// is reached, evaluation errors do not terminate execution. 432func (t *Test) Run() error { 433 for _, cmd := range t.cmds { 434 // TODO(fabxc): aggregate command errors, yield diffs for result 435 // comparison errors. 436 if err := t.exec(cmd); err != nil { 437 return err 438 } 439 } 440 return nil 441} 442 443type atModifierTestCase struct { 444 expr string 445 evalTime time.Time 446} 447 448func atModifierTestCases(exprStr string, evalTime time.Time) ([]atModifierTestCase, error) { 449 expr, err := parser.ParseExpr(exprStr) 450 if err != nil { 451 return nil, err 452 } 453 ts := timestamp.FromTime(evalTime) 454 455 containsNonStepInvariant := false 456 // Setting the @ timestamp for all selectors to be evalTime. 457 // If there is a subquery, then the selectors inside it don't get the @ timestamp. 458 // If any selector already has the @ timestamp set, then it is untouched. 459 parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { 460 _, _, subqTs := subqueryTimes(path) 461 if subqTs != nil { 462 // There is a subquery with timestamp in the path, 463 // hence don't change any timestamps further. 464 return nil 465 } 466 switch n := node.(type) { 467 case *parser.VectorSelector: 468 if n.Timestamp == nil { 469 n.Timestamp = makeInt64Pointer(ts) 470 } 471 472 case *parser.MatrixSelector: 473 if vs := n.VectorSelector.(*parser.VectorSelector); vs.Timestamp == nil { 474 vs.Timestamp = makeInt64Pointer(ts) 475 } 476 477 case *parser.SubqueryExpr: 478 if n.Timestamp == nil { 479 n.Timestamp = makeInt64Pointer(ts) 480 } 481 482 case *parser.Call: 483 _, ok := AtModifierUnsafeFunctions[n.Func.Name] 484 containsNonStepInvariant = containsNonStepInvariant || ok 485 } 486 return nil 487 }) 488 489 if containsNonStepInvariant { 490 // Since there is a step invariant function, we cannot automatically 491 // generate step invariant test cases for it sanely. 492 return nil, nil 493 } 494 495 newExpr := expr.String() // With all the @ evalTime set. 496 additionalEvalTimes := []int64{-10 * ts, 0, ts / 5, ts, 10 * ts} 497 if ts == 0 { 498 additionalEvalTimes = []int64{-1000, -ts, 1000} 499 } 500 testCases := make([]atModifierTestCase, 0, len(additionalEvalTimes)) 501 for _, et := range additionalEvalTimes { 502 testCases = append(testCases, atModifierTestCase{ 503 expr: newExpr, 504 evalTime: timestamp.Time(et), 505 }) 506 } 507 508 return testCases, nil 509} 510 511// exec processes a single step of the test. 512func (t *Test) exec(tc testCommand) error { 513 switch cmd := tc.(type) { 514 case *clearCmd: 515 t.clear() 516 517 case *loadCmd: 518 app := t.storage.Appender(t.context) 519 if err := cmd.append(app); err != nil { 520 app.Rollback() 521 return err 522 } 523 524 if err := app.Commit(); err != nil { 525 return err 526 } 527 528 case *evalCmd: 529 queries, err := atModifierTestCases(cmd.expr, cmd.start) 530 if err != nil { 531 return err 532 } 533 queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...) 534 for _, iq := range queries { 535 q, err := t.QueryEngine().NewInstantQuery(t.storage, iq.expr, iq.evalTime) 536 if err != nil { 537 return err 538 } 539 defer q.Close() 540 res := q.Exec(t.context) 541 if res.Err != nil { 542 if cmd.fail { 543 continue 544 } 545 return errors.Wrapf(res.Err, "error evaluating query %q (line %d)", iq.expr, cmd.line) 546 } 547 if res.Err == nil && cmd.fail { 548 return errors.Errorf("expected error evaluating query %q (line %d) but got none", iq.expr, cmd.line) 549 } 550 err = cmd.compareResult(res.Value) 551 if err != nil { 552 return errors.Wrapf(err, "error in %s %s", cmd, iq.expr) 553 } 554 555 // Check query returns same result in range mode, 556 // by checking against the middle step. 557 q, err = t.queryEngine.NewRangeQuery(t.storage, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute) 558 if err != nil { 559 return err 560 } 561 rangeRes := q.Exec(t.context) 562 if rangeRes.Err != nil { 563 return errors.Wrapf(rangeRes.Err, "error evaluating query %q (line %d) in range mode", iq.expr, cmd.line) 564 } 565 defer q.Close() 566 if cmd.ordered { 567 // Ordering isn't defined for range queries. 568 continue 569 } 570 mat := rangeRes.Value.(Matrix) 571 vec := make(Vector, 0, len(mat)) 572 for _, series := range mat { 573 for _, point := range series.Points { 574 if point.T == timeMilliseconds(iq.evalTime) { 575 vec = append(vec, Sample{Metric: series.Metric, Point: point}) 576 break 577 } 578 } 579 } 580 if _, ok := res.Value.(Scalar); ok { 581 err = cmd.compareResult(Scalar{V: vec[0].Point.V}) 582 } else { 583 err = cmd.compareResult(vec) 584 } 585 if err != nil { 586 return errors.Wrapf(err, "error in %s %s (line %d) rande mode", cmd, iq.expr, cmd.line) 587 } 588 589 } 590 591 default: 592 panic("promql.Test.exec: unknown test command type") 593 } 594 return nil 595} 596 597// clear the current test storage of all inserted samples. 598func (t *Test) clear() { 599 if t.storage != nil { 600 if err := t.storage.Close(); err != nil { 601 t.T.Fatalf("closing test storage: %s", err) 602 } 603 } 604 if t.cancelCtx != nil { 605 t.cancelCtx() 606 } 607 t.storage = teststorage.New(t) 608 609 opts := EngineOpts{ 610 Logger: nil, 611 Reg: nil, 612 MaxSamples: 10000, 613 Timeout: 100 * time.Second, 614 NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) }, 615 EnableAtModifier: true, 616 } 617 618 t.queryEngine = NewEngine(opts) 619 t.context, t.cancelCtx = context.WithCancel(context.Background()) 620} 621 622// Close closes resources associated with the Test. 623func (t *Test) Close() { 624 t.cancelCtx() 625 626 if err := t.storage.Close(); err != nil { 627 t.T.Fatalf("closing test storage: %s", err) 628 } 629} 630 631// samplesAlmostEqual returns true if the two sample lines only differ by a 632// small relative error in their sample value. 633func almostEqual(a, b float64) bool { 634 // NaN has no equality but for testing we still want to know whether both values 635 // are NaN. 636 if math.IsNaN(a) && math.IsNaN(b) { 637 return true 638 } 639 640 // Cf. http://floating-point-gui.de/errors/comparison/ 641 if a == b { 642 return true 643 } 644 645 diff := math.Abs(a - b) 646 647 if a == 0 || b == 0 || diff < minNormal { 648 return diff < epsilon*minNormal 649 } 650 return diff/(math.Abs(a)+math.Abs(b)) < epsilon 651} 652 653func parseNumber(s string) (float64, error) { 654 n, err := strconv.ParseInt(s, 0, 64) 655 f := float64(n) 656 if err != nil { 657 f, err = strconv.ParseFloat(s, 64) 658 } 659 if err != nil { 660 return 0, errors.Wrap(err, "error parsing number") 661 } 662 return f, nil 663} 664 665// LazyLoader lazily loads samples into storage. 666// This is specifically implemented for unit testing of rules. 667type LazyLoader struct { 668 testutil.T 669 670 loadCmd *loadCmd 671 672 storage storage.Storage 673 SubqueryInterval time.Duration 674 675 queryEngine *Engine 676 context context.Context 677 cancelCtx context.CancelFunc 678} 679 680// NewLazyLoader returns an initialized empty LazyLoader. 681func NewLazyLoader(t testutil.T, input string) (*LazyLoader, error) { 682 ll := &LazyLoader{ 683 T: t, 684 } 685 err := ll.parse(input) 686 ll.clear() 687 return ll, err 688} 689 690// parse the given load command. 691func (ll *LazyLoader) parse(input string) error { 692 lines := getLines(input) 693 // Accepts only 'load' command. 694 for i := 0; i < len(lines); i++ { 695 l := lines[i] 696 if len(l) == 0 { 697 continue 698 } 699 if strings.ToLower(patSpace.Split(l, 2)[0]) == "load" { 700 _, cmd, err := parseLoad(lines, i) 701 if err != nil { 702 return err 703 } 704 ll.loadCmd = cmd 705 return nil 706 } 707 708 return raise(i, "invalid command %q", l) 709 } 710 return errors.New("no \"load\" command found") 711} 712 713// clear the current test storage of all inserted samples. 714func (ll *LazyLoader) clear() { 715 if ll.storage != nil { 716 if err := ll.storage.Close(); err != nil { 717 ll.T.Fatalf("closing test storage: %s", err) 718 } 719 } 720 if ll.cancelCtx != nil { 721 ll.cancelCtx() 722 } 723 ll.storage = teststorage.New(ll) 724 725 opts := EngineOpts{ 726 Logger: nil, 727 Reg: nil, 728 MaxSamples: 10000, 729 Timeout: 100 * time.Second, 730 NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(ll.SubqueryInterval) }, 731 EnableAtModifier: true, 732 } 733 734 ll.queryEngine = NewEngine(opts) 735 ll.context, ll.cancelCtx = context.WithCancel(context.Background()) 736} 737 738// appendTill appends the defined time series to the storage till the given timestamp (in milliseconds). 739func (ll *LazyLoader) appendTill(ts int64) error { 740 app := ll.storage.Appender(ll.Context()) 741 for h, smpls := range ll.loadCmd.defs { 742 m := ll.loadCmd.metrics[h] 743 for i, s := range smpls { 744 if s.T > ts { 745 // Removing the already added samples. 746 ll.loadCmd.defs[h] = smpls[i:] 747 break 748 } 749 if _, err := app.Append(0, m, s.T, s.V); err != nil { 750 return err 751 } 752 if i == len(smpls)-1 { 753 ll.loadCmd.defs[h] = nil 754 } 755 } 756 } 757 return app.Commit() 758} 759 760// WithSamplesTill loads the samples till given timestamp and executes the given function. 761func (ll *LazyLoader) WithSamplesTill(ts time.Time, fn func(error)) { 762 tsMilli := ts.Sub(time.Unix(0, 0).UTC()) / time.Millisecond 763 fn(ll.appendTill(int64(tsMilli))) 764} 765 766// QueryEngine returns the LazyLoader's query engine. 767func (ll *LazyLoader) QueryEngine() *Engine { 768 return ll.queryEngine 769} 770 771// Queryable allows querying the LazyLoader's data. 772// Note: only the samples till the max timestamp used 773// in `WithSamplesTill` can be queried. 774func (ll *LazyLoader) Queryable() storage.Queryable { 775 return ll.storage 776} 777 778// Context returns the LazyLoader's context. 779func (ll *LazyLoader) Context() context.Context { 780 return ll.context 781} 782 783// Storage returns the LazyLoader's storage. 784func (ll *LazyLoader) Storage() storage.Storage { 785 return ll.storage 786} 787 788// Close closes resources associated with the LazyLoader. 789func (ll *LazyLoader) Close() { 790 ll.cancelCtx() 791 792 if err := ll.storage.Close(); err != nil { 793 ll.T.Fatalf("closing test storage: %s", err) 794 } 795} 796