1// Copyright 2016 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package retrieval 15 16import ( 17 "fmt" 18 "net/http" 19 "net/http/httptest" 20 "net/url" 21 "reflect" 22 "sort" 23 "strings" 24 "sync" 25 "testing" 26 "time" 27 28 "github.com/prometheus/common/model" 29 "golang.org/x/net/context" 30 31 "github.com/prometheus/prometheus/config" 32 "github.com/prometheus/prometheus/storage" 33) 34 35func TestNewScrapePool(t *testing.T) { 36 var ( 37 app = &nopAppender{} 38 cfg = &config.ScrapeConfig{} 39 sp = newScrapePool(context.Background(), cfg, app) 40 ) 41 42 if a, ok := sp.appender.(*nopAppender); !ok || a != app { 43 t.Fatalf("Wrong sample appender") 44 } 45 if sp.config != cfg { 46 t.Fatalf("Wrong scrape config") 47 } 48 if sp.newLoop == nil { 49 t.Fatalf("newLoop function not initialized") 50 } 51} 52 53type testLoop struct { 54 startFunc func(interval, timeout time.Duration, errc chan<- error) 55 stopFunc func() 56} 57 58func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) { 59 l.startFunc(interval, timeout, errc) 60} 61 62func (l *testLoop) stop() { 63 l.stopFunc() 64} 65 66func TestScrapePoolStop(t *testing.T) { 67 sp := &scrapePool{ 68 targets: map[uint64]*Target{}, 69 loops: map[uint64]loop{}, 70 } 71 var mtx sync.Mutex 72 stopped := map[uint64]bool{} 73 numTargets := 20 74 75 // Stopping the scrape pool must call stop() on all scrape loops, 76 // clean them and the respective targets up. It must wait until each loop's 77 // stop function returned before returning itself. 78 79 for i := 0; i < numTargets; i++ { 80 t := &Target{ 81 labels: model.LabelSet{ 82 model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)), 83 }, 84 } 85 l := &testLoop{} 86 l.stopFunc = func() { 87 time.Sleep(time.Duration(i*20) * time.Millisecond) 88 89 mtx.Lock() 90 stopped[t.hash()] = true 91 mtx.Unlock() 92 } 93 94 sp.targets[t.hash()] = t 95 sp.loops[t.hash()] = l 96 } 97 98 done := make(chan struct{}) 99 stopTime := time.Now() 100 101 go func() { 102 sp.stop() 103 close(done) 104 }() 105 106 select { 107 case <-time.After(5 * time.Second): 108 t.Fatalf("scrapeLoop.stop() did not return as expected") 109 case <-done: 110 // This should have taken at least as long as the last target slept. 111 if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond { 112 t.Fatalf("scrapeLoop.stop() exited before all targets stopped") 113 } 114 } 115 116 mtx.Lock() 117 if len(stopped) != numTargets { 118 t.Fatalf("Expected 20 stopped loops, got %d", len(stopped)) 119 } 120 mtx.Unlock() 121 122 if len(sp.targets) > 0 { 123 t.Fatalf("Targets were not cleared on stopping: %d left", len(sp.targets)) 124 } 125 if len(sp.loops) > 0 { 126 t.Fatalf("Loops were not cleared on stopping: %d left", len(sp.loops)) 127 } 128} 129 130func TestScrapePoolReload(t *testing.T) { 131 var mtx sync.Mutex 132 numTargets := 20 133 134 stopped := map[uint64]bool{} 135 136 reloadCfg := &config.ScrapeConfig{ 137 ScrapeInterval: model.Duration(3 * time.Second), 138 ScrapeTimeout: model.Duration(2 * time.Second), 139 } 140 // On starting to run, new loops created on reload check whether their preceding 141 // equivalents have been stopped. 142 newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, tl model.LabelSet, cfg *config.ScrapeConfig) loop { 143 l := &testLoop{} 144 l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { 145 if interval != 3*time.Second { 146 t.Errorf("Expected scrape interval %d but got %d", 3*time.Second, interval) 147 } 148 if timeout != 2*time.Second { 149 t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout) 150 } 151 mtx.Lock() 152 if !stopped[s.(*targetScraper).hash()] { 153 t.Errorf("Scrape loop for %v not stopped yet", s.(*targetScraper)) 154 } 155 mtx.Unlock() 156 } 157 return l 158 } 159 sp := &scrapePool{ 160 targets: map[uint64]*Target{}, 161 loops: map[uint64]loop{}, 162 newLoop: newLoop, 163 } 164 165 // Reloading a scrape pool with a new scrape configuration must stop all scrape 166 // loops and start new ones. A new loop must not be started before the preceding 167 // one terminated. 168 169 for i := 0; i < numTargets; i++ { 170 t := &Target{ 171 labels: model.LabelSet{ 172 model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)), 173 }, 174 } 175 l := &testLoop{} 176 l.stopFunc = func() { 177 time.Sleep(time.Duration(i*20) * time.Millisecond) 178 179 mtx.Lock() 180 stopped[t.hash()] = true 181 mtx.Unlock() 182 } 183 184 sp.targets[t.hash()] = t 185 sp.loops[t.hash()] = l 186 } 187 done := make(chan struct{}) 188 189 beforeTargets := map[uint64]*Target{} 190 for h, t := range sp.targets { 191 beforeTargets[h] = t 192 } 193 194 reloadTime := time.Now() 195 196 go func() { 197 sp.reload(reloadCfg) 198 close(done) 199 }() 200 201 select { 202 case <-time.After(5 * time.Second): 203 t.Fatalf("scrapeLoop.reload() did not return as expected") 204 case <-done: 205 // This should have taken at least as long as the last target slept. 206 if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond { 207 t.Fatalf("scrapeLoop.stop() exited before all targets stopped") 208 } 209 } 210 211 mtx.Lock() 212 if len(stopped) != numTargets { 213 t.Fatalf("Expected 20 stopped loops, got %d", len(stopped)) 214 } 215 mtx.Unlock() 216 217 if !reflect.DeepEqual(sp.targets, beforeTargets) { 218 t.Fatalf("Reloading affected target states unexpectedly") 219 } 220 if len(sp.loops) != numTargets { 221 t.Fatalf("Expected %d loops after reload but got %d", numTargets, len(sp.loops)) 222 } 223} 224 225func TestScrapeLoopWrapSampleAppender(t *testing.T) { 226 cfg := &config.ScrapeConfig{ 227 MetricRelabelConfigs: []*config.RelabelConfig{ 228 { 229 Action: config.RelabelDrop, 230 SourceLabels: model.LabelNames{"__name__"}, 231 Regex: config.MustNewRegexp("does_not_match_.*"), 232 }, 233 { 234 Action: config.RelabelDrop, 235 SourceLabels: model.LabelNames{"__name__"}, 236 Regex: config.MustNewRegexp("does_not_match_either_*"), 237 }, 238 }, 239 } 240 241 target := newTestTarget("example.com:80", 10*time.Millisecond, nil) 242 app := &nopAppender{} 243 244 sp := newScrapePool(context.Background(), cfg, app) 245 246 cfg.HonorLabels = false 247 248 sl := sp.newLoop( 249 sp.ctx, 250 &targetScraper{Target: target, client: sp.client}, 251 sp.appender, 252 target.Labels(), 253 sp.config, 254 ).(*scrapeLoop) 255 wrapped, _ := sl.wrapAppender(sl.appender) 256 257 rl, ok := wrapped.(ruleLabelsAppender) 258 if !ok { 259 t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) 260 } 261 re, ok := rl.SampleAppender.(relabelAppender) 262 if !ok { 263 t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) 264 } 265 co, ok := re.SampleAppender.(*countingAppender) 266 if !ok { 267 t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender) 268 } 269 if co.SampleAppender != app { 270 t.Fatalf("Expected base appender but got %T", co.SampleAppender) 271 } 272 273 cfg.HonorLabels = true 274 sl = sp.newLoop( 275 sp.ctx, 276 &targetScraper{Target: target, client: sp.client}, 277 sp.appender, 278 target.Labels(), 279 sp.config, 280 ).(*scrapeLoop) 281 wrapped, _ = sl.wrapAppender(sl.appender) 282 283 hl, ok := wrapped.(honorLabelsAppender) 284 if !ok { 285 t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) 286 } 287 re, ok = hl.SampleAppender.(relabelAppender) 288 if !ok { 289 t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) 290 } 291 co, ok = re.SampleAppender.(*countingAppender) 292 if !ok { 293 t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender) 294 } 295 if co.SampleAppender != app { 296 t.Fatalf("Expected base appender but got %T", co.SampleAppender) 297 } 298} 299 300func TestScrapeLoopSampleProcessing(t *testing.T) { 301 readSamples := model.Samples{ 302 { 303 Metric: model.Metric{"__name__": "a_metric"}, 304 }, 305 { 306 Metric: model.Metric{"__name__": "b_metric"}, 307 }, 308 } 309 310 testCases := []struct { 311 scrapedSamples model.Samples 312 scrapeConfig *config.ScrapeConfig 313 expectedReportedSamples model.Samples 314 expectedPostRelabelSamplesCount int 315 }{ 316 { // 0 317 scrapedSamples: readSamples, 318 scrapeConfig: &config.ScrapeConfig{}, 319 expectedReportedSamples: model.Samples{ 320 { 321 Metric: model.Metric{"__name__": "up"}, 322 Value: 1, 323 }, 324 { 325 Metric: model.Metric{"__name__": "scrape_duration_seconds"}, 326 Value: 42, 327 }, 328 { 329 Metric: model.Metric{"__name__": "scrape_samples_scraped"}, 330 Value: 2, 331 }, 332 { 333 Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, 334 Value: 2, 335 }, 336 }, 337 expectedPostRelabelSamplesCount: 2, 338 }, 339 { // 1 340 scrapedSamples: readSamples, 341 scrapeConfig: &config.ScrapeConfig{ 342 MetricRelabelConfigs: []*config.RelabelConfig{ 343 { 344 Action: config.RelabelDrop, 345 SourceLabels: model.LabelNames{"__name__"}, 346 Regex: config.MustNewRegexp("a.*"), 347 }, 348 }, 349 }, 350 expectedReportedSamples: model.Samples{ 351 { 352 Metric: model.Metric{"__name__": "up"}, 353 Value: 1, 354 }, 355 { 356 Metric: model.Metric{"__name__": "scrape_duration_seconds"}, 357 Value: 42, 358 }, 359 { 360 Metric: model.Metric{"__name__": "scrape_samples_scraped"}, 361 Value: 2, 362 }, 363 { 364 Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, 365 Value: 1, 366 }, 367 }, 368 expectedPostRelabelSamplesCount: 1, 369 }, 370 { // 2 371 scrapedSamples: readSamples, 372 scrapeConfig: &config.ScrapeConfig{ 373 SampleLimit: 1, 374 MetricRelabelConfigs: []*config.RelabelConfig{ 375 { 376 Action: config.RelabelDrop, 377 SourceLabels: model.LabelNames{"__name__"}, 378 Regex: config.MustNewRegexp("a.*"), 379 }, 380 }, 381 }, 382 expectedReportedSamples: model.Samples{ 383 { 384 Metric: model.Metric{"__name__": "up"}, 385 Value: 1, 386 }, 387 { 388 Metric: model.Metric{"__name__": "scrape_duration_seconds"}, 389 Value: 42, 390 }, 391 { 392 Metric: model.Metric{"__name__": "scrape_samples_scraped"}, 393 Value: 2, 394 }, 395 { 396 Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, 397 Value: 1, 398 }, 399 }, 400 expectedPostRelabelSamplesCount: 1, 401 }, 402 { // 3 403 scrapedSamples: readSamples, 404 scrapeConfig: &config.ScrapeConfig{ 405 SampleLimit: 1, 406 }, 407 expectedReportedSamples: model.Samples{ 408 { 409 Metric: model.Metric{"__name__": "up"}, 410 Value: 0, 411 }, 412 { 413 Metric: model.Metric{"__name__": "scrape_duration_seconds"}, 414 Value: 42, 415 }, 416 { 417 Metric: model.Metric{"__name__": "scrape_samples_scraped"}, 418 Value: 2, 419 }, 420 { 421 Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"}, 422 Value: 2, 423 }, 424 }, 425 expectedPostRelabelSamplesCount: 2, 426 }, 427 } 428 429 for i, test := range testCases { 430 ingestedSamples := &bufferAppender{buffer: model.Samples{}} 431 432 target := newTestTarget("example.com:80", 10*time.Millisecond, nil) 433 434 scraper := &testScraper{} 435 sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, target.Labels(), test.scrapeConfig).(*scrapeLoop) 436 num, err := sl.append(test.scrapedSamples) 437 sl.report(time.Unix(0, 0), 42*time.Second, len(test.scrapedSamples), num, err) 438 reportedSamples := ingestedSamples.buffer 439 if err == nil { 440 reportedSamples = reportedSamples[num:] 441 } 442 443 if !reflect.DeepEqual(reportedSamples, test.expectedReportedSamples) { 444 t.Errorf("Reported samples did not match expected metrics for case %d", i) 445 t.Errorf("Expected: %v", test.expectedReportedSamples) 446 t.Fatalf("Got: %v", reportedSamples) 447 } 448 if test.expectedPostRelabelSamplesCount != num { 449 t.Fatalf("Case %d: Ingested samples %d did not match expected value %d", i, num, test.expectedPostRelabelSamplesCount) 450 } 451 } 452 453} 454 455func TestScrapeLoopStop(t *testing.T) { 456 scraper := &testScraper{} 457 sl := newScrapeLoop(context.Background(), scraper, nil, nil, &config.ScrapeConfig{}) 458 459 // The scrape pool synchronizes on stopping scrape loops. However, new scrape 460 // loops are started asynchronously. Thus it's possible, that a loop is stopped 461 // again before having started properly. 462 // Stopping not-yet-started loops must block until the run method was called and exited. 463 // The run method must exit immediately. 464 465 stopDone := make(chan struct{}) 466 go func() { 467 sl.stop() 468 close(stopDone) 469 }() 470 471 select { 472 case <-stopDone: 473 t.Fatalf("Stopping terminated before run exited successfully") 474 case <-time.After(500 * time.Millisecond): 475 } 476 477 // Running the scrape loop must exit before calling the scraper even once. 478 scraper.scrapeFunc = func(context.Context, time.Time) (model.Samples, error) { 479 t.Fatalf("scraper was called for terminated scrape loop") 480 return nil, nil 481 } 482 483 runDone := make(chan struct{}) 484 go func() { 485 sl.run(1, 0, nil) 486 close(runDone) 487 }() 488 489 select { 490 case <-runDone: 491 case <-time.After(1 * time.Second): 492 t.Fatalf("Running terminated scrape loop did not exit") 493 } 494 495 select { 496 case <-stopDone: 497 case <-time.After(1 * time.Second): 498 t.Fatalf("Stopping did not terminate after running exited") 499 } 500} 501 502func TestScrapeLoopRun(t *testing.T) { 503 var ( 504 signal = make(chan struct{}) 505 errc = make(chan error) 506 507 scraper = &testScraper{} 508 app = &nopAppender{} 509 ) 510 defer close(signal) 511 512 ctx, cancel := context.WithCancel(context.Background()) 513 sl := newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{}) 514 515 // The loop must terminate during the initial offset if the context 516 // is canceled. 517 scraper.offsetDur = time.Hour 518 519 go func() { 520 sl.run(time.Second, time.Hour, errc) 521 signal <- struct{}{} 522 }() 523 524 // Wait to make sure we are actually waiting on the offset. 525 time.Sleep(1 * time.Second) 526 527 cancel() 528 select { 529 case <-signal: 530 case <-time.After(5 * time.Second): 531 t.Fatalf("Cancelation during initial offset failed") 532 case err := <-errc: 533 t.Fatalf("Unexpected error: %s", err) 534 } 535 536 // The provided timeout must cause cancelation of the context passed down to the 537 // scraper. The scraper has to respect the context. 538 scraper.offsetDur = 0 539 540 block := make(chan struct{}) 541 scraper.scrapeFunc = func(ctx context.Context, ts time.Time) (model.Samples, error) { 542 select { 543 case <-block: 544 case <-ctx.Done(): 545 return nil, ctx.Err() 546 } 547 return nil, nil 548 } 549 550 ctx, cancel = context.WithCancel(context.Background()) 551 sl = newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{}) 552 553 go func() { 554 sl.run(time.Second, 100*time.Millisecond, errc) 555 signal <- struct{}{} 556 }() 557 558 select { 559 case err := <-errc: 560 if err != context.DeadlineExceeded { 561 t.Fatalf("Expected timeout error but got: %s", err) 562 } 563 case <-time.After(3 * time.Second): 564 t.Fatalf("Expected timeout error but got none") 565 } 566 567 // We already caught the timeout error and are certainly in the loop. 568 // Let the scrapes returns immediately to cause no further timeout errors 569 // and check whether canceling the parent context terminates the loop. 570 close(block) 571 cancel() 572 573 select { 574 case <-signal: 575 // Loop terminated as expected. 576 case err := <-errc: 577 t.Fatalf("Unexpected error: %s", err) 578 case <-time.After(3 * time.Second): 579 t.Fatalf("Loop did not terminate on context cancelation") 580 } 581} 582 583func TestTargetScraperScrapeOK(t *testing.T) { 584 const ( 585 configTimeout = 1500 * time.Millisecond 586 expectedTimeout = "1.500000" 587 ) 588 589 server := httptest.NewServer( 590 http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 591 timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds") 592 if timeout != expectedTimeout { 593 t.Errorf("Scrape timeout did not match expected timeout") 594 t.Errorf("Expected: %v", expectedTimeout) 595 t.Fatalf("Got: %v", timeout) 596 } 597 598 w.Header().Set("Content-Type", `text/plain; version=0.0.4`) 599 w.Write([]byte("metric_a 1\nmetric_b 2\n")) 600 }), 601 ) 602 defer server.Close() 603 604 serverURL, err := url.Parse(server.URL) 605 if err != nil { 606 panic(err) 607 } 608 609 ts := &targetScraper{ 610 Target: &Target{ 611 labels: model.LabelSet{ 612 model.SchemeLabel: model.LabelValue(serverURL.Scheme), 613 model.AddressLabel: model.LabelValue(serverURL.Host), 614 }, 615 }, 616 client: http.DefaultClient, 617 timeout: configTimeout, 618 } 619 now := time.Now() 620 621 samples, err := ts.scrape(context.Background(), now) 622 if err != nil { 623 t.Fatalf("Unexpected scrape error: %s", err) 624 } 625 626 expectedSamples := model.Samples{ 627 { 628 Metric: model.Metric{"__name__": "metric_a"}, 629 Timestamp: model.TimeFromUnixNano(now.UnixNano()), 630 Value: 1, 631 }, 632 { 633 Metric: model.Metric{"__name__": "metric_b"}, 634 Timestamp: model.TimeFromUnixNano(now.UnixNano()), 635 Value: 2, 636 }, 637 } 638 sort.Sort(expectedSamples) 639 sort.Sort(samples) 640 641 if !reflect.DeepEqual(samples, expectedSamples) { 642 t.Errorf("Scraped samples did not match served metrics") 643 t.Errorf("Expected: %v", expectedSamples) 644 t.Fatalf("Got: %v", samples) 645 } 646} 647 648func TestTargetScrapeScrapeCancel(t *testing.T) { 649 block := make(chan struct{}) 650 651 server := httptest.NewServer( 652 http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 653 <-block 654 }), 655 ) 656 defer server.Close() 657 658 serverURL, err := url.Parse(server.URL) 659 if err != nil { 660 panic(err) 661 } 662 663 ts := &targetScraper{ 664 Target: &Target{ 665 labels: model.LabelSet{ 666 model.SchemeLabel: model.LabelValue(serverURL.Scheme), 667 model.AddressLabel: model.LabelValue(serverURL.Host), 668 }, 669 }, 670 client: http.DefaultClient, 671 } 672 ctx, cancel := context.WithCancel(context.Background()) 673 674 errc := make(chan error) 675 676 go func() { 677 time.Sleep(1 * time.Second) 678 cancel() 679 }() 680 681 go func() { 682 if _, err := ts.scrape(ctx, time.Now()); err != context.Canceled { 683 errc <- fmt.Errorf("Expected context cancelation error but got: %s", err) 684 } 685 close(errc) 686 }() 687 688 select { 689 case <-time.After(5 * time.Second): 690 t.Fatalf("Scrape function did not return unexpectedly") 691 case err := <-errc: 692 if err != nil { 693 t.Fatalf(err.Error()) 694 } 695 } 696 // If this is closed in a defer above the function the test server 697 // does not terminate and the test doens't complete. 698 close(block) 699} 700 701func TestTargetScrapeScrapeNotFound(t *testing.T) { 702 server := httptest.NewServer( 703 http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 704 w.WriteHeader(http.StatusNotFound) 705 }), 706 ) 707 defer server.Close() 708 709 serverURL, err := url.Parse(server.URL) 710 if err != nil { 711 panic(err) 712 } 713 714 ts := &targetScraper{ 715 Target: &Target{ 716 labels: model.LabelSet{ 717 model.SchemeLabel: model.LabelValue(serverURL.Scheme), 718 model.AddressLabel: model.LabelValue(serverURL.Host), 719 }, 720 }, 721 client: http.DefaultClient, 722 } 723 724 if _, err := ts.scrape(context.Background(), time.Now()); !strings.Contains(err.Error(), "404") { 725 t.Fatalf("Expected \"404 NotFound\" error but got: %s", err) 726 } 727} 728 729// testScraper implements the scraper interface and allows setting values 730// returned by its methods. It also allows setting a custom scrape function. 731type testScraper struct { 732 offsetDur time.Duration 733 734 lastStart time.Time 735 lastDuration time.Duration 736 lastError error 737 738 samples model.Samples 739 scrapeErr error 740 scrapeFunc func(context.Context, time.Time) (model.Samples, error) 741} 742 743func (ts *testScraper) offset(interval time.Duration) time.Duration { 744 return ts.offsetDur 745} 746 747func (ts *testScraper) report(start time.Time, duration time.Duration, err error) { 748 ts.lastStart = start 749 ts.lastDuration = duration 750 ts.lastError = err 751} 752 753func (ts *testScraper) scrape(ctx context.Context, t time.Time) (model.Samples, error) { 754 if ts.scrapeFunc != nil { 755 return ts.scrapeFunc(ctx, t) 756 } 757 return ts.samples, ts.scrapeErr 758} 759