1// Copyright 2016 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package scrape 15 16import ( 17 "bytes" 18 "compress/gzip" 19 "context" 20 "fmt" 21 "io" 22 "io/ioutil" 23 "math" 24 "net/http" 25 "net/http/httptest" 26 "net/url" 27 "strings" 28 "sync" 29 "testing" 30 "time" 31 32 "github.com/go-kit/log" 33 "github.com/pkg/errors" 34 dto "github.com/prometheus/client_model/go" 35 config_util "github.com/prometheus/common/config" 36 "github.com/prometheus/common/model" 37 "github.com/stretchr/testify/require" 38 39 "github.com/prometheus/prometheus/config" 40 "github.com/prometheus/prometheus/discovery/targetgroup" 41 "github.com/prometheus/prometheus/pkg/exemplar" 42 "github.com/prometheus/prometheus/pkg/labels" 43 "github.com/prometheus/prometheus/pkg/relabel" 44 "github.com/prometheus/prometheus/pkg/textparse" 45 "github.com/prometheus/prometheus/pkg/timestamp" 46 "github.com/prometheus/prometheus/pkg/value" 47 "github.com/prometheus/prometheus/storage" 48 "github.com/prometheus/prometheus/util/teststorage" 49 "github.com/prometheus/prometheus/util/testutil" 50) 51 52func TestMain(m *testing.M) { 53 testutil.TolerantVerifyLeak(m) 54} 55 56func TestNewScrapePool(t *testing.T) { 57 var ( 58 app = &nopAppendable{} 59 cfg = &config.ScrapeConfig{} 60 sp, _ = newScrapePool(cfg, app, 0, nil, false) 61 ) 62 63 if a, ok := sp.appendable.(*nopAppendable); !ok || a != app { 64 t.Fatalf("Wrong sample appender") 65 } 66 if sp.config != cfg { 67 t.Fatalf("Wrong scrape config") 68 } 69 if sp.newLoop == nil { 70 t.Fatalf("newLoop function not initialized") 71 } 72} 73 74func TestDroppedTargetsList(t *testing.T) { 75 var ( 76 app = &nopAppendable{} 77 cfg = &config.ScrapeConfig{ 78 JobName: "dropMe", 79 ScrapeInterval: model.Duration(1), 80 RelabelConfigs: []*relabel.Config{ 81 { 82 Action: relabel.Drop, 83 Regex: relabel.MustNewRegexp("dropMe"), 84 SourceLabels: model.LabelNames{"job"}, 85 }, 86 }, 87 } 88 tgs = []*targetgroup.Group{ 89 { 90 Targets: []model.LabelSet{ 91 {model.AddressLabel: "127.0.0.1:9090"}, 92 }, 93 }, 94 } 95 sp, _ = newScrapePool(cfg, app, 0, nil, false) 96 expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}" 97 expectedLength = 1 98 ) 99 sp.Sync(tgs) 100 sp.Sync(tgs) 101 if len(sp.droppedTargets) != expectedLength { 102 t.Fatalf("Length of dropped targets exceeded expected length, expected %v, got %v", expectedLength, len(sp.droppedTargets)) 103 } 104 if sp.droppedTargets[0].DiscoveredLabels().String() != expectedLabelSetString { 105 t.Fatalf("Got %v, expected %v", sp.droppedTargets[0].DiscoveredLabels().String(), expectedLabelSetString) 106 } 107} 108 109// TestDiscoveredLabelsUpdate checks that DiscoveredLabels are updated 110// even when new labels don't affect the target `hash`. 111func TestDiscoveredLabelsUpdate(t *testing.T) { 112 113 sp := &scrapePool{} 114 // These are used when syncing so need this to avoid a panic. 115 sp.config = &config.ScrapeConfig{ 116 ScrapeInterval: model.Duration(1), 117 ScrapeTimeout: model.Duration(1), 118 } 119 sp.activeTargets = make(map[uint64]*Target) 120 t1 := &Target{ 121 discoveredLabels: labels.Labels{ 122 labels.Label{ 123 Name: "label", 124 Value: "name", 125 }, 126 }, 127 } 128 sp.activeTargets[t1.hash()] = t1 129 130 t2 := &Target{ 131 discoveredLabels: labels.Labels{ 132 labels.Label{ 133 Name: "labelNew", 134 Value: "nameNew", 135 }, 136 }, 137 } 138 sp.sync([]*Target{t2}) 139 140 require.Equal(t, t2.DiscoveredLabels(), sp.activeTargets[t1.hash()].DiscoveredLabels()) 141} 142 143type testLoop struct { 144 startFunc func(interval, timeout time.Duration, errc chan<- error) 145 stopFunc func() 146 forcedErr error 147 forcedErrMtx sync.Mutex 148 runOnce bool 149 interval time.Duration 150 timeout time.Duration 151} 152 153func (l *testLoop) run(errc chan<- error) { 154 if l.runOnce { 155 panic("loop must be started only once") 156 } 157 l.runOnce = true 158 l.startFunc(l.interval, l.timeout, errc) 159} 160 161func (l *testLoop) disableEndOfRunStalenessMarkers() { 162} 163 164func (l *testLoop) setForcedError(err error) { 165 l.forcedErrMtx.Lock() 166 defer l.forcedErrMtx.Unlock() 167 l.forcedErr = err 168} 169 170func (l *testLoop) getForcedError() error { 171 l.forcedErrMtx.Lock() 172 defer l.forcedErrMtx.Unlock() 173 return l.forcedErr 174} 175 176func (l *testLoop) stop() { 177 l.stopFunc() 178} 179 180func (l *testLoop) getCache() *scrapeCache { 181 return nil 182} 183 184func TestScrapePoolStop(t *testing.T) { 185 sp := &scrapePool{ 186 activeTargets: map[uint64]*Target{}, 187 loops: map[uint64]loop{}, 188 cancel: func() {}, 189 client: http.DefaultClient, 190 } 191 var mtx sync.Mutex 192 stopped := map[uint64]bool{} 193 numTargets := 20 194 195 // Stopping the scrape pool must call stop() on all scrape loops, 196 // clean them and the respective targets up. It must wait until each loop's 197 // stop function returned before returning itself. 198 199 for i := 0; i < numTargets; i++ { 200 t := &Target{ 201 labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)), 202 } 203 l := &testLoop{} 204 l.stopFunc = func() { 205 time.Sleep(time.Duration(i*20) * time.Millisecond) 206 207 mtx.Lock() 208 stopped[t.hash()] = true 209 mtx.Unlock() 210 } 211 212 sp.activeTargets[t.hash()] = t 213 sp.loops[t.hash()] = l 214 } 215 216 done := make(chan struct{}) 217 stopTime := time.Now() 218 219 go func() { 220 sp.stop() 221 close(done) 222 }() 223 224 select { 225 case <-time.After(5 * time.Second): 226 t.Fatalf("scrapeLoop.stop() did not return as expected") 227 case <-done: 228 // This should have taken at least as long as the last target slept. 229 if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond { 230 t.Fatalf("scrapeLoop.stop() exited before all targets stopped") 231 } 232 } 233 234 mtx.Lock() 235 require.Equal(t, numTargets, len(stopped), "Unexpected number of stopped loops") 236 mtx.Unlock() 237 238 require.Equal(t, 0, len(sp.activeTargets), "Targets were not cleared on stopping: %d left", len(sp.activeTargets)) 239 require.Equal(t, 0, len(sp.loops), "Loops were not cleared on stopping: %d left", len(sp.loops)) 240} 241 242func TestScrapePoolReload(t *testing.T) { 243 var mtx sync.Mutex 244 numTargets := 20 245 246 stopped := map[uint64]bool{} 247 248 reloadCfg := &config.ScrapeConfig{ 249 ScrapeInterval: model.Duration(3 * time.Second), 250 ScrapeTimeout: model.Duration(2 * time.Second), 251 } 252 // On starting to run, new loops created on reload check whether their preceding 253 // equivalents have been stopped. 254 newLoop := func(opts scrapeLoopOptions) loop { 255 l := &testLoop{interval: time.Duration(reloadCfg.ScrapeInterval), timeout: time.Duration(reloadCfg.ScrapeTimeout)} 256 l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { 257 require.Equal(t, 3*time.Second, interval, "Unexpected scrape interval") 258 require.Equal(t, 2*time.Second, timeout, "Unexpected scrape timeout") 259 260 mtx.Lock() 261 targetScraper := opts.scraper.(*targetScraper) 262 require.True(t, stopped[targetScraper.hash()], "Scrape loop for %v not stopped yet", targetScraper) 263 mtx.Unlock() 264 } 265 return l 266 } 267 sp := &scrapePool{ 268 appendable: &nopAppendable{}, 269 activeTargets: map[uint64]*Target{}, 270 loops: map[uint64]loop{}, 271 newLoop: newLoop, 272 logger: nil, 273 client: http.DefaultClient, 274 } 275 276 // Reloading a scrape pool with a new scrape configuration must stop all scrape 277 // loops and start new ones. A new loop must not be started before the preceding 278 // one terminated. 279 280 for i := 0; i < numTargets; i++ { 281 labels := labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)) 282 t := &Target{ 283 labels: labels, 284 discoveredLabels: labels, 285 } 286 l := &testLoop{} 287 l.stopFunc = func() { 288 time.Sleep(time.Duration(i*20) * time.Millisecond) 289 290 mtx.Lock() 291 stopped[t.hash()] = true 292 mtx.Unlock() 293 } 294 295 sp.activeTargets[t.hash()] = t 296 sp.loops[t.hash()] = l 297 } 298 done := make(chan struct{}) 299 300 beforeTargets := map[uint64]*Target{} 301 for h, t := range sp.activeTargets { 302 beforeTargets[h] = t 303 } 304 305 reloadTime := time.Now() 306 307 go func() { 308 sp.reload(reloadCfg) 309 close(done) 310 }() 311 312 select { 313 case <-time.After(5 * time.Second): 314 t.Fatalf("scrapeLoop.reload() did not return as expected") 315 case <-done: 316 // This should have taken at least as long as the last target slept. 317 if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond { 318 t.Fatalf("scrapeLoop.stop() exited before all targets stopped") 319 } 320 } 321 322 mtx.Lock() 323 require.Equal(t, numTargets, len(stopped), "Unexpected number of stopped loops") 324 mtx.Unlock() 325 326 require.Equal(t, sp.activeTargets, beforeTargets, "Reloading affected target states unexpectedly") 327 require.Equal(t, numTargets, len(sp.loops), "Unexpected number of stopped loops after reload") 328} 329 330func TestScrapePoolTargetLimit(t *testing.T) { 331 var wg sync.WaitGroup 332 // On starting to run, new loops created on reload check whether their preceding 333 // equivalents have been stopped. 334 newLoop := func(opts scrapeLoopOptions) loop { 335 wg.Add(1) 336 l := &testLoop{ 337 startFunc: func(interval, timeout time.Duration, errc chan<- error) { 338 wg.Done() 339 }, 340 stopFunc: func() {}, 341 } 342 return l 343 } 344 sp := &scrapePool{ 345 appendable: &nopAppendable{}, 346 activeTargets: map[uint64]*Target{}, 347 loops: map[uint64]loop{}, 348 newLoop: newLoop, 349 logger: log.NewNopLogger(), 350 client: http.DefaultClient, 351 } 352 353 var tgs = []*targetgroup.Group{} 354 for i := 0; i < 50; i++ { 355 tgs = append(tgs, 356 &targetgroup.Group{ 357 Targets: []model.LabelSet{ 358 {model.AddressLabel: model.LabelValue(fmt.Sprintf("127.0.0.1:%d", 9090+i))}, 359 }, 360 }, 361 ) 362 } 363 364 var limit uint 365 reloadWithLimit := func(l uint) { 366 limit = l 367 require.NoError(t, sp.reload(&config.ScrapeConfig{ 368 ScrapeInterval: model.Duration(3 * time.Second), 369 ScrapeTimeout: model.Duration(2 * time.Second), 370 TargetLimit: l, 371 })) 372 } 373 374 var targets int 375 loadTargets := func(n int) { 376 targets = n 377 sp.Sync(tgs[:n]) 378 } 379 380 validateIsRunning := func() { 381 wg.Wait() 382 for _, l := range sp.loops { 383 require.True(t, l.(*testLoop).runOnce, "loop should be running") 384 } 385 } 386 387 validateErrorMessage := func(shouldErr bool) { 388 for _, l := range sp.loops { 389 lerr := l.(*testLoop).getForcedError() 390 if shouldErr { 391 require.NotNil(t, lerr, "error was expected for %d targets with a limit of %d", targets, limit) 392 require.Equal(t, fmt.Sprintf("target_limit exceeded (number of targets: %d, limit: %d)", targets, limit), lerr.Error()) 393 } else { 394 require.Equal(t, nil, lerr) 395 } 396 } 397 } 398 399 reloadWithLimit(0) 400 loadTargets(50) 401 validateIsRunning() 402 403 // Simulate an initial config with a limit. 404 sp.config.TargetLimit = 30 405 limit = 30 406 loadTargets(50) 407 validateIsRunning() 408 validateErrorMessage(true) 409 410 reloadWithLimit(50) 411 validateIsRunning() 412 validateErrorMessage(false) 413 414 reloadWithLimit(40) 415 validateIsRunning() 416 validateErrorMessage(true) 417 418 loadTargets(30) 419 validateIsRunning() 420 validateErrorMessage(false) 421 422 loadTargets(40) 423 validateIsRunning() 424 validateErrorMessage(false) 425 426 loadTargets(41) 427 validateIsRunning() 428 validateErrorMessage(true) 429 430 reloadWithLimit(0) 431 validateIsRunning() 432 validateErrorMessage(false) 433 434 reloadWithLimit(51) 435 validateIsRunning() 436 validateErrorMessage(false) 437 438 tgs = append(tgs, 439 &targetgroup.Group{ 440 Targets: []model.LabelSet{ 441 {model.AddressLabel: model.LabelValue("127.0.0.1:1090")}, 442 }, 443 }, 444 &targetgroup.Group{ 445 Targets: []model.LabelSet{ 446 {model.AddressLabel: model.LabelValue("127.0.0.1:1090")}, 447 }, 448 }, 449 ) 450 451 sp.Sync(tgs) 452 validateIsRunning() 453 validateErrorMessage(false) 454} 455 456func TestScrapePoolAppender(t *testing.T) { 457 cfg := &config.ScrapeConfig{} 458 app := &nopAppendable{} 459 sp, _ := newScrapePool(cfg, app, 0, nil, false) 460 461 loop := sp.newLoop(scrapeLoopOptions{ 462 target: &Target{}, 463 }) 464 appl, ok := loop.(*scrapeLoop) 465 require.True(t, ok, "Expected scrapeLoop but got %T", loop) 466 467 wrapped := appl.appender(context.Background()) 468 469 tl, ok := wrapped.(*timeLimitAppender) 470 require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped) 471 472 _, ok = tl.Appender.(nopAppender) 473 require.True(t, ok, "Expected base appender but got %T", tl.Appender) 474 475 loop = sp.newLoop(scrapeLoopOptions{ 476 target: &Target{}, 477 sampleLimit: 100, 478 }) 479 appl, ok = loop.(*scrapeLoop) 480 require.True(t, ok, "Expected scrapeLoop but got %T", loop) 481 482 wrapped = appl.appender(context.Background()) 483 484 sl, ok := wrapped.(*limitAppender) 485 require.True(t, ok, "Expected limitAppender but got %T", wrapped) 486 487 tl, ok = sl.Appender.(*timeLimitAppender) 488 require.True(t, ok, "Expected limitAppender but got %T", sl.Appender) 489 490 _, ok = tl.Appender.(nopAppender) 491 require.True(t, ok, "Expected base appender but got %T", tl.Appender) 492} 493 494func TestScrapePoolRaces(t *testing.T) { 495 interval, _ := model.ParseDuration("1s") 496 timeout, _ := model.ParseDuration("500ms") 497 newConfig := func() *config.ScrapeConfig { 498 return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout} 499 } 500 sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false) 501 tgts := []*targetgroup.Group{ 502 { 503 Targets: []model.LabelSet{ 504 {model.AddressLabel: "127.0.0.1:9090"}, 505 {model.AddressLabel: "127.0.0.2:9090"}, 506 {model.AddressLabel: "127.0.0.3:9090"}, 507 {model.AddressLabel: "127.0.0.4:9090"}, 508 {model.AddressLabel: "127.0.0.5:9090"}, 509 {model.AddressLabel: "127.0.0.6:9090"}, 510 {model.AddressLabel: "127.0.0.7:9090"}, 511 {model.AddressLabel: "127.0.0.8:9090"}, 512 }, 513 }, 514 } 515 516 sp.Sync(tgts) 517 active := sp.ActiveTargets() 518 dropped := sp.DroppedTargets() 519 expectedActive, expectedDropped := len(tgts[0].Targets), 0 520 521 require.Equal(t, expectedActive, len(active), "Invalid number of active targets") 522 require.Equal(t, expectedDropped, len(dropped), "Invalid number of dropped targets") 523 524 for i := 0; i < 20; i++ { 525 time.Sleep(time.Duration(10 * time.Millisecond)) 526 sp.reload(newConfig()) 527 } 528 sp.stop() 529} 530 531func TestScrapePoolScrapeLoopsStarted(t *testing.T) { 532 var wg sync.WaitGroup 533 newLoop := func(opts scrapeLoopOptions) loop { 534 wg.Add(1) 535 l := &testLoop{ 536 startFunc: func(interval, timeout time.Duration, errc chan<- error) { 537 wg.Done() 538 }, 539 stopFunc: func() {}, 540 } 541 return l 542 } 543 sp := &scrapePool{ 544 appendable: &nopAppendable{}, 545 activeTargets: map[uint64]*Target{}, 546 loops: map[uint64]loop{}, 547 newLoop: newLoop, 548 logger: nil, 549 client: http.DefaultClient, 550 } 551 552 tgs := []*targetgroup.Group{ 553 { 554 Targets: []model.LabelSet{ 555 {model.AddressLabel: model.LabelValue("127.0.0.1:9090")}, 556 }, 557 }, 558 { 559 Targets: []model.LabelSet{ 560 {model.AddressLabel: model.LabelValue("127.0.0.1:9090")}, 561 }, 562 }, 563 } 564 565 require.NoError(t, sp.reload(&config.ScrapeConfig{ 566 ScrapeInterval: model.Duration(3 * time.Second), 567 ScrapeTimeout: model.Duration(2 * time.Second), 568 })) 569 sp.Sync(tgs) 570 571 require.Equal(t, 1, len(sp.loops)) 572 573 wg.Wait() 574 for _, l := range sp.loops { 575 require.True(t, l.(*testLoop).runOnce, "loop should be running") 576 } 577} 578 579func TestScrapeLoopStopBeforeRun(t *testing.T) { 580 scraper := &testScraper{} 581 582 sl := newScrapeLoop(context.Background(), 583 scraper, 584 nil, nil, 585 nopMutator, 586 nopMutator, 587 nil, nil, 0, 588 true, 589 0, 590 nil, 591 1, 592 0, 593 false, 594 ) 595 596 // The scrape pool synchronizes on stopping scrape loops. However, new scrape 597 // loops are started asynchronously. Thus it's possible, that a loop is stopped 598 // again before having started properly. 599 // Stopping not-yet-started loops must block until the run method was called and exited. 600 // The run method must exit immediately. 601 602 stopDone := make(chan struct{}) 603 go func() { 604 sl.stop() 605 close(stopDone) 606 }() 607 608 select { 609 case <-stopDone: 610 t.Fatalf("Stopping terminated before run exited successfully") 611 case <-time.After(500 * time.Millisecond): 612 } 613 614 // Running the scrape loop must exit before calling the scraper even once. 615 scraper.scrapeFunc = func(context.Context, io.Writer) error { 616 t.Fatalf("scraper was called for terminated scrape loop") 617 return nil 618 } 619 620 runDone := make(chan struct{}) 621 go func() { 622 sl.run(nil) 623 close(runDone) 624 }() 625 626 select { 627 case <-runDone: 628 case <-time.After(1 * time.Second): 629 t.Fatalf("Running terminated scrape loop did not exit") 630 } 631 632 select { 633 case <-stopDone: 634 case <-time.After(1 * time.Second): 635 t.Fatalf("Stopping did not terminate after running exited") 636 } 637} 638 639func nopMutator(l labels.Labels) labels.Labels { return l } 640 641func TestScrapeLoopStop(t *testing.T) { 642 var ( 643 signal = make(chan struct{}, 1) 644 appender = &collectResultAppender{} 645 scraper = &testScraper{} 646 app = func(ctx context.Context) storage.Appender { return appender } 647 ) 648 649 sl := newScrapeLoop(context.Background(), 650 scraper, 651 nil, nil, 652 nopMutator, 653 nopMutator, 654 app, 655 nil, 656 0, 657 true, 658 0, 659 nil, 660 10*time.Millisecond, 661 time.Hour, 662 false, 663 ) 664 665 // Terminate loop after 2 scrapes. 666 numScrapes := 0 667 668 scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { 669 numScrapes++ 670 if numScrapes == 2 { 671 go sl.stop() 672 <-sl.ctx.Done() 673 } 674 w.Write([]byte("metric_a 42\n")) 675 return ctx.Err() 676 } 677 678 go func() { 679 sl.run(nil) 680 signal <- struct{}{} 681 }() 682 683 select { 684 case <-signal: 685 case <-time.After(5 * time.Second): 686 t.Fatalf("Scrape wasn't stopped.") 687 } 688 689 // We expected 1 actual sample for each scrape plus 5 for report samples. 690 // At least 2 scrapes were made, plus the final stale markers. 691 if len(appender.result) < 6*3 || len(appender.result)%6 != 0 { 692 t.Fatalf("Expected at least 3 scrapes with 6 samples each, got %d samples", len(appender.result)) 693 } 694 // All samples in a scrape must have the same timestamp. 695 var ts int64 696 for i, s := range appender.result { 697 if i%6 == 0 { 698 ts = s.t 699 } else if s.t != ts { 700 t.Fatalf("Unexpected multiple timestamps within single scrape") 701 } 702 } 703 // All samples from the last scrape must be stale markers. 704 for _, s := range appender.result[len(appender.result)-5:] { 705 if !value.IsStaleNaN(s.v) { 706 t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.v)) 707 } 708 } 709} 710 711func TestScrapeLoopRun(t *testing.T) { 712 var ( 713 signal = make(chan struct{}, 1) 714 errc = make(chan error) 715 716 scraper = &testScraper{} 717 app = func(ctx context.Context) storage.Appender { return &nopAppender{} } 718 ) 719 720 ctx, cancel := context.WithCancel(context.Background()) 721 sl := newScrapeLoop(ctx, 722 scraper, 723 nil, nil, 724 nopMutator, 725 nopMutator, 726 app, 727 nil, 728 0, 729 true, 730 0, 731 nil, 732 time.Second, 733 time.Hour, 734 false, 735 ) 736 737 // The loop must terminate during the initial offset if the context 738 // is canceled. 739 scraper.offsetDur = time.Hour 740 741 go func() { 742 sl.run(errc) 743 signal <- struct{}{} 744 }() 745 746 // Wait to make sure we are actually waiting on the offset. 747 time.Sleep(1 * time.Second) 748 749 cancel() 750 select { 751 case <-signal: 752 case <-time.After(5 * time.Second): 753 t.Fatalf("Cancellation during initial offset failed") 754 case err := <-errc: 755 t.Fatalf("Unexpected error: %s", err) 756 } 757 758 // The provided timeout must cause cancellation of the context passed down to the 759 // scraper. The scraper has to respect the context. 760 scraper.offsetDur = 0 761 762 block := make(chan struct{}) 763 scraper.scrapeFunc = func(ctx context.Context, _ io.Writer) error { 764 select { 765 case <-block: 766 case <-ctx.Done(): 767 return ctx.Err() 768 } 769 return nil 770 } 771 772 ctx, cancel = context.WithCancel(context.Background()) 773 sl = newScrapeLoop(ctx, 774 scraper, 775 nil, nil, 776 nopMutator, 777 nopMutator, 778 app, 779 nil, 780 0, 781 true, 782 0, 783 nil, 784 time.Second, 785 100*time.Millisecond, 786 false, 787 ) 788 789 go func() { 790 sl.run(errc) 791 signal <- struct{}{} 792 }() 793 794 select { 795 case err := <-errc: 796 if err != context.DeadlineExceeded { 797 t.Fatalf("Expected timeout error but got: %s", err) 798 } 799 case <-time.After(3 * time.Second): 800 t.Fatalf("Expected timeout error but got none") 801 } 802 803 // We already caught the timeout error and are certainly in the loop. 804 // Let the scrapes returns immediately to cause no further timeout errors 805 // and check whether canceling the parent context terminates the loop. 806 close(block) 807 cancel() 808 809 select { 810 case <-signal: 811 // Loop terminated as expected. 812 case err := <-errc: 813 t.Fatalf("Unexpected error: %s", err) 814 case <-time.After(3 * time.Second): 815 t.Fatalf("Loop did not terminate on context cancellation") 816 } 817} 818 819func TestScrapeLoopForcedErr(t *testing.T) { 820 var ( 821 signal = make(chan struct{}, 1) 822 errc = make(chan error) 823 824 scraper = &testScraper{} 825 app = func(ctx context.Context) storage.Appender { return &nopAppender{} } 826 ) 827 828 ctx, cancel := context.WithCancel(context.Background()) 829 sl := newScrapeLoop(ctx, 830 scraper, 831 nil, nil, 832 nopMutator, 833 nopMutator, 834 app, 835 nil, 836 0, 837 true, 838 0, 839 nil, 840 time.Second, 841 time.Hour, 842 false, 843 ) 844 845 forcedErr := fmt.Errorf("forced err") 846 sl.setForcedError(forcedErr) 847 848 scraper.scrapeFunc = func(context.Context, io.Writer) error { 849 t.Fatalf("should not be scraped") 850 return nil 851 } 852 853 go func() { 854 sl.run(errc) 855 signal <- struct{}{} 856 }() 857 858 select { 859 case err := <-errc: 860 if err != forcedErr { 861 t.Fatalf("Expected forced error but got: %s", err) 862 } 863 case <-time.After(3 * time.Second): 864 t.Fatalf("Expected forced error but got none") 865 } 866 cancel() 867 868 select { 869 case <-signal: 870 case <-time.After(5 * time.Second): 871 t.Fatalf("Scrape not stopped") 872 } 873} 874 875func TestScrapeLoopMetadata(t *testing.T) { 876 var ( 877 signal = make(chan struct{}) 878 scraper = &testScraper{} 879 cache = newScrapeCache() 880 ) 881 defer close(signal) 882 883 ctx, cancel := context.WithCancel(context.Background()) 884 sl := newScrapeLoop(ctx, 885 scraper, 886 nil, nil, 887 nopMutator, 888 nopMutator, 889 func(ctx context.Context) storage.Appender { return nopAppender{} }, 890 cache, 891 0, 892 true, 893 0, 894 nil, 895 0, 896 0, 897 false, 898 ) 899 defer cancel() 900 901 slApp := sl.appender(ctx) 902 total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter 903# HELP test_metric some help text 904# UNIT test_metric metric 905test_metric 1 906# TYPE test_metric_no_help gauge 907# HELP test_metric_no_type other help text 908# EOF`), "application/openmetrics-text", time.Now()) 909 require.NoError(t, err) 910 require.NoError(t, slApp.Commit()) 911 require.Equal(t, 1, total) 912 913 md, ok := cache.GetMetadata("test_metric") 914 require.True(t, ok, "expected metadata to be present") 915 require.Equal(t, textparse.MetricTypeCounter, md.Type, "unexpected metric type") 916 require.Equal(t, "some help text", md.Help) 917 require.Equal(t, "metric", md.Unit) 918 919 md, ok = cache.GetMetadata("test_metric_no_help") 920 require.True(t, ok, "expected metadata to be present") 921 require.Equal(t, textparse.MetricTypeGauge, md.Type, "unexpected metric type") 922 require.Equal(t, "", md.Help) 923 require.Equal(t, "", md.Unit) 924 925 md, ok = cache.GetMetadata("test_metric_no_type") 926 require.True(t, ok, "expected metadata to be present") 927 require.Equal(t, textparse.MetricTypeUnknown, md.Type, "unexpected metric type") 928 require.Equal(t, "other help text", md.Help) 929 require.Equal(t, "", md.Unit) 930} 931 932func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) { 933 // Need a full storage for correct Add/AddFast semantics. 934 s := teststorage.New(t) 935 t.Cleanup(func() { s.Close() }) 936 937 ctx, cancel := context.WithCancel(context.Background()) 938 sl := newScrapeLoop(ctx, 939 &testScraper{}, 940 nil, nil, 941 nopMutator, 942 nopMutator, 943 s.Appender, 944 nil, 945 0, 946 true, 947 0, 948 nil, 949 0, 950 0, 951 false, 952 ) 953 t.Cleanup(func() { cancel() }) 954 955 return ctx, sl 956} 957 958func TestScrapeLoopSeriesAdded(t *testing.T) { 959 ctx, sl := simpleTestScrapeLoop(t) 960 961 slApp := sl.appender(ctx) 962 total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) 963 require.NoError(t, err) 964 require.NoError(t, slApp.Commit()) 965 require.Equal(t, 1, total) 966 require.Equal(t, 1, added) 967 require.Equal(t, 1, seriesAdded) 968 969 slApp = sl.appender(ctx) 970 total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) 971 require.NoError(t, slApp.Commit()) 972 require.NoError(t, err) 973 require.Equal(t, 1, total) 974 require.Equal(t, 1, added) 975 require.Equal(t, 0, seriesAdded) 976} 977 978func makeTestMetrics(n int) []byte { 979 // Construct a metrics string to parse 980 sb := bytes.Buffer{} 981 for i := 0; i < n; i++ { 982 fmt.Fprintf(&sb, "# TYPE metric_a gauge\n") 983 fmt.Fprintf(&sb, "# HELP metric_a help text\n") 984 fmt.Fprintf(&sb, "metric_a{foo=\"%d\",bar=\"%d\"} 1\n", i, i*100) 985 } 986 return sb.Bytes() 987} 988 989func BenchmarkScrapeLoopAppend(b *testing.B) { 990 ctx, sl := simpleTestScrapeLoop(b) 991 992 slApp := sl.appender(ctx) 993 metrics := makeTestMetrics(100) 994 ts := time.Time{} 995 996 b.ResetTimer() 997 998 for i := 0; i < b.N; i++ { 999 ts = ts.Add(time.Second) 1000 _, _, _, _ = sl.append(slApp, metrics, "", ts) 1001 } 1002} 1003func BenchmarkScrapeLoopAppendOM(b *testing.B) { 1004 ctx, sl := simpleTestScrapeLoop(b) 1005 1006 slApp := sl.appender(ctx) 1007 metrics := makeTestMetrics(100) 1008 ts := time.Time{} 1009 1010 b.ResetTimer() 1011 1012 for i := 0; i < b.N; i++ { 1013 ts = ts.Add(time.Second) 1014 _, _, _, _ = sl.append(slApp, metrics, "application/openmetrics-text", ts) 1015 } 1016} 1017 1018func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { 1019 appender := &collectResultAppender{} 1020 var ( 1021 signal = make(chan struct{}, 1) 1022 scraper = &testScraper{} 1023 app = func(ctx context.Context) storage.Appender { return appender } 1024 ) 1025 1026 ctx, cancel := context.WithCancel(context.Background()) 1027 sl := newScrapeLoop(ctx, 1028 scraper, 1029 nil, nil, 1030 nopMutator, 1031 nopMutator, 1032 app, 1033 nil, 1034 0, 1035 true, 1036 0, 1037 nil, 1038 10*time.Millisecond, 1039 time.Hour, 1040 false, 1041 ) 1042 // Succeed once, several failures, then stop. 1043 numScrapes := 0 1044 1045 scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { 1046 numScrapes++ 1047 1048 if numScrapes == 1 { 1049 w.Write([]byte("metric_a 42\n")) 1050 return nil 1051 } else if numScrapes == 5 { 1052 cancel() 1053 } 1054 return errors.New("scrape failed") 1055 } 1056 1057 go func() { 1058 sl.run(nil) 1059 signal <- struct{}{} 1060 }() 1061 1062 select { 1063 case <-signal: 1064 case <-time.After(5 * time.Second): 1065 t.Fatalf("Scrape wasn't stopped.") 1066 } 1067 1068 // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for 1069 // each scrape successful or not. 1070 require.Equal(t, 27, len(appender.result), "Appended samples not as expected") 1071 require.Equal(t, 42.0, appender.result[0].v, "Appended first sample not as expected") 1072 require.True(t, value.IsStaleNaN(appender.result[6].v), 1073 "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[6].v)) 1074} 1075 1076func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { 1077 appender := &collectResultAppender{} 1078 var ( 1079 signal = make(chan struct{}, 1) 1080 scraper = &testScraper{} 1081 app = func(ctx context.Context) storage.Appender { return appender } 1082 numScrapes = 0 1083 ) 1084 1085 ctx, cancel := context.WithCancel(context.Background()) 1086 sl := newScrapeLoop(ctx, 1087 scraper, 1088 nil, nil, 1089 nopMutator, 1090 nopMutator, 1091 app, 1092 nil, 1093 0, 1094 true, 1095 0, 1096 nil, 1097 10*time.Millisecond, 1098 time.Hour, 1099 false, 1100 ) 1101 1102 // Succeed once, several failures, then stop. 1103 scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { 1104 numScrapes++ 1105 1106 if numScrapes == 1 { 1107 w.Write([]byte("metric_a 42\n")) 1108 return nil 1109 } else if numScrapes == 2 { 1110 w.Write([]byte("7&-\n")) 1111 return nil 1112 } else if numScrapes == 3 { 1113 cancel() 1114 } 1115 return errors.New("scrape failed") 1116 } 1117 1118 go func() { 1119 sl.run(nil) 1120 signal <- struct{}{} 1121 }() 1122 1123 select { 1124 case <-signal: 1125 case <-time.After(5 * time.Second): 1126 t.Fatalf("Scrape wasn't stopped.") 1127 } 1128 1129 // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for 1130 // each scrape successful or not. 1131 require.Equal(t, 17, len(appender.result), "Appended samples not as expected") 1132 require.Equal(t, 42.0, appender.result[0].v, "Appended first sample not as expected") 1133 require.True(t, value.IsStaleNaN(appender.result[6].v), 1134 "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[6].v)) 1135} 1136 1137func TestScrapeLoopCache(t *testing.T) { 1138 s := teststorage.New(t) 1139 defer s.Close() 1140 1141 appender := &collectResultAppender{} 1142 var ( 1143 signal = make(chan struct{}, 1) 1144 scraper = &testScraper{} 1145 app = func(ctx context.Context) storage.Appender { appender.next = s.Appender(ctx); return appender } 1146 ) 1147 1148 ctx, cancel := context.WithCancel(context.Background()) 1149 sl := newScrapeLoop(ctx, 1150 scraper, 1151 nil, nil, 1152 nopMutator, 1153 nopMutator, 1154 app, 1155 nil, 1156 0, 1157 true, 1158 0, 1159 nil, 1160 10*time.Millisecond, 1161 time.Hour, 1162 false, 1163 ) 1164 1165 numScrapes := 0 1166 1167 scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { 1168 if numScrapes == 1 || numScrapes == 2 { 1169 if _, ok := sl.cache.series["metric_a"]; !ok { 1170 t.Errorf("metric_a missing from cache after scrape %d", numScrapes) 1171 } 1172 if _, ok := sl.cache.series["metric_b"]; !ok { 1173 t.Errorf("metric_b missing from cache after scrape %d", numScrapes) 1174 } 1175 } else if numScrapes == 3 { 1176 if _, ok := sl.cache.series["metric_a"]; !ok { 1177 t.Errorf("metric_a missing from cache after scrape %d", numScrapes) 1178 } 1179 if _, ok := sl.cache.series["metric_b"]; ok { 1180 t.Errorf("metric_b present in cache after scrape %d", numScrapes) 1181 } 1182 } 1183 1184 numScrapes++ 1185 1186 if numScrapes == 1 { 1187 w.Write([]byte("metric_a 42\nmetric_b 43\n")) 1188 return nil 1189 } else if numScrapes == 3 { 1190 w.Write([]byte("metric_a 44\n")) 1191 return nil 1192 } else if numScrapes == 4 { 1193 cancel() 1194 } 1195 return fmt.Errorf("scrape failed") 1196 } 1197 1198 go func() { 1199 sl.run(nil) 1200 signal <- struct{}{} 1201 }() 1202 1203 select { 1204 case <-signal: 1205 case <-time.After(5 * time.Second): 1206 t.Fatalf("Scrape wasn't stopped.") 1207 } 1208 1209 // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for 1210 // each scrape successful or not. 1211 require.Equal(t, 26, len(appender.result), "Appended samples not as expected") 1212} 1213 1214func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { 1215 s := teststorage.New(t) 1216 defer s.Close() 1217 1218 sapp := s.Appender(context.Background()) 1219 1220 appender := &collectResultAppender{next: sapp} 1221 var ( 1222 signal = make(chan struct{}, 1) 1223 scraper = &testScraper{} 1224 app = func(ctx context.Context) storage.Appender { return appender } 1225 ) 1226 1227 ctx, cancel := context.WithCancel(context.Background()) 1228 sl := newScrapeLoop(ctx, 1229 scraper, 1230 nil, nil, 1231 nopMutator, 1232 nopMutator, 1233 app, 1234 nil, 1235 0, 1236 true, 1237 0, 1238 nil, 1239 10*time.Millisecond, 1240 time.Hour, 1241 false, 1242 ) 1243 1244 numScrapes := 0 1245 1246 scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { 1247 numScrapes++ 1248 if numScrapes < 5 { 1249 s := "" 1250 for i := 0; i < 500; i++ { 1251 s = fmt.Sprintf("%smetric_%d_%d 42\n", s, i, numScrapes) 1252 } 1253 w.Write([]byte(fmt.Sprintf(s + "&"))) 1254 } else { 1255 cancel() 1256 } 1257 return nil 1258 } 1259 1260 go func() { 1261 sl.run(nil) 1262 signal <- struct{}{} 1263 }() 1264 1265 select { 1266 case <-signal: 1267 case <-time.After(5 * time.Second): 1268 t.Fatalf("Scrape wasn't stopped.") 1269 } 1270 1271 if len(sl.cache.series) > 2000 { 1272 t.Fatalf("More than 2000 series cached. Got: %d", len(sl.cache.series)) 1273 } 1274} 1275 1276func TestScrapeLoopAppend(t *testing.T) { 1277 tests := []struct { 1278 title string 1279 honorLabels bool 1280 scrapeLabels string 1281 discoveryLabels []string 1282 expLset labels.Labels 1283 expValue float64 1284 }{ 1285 { 1286 // When "honor_labels" is not set 1287 // label name collision is handler by adding a prefix. 1288 title: "Label name collision", 1289 honorLabels: false, 1290 scrapeLabels: `metric{n="1"} 0`, 1291 discoveryLabels: []string{"n", "2"}, 1292 expLset: labels.FromStrings("__name__", "metric", "exported_n", "1", "n", "2"), 1293 expValue: 0, 1294 }, { 1295 // When "honor_labels" is not set 1296 // exported label from discovery don't get overwritten 1297 title: "Label name collision", 1298 honorLabels: false, 1299 scrapeLabels: `metric 0`, 1300 discoveryLabels: []string{"n", "2", "exported_n", "2"}, 1301 expLset: labels.FromStrings("__name__", "metric", "n", "2", "exported_n", "2"), 1302 expValue: 0, 1303 }, { 1304 // Labels with no value need to be removed as these should not be ingested. 1305 title: "Delete Empty labels", 1306 honorLabels: false, 1307 scrapeLabels: `metric{n=""} 0`, 1308 discoveryLabels: nil, 1309 expLset: labels.FromStrings("__name__", "metric"), 1310 expValue: 0, 1311 }, { 1312 // Honor Labels should ignore labels with the same name. 1313 title: "Honor Labels", 1314 honorLabels: true, 1315 scrapeLabels: `metric{n1="1" n2="2"} 0`, 1316 discoveryLabels: []string{"n1", "0"}, 1317 expLset: labels.FromStrings("__name__", "metric", "n1", "1", "n2", "2"), 1318 expValue: 0, 1319 }, { 1320 title: "Stale - NaN", 1321 honorLabels: false, 1322 scrapeLabels: `metric NaN`, 1323 discoveryLabels: nil, 1324 expLset: labels.FromStrings("__name__", "metric"), 1325 expValue: float64(value.NormalNaN), 1326 }, 1327 } 1328 1329 for _, test := range tests { 1330 app := &collectResultAppender{} 1331 1332 discoveryLabels := &Target{ 1333 labels: labels.FromStrings(test.discoveryLabels...), 1334 } 1335 1336 sl := newScrapeLoop(context.Background(), 1337 nil, nil, nil, 1338 func(l labels.Labels) labels.Labels { 1339 return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil) 1340 }, 1341 func(l labels.Labels) labels.Labels { 1342 return mutateReportSampleLabels(l, discoveryLabels) 1343 }, 1344 func(ctx context.Context) storage.Appender { return app }, 1345 nil, 1346 0, 1347 true, 1348 0, 1349 nil, 1350 0, 1351 0, 1352 false, 1353 ) 1354 1355 now := time.Now() 1356 1357 slApp := sl.appender(context.Background()) 1358 _, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now) 1359 require.NoError(t, err) 1360 require.NoError(t, slApp.Commit()) 1361 1362 expected := []sample{ 1363 { 1364 metric: test.expLset, 1365 t: timestamp.FromTime(now), 1366 v: test.expValue, 1367 }, 1368 } 1369 1370 // When the expected value is NaN 1371 // DeepEqual will report NaNs as being different, 1372 // so replace it with the expected one. 1373 if test.expValue == float64(value.NormalNaN) { 1374 app.result[0].v = expected[0].v 1375 } 1376 1377 t.Logf("Test:%s", test.title) 1378 require.Equal(t, expected, app.result) 1379 } 1380} 1381 1382func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { 1383 // collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next. 1384 app := &collectResultAppender{} 1385 1386 sl := newScrapeLoop(context.Background(), 1387 nil, nil, nil, 1388 nopMutator, 1389 nopMutator, 1390 func(ctx context.Context) storage.Appender { return app }, 1391 nil, 1392 0, 1393 true, 1394 0, 1395 nil, 1396 0, 1397 0, 1398 false, 1399 ) 1400 1401 fakeRef := uint64(1) 1402 expValue := float64(1) 1403 metric := `metric{n="1"} 1` 1404 p := textparse.New([]byte(metric), "") 1405 1406 var lset labels.Labels 1407 p.Next() 1408 mets := p.Metric(&lset) 1409 hash := lset.Hash() 1410 1411 // Create a fake entry in the cache 1412 sl.cache.addRef(mets, fakeRef, lset, hash) 1413 now := time.Now() 1414 1415 slApp := sl.appender(context.Background()) 1416 _, _, _, err := sl.append(slApp, []byte(metric), "", now) 1417 require.NoError(t, err) 1418 require.NoError(t, slApp.Commit()) 1419 1420 expected := []sample{ 1421 { 1422 metric: lset, 1423 t: timestamp.FromTime(now), 1424 v: expValue, 1425 }, 1426 } 1427 1428 require.Equal(t, expected, app.result) 1429} 1430 1431func TestScrapeLoopAppendSampleLimit(t *testing.T) { 1432 resApp := &collectResultAppender{} 1433 app := &limitAppender{Appender: resApp, limit: 1} 1434 1435 sl := newScrapeLoop(context.Background(), 1436 nil, nil, nil, 1437 func(l labels.Labels) labels.Labels { 1438 if l.Has("deleteme") { 1439 return nil 1440 } 1441 return l 1442 }, 1443 nopMutator, 1444 func(ctx context.Context) storage.Appender { return app }, 1445 nil, 1446 0, 1447 true, 1448 app.limit, 1449 nil, 1450 0, 1451 0, 1452 false, 1453 ) 1454 1455 // Get the value of the Counter before performing the append. 1456 beforeMetric := dto.Metric{} 1457 err := targetScrapeSampleLimit.Write(&beforeMetric) 1458 require.NoError(t, err) 1459 1460 beforeMetricValue := beforeMetric.GetCounter().GetValue() 1461 1462 now := time.Now() 1463 slApp := sl.appender(context.Background()) 1464 total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) 1465 if err != errSampleLimit { 1466 t.Fatalf("Did not see expected sample limit error: %s", err) 1467 } 1468 require.NoError(t, slApp.Rollback()) 1469 require.Equal(t, 3, total) 1470 require.Equal(t, 3, added) 1471 require.Equal(t, 1, seriesAdded) 1472 1473 // Check that the Counter has been incremented a single time for the scrape, 1474 // not multiple times for each sample. 1475 metric := dto.Metric{} 1476 err = targetScrapeSampleLimit.Write(&metric) 1477 require.NoError(t, err) 1478 1479 value := metric.GetCounter().GetValue() 1480 change := value - beforeMetricValue 1481 require.Equal(t, 1.0, change, "Unexpected change of sample limit metric: %f", change) 1482 1483 // And verify that we got the samples that fit under the limit. 1484 want := []sample{ 1485 { 1486 metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), 1487 t: timestamp.FromTime(now), 1488 v: 1, 1489 }, 1490 } 1491 require.Equal(t, want, resApp.rolledbackResult, "Appended samples not as expected") 1492 1493 now = time.Now() 1494 slApp = sl.appender(context.Background()) 1495 total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now) 1496 if err != errSampleLimit { 1497 t.Fatalf("Did not see expected sample limit error: %s", err) 1498 } 1499 require.NoError(t, slApp.Rollback()) 1500 require.Equal(t, 9, total) 1501 require.Equal(t, 6, added) 1502 require.Equal(t, 0, seriesAdded) 1503} 1504 1505func TestScrapeLoop_ChangingMetricString(t *testing.T) { 1506 // This is a regression test for the scrape loop cache not properly maintaining 1507 // IDs when the string representation of a metric changes across a scrape. Thus 1508 // we use a real storage appender here. 1509 s := teststorage.New(t) 1510 defer s.Close() 1511 1512 capp := &collectResultAppender{} 1513 1514 sl := newScrapeLoop(context.Background(), 1515 nil, nil, nil, 1516 nopMutator, 1517 nopMutator, 1518 func(ctx context.Context) storage.Appender { capp.next = s.Appender(ctx); return capp }, 1519 nil, 1520 0, 1521 true, 1522 0, 1523 nil, 1524 0, 1525 0, 1526 false, 1527 ) 1528 1529 now := time.Now() 1530 slApp := sl.appender(context.Background()) 1531 _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now) 1532 require.NoError(t, err) 1533 require.NoError(t, slApp.Commit()) 1534 1535 slApp = sl.appender(context.Background()) 1536 _, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute)) 1537 require.NoError(t, err) 1538 require.NoError(t, slApp.Commit()) 1539 1540 // DeepEqual will report NaNs as being different, so replace with a different value. 1541 want := []sample{ 1542 { 1543 metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), 1544 t: timestamp.FromTime(now), 1545 v: 1, 1546 }, 1547 { 1548 metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), 1549 t: timestamp.FromTime(now.Add(time.Minute)), 1550 v: 2, 1551 }, 1552 } 1553 require.Equal(t, want, capp.result, "Appended samples not as expected") 1554} 1555 1556func TestScrapeLoopAppendStaleness(t *testing.T) { 1557 app := &collectResultAppender{} 1558 1559 sl := newScrapeLoop(context.Background(), 1560 nil, nil, nil, 1561 nopMutator, 1562 nopMutator, 1563 func(ctx context.Context) storage.Appender { return app }, 1564 nil, 1565 0, 1566 true, 1567 0, 1568 nil, 1569 0, 1570 0, 1571 false, 1572 ) 1573 1574 now := time.Now() 1575 slApp := sl.appender(context.Background()) 1576 _, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now) 1577 require.NoError(t, err) 1578 require.NoError(t, slApp.Commit()) 1579 1580 slApp = sl.appender(context.Background()) 1581 _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) 1582 require.NoError(t, err) 1583 require.NoError(t, slApp.Commit()) 1584 1585 ingestedNaN := math.Float64bits(app.result[1].v) 1586 require.Equal(t, value.StaleNaN, ingestedNaN, "Appended stale sample wasn't as expected") 1587 1588 // DeepEqual will report NaNs as being different, so replace with a different value. 1589 app.result[1].v = 42 1590 want := []sample{ 1591 { 1592 metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), 1593 t: timestamp.FromTime(now), 1594 v: 1, 1595 }, 1596 { 1597 metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), 1598 t: timestamp.FromTime(now.Add(time.Second)), 1599 v: 42, 1600 }, 1601 } 1602 require.Equal(t, want, app.result, "Appended samples not as expected") 1603} 1604 1605func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { 1606 app := &collectResultAppender{} 1607 sl := newScrapeLoop(context.Background(), 1608 nil, nil, nil, 1609 nopMutator, 1610 nopMutator, 1611 func(ctx context.Context) storage.Appender { return app }, 1612 nil, 1613 0, 1614 true, 1615 0, 1616 nil, 1617 0, 1618 0, 1619 false, 1620 ) 1621 1622 now := time.Now() 1623 slApp := sl.appender(context.Background()) 1624 _, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now) 1625 require.NoError(t, err) 1626 require.NoError(t, slApp.Commit()) 1627 1628 slApp = sl.appender(context.Background()) 1629 _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) 1630 require.NoError(t, err) 1631 require.NoError(t, slApp.Commit()) 1632 1633 want := []sample{ 1634 { 1635 metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), 1636 t: 1000, 1637 v: 1, 1638 }, 1639 } 1640 require.Equal(t, want, app.result, "Appended samples not as expected") 1641} 1642 1643func TestScrapeLoopAppendExemplar(t *testing.T) { 1644 tests := []struct { 1645 title string 1646 scrapeText string 1647 discoveryLabels []string 1648 samples []sample 1649 exemplars []exemplar.Exemplar 1650 }{ 1651 { 1652 title: "Metric without exemplars", 1653 scrapeText: "metric_total{n=\"1\"} 0\n# EOF", 1654 discoveryLabels: []string{"n", "2"}, 1655 samples: []sample{{ 1656 metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), 1657 v: 0, 1658 }}, 1659 }, 1660 { 1661 title: "Metric with exemplars", 1662 scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0\n# EOF", 1663 discoveryLabels: []string{"n", "2"}, 1664 samples: []sample{{ 1665 metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), 1666 v: 0, 1667 }}, 1668 exemplars: []exemplar.Exemplar{ 1669 {Labels: labels.FromStrings("a", "abc"), Value: 1}, 1670 }, 1671 }, { 1672 title: "Metric with exemplars and TS", 1673 scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0 10000\n# EOF", 1674 discoveryLabels: []string{"n", "2"}, 1675 samples: []sample{{ 1676 metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), 1677 v: 0, 1678 }}, 1679 exemplars: []exemplar.Exemplar{ 1680 {Labels: labels.FromStrings("a", "abc"), Value: 1, Ts: 10000000, HasTs: true}, 1681 }, 1682 }, { 1683 title: "Two metrics and exemplars", 1684 scrapeText: `metric_total{n="1"} 1 # {t="1"} 1.0 10000 1685metric_total{n="2"} 2 # {t="2"} 2.0 20000 1686# EOF`, 1687 samples: []sample{{ 1688 metric: labels.FromStrings("__name__", "metric_total", "n", "1"), 1689 v: 1, 1690 }, { 1691 metric: labels.FromStrings("__name__", "metric_total", "n", "2"), 1692 v: 2, 1693 }}, 1694 exemplars: []exemplar.Exemplar{ 1695 {Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true}, 1696 {Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true}, 1697 }, 1698 }, 1699 } 1700 1701 for _, test := range tests { 1702 t.Run(test.title, func(t *testing.T) { 1703 app := &collectResultAppender{} 1704 1705 discoveryLabels := &Target{ 1706 labels: labels.FromStrings(test.discoveryLabels...), 1707 } 1708 1709 sl := newScrapeLoop(context.Background(), 1710 nil, nil, nil, 1711 func(l labels.Labels) labels.Labels { 1712 return mutateSampleLabels(l, discoveryLabels, false, nil) 1713 }, 1714 func(l labels.Labels) labels.Labels { 1715 return mutateReportSampleLabels(l, discoveryLabels) 1716 }, 1717 func(ctx context.Context) storage.Appender { return app }, 1718 nil, 1719 0, 1720 true, 1721 0, 1722 nil, 1723 0, 1724 0, 1725 false, 1726 ) 1727 1728 now := time.Now() 1729 1730 for i := range test.samples { 1731 test.samples[i].t = timestamp.FromTime(now) 1732 } 1733 1734 // We need to set the timestamp for expected exemplars that does not have a timestamp. 1735 for i := range test.exemplars { 1736 if test.exemplars[i].Ts == 0 { 1737 test.exemplars[i].Ts = timestamp.FromTime(now) 1738 } 1739 } 1740 1741 _, _, _, err := sl.append(app, []byte(test.scrapeText), "application/openmetrics-text", now) 1742 require.NoError(t, err) 1743 require.NoError(t, app.Commit()) 1744 require.Equal(t, test.samples, app.result) 1745 require.Equal(t, test.exemplars, app.resultExemplars) 1746 }) 1747 } 1748} 1749 1750func TestScrapeLoopAppendExemplarSeries(t *testing.T) { 1751 scrapeText := []string{`metric_total{n="1"} 1 # {t="1"} 1.0 10000 1752# EOF`, `metric_total{n="1"} 2 # {t="2"} 2.0 20000 1753# EOF`} 1754 samples := []sample{{ 1755 metric: labels.FromStrings("__name__", "metric_total", "n", "1"), 1756 v: 1, 1757 }, { 1758 metric: labels.FromStrings("__name__", "metric_total", "n", "1"), 1759 v: 2, 1760 }} 1761 exemplars := []exemplar.Exemplar{ 1762 {Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true}, 1763 {Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true}, 1764 } 1765 discoveryLabels := &Target{ 1766 labels: labels.FromStrings(), 1767 } 1768 1769 app := &collectResultAppender{} 1770 1771 sl := newScrapeLoop(context.Background(), 1772 nil, nil, nil, 1773 func(l labels.Labels) labels.Labels { 1774 return mutateSampleLabels(l, discoveryLabels, false, nil) 1775 }, 1776 func(l labels.Labels) labels.Labels { 1777 return mutateReportSampleLabels(l, discoveryLabels) 1778 }, 1779 func(ctx context.Context) storage.Appender { return app }, 1780 nil, 1781 0, 1782 true, 1783 0, 1784 nil, 1785 0, 1786 0, 1787 false, 1788 ) 1789 1790 now := time.Now() 1791 1792 for i := range samples { 1793 ts := now.Add(time.Second * time.Duration(i)) 1794 samples[i].t = timestamp.FromTime(ts) 1795 } 1796 1797 // We need to set the timestamp for expected exemplars that does not have a timestamp. 1798 for i := range exemplars { 1799 if exemplars[i].Ts == 0 { 1800 ts := now.Add(time.Second * time.Duration(i)) 1801 exemplars[i].Ts = timestamp.FromTime(ts) 1802 } 1803 } 1804 1805 for i, st := range scrapeText { 1806 _, _, _, err := sl.append(app, []byte(st), "application/openmetrics-text", timestamp.Time(samples[i].t)) 1807 require.NoError(t, err) 1808 require.NoError(t, app.Commit()) 1809 } 1810 1811 require.Equal(t, samples, app.result) 1812 require.Equal(t, exemplars, app.resultExemplars) 1813} 1814 1815func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { 1816 var ( 1817 scraper = &testScraper{} 1818 appender = &collectResultAppender{} 1819 app = func(ctx context.Context) storage.Appender { return appender } 1820 ) 1821 1822 ctx, cancel := context.WithCancel(context.Background()) 1823 sl := newScrapeLoop(ctx, 1824 scraper, 1825 nil, nil, 1826 nopMutator, 1827 nopMutator, 1828 app, 1829 nil, 1830 0, 1831 true, 1832 0, 1833 nil, 1834 10*time.Millisecond, 1835 time.Hour, 1836 false, 1837 ) 1838 1839 scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { 1840 cancel() 1841 return errors.New("scrape failed") 1842 } 1843 1844 sl.run(nil) 1845 require.Equal(t, 0.0, appender.result[0].v, "bad 'up' value") 1846} 1847 1848func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { 1849 var ( 1850 scraper = &testScraper{} 1851 appender = &collectResultAppender{} 1852 app = func(ctx context.Context) storage.Appender { return appender } 1853 ) 1854 1855 ctx, cancel := context.WithCancel(context.Background()) 1856 sl := newScrapeLoop(ctx, 1857 scraper, 1858 nil, nil, 1859 nopMutator, 1860 nopMutator, 1861 app, 1862 nil, 1863 0, 1864 true, 1865 0, 1866 nil, 1867 10*time.Millisecond, 1868 time.Hour, 1869 false, 1870 ) 1871 1872 scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { 1873 cancel() 1874 w.Write([]byte("a{l=\"\xff\"} 1\n")) 1875 return nil 1876 } 1877 1878 sl.run(nil) 1879 require.Equal(t, 0.0, appender.result[0].v, "bad 'up' value") 1880} 1881 1882type errorAppender struct { 1883 collectResultAppender 1884} 1885 1886func (app *errorAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { 1887 switch lset.Get(model.MetricNameLabel) { 1888 case "out_of_order": 1889 return 0, storage.ErrOutOfOrderSample 1890 case "amend": 1891 return 0, storage.ErrDuplicateSampleForTimestamp 1892 case "out_of_bounds": 1893 return 0, storage.ErrOutOfBounds 1894 default: 1895 return app.collectResultAppender.Append(ref, lset, t, v) 1896 } 1897} 1898 1899func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) { 1900 app := &errorAppender{} 1901 1902 sl := newScrapeLoop(context.Background(), 1903 nil, 1904 nil, nil, 1905 nopMutator, 1906 nopMutator, 1907 func(ctx context.Context) storage.Appender { return app }, 1908 nil, 1909 0, 1910 true, 1911 0, 1912 nil, 1913 0, 1914 0, 1915 false, 1916 ) 1917 1918 now := time.Unix(1, 0) 1919 slApp := sl.appender(context.Background()) 1920 total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now) 1921 require.NoError(t, err) 1922 require.NoError(t, slApp.Commit()) 1923 1924 want := []sample{ 1925 { 1926 metric: labels.FromStrings(model.MetricNameLabel, "normal"), 1927 t: timestamp.FromTime(now), 1928 v: 1, 1929 }, 1930 } 1931 require.Equal(t, want, app.result, "Appended samples not as expected") 1932 require.Equal(t, 4, total) 1933 require.Equal(t, 4, added) 1934 require.Equal(t, 1, seriesAdded) 1935} 1936 1937func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { 1938 app := &collectResultAppender{} 1939 sl := newScrapeLoop(context.Background(), 1940 nil, 1941 nil, nil, 1942 nopMutator, 1943 nopMutator, 1944 func(ctx context.Context) storage.Appender { 1945 return &timeLimitAppender{ 1946 Appender: app, 1947 maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)), 1948 } 1949 }, 1950 nil, 1951 0, 1952 true, 1953 0, 1954 nil, 1955 0, 1956 0, 1957 false, 1958 ) 1959 1960 now := time.Now().Add(20 * time.Minute) 1961 slApp := sl.appender(context.Background()) 1962 total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now) 1963 require.NoError(t, err) 1964 require.NoError(t, slApp.Commit()) 1965 require.Equal(t, 1, total) 1966 require.Equal(t, 1, added) 1967 require.Equal(t, 0, seriesAdded) 1968 1969} 1970 1971func TestTargetScraperScrapeOK(t *testing.T) { 1972 const ( 1973 configTimeout = 1500 * time.Millisecond 1974 expectedTimeout = "1.5" 1975 ) 1976 1977 server := httptest.NewServer( 1978 http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1979 accept := r.Header.Get("Accept") 1980 if !strings.HasPrefix(accept, "application/openmetrics-text;") { 1981 t.Errorf("Expected Accept header to prefer application/openmetrics-text, got %q", accept) 1982 } 1983 1984 timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds") 1985 if timeout != expectedTimeout { 1986 t.Errorf("Expected scrape timeout header %q, got %q", expectedTimeout, timeout) 1987 } 1988 1989 w.Header().Set("Content-Type", `text/plain; version=0.0.4`) 1990 w.Write([]byte("metric_a 1\nmetric_b 2\n")) 1991 }), 1992 ) 1993 defer server.Close() 1994 1995 serverURL, err := url.Parse(server.URL) 1996 if err != nil { 1997 panic(err) 1998 } 1999 2000 ts := &targetScraper{ 2001 Target: &Target{ 2002 labels: labels.FromStrings( 2003 model.SchemeLabel, serverURL.Scheme, 2004 model.AddressLabel, serverURL.Host, 2005 ), 2006 }, 2007 client: http.DefaultClient, 2008 timeout: configTimeout, 2009 } 2010 var buf bytes.Buffer 2011 2012 contentType, err := ts.scrape(context.Background(), &buf) 2013 require.NoError(t, err) 2014 require.Equal(t, "text/plain; version=0.0.4", contentType) 2015 require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String()) 2016} 2017 2018func TestTargetScrapeScrapeCancel(t *testing.T) { 2019 block := make(chan struct{}) 2020 2021 server := httptest.NewServer( 2022 http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 2023 <-block 2024 }), 2025 ) 2026 defer server.Close() 2027 2028 serverURL, err := url.Parse(server.URL) 2029 if err != nil { 2030 panic(err) 2031 } 2032 2033 ts := &targetScraper{ 2034 Target: &Target{ 2035 labels: labels.FromStrings( 2036 model.SchemeLabel, serverURL.Scheme, 2037 model.AddressLabel, serverURL.Host, 2038 ), 2039 }, 2040 client: http.DefaultClient, 2041 } 2042 ctx, cancel := context.WithCancel(context.Background()) 2043 2044 errc := make(chan error, 1) 2045 2046 go func() { 2047 time.Sleep(1 * time.Second) 2048 cancel() 2049 }() 2050 2051 go func() { 2052 _, err := ts.scrape(ctx, ioutil.Discard) 2053 if err == nil { 2054 errc <- errors.New("Expected error but got nil") 2055 } else if ctx.Err() != context.Canceled { 2056 errc <- errors.Errorf("Expected context cancellation error but got: %s", ctx.Err()) 2057 } else { 2058 close(errc) 2059 } 2060 }() 2061 2062 select { 2063 case <-time.After(5 * time.Second): 2064 t.Fatalf("Scrape function did not return unexpectedly") 2065 case err := <-errc: 2066 require.NoError(t, err) 2067 } 2068 // If this is closed in a defer above the function the test server 2069 // doesn't terminate and the test doesn't complete. 2070 close(block) 2071} 2072 2073func TestTargetScrapeScrapeNotFound(t *testing.T) { 2074 server := httptest.NewServer( 2075 http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 2076 w.WriteHeader(http.StatusNotFound) 2077 }), 2078 ) 2079 defer server.Close() 2080 2081 serverURL, err := url.Parse(server.URL) 2082 if err != nil { 2083 panic(err) 2084 } 2085 2086 ts := &targetScraper{ 2087 Target: &Target{ 2088 labels: labels.FromStrings( 2089 model.SchemeLabel, serverURL.Scheme, 2090 model.AddressLabel, serverURL.Host, 2091 ), 2092 }, 2093 client: http.DefaultClient, 2094 } 2095 2096 _, err = ts.scrape(context.Background(), ioutil.Discard) 2097 require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err) 2098} 2099 2100func TestTargetScraperBodySizeLimit(t *testing.T) { 2101 const ( 2102 bodySizeLimit = 15 2103 responseBody = "metric_a 1\nmetric_b 2\n" 2104 ) 2105 var gzipResponse bool 2106 server := httptest.NewServer( 2107 http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 2108 w.Header().Set("Content-Type", `text/plain; version=0.0.4`) 2109 if gzipResponse { 2110 w.Header().Set("Content-Encoding", "gzip") 2111 gw := gzip.NewWriter(w) 2112 defer gw.Close() 2113 gw.Write([]byte(responseBody)) 2114 return 2115 } 2116 w.Write([]byte(responseBody)) 2117 }), 2118 ) 2119 defer server.Close() 2120 2121 serverURL, err := url.Parse(server.URL) 2122 if err != nil { 2123 panic(err) 2124 } 2125 2126 ts := &targetScraper{ 2127 Target: &Target{ 2128 labels: labels.FromStrings( 2129 model.SchemeLabel, serverURL.Scheme, 2130 model.AddressLabel, serverURL.Host, 2131 ), 2132 }, 2133 client: http.DefaultClient, 2134 bodySizeLimit: bodySizeLimit, 2135 } 2136 var buf bytes.Buffer 2137 2138 // Target response uncompressed body, scrape with body size limit. 2139 _, err = ts.scrape(context.Background(), &buf) 2140 require.ErrorIs(t, err, errBodySizeLimit) 2141 require.Equal(t, bodySizeLimit, buf.Len()) 2142 // Target response gzip compressed body, scrape with body size limit. 2143 gzipResponse = true 2144 buf.Reset() 2145 _, err = ts.scrape(context.Background(), &buf) 2146 require.ErrorIs(t, err, errBodySizeLimit) 2147 require.Equal(t, bodySizeLimit, buf.Len()) 2148 // Target response uncompressed body, scrape without body size limit. 2149 gzipResponse = false 2150 buf.Reset() 2151 ts.bodySizeLimit = 0 2152 _, err = ts.scrape(context.Background(), &buf) 2153 require.NoError(t, err) 2154 require.Equal(t, len(responseBody), buf.Len()) 2155 // Target response gzip compressed body, scrape without body size limit. 2156 gzipResponse = true 2157 buf.Reset() 2158 _, err = ts.scrape(context.Background(), &buf) 2159 require.NoError(t, err) 2160 require.Equal(t, len(responseBody), buf.Len()) 2161} 2162 2163// testScraper implements the scraper interface and allows setting values 2164// returned by its methods. It also allows setting a custom scrape function. 2165type testScraper struct { 2166 offsetDur time.Duration 2167 2168 lastStart time.Time 2169 lastDuration time.Duration 2170 lastError error 2171 2172 scrapeErr error 2173 scrapeFunc func(context.Context, io.Writer) error 2174} 2175 2176func (ts *testScraper) offset(interval time.Duration, jitterSeed uint64) time.Duration { 2177 return ts.offsetDur 2178} 2179 2180func (ts *testScraper) Report(start time.Time, duration time.Duration, err error) { 2181 ts.lastStart = start 2182 ts.lastDuration = duration 2183 ts.lastError = err 2184} 2185 2186func (ts *testScraper) scrape(ctx context.Context, w io.Writer) (string, error) { 2187 if ts.scrapeFunc != nil { 2188 return "", ts.scrapeFunc(ctx, w) 2189 } 2190 return "", ts.scrapeErr 2191} 2192 2193func TestScrapeLoop_RespectTimestamps(t *testing.T) { 2194 s := teststorage.New(t) 2195 defer s.Close() 2196 2197 app := s.Appender(context.Background()) 2198 2199 capp := &collectResultAppender{next: app} 2200 2201 sl := newScrapeLoop(context.Background(), 2202 nil, nil, nil, 2203 nopMutator, 2204 nopMutator, 2205 func(ctx context.Context) storage.Appender { return capp }, 2206 nil, 0, 2207 true, 2208 0, 2209 nil, 2210 0, 2211 0, 2212 false, 2213 ) 2214 2215 now := time.Now() 2216 slApp := sl.appender(context.Background()) 2217 _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) 2218 require.NoError(t, err) 2219 require.NoError(t, slApp.Commit()) 2220 2221 want := []sample{ 2222 { 2223 metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), 2224 t: 0, 2225 v: 1, 2226 }, 2227 } 2228 require.Equal(t, want, capp.result, "Appended samples not as expected") 2229} 2230 2231func TestScrapeLoop_DiscardTimestamps(t *testing.T) { 2232 s := teststorage.New(t) 2233 defer s.Close() 2234 2235 app := s.Appender(context.Background()) 2236 2237 capp := &collectResultAppender{next: app} 2238 2239 sl := newScrapeLoop(context.Background(), 2240 nil, nil, nil, 2241 nopMutator, 2242 nopMutator, 2243 func(ctx context.Context) storage.Appender { return capp }, 2244 nil, 0, 2245 false, 2246 0, 2247 nil, 2248 0, 2249 0, 2250 false, 2251 ) 2252 2253 now := time.Now() 2254 slApp := sl.appender(context.Background()) 2255 _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) 2256 require.NoError(t, err) 2257 require.NoError(t, slApp.Commit()) 2258 2259 want := []sample{ 2260 { 2261 metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), 2262 t: timestamp.FromTime(now), 2263 v: 1, 2264 }, 2265 } 2266 require.Equal(t, want, capp.result, "Appended samples not as expected") 2267} 2268 2269func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { 2270 s := teststorage.New(t) 2271 defer s.Close() 2272 2273 ctx, cancel := context.WithCancel(context.Background()) 2274 sl := newScrapeLoop(ctx, 2275 &testScraper{}, 2276 nil, nil, 2277 nopMutator, 2278 nopMutator, 2279 s.Appender, 2280 nil, 2281 0, 2282 true, 2283 0, 2284 nil, 2285 0, 2286 0, 2287 false, 2288 ) 2289 defer cancel() 2290 2291 // We add a good and a bad metric to check that both are discarded. 2292 slApp := sl.appender(ctx) 2293 _, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{}) 2294 require.Error(t, err) 2295 require.NoError(t, slApp.Rollback()) 2296 2297 q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) 2298 require.NoError(t, err) 2299 series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) 2300 require.Equal(t, false, series.Next(), "series found in tsdb") 2301 require.NoError(t, series.Err()) 2302 2303 // We add a good metric to check that it is recorded. 2304 slApp = sl.appender(ctx) 2305 _, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{}) 2306 require.NoError(t, err) 2307 require.NoError(t, slApp.Commit()) 2308 2309 q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0) 2310 require.NoError(t, err) 2311 series = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) 2312 require.Equal(t, true, series.Next(), "series not found in tsdb") 2313 require.NoError(t, series.Err()) 2314 require.Equal(t, false, series.Next(), "more than one series found in tsdb") 2315} 2316 2317func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { 2318 s := teststorage.New(t) 2319 defer s.Close() 2320 2321 app := s.Appender(context.Background()) 2322 2323 ctx, cancel := context.WithCancel(context.Background()) 2324 sl := newScrapeLoop(context.Background(), 2325 &testScraper{}, 2326 nil, nil, 2327 func(l labels.Labels) labels.Labels { 2328 if l.Has("drop") { 2329 return labels.Labels{} 2330 } 2331 return l 2332 }, 2333 nopMutator, 2334 func(ctx context.Context) storage.Appender { return app }, 2335 nil, 2336 0, 2337 true, 2338 0, 2339 nil, 2340 0, 2341 0, 2342 false, 2343 ) 2344 defer cancel() 2345 2346 slApp := sl.appender(context.Background()) 2347 _, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{}) 2348 require.Error(t, err) 2349 require.NoError(t, slApp.Rollback()) 2350 require.Equal(t, errNameLabelMandatory, err) 2351 2352 q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) 2353 require.NoError(t, err) 2354 series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) 2355 require.Equal(t, false, series.Next(), "series found in tsdb") 2356 require.NoError(t, series.Err()) 2357} 2358 2359func TestReusableConfig(t *testing.T) { 2360 variants := []*config.ScrapeConfig{ 2361 { 2362 JobName: "prometheus", 2363 ScrapeTimeout: model.Duration(15 * time.Second), 2364 }, 2365 { 2366 JobName: "httpd", 2367 ScrapeTimeout: model.Duration(15 * time.Second), 2368 }, 2369 { 2370 JobName: "prometheus", 2371 ScrapeTimeout: model.Duration(5 * time.Second), 2372 }, 2373 { 2374 JobName: "prometheus", 2375 MetricsPath: "/metrics", 2376 }, 2377 { 2378 JobName: "prometheus", 2379 MetricsPath: "/metrics2", 2380 }, 2381 { 2382 JobName: "prometheus", 2383 ScrapeTimeout: model.Duration(5 * time.Second), 2384 MetricsPath: "/metrics2", 2385 }, 2386 { 2387 JobName: "prometheus", 2388 ScrapeInterval: model.Duration(5 * time.Second), 2389 MetricsPath: "/metrics2", 2390 }, 2391 { 2392 JobName: "prometheus", 2393 ScrapeInterval: model.Duration(5 * time.Second), 2394 SampleLimit: 1000, 2395 MetricsPath: "/metrics2", 2396 }, 2397 } 2398 2399 match := [][]int{ 2400 {0, 2}, 2401 {4, 5}, 2402 {4, 6}, 2403 {4, 7}, 2404 {5, 6}, 2405 {5, 7}, 2406 {6, 7}, 2407 } 2408 noMatch := [][]int{ 2409 {1, 2}, 2410 {0, 4}, 2411 {3, 4}, 2412 } 2413 2414 for i, m := range match { 2415 require.Equal(t, true, reusableCache(variants[m[0]], variants[m[1]]), "match test %d", i) 2416 require.Equal(t, true, reusableCache(variants[m[1]], variants[m[0]]), "match test %d", i) 2417 require.Equal(t, true, reusableCache(variants[m[1]], variants[m[1]]), "match test %d", i) 2418 require.Equal(t, true, reusableCache(variants[m[0]], variants[m[0]]), "match test %d", i) 2419 } 2420 for i, m := range noMatch { 2421 require.Equal(t, false, reusableCache(variants[m[0]], variants[m[1]]), "not match test %d", i) 2422 require.Equal(t, false, reusableCache(variants[m[1]], variants[m[0]]), "not match test %d", i) 2423 } 2424} 2425 2426func TestReuseScrapeCache(t *testing.T) { 2427 var ( 2428 app = &nopAppendable{} 2429 cfg = &config.ScrapeConfig{ 2430 JobName: "Prometheus", 2431 ScrapeTimeout: model.Duration(5 * time.Second), 2432 ScrapeInterval: model.Duration(5 * time.Second), 2433 MetricsPath: "/metrics", 2434 } 2435 sp, _ = newScrapePool(cfg, app, 0, nil, false) 2436 t1 = &Target{ 2437 discoveredLabels: labels.Labels{ 2438 labels.Label{ 2439 Name: "labelNew", 2440 Value: "nameNew", 2441 }, 2442 }, 2443 } 2444 proxyURL, _ = url.Parse("http://localhost:2128") 2445 ) 2446 defer sp.stop() 2447 sp.sync([]*Target{t1}) 2448 2449 steps := []struct { 2450 keep bool 2451 newConfig *config.ScrapeConfig 2452 }{ 2453 { 2454 keep: true, 2455 newConfig: &config.ScrapeConfig{ 2456 JobName: "Prometheus", 2457 ScrapeInterval: model.Duration(5 * time.Second), 2458 ScrapeTimeout: model.Duration(5 * time.Second), 2459 MetricsPath: "/metrics", 2460 }, 2461 }, 2462 { 2463 keep: false, 2464 newConfig: &config.ScrapeConfig{ 2465 JobName: "Prometheus", 2466 ScrapeInterval: model.Duration(5 * time.Second), 2467 ScrapeTimeout: model.Duration(15 * time.Second), 2468 MetricsPath: "/metrics2", 2469 }, 2470 }, 2471 { 2472 keep: true, 2473 newConfig: &config.ScrapeConfig{ 2474 JobName: "Prometheus", 2475 SampleLimit: 400, 2476 ScrapeInterval: model.Duration(5 * time.Second), 2477 ScrapeTimeout: model.Duration(15 * time.Second), 2478 MetricsPath: "/metrics2", 2479 }, 2480 }, 2481 { 2482 keep: false, 2483 newConfig: &config.ScrapeConfig{ 2484 JobName: "Prometheus", 2485 HonorTimestamps: true, 2486 SampleLimit: 400, 2487 ScrapeInterval: model.Duration(5 * time.Second), 2488 ScrapeTimeout: model.Duration(15 * time.Second), 2489 MetricsPath: "/metrics2", 2490 }, 2491 }, 2492 { 2493 keep: true, 2494 newConfig: &config.ScrapeConfig{ 2495 JobName: "Prometheus", 2496 HonorTimestamps: true, 2497 SampleLimit: 400, 2498 HTTPClientConfig: config_util.HTTPClientConfig{ 2499 ProxyURL: config_util.URL{URL: proxyURL}, 2500 }, 2501 ScrapeInterval: model.Duration(5 * time.Second), 2502 ScrapeTimeout: model.Duration(15 * time.Second), 2503 MetricsPath: "/metrics2", 2504 }, 2505 }, 2506 { 2507 keep: false, 2508 newConfig: &config.ScrapeConfig{ 2509 JobName: "Prometheus", 2510 HonorTimestamps: true, 2511 HonorLabels: true, 2512 SampleLimit: 400, 2513 ScrapeInterval: model.Duration(5 * time.Second), 2514 ScrapeTimeout: model.Duration(15 * time.Second), 2515 MetricsPath: "/metrics2", 2516 }, 2517 }, 2518 } 2519 2520 cacheAddr := func(sp *scrapePool) map[uint64]string { 2521 r := make(map[uint64]string) 2522 for fp, l := range sp.loops { 2523 r[fp] = fmt.Sprintf("%p", l.getCache()) 2524 } 2525 return r 2526 } 2527 2528 for i, s := range steps { 2529 initCacheAddr := cacheAddr(sp) 2530 sp.reload(s.newConfig) 2531 for fp, newCacheAddr := range cacheAddr(sp) { 2532 if s.keep { 2533 require.Equal(t, initCacheAddr[fp], newCacheAddr, "step %d: old cache and new cache are not the same", i) 2534 } else { 2535 require.NotEqual(t, initCacheAddr[fp], newCacheAddr, "step %d: old cache and new cache are the same", i) 2536 } 2537 } 2538 initCacheAddr = cacheAddr(sp) 2539 sp.reload(s.newConfig) 2540 for fp, newCacheAddr := range cacheAddr(sp) { 2541 require.Equal(t, initCacheAddr[fp], newCacheAddr, "step %d: reloading the exact config invalidates the cache", i) 2542 } 2543 } 2544} 2545 2546func TestScrapeAddFast(t *testing.T) { 2547 s := teststorage.New(t) 2548 defer s.Close() 2549 2550 ctx, cancel := context.WithCancel(context.Background()) 2551 sl := newScrapeLoop(ctx, 2552 &testScraper{}, 2553 nil, nil, 2554 nopMutator, 2555 nopMutator, 2556 s.Appender, 2557 nil, 2558 0, 2559 true, 2560 0, 2561 nil, 2562 0, 2563 0, 2564 false, 2565 ) 2566 defer cancel() 2567 2568 slApp := sl.appender(ctx) 2569 _, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{}) 2570 require.NoError(t, err) 2571 require.NoError(t, slApp.Commit()) 2572 2573 // Poison the cache. There is just one entry, and one series in the 2574 // storage. Changing the ref will create a 'not found' error. 2575 for _, v := range sl.getCache().series { 2576 v.ref++ 2577 } 2578 2579 slApp = sl.appender(ctx) 2580 _, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second)) 2581 require.NoError(t, err) 2582 require.NoError(t, slApp.Commit()) 2583} 2584 2585func TestReuseCacheRace(t *testing.T) { 2586 var ( 2587 app = &nopAppendable{} 2588 cfg = &config.ScrapeConfig{ 2589 JobName: "Prometheus", 2590 ScrapeTimeout: model.Duration(5 * time.Second), 2591 ScrapeInterval: model.Duration(5 * time.Second), 2592 MetricsPath: "/metrics", 2593 } 2594 sp, _ = newScrapePool(cfg, app, 0, nil, false) 2595 t1 = &Target{ 2596 discoveredLabels: labels.Labels{ 2597 labels.Label{ 2598 Name: "labelNew", 2599 Value: "nameNew", 2600 }, 2601 }, 2602 } 2603 ) 2604 defer sp.stop() 2605 sp.sync([]*Target{t1}) 2606 2607 start := time.Now() 2608 for i := uint(1); i > 0; i++ { 2609 if time.Since(start) > 5*time.Second { 2610 break 2611 } 2612 sp.reload(&config.ScrapeConfig{ 2613 JobName: "Prometheus", 2614 ScrapeTimeout: model.Duration(1 * time.Millisecond), 2615 ScrapeInterval: model.Duration(1 * time.Millisecond), 2616 MetricsPath: "/metrics", 2617 SampleLimit: i, 2618 }) 2619 } 2620} 2621 2622func TestCheckAddError(t *testing.T) { 2623 var appErrs appendErrors 2624 sl := scrapeLoop{l: log.NewNopLogger()} 2625 sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, &appErrs) 2626 require.Equal(t, 1, appErrs.numOutOfOrder) 2627} 2628 2629func TestScrapeReportSingleAppender(t *testing.T) { 2630 s := teststorage.New(t) 2631 defer s.Close() 2632 2633 var ( 2634 signal = make(chan struct{}, 1) 2635 scraper = &testScraper{} 2636 ) 2637 2638 ctx, cancel := context.WithCancel(context.Background()) 2639 sl := newScrapeLoop(ctx, 2640 scraper, 2641 nil, nil, 2642 nopMutator, 2643 nopMutator, 2644 s.Appender, 2645 nil, 2646 0, 2647 true, 2648 0, 2649 nil, 2650 10*time.Millisecond, 2651 time.Hour, 2652 false, 2653 ) 2654 2655 numScrapes := 0 2656 2657 scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { 2658 numScrapes++ 2659 if numScrapes%4 == 0 { 2660 return fmt.Errorf("scrape failed") 2661 } 2662 w.Write([]byte("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n")) 2663 return nil 2664 } 2665 2666 go func() { 2667 sl.run(nil) 2668 signal <- struct{}{} 2669 }() 2670 2671 start := time.Now() 2672 for time.Since(start) < 3*time.Second { 2673 q, err := s.Querier(ctx, time.Time{}.UnixNano(), time.Now().UnixNano()) 2674 require.NoError(t, err) 2675 series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+")) 2676 2677 c := 0 2678 for series.Next() { 2679 i := series.At().Iterator() 2680 for i.Next() { 2681 c++ 2682 } 2683 } 2684 2685 require.Equal(t, 0, c%9, "Appended samples not as expected: %d", c) 2686 q.Close() 2687 } 2688 cancel() 2689 2690 select { 2691 case <-signal: 2692 case <-time.After(5 * time.Second): 2693 t.Fatalf("Scrape wasn't stopped.") 2694 } 2695} 2696 2697func TestScrapeLoopLabelLimit(t *testing.T) { 2698 tests := []struct { 2699 title string 2700 scrapeLabels string 2701 discoveryLabels []string 2702 labelLimits labelLimits 2703 expectErr bool 2704 }{ 2705 { 2706 title: "Valid number of labels", 2707 scrapeLabels: `metric{l1="1", l2="2"} 0`, 2708 discoveryLabels: nil, 2709 labelLimits: labelLimits{labelLimit: 5}, 2710 expectErr: false, 2711 }, { 2712 title: "Too many labels", 2713 scrapeLabels: `metric{l1="1", l2="2", l3="3", l4="4", l5="5", l6="6"} 0`, 2714 discoveryLabels: nil, 2715 labelLimits: labelLimits{labelLimit: 5}, 2716 expectErr: true, 2717 }, { 2718 title: "Too many labels including discovery labels", 2719 scrapeLabels: `metric{l1="1", l2="2", l3="3", l4="4"} 0`, 2720 discoveryLabels: []string{"l5", "5", "l6", "6"}, 2721 labelLimits: labelLimits{labelLimit: 5}, 2722 expectErr: true, 2723 }, { 2724 title: "Valid labels name length", 2725 scrapeLabels: `metric{l1="1", l2="2"} 0`, 2726 discoveryLabels: nil, 2727 labelLimits: labelLimits{labelNameLengthLimit: 10}, 2728 expectErr: false, 2729 }, { 2730 title: "Label name too long", 2731 scrapeLabels: `metric{label_name_too_long="0"} 0`, 2732 discoveryLabels: nil, 2733 labelLimits: labelLimits{labelNameLengthLimit: 10}, 2734 expectErr: true, 2735 }, { 2736 title: "Discovery label name too long", 2737 scrapeLabels: `metric{l1="1", l2="2"} 0`, 2738 discoveryLabels: []string{"label_name_too_long", "0"}, 2739 labelLimits: labelLimits{labelNameLengthLimit: 10}, 2740 expectErr: true, 2741 }, { 2742 title: "Valid labels value length", 2743 scrapeLabels: `metric{l1="1", l2="2"} 0`, 2744 discoveryLabels: nil, 2745 labelLimits: labelLimits{labelValueLengthLimit: 10}, 2746 expectErr: false, 2747 }, { 2748 title: "Label value too long", 2749 scrapeLabels: `metric{l1="label_value_too_long"} 0`, 2750 discoveryLabels: nil, 2751 labelLimits: labelLimits{labelValueLengthLimit: 10}, 2752 expectErr: true, 2753 }, { 2754 title: "Discovery label value too long", 2755 scrapeLabels: `metric{l1="1", l2="2"} 0`, 2756 discoveryLabels: []string{"l1", "label_value_too_long"}, 2757 labelLimits: labelLimits{labelValueLengthLimit: 10}, 2758 expectErr: true, 2759 }, 2760 } 2761 2762 for _, test := range tests { 2763 app := &collectResultAppender{} 2764 2765 discoveryLabels := &Target{ 2766 labels: labels.FromStrings(test.discoveryLabels...), 2767 } 2768 2769 sl := newScrapeLoop(context.Background(), 2770 nil, nil, nil, 2771 func(l labels.Labels) labels.Labels { 2772 return mutateSampleLabels(l, discoveryLabels, false, nil) 2773 }, 2774 func(l labels.Labels) labels.Labels { 2775 return mutateReportSampleLabels(l, discoveryLabels) 2776 }, 2777 func(ctx context.Context) storage.Appender { return app }, 2778 nil, 2779 0, 2780 true, 2781 0, 2782 &test.labelLimits, 2783 0, 2784 0, 2785 false, 2786 ) 2787 2788 slApp := sl.appender(context.Background()) 2789 _, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", time.Now()) 2790 2791 t.Logf("Test:%s", test.title) 2792 if test.expectErr { 2793 require.Error(t, err) 2794 } else { 2795 require.NoError(t, err) 2796 require.NoError(t, slApp.Commit()) 2797 } 2798 } 2799} 2800 2801func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) { 2802 interval, _ := model.ParseDuration("2s") 2803 timeout, _ := model.ParseDuration("500ms") 2804 config := &config.ScrapeConfig{ 2805 ScrapeInterval: interval, 2806 ScrapeTimeout: timeout, 2807 RelabelConfigs: []*relabel.Config{ 2808 { 2809 SourceLabels: model.LabelNames{model.ScrapeIntervalLabel}, 2810 Regex: relabel.MustNewRegexp("2s"), 2811 Replacement: "3s", 2812 TargetLabel: model.ScrapeIntervalLabel, 2813 Action: relabel.Replace, 2814 }, 2815 { 2816 SourceLabels: model.LabelNames{model.ScrapeTimeoutLabel}, 2817 Regex: relabel.MustNewRegexp("500ms"), 2818 Replacement: "750ms", 2819 TargetLabel: model.ScrapeTimeoutLabel, 2820 Action: relabel.Replace, 2821 }, 2822 }, 2823 } 2824 sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false) 2825 tgts := []*targetgroup.Group{ 2826 { 2827 Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}}, 2828 }, 2829 } 2830 2831 sp.Sync(tgts) 2832 defer sp.stop() 2833 2834 require.Equal(t, "3s", sp.ActiveTargets()[0].labels.Get(model.ScrapeIntervalLabel)) 2835 require.Equal(t, "750ms", sp.ActiveTargets()[0].labels.Get(model.ScrapeTimeoutLabel)) 2836} 2837