1// Copyright 2016 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package scrape 15 16import ( 17 "bufio" 18 "bytes" 19 "compress/gzip" 20 "context" 21 "fmt" 22 "io" 23 "io/ioutil" 24 "math" 25 "net/http" 26 "reflect" 27 "strconv" 28 "sync" 29 "time" 30 "unsafe" 31 32 "github.com/go-kit/log" 33 "github.com/go-kit/log/level" 34 "github.com/pkg/errors" 35 "github.com/prometheus/client_golang/prometheus" 36 config_util "github.com/prometheus/common/config" 37 "github.com/prometheus/common/model" 38 "github.com/prometheus/common/version" 39 40 "github.com/prometheus/prometheus/config" 41 "github.com/prometheus/prometheus/discovery/targetgroup" 42 "github.com/prometheus/prometheus/pkg/exemplar" 43 "github.com/prometheus/prometheus/pkg/labels" 44 "github.com/prometheus/prometheus/pkg/pool" 45 "github.com/prometheus/prometheus/pkg/relabel" 46 "github.com/prometheus/prometheus/pkg/textparse" 47 "github.com/prometheus/prometheus/pkg/timestamp" 48 "github.com/prometheus/prometheus/pkg/value" 49 "github.com/prometheus/prometheus/storage" 50) 51 52// ScrapeTimestampTolerance is the tolerance for scrape appends timestamps 53// alignment, to enable better compression at the TSDB level. 54// See https://github.com/prometheus/prometheus/issues/7846 55var ScrapeTimestampTolerance = 2 * time.Millisecond 56 57// AlignScrapeTimestamps enables the tolerance for scrape appends timestamps described above. 58var AlignScrapeTimestamps = true 59 60var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName) 61 62var ( 63 targetIntervalLength = prometheus.NewSummaryVec( 64 prometheus.SummaryOpts{ 65 Name: "prometheus_target_interval_length_seconds", 66 Help: "Actual intervals between scrapes.", 67 Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, 68 }, 69 []string{"interval"}, 70 ) 71 targetReloadIntervalLength = prometheus.NewSummaryVec( 72 prometheus.SummaryOpts{ 73 Name: "prometheus_target_reload_length_seconds", 74 Help: "Actual interval to reload the scrape pool with a given configuration.", 75 Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, 76 }, 77 []string{"interval"}, 78 ) 79 targetScrapePools = prometheus.NewCounter( 80 prometheus.CounterOpts{ 81 Name: "prometheus_target_scrape_pools_total", 82 Help: "Total number of scrape pool creation attempts.", 83 }, 84 ) 85 targetScrapePoolsFailed = prometheus.NewCounter( 86 prometheus.CounterOpts{ 87 Name: "prometheus_target_scrape_pools_failed_total", 88 Help: "Total number of scrape pool creations that failed.", 89 }, 90 ) 91 targetScrapePoolReloads = prometheus.NewCounter( 92 prometheus.CounterOpts{ 93 Name: "prometheus_target_scrape_pool_reloads_total", 94 Help: "Total number of scrape pool reloads.", 95 }, 96 ) 97 targetScrapePoolReloadsFailed = prometheus.NewCounter( 98 prometheus.CounterOpts{ 99 Name: "prometheus_target_scrape_pool_reloads_failed_total", 100 Help: "Total number of failed scrape pool reloads.", 101 }, 102 ) 103 targetScrapePoolExceededTargetLimit = prometheus.NewCounter( 104 prometheus.CounterOpts{ 105 Name: "prometheus_target_scrape_pool_exceeded_target_limit_total", 106 Help: "Total number of times scrape pools hit the target limit, during sync or config reload.", 107 }, 108 ) 109 targetScrapePoolTargetLimit = prometheus.NewGaugeVec( 110 prometheus.GaugeOpts{ 111 Name: "prometheus_target_scrape_pool_target_limit", 112 Help: "Maximum number of targets allowed in this scrape pool.", 113 }, 114 []string{"scrape_job"}, 115 ) 116 targetScrapePoolTargetsAdded = prometheus.NewGaugeVec( 117 prometheus.GaugeOpts{ 118 Name: "prometheus_target_scrape_pool_targets", 119 Help: "Current number of targets in this scrape pool.", 120 }, 121 []string{"scrape_job"}, 122 ) 123 targetSyncIntervalLength = prometheus.NewSummaryVec( 124 prometheus.SummaryOpts{ 125 Name: "prometheus_target_sync_length_seconds", 126 Help: "Actual interval to sync the scrape pool.", 127 Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, 128 }, 129 []string{"scrape_job"}, 130 ) 131 targetScrapePoolSyncsCounter = prometheus.NewCounterVec( 132 prometheus.CounterOpts{ 133 Name: "prometheus_target_scrape_pool_sync_total", 134 Help: "Total number of syncs that were executed on a scrape pool.", 135 }, 136 []string{"scrape_job"}, 137 ) 138 targetScrapeExceededBodySizeLimit = prometheus.NewCounter( 139 prometheus.CounterOpts{ 140 Name: "prometheus_target_scrapes_exceeded_body_size_limit_total", 141 Help: "Total number of scrapes that hit the body size limit", 142 }, 143 ) 144 targetScrapeSampleLimit = prometheus.NewCounter( 145 prometheus.CounterOpts{ 146 Name: "prometheus_target_scrapes_exceeded_sample_limit_total", 147 Help: "Total number of scrapes that hit the sample limit and were rejected.", 148 }, 149 ) 150 targetScrapeSampleDuplicate = prometheus.NewCounter( 151 prometheus.CounterOpts{ 152 Name: "prometheus_target_scrapes_sample_duplicate_timestamp_total", 153 Help: "Total number of samples rejected due to duplicate timestamps but different values.", 154 }, 155 ) 156 targetScrapeSampleOutOfOrder = prometheus.NewCounter( 157 prometheus.CounterOpts{ 158 Name: "prometheus_target_scrapes_sample_out_of_order_total", 159 Help: "Total number of samples rejected due to not being out of the expected order.", 160 }, 161 ) 162 targetScrapeSampleOutOfBounds = prometheus.NewCounter( 163 prometheus.CounterOpts{ 164 Name: "prometheus_target_scrapes_sample_out_of_bounds_total", 165 Help: "Total number of samples rejected due to timestamp falling outside of the time bounds.", 166 }, 167 ) 168 targetScrapeCacheFlushForced = prometheus.NewCounter( 169 prometheus.CounterOpts{ 170 Name: "prometheus_target_scrapes_cache_flush_forced_total", 171 Help: "How many times a scrape cache was flushed due to getting big while scrapes are failing.", 172 }, 173 ) 174 targetScrapeExemplarOutOfOrder = prometheus.NewCounter( 175 prometheus.CounterOpts{ 176 Name: "prometheus_target_scrapes_exemplar_out_of_order_total", 177 Help: "Total number of exemplar rejected due to not being out of the expected order.", 178 }, 179 ) 180 targetScrapePoolExceededLabelLimits = prometheus.NewCounter( 181 prometheus.CounterOpts{ 182 Name: "prometheus_target_scrape_pool_exceeded_label_limits_total", 183 Help: "Total number of times scrape pools hit the label limits, during sync or config reload.", 184 }, 185 ) 186 targetSyncFailed = prometheus.NewCounterVec( 187 prometheus.CounterOpts{ 188 Name: "prometheus_target_sync_failed_total", 189 Help: "Total number of target sync failures.", 190 }, 191 []string{"scrape_job"}, 192 ) 193) 194 195func init() { 196 prometheus.MustRegister( 197 targetIntervalLength, 198 targetReloadIntervalLength, 199 targetScrapePools, 200 targetScrapePoolsFailed, 201 targetScrapePoolReloads, 202 targetScrapePoolReloadsFailed, 203 targetSyncIntervalLength, 204 targetScrapePoolSyncsCounter, 205 targetScrapeExceededBodySizeLimit, 206 targetScrapeSampleLimit, 207 targetScrapeSampleDuplicate, 208 targetScrapeSampleOutOfOrder, 209 targetScrapeSampleOutOfBounds, 210 targetScrapePoolExceededTargetLimit, 211 targetScrapePoolTargetLimit, 212 targetScrapePoolTargetsAdded, 213 targetScrapeCacheFlushForced, 214 targetMetadataCache, 215 targetScrapeExemplarOutOfOrder, 216 targetScrapePoolExceededLabelLimits, 217 targetSyncFailed, 218 ) 219} 220 221// scrapePool manages scrapes for sets of targets. 222type scrapePool struct { 223 appendable storage.Appendable 224 logger log.Logger 225 cancel context.CancelFunc 226 227 // mtx must not be taken after targetMtx. 228 mtx sync.Mutex 229 config *config.ScrapeConfig 230 client *http.Client 231 loops map[uint64]loop 232 233 targetMtx sync.Mutex 234 // activeTargets and loops must always be synchronized to have the same 235 // set of hashes. 236 activeTargets map[uint64]*Target 237 droppedTargets []*Target 238 239 // Constructor for new scrape loops. This is settable for testing convenience. 240 newLoop func(scrapeLoopOptions) loop 241} 242 243type labelLimits struct { 244 labelLimit int 245 labelNameLengthLimit int 246 labelValueLengthLimit int 247} 248 249type scrapeLoopOptions struct { 250 target *Target 251 scraper scraper 252 sampleLimit int 253 labelLimits *labelLimits 254 honorLabels bool 255 honorTimestamps bool 256 interval time.Duration 257 timeout time.Duration 258 mrc []*relabel.Config 259 cache *scrapeCache 260} 261 262const maxAheadTime = 10 * time.Minute 263 264type labelsMutator func(labels.Labels) labels.Labels 265 266func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportScrapeTimeout bool) (*scrapePool, error) { 267 targetScrapePools.Inc() 268 if logger == nil { 269 logger = log.NewNopLogger() 270 } 271 272 client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, config_util.WithHTTP2Disabled()) 273 if err != nil { 274 targetScrapePoolsFailed.Inc() 275 return nil, errors.Wrap(err, "error creating HTTP client") 276 } 277 278 buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) 279 280 ctx, cancel := context.WithCancel(context.Background()) 281 sp := &scrapePool{ 282 cancel: cancel, 283 appendable: app, 284 config: cfg, 285 client: client, 286 activeTargets: map[uint64]*Target{}, 287 loops: map[uint64]loop{}, 288 logger: logger, 289 } 290 sp.newLoop = func(opts scrapeLoopOptions) loop { 291 // Update the targets retrieval function for metadata to a new scrape cache. 292 cache := opts.cache 293 if cache == nil { 294 cache = newScrapeCache() 295 } 296 opts.target.SetMetadataStore(cache) 297 298 return newScrapeLoop( 299 ctx, 300 opts.scraper, 301 log.With(logger, "target", opts.target), 302 buffers, 303 func(l labels.Labels) labels.Labels { 304 return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc) 305 }, 306 func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) }, 307 func(ctx context.Context) storage.Appender { return appender(app.Appender(ctx), opts.sampleLimit) }, 308 cache, 309 jitterSeed, 310 opts.honorTimestamps, 311 opts.sampleLimit, 312 opts.labelLimits, 313 opts.interval, 314 opts.timeout, 315 reportScrapeTimeout, 316 ) 317 } 318 319 return sp, nil 320} 321 322func (sp *scrapePool) ActiveTargets() []*Target { 323 sp.targetMtx.Lock() 324 defer sp.targetMtx.Unlock() 325 326 var tActive []*Target 327 for _, t := range sp.activeTargets { 328 tActive = append(tActive, t) 329 } 330 return tActive 331} 332 333func (sp *scrapePool) DroppedTargets() []*Target { 334 sp.targetMtx.Lock() 335 defer sp.targetMtx.Unlock() 336 return sp.droppedTargets 337} 338 339// stop terminates all scrape loops and returns after they all terminated. 340func (sp *scrapePool) stop() { 341 sp.mtx.Lock() 342 defer sp.mtx.Unlock() 343 sp.cancel() 344 var wg sync.WaitGroup 345 346 sp.targetMtx.Lock() 347 348 for fp, l := range sp.loops { 349 wg.Add(1) 350 351 go func(l loop) { 352 l.stop() 353 wg.Done() 354 }(l) 355 356 delete(sp.loops, fp) 357 delete(sp.activeTargets, fp) 358 } 359 360 sp.targetMtx.Unlock() 361 362 wg.Wait() 363 sp.client.CloseIdleConnections() 364 365 if sp.config != nil { 366 targetScrapePoolSyncsCounter.DeleteLabelValues(sp.config.JobName) 367 targetScrapePoolTargetLimit.DeleteLabelValues(sp.config.JobName) 368 targetScrapePoolTargetsAdded.DeleteLabelValues(sp.config.JobName) 369 targetSyncIntervalLength.DeleteLabelValues(sp.config.JobName) 370 targetSyncFailed.DeleteLabelValues(sp.config.JobName) 371 } 372} 373 374// reload the scrape pool with the given scrape configuration. The target state is preserved 375// but all scrape loops are restarted with the new scrape configuration. 376// This method returns after all scrape loops that were stopped have stopped scraping. 377func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { 378 sp.mtx.Lock() 379 defer sp.mtx.Unlock() 380 targetScrapePoolReloads.Inc() 381 start := time.Now() 382 383 client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, config_util.WithHTTP2Disabled()) 384 if err != nil { 385 targetScrapePoolReloadsFailed.Inc() 386 return errors.Wrap(err, "error creating HTTP client") 387 } 388 389 reuseCache := reusableCache(sp.config, cfg) 390 sp.config = cfg 391 oldClient := sp.client 392 sp.client = client 393 394 targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit)) 395 396 var ( 397 wg sync.WaitGroup 398 interval = time.Duration(sp.config.ScrapeInterval) 399 timeout = time.Duration(sp.config.ScrapeTimeout) 400 bodySizeLimit = int64(sp.config.BodySizeLimit) 401 sampleLimit = int(sp.config.SampleLimit) 402 labelLimits = &labelLimits{ 403 labelLimit: int(sp.config.LabelLimit), 404 labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), 405 labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), 406 } 407 honorLabels = sp.config.HonorLabels 408 honorTimestamps = sp.config.HonorTimestamps 409 mrc = sp.config.MetricRelabelConfigs 410 ) 411 412 sp.targetMtx.Lock() 413 414 forcedErr := sp.refreshTargetLimitErr() 415 for fp, oldLoop := range sp.loops { 416 var cache *scrapeCache 417 if oc := oldLoop.getCache(); reuseCache && oc != nil { 418 oldLoop.disableEndOfRunStalenessMarkers() 419 cache = oc 420 } else { 421 cache = newScrapeCache() 422 } 423 424 var ( 425 t = sp.activeTargets[fp] 426 s = &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit} 427 newLoop = sp.newLoop(scrapeLoopOptions{ 428 target: t, 429 scraper: s, 430 sampleLimit: sampleLimit, 431 labelLimits: labelLimits, 432 honorLabels: honorLabels, 433 honorTimestamps: honorTimestamps, 434 mrc: mrc, 435 cache: cache, 436 interval: interval, 437 timeout: timeout, 438 }) 439 ) 440 wg.Add(1) 441 442 go func(oldLoop, newLoop loop) { 443 oldLoop.stop() 444 wg.Done() 445 446 newLoop.setForcedError(forcedErr) 447 newLoop.run(nil) 448 }(oldLoop, newLoop) 449 450 sp.loops[fp] = newLoop 451 } 452 453 sp.targetMtx.Unlock() 454 455 wg.Wait() 456 oldClient.CloseIdleConnections() 457 targetReloadIntervalLength.WithLabelValues(interval.String()).Observe( 458 time.Since(start).Seconds(), 459 ) 460 return nil 461} 462 463// Sync converts target groups into actual scrape targets and synchronizes 464// the currently running scraper with the resulting set and returns all scraped and dropped targets. 465func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { 466 sp.mtx.Lock() 467 defer sp.mtx.Unlock() 468 start := time.Now() 469 470 sp.targetMtx.Lock() 471 var all []*Target 472 sp.droppedTargets = []*Target{} 473 for _, tg := range tgs { 474 targets, failures := targetsFromGroup(tg, sp.config) 475 for _, err := range failures { 476 level.Error(sp.logger).Log("msg", "Creating target failed", "err", err) 477 } 478 targetSyncFailed.WithLabelValues(sp.config.JobName).Add(float64(len(failures))) 479 for _, t := range targets { 480 if t.Labels().Len() > 0 { 481 all = append(all, t) 482 } else if t.DiscoveredLabels().Len() > 0 { 483 sp.droppedTargets = append(sp.droppedTargets, t) 484 } 485 } 486 } 487 sp.targetMtx.Unlock() 488 sp.sync(all) 489 490 targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( 491 time.Since(start).Seconds(), 492 ) 493 targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() 494} 495 496// sync takes a list of potentially duplicated targets, deduplicates them, starts 497// scrape loops for new targets, and stops scrape loops for disappeared targets. 498// It returns after all stopped scrape loops terminated. 499func (sp *scrapePool) sync(targets []*Target) { 500 var ( 501 uniqueLoops = make(map[uint64]loop) 502 interval = time.Duration(sp.config.ScrapeInterval) 503 timeout = time.Duration(sp.config.ScrapeTimeout) 504 bodySizeLimit = int64(sp.config.BodySizeLimit) 505 sampleLimit = int(sp.config.SampleLimit) 506 labelLimits = &labelLimits{ 507 labelLimit: int(sp.config.LabelLimit), 508 labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), 509 labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), 510 } 511 honorLabels = sp.config.HonorLabels 512 honorTimestamps = sp.config.HonorTimestamps 513 mrc = sp.config.MetricRelabelConfigs 514 ) 515 516 sp.targetMtx.Lock() 517 for _, t := range targets { 518 hash := t.hash() 519 520 if _, ok := sp.activeTargets[hash]; !ok { 521 // The scrape interval and timeout labels are set to the config's values initially, 522 // so whether changed via relabeling or not, they'll exist and hold the correct values 523 // for every target. 524 var err error 525 interval, timeout, err = t.intervalAndTimeout(interval, timeout) 526 527 s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit} 528 l := sp.newLoop(scrapeLoopOptions{ 529 target: t, 530 scraper: s, 531 sampleLimit: sampleLimit, 532 labelLimits: labelLimits, 533 honorLabels: honorLabels, 534 honorTimestamps: honorTimestamps, 535 mrc: mrc, 536 interval: interval, 537 timeout: timeout, 538 }) 539 if err != nil { 540 l.setForcedError(err) 541 } 542 543 sp.activeTargets[hash] = t 544 sp.loops[hash] = l 545 546 uniqueLoops[hash] = l 547 } else { 548 // This might be a duplicated target. 549 if _, ok := uniqueLoops[hash]; !ok { 550 uniqueLoops[hash] = nil 551 } 552 // Need to keep the most updated labels information 553 // for displaying it in the Service Discovery web page. 554 sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels()) 555 } 556 } 557 558 var wg sync.WaitGroup 559 560 // Stop and remove old targets and scraper loops. 561 for hash := range sp.activeTargets { 562 if _, ok := uniqueLoops[hash]; !ok { 563 wg.Add(1) 564 go func(l loop) { 565 l.stop() 566 wg.Done() 567 }(sp.loops[hash]) 568 569 delete(sp.loops, hash) 570 delete(sp.activeTargets, hash) 571 } 572 } 573 574 sp.targetMtx.Unlock() 575 576 targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops))) 577 forcedErr := sp.refreshTargetLimitErr() 578 for _, l := range sp.loops { 579 l.setForcedError(forcedErr) 580 } 581 for _, l := range uniqueLoops { 582 if l != nil { 583 go l.run(nil) 584 } 585 } 586 // Wait for all potentially stopped scrapers to terminate. 587 // This covers the case of flapping targets. If the server is under high load, a new scraper 588 // may be active and tries to insert. The old scraper that didn't terminate yet could still 589 // be inserting a previous sample set. 590 wg.Wait() 591} 592 593// refreshTargetLimitErr returns an error that can be passed to the scrape loops 594// if the number of targets exceeds the configured limit. 595func (sp *scrapePool) refreshTargetLimitErr() error { 596 if sp.config == nil || sp.config.TargetLimit == 0 { 597 return nil 598 } 599 if l := len(sp.activeTargets); l > int(sp.config.TargetLimit) { 600 targetScrapePoolExceededTargetLimit.Inc() 601 return fmt.Errorf("target_limit exceeded (number of targets: %d, limit: %d)", l, sp.config.TargetLimit) 602 } 603 return nil 604} 605 606func verifyLabelLimits(lset labels.Labels, limits *labelLimits) error { 607 if limits == nil { 608 return nil 609 } 610 611 met := lset.Get(labels.MetricName) 612 if limits.labelLimit > 0 { 613 nbLabels := len(lset) 614 if nbLabels > int(limits.labelLimit) { 615 return fmt.Errorf("label_limit exceeded (metric: %.50s, number of label: %d, limit: %d)", met, nbLabels, limits.labelLimit) 616 } 617 } 618 619 if limits.labelNameLengthLimit == 0 && limits.labelValueLengthLimit == 0 { 620 return nil 621 } 622 623 for _, l := range lset { 624 if limits.labelNameLengthLimit > 0 { 625 nameLength := len(l.Name) 626 if nameLength > int(limits.labelNameLengthLimit) { 627 return fmt.Errorf("label_name_length_limit exceeded (metric: %.50s, label: %.50v, name length: %d, limit: %d)", met, l, nameLength, limits.labelNameLengthLimit) 628 } 629 } 630 631 if limits.labelValueLengthLimit > 0 { 632 valueLength := len(l.Value) 633 if valueLength > int(limits.labelValueLengthLimit) { 634 return fmt.Errorf("label_value_length_limit exceeded (metric: %.50s, label: %.50v, value length: %d, limit: %d)", met, l, valueLength, limits.labelValueLengthLimit) 635 } 636 } 637 } 638 return nil 639} 640 641func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*relabel.Config) labels.Labels { 642 lb := labels.NewBuilder(lset) 643 644 if honor { 645 for _, l := range target.Labels() { 646 if !lset.Has(l.Name) { 647 lb.Set(l.Name, l.Value) 648 } 649 } 650 } else { 651 for _, l := range target.Labels() { 652 // existingValue will be empty if l.Name doesn't exist. 653 existingValue := lset.Get(l.Name) 654 if existingValue != "" { 655 lb.Set(model.ExportedLabelPrefix+l.Name, existingValue) 656 } 657 // It is now safe to set the target label. 658 lb.Set(l.Name, l.Value) 659 } 660 } 661 662 res := lb.Labels() 663 664 if len(rc) > 0 { 665 res = relabel.Process(res, rc...) 666 } 667 668 return res 669} 670 671func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels { 672 lb := labels.NewBuilder(lset) 673 674 for _, l := range target.Labels() { 675 lb.Set(model.ExportedLabelPrefix+l.Name, lset.Get(l.Name)) 676 lb.Set(l.Name, l.Value) 677 } 678 679 return lb.Labels() 680} 681 682// appender returns an appender for ingested samples from the target. 683func appender(app storage.Appender, limit int) storage.Appender { 684 app = &timeLimitAppender{ 685 Appender: app, 686 maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), 687 } 688 689 // The limit is applied after metrics are potentially dropped via relabeling. 690 if limit > 0 { 691 app = &limitAppender{ 692 Appender: app, 693 limit: limit, 694 } 695 } 696 return app 697} 698 699// A scraper retrieves samples and accepts a status report at the end. 700type scraper interface { 701 scrape(ctx context.Context, w io.Writer) (string, error) 702 Report(start time.Time, dur time.Duration, err error) 703 offset(interval time.Duration, jitterSeed uint64) time.Duration 704} 705 706// targetScraper implements the scraper interface for a target. 707type targetScraper struct { 708 *Target 709 710 client *http.Client 711 req *http.Request 712 timeout time.Duration 713 714 gzipr *gzip.Reader 715 buf *bufio.Reader 716 717 bodySizeLimit int64 718} 719 720var errBodySizeLimit = errors.New("body size limit exceeded") 721 722const acceptHeader = `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1` 723 724var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version) 725 726func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) { 727 if s.req == nil { 728 req, err := http.NewRequest("GET", s.URL().String(), nil) 729 if err != nil { 730 return "", err 731 } 732 req.Header.Add("Accept", acceptHeader) 733 req.Header.Add("Accept-Encoding", "gzip") 734 req.Header.Set("User-Agent", userAgentHeader) 735 req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(s.timeout.Seconds(), 'f', -1, 64)) 736 737 s.req = req 738 } 739 740 resp, err := s.client.Do(s.req.WithContext(ctx)) 741 if err != nil { 742 return "", err 743 } 744 defer func() { 745 io.Copy(ioutil.Discard, resp.Body) 746 resp.Body.Close() 747 }() 748 749 if resp.StatusCode != http.StatusOK { 750 return "", errors.Errorf("server returned HTTP status %s", resp.Status) 751 } 752 753 if s.bodySizeLimit <= 0 { 754 s.bodySizeLimit = math.MaxInt64 755 } 756 if resp.Header.Get("Content-Encoding") != "gzip" { 757 n, err := io.Copy(w, io.LimitReader(resp.Body, s.bodySizeLimit)) 758 if err != nil { 759 return "", err 760 } 761 if n >= s.bodySizeLimit { 762 targetScrapeExceededBodySizeLimit.Inc() 763 return "", errBodySizeLimit 764 } 765 return resp.Header.Get("Content-Type"), nil 766 } 767 768 if s.gzipr == nil { 769 s.buf = bufio.NewReader(resp.Body) 770 s.gzipr, err = gzip.NewReader(s.buf) 771 if err != nil { 772 return "", err 773 } 774 } else { 775 s.buf.Reset(resp.Body) 776 if err = s.gzipr.Reset(s.buf); err != nil { 777 return "", err 778 } 779 } 780 781 n, err := io.Copy(w, io.LimitReader(s.gzipr, s.bodySizeLimit)) 782 s.gzipr.Close() 783 if err != nil { 784 return "", err 785 } 786 if n >= s.bodySizeLimit { 787 targetScrapeExceededBodySizeLimit.Inc() 788 return "", errBodySizeLimit 789 } 790 return resp.Header.Get("Content-Type"), nil 791} 792 793// A loop can run and be stopped again. It must not be reused after it was stopped. 794type loop interface { 795 run(errc chan<- error) 796 setForcedError(err error) 797 stop() 798 getCache() *scrapeCache 799 disableEndOfRunStalenessMarkers() 800} 801 802type cacheEntry struct { 803 ref uint64 804 lastIter uint64 805 hash uint64 806 lset labels.Labels 807} 808 809type scrapeLoop struct { 810 scraper scraper 811 l log.Logger 812 cache *scrapeCache 813 lastScrapeSize int 814 buffers *pool.Pool 815 jitterSeed uint64 816 honorTimestamps bool 817 forcedErr error 818 forcedErrMtx sync.Mutex 819 sampleLimit int 820 labelLimits *labelLimits 821 interval time.Duration 822 timeout time.Duration 823 824 appender func(ctx context.Context) storage.Appender 825 sampleMutator labelsMutator 826 reportSampleMutator labelsMutator 827 828 parentCtx context.Context 829 ctx context.Context 830 cancel func() 831 stopped chan struct{} 832 833 disabledEndOfRunStalenessMarkers bool 834 835 reportScrapeTimeout bool 836} 837 838// scrapeCache tracks mappings of exposed metric strings to label sets and 839// storage references. Additionally, it tracks staleness of series between 840// scrapes. 841type scrapeCache struct { 842 iter uint64 // Current scrape iteration. 843 844 // How many series and metadata entries there were at the last success. 845 successfulCount int 846 847 // Parsed string to an entry with information about the actual label set 848 // and its storage reference. 849 series map[string]*cacheEntry 850 851 // Cache of dropped metric strings and their iteration. The iteration must 852 // be a pointer so we can update it without setting a new entry with an unsafe 853 // string in addDropped(). 854 droppedSeries map[string]*uint64 855 856 // seriesCur and seriesPrev store the labels of series that were seen 857 // in the current and previous scrape. 858 // We hold two maps and swap them out to save allocations. 859 seriesCur map[uint64]labels.Labels 860 seriesPrev map[uint64]labels.Labels 861 862 metaMtx sync.Mutex 863 metadata map[string]*metaEntry 864} 865 866// metaEntry holds meta information about a metric. 867type metaEntry struct { 868 lastIter uint64 // Last scrape iteration the entry was observed at. 869 typ textparse.MetricType 870 help string 871 unit string 872} 873 874func (m *metaEntry) size() int { 875 // The attribute lastIter although part of the struct it is not metadata. 876 return len(m.help) + len(m.unit) + len(m.typ) 877} 878 879func newScrapeCache() *scrapeCache { 880 return &scrapeCache{ 881 series: map[string]*cacheEntry{}, 882 droppedSeries: map[string]*uint64{}, 883 seriesCur: map[uint64]labels.Labels{}, 884 seriesPrev: map[uint64]labels.Labels{}, 885 metadata: map[string]*metaEntry{}, 886 } 887} 888 889func (c *scrapeCache) iterDone(flushCache bool) { 890 c.metaMtx.Lock() 891 count := len(c.series) + len(c.droppedSeries) + len(c.metadata) 892 c.metaMtx.Unlock() 893 894 if flushCache { 895 c.successfulCount = count 896 } else if count > c.successfulCount*2+1000 { 897 // If a target had varying labels in scrapes that ultimately failed, 898 // the caches would grow indefinitely. Force a flush when this happens. 899 // We use the heuristic that this is a doubling of the cache size 900 // since the last scrape, and allow an additional 1000 in case 901 // initial scrapes all fail. 902 flushCache = true 903 targetScrapeCacheFlushForced.Inc() 904 } 905 906 if flushCache { 907 // All caches may grow over time through series churn 908 // or multiple string representations of the same metric. Clean up entries 909 // that haven't appeared in the last scrape. 910 for s, e := range c.series { 911 if c.iter != e.lastIter { 912 delete(c.series, s) 913 } 914 } 915 for s, iter := range c.droppedSeries { 916 if c.iter != *iter { 917 delete(c.droppedSeries, s) 918 } 919 } 920 c.metaMtx.Lock() 921 for m, e := range c.metadata { 922 // Keep metadata around for 10 scrapes after its metric disappeared. 923 if c.iter-e.lastIter > 10 { 924 delete(c.metadata, m) 925 } 926 } 927 c.metaMtx.Unlock() 928 929 c.iter++ 930 } 931 932 // Swap current and previous series. 933 c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev 934 935 // We have to delete every single key in the map. 936 for k := range c.seriesCur { 937 delete(c.seriesCur, k) 938 } 939} 940 941func (c *scrapeCache) get(met string) (*cacheEntry, bool) { 942 e, ok := c.series[met] 943 if !ok { 944 return nil, false 945 } 946 e.lastIter = c.iter 947 return e, true 948} 949 950func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) { 951 if ref == 0 { 952 return 953 } 954 c.series[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} 955} 956 957func (c *scrapeCache) addDropped(met string) { 958 iter := c.iter 959 c.droppedSeries[met] = &iter 960} 961 962func (c *scrapeCache) getDropped(met string) bool { 963 iterp, ok := c.droppedSeries[met] 964 if ok { 965 *iterp = c.iter 966 } 967 return ok 968} 969 970func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) { 971 c.seriesCur[hash] = lset 972} 973 974func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { 975 for h, lset := range c.seriesPrev { 976 if _, ok := c.seriesCur[h]; !ok { 977 if !f(lset) { 978 break 979 } 980 } 981 } 982} 983 984func (c *scrapeCache) setType(metric []byte, t textparse.MetricType) { 985 c.metaMtx.Lock() 986 987 e, ok := c.metadata[yoloString(metric)] 988 if !ok { 989 e = &metaEntry{typ: textparse.MetricTypeUnknown} 990 c.metadata[string(metric)] = e 991 } 992 e.typ = t 993 e.lastIter = c.iter 994 995 c.metaMtx.Unlock() 996} 997 998func (c *scrapeCache) setHelp(metric, help []byte) { 999 c.metaMtx.Lock() 1000 1001 e, ok := c.metadata[yoloString(metric)] 1002 if !ok { 1003 e = &metaEntry{typ: textparse.MetricTypeUnknown} 1004 c.metadata[string(metric)] = e 1005 } 1006 if e.help != yoloString(help) { 1007 e.help = string(help) 1008 } 1009 e.lastIter = c.iter 1010 1011 c.metaMtx.Unlock() 1012} 1013 1014func (c *scrapeCache) setUnit(metric, unit []byte) { 1015 c.metaMtx.Lock() 1016 1017 e, ok := c.metadata[yoloString(metric)] 1018 if !ok { 1019 e = &metaEntry{typ: textparse.MetricTypeUnknown} 1020 c.metadata[string(metric)] = e 1021 } 1022 if e.unit != yoloString(unit) { 1023 e.unit = string(unit) 1024 } 1025 e.lastIter = c.iter 1026 1027 c.metaMtx.Unlock() 1028} 1029 1030func (c *scrapeCache) GetMetadata(metric string) (MetricMetadata, bool) { 1031 c.metaMtx.Lock() 1032 defer c.metaMtx.Unlock() 1033 1034 m, ok := c.metadata[metric] 1035 if !ok { 1036 return MetricMetadata{}, false 1037 } 1038 return MetricMetadata{ 1039 Metric: metric, 1040 Type: m.typ, 1041 Help: m.help, 1042 Unit: m.unit, 1043 }, true 1044} 1045 1046func (c *scrapeCache) ListMetadata() []MetricMetadata { 1047 c.metaMtx.Lock() 1048 defer c.metaMtx.Unlock() 1049 1050 res := make([]MetricMetadata, 0, len(c.metadata)) 1051 1052 for m, e := range c.metadata { 1053 res = append(res, MetricMetadata{ 1054 Metric: m, 1055 Type: e.typ, 1056 Help: e.help, 1057 Unit: e.unit, 1058 }) 1059 } 1060 return res 1061} 1062 1063// MetadataSize returns the size of the metadata cache. 1064func (c *scrapeCache) SizeMetadata() (s int) { 1065 c.metaMtx.Lock() 1066 defer c.metaMtx.Unlock() 1067 for _, e := range c.metadata { 1068 s += e.size() 1069 } 1070 1071 return s 1072} 1073 1074// MetadataLen returns the number of metadata entries in the cache. 1075func (c *scrapeCache) LengthMetadata() int { 1076 c.metaMtx.Lock() 1077 defer c.metaMtx.Unlock() 1078 1079 return len(c.metadata) 1080} 1081 1082func newScrapeLoop(ctx context.Context, 1083 sc scraper, 1084 l log.Logger, 1085 buffers *pool.Pool, 1086 sampleMutator labelsMutator, 1087 reportSampleMutator labelsMutator, 1088 appender func(ctx context.Context) storage.Appender, 1089 cache *scrapeCache, 1090 jitterSeed uint64, 1091 honorTimestamps bool, 1092 sampleLimit int, 1093 labelLimits *labelLimits, 1094 interval time.Duration, 1095 timeout time.Duration, 1096 reportScrapeTimeout bool, 1097) *scrapeLoop { 1098 if l == nil { 1099 l = log.NewNopLogger() 1100 } 1101 if buffers == nil { 1102 buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) 1103 } 1104 if cache == nil { 1105 cache = newScrapeCache() 1106 } 1107 sl := &scrapeLoop{ 1108 scraper: sc, 1109 buffers: buffers, 1110 cache: cache, 1111 appender: appender, 1112 sampleMutator: sampleMutator, 1113 reportSampleMutator: reportSampleMutator, 1114 stopped: make(chan struct{}), 1115 jitterSeed: jitterSeed, 1116 l: l, 1117 parentCtx: ctx, 1118 honorTimestamps: honorTimestamps, 1119 sampleLimit: sampleLimit, 1120 labelLimits: labelLimits, 1121 interval: interval, 1122 timeout: timeout, 1123 reportScrapeTimeout: reportScrapeTimeout, 1124 } 1125 sl.ctx, sl.cancel = context.WithCancel(ctx) 1126 1127 return sl 1128} 1129 1130func (sl *scrapeLoop) run(errc chan<- error) { 1131 select { 1132 case <-time.After(sl.scraper.offset(sl.interval, sl.jitterSeed)): 1133 // Continue after a scraping offset. 1134 case <-sl.ctx.Done(): 1135 close(sl.stopped) 1136 return 1137 } 1138 1139 var last time.Time 1140 1141 alignedScrapeTime := time.Now().Round(0) 1142 ticker := time.NewTicker(sl.interval) 1143 defer ticker.Stop() 1144 1145mainLoop: 1146 for { 1147 select { 1148 case <-sl.parentCtx.Done(): 1149 close(sl.stopped) 1150 return 1151 case <-sl.ctx.Done(): 1152 break mainLoop 1153 default: 1154 } 1155 1156 // Temporary workaround for a jitter in go timers that causes disk space 1157 // increase in TSDB. 1158 // See https://github.com/prometheus/prometheus/issues/7846 1159 // Calling Round ensures the time used is the wall clock, as otherwise .Sub 1160 // and .Add on time.Time behave differently (see time package docs). 1161 scrapeTime := time.Now().Round(0) 1162 if AlignScrapeTimestamps && sl.interval > 100*ScrapeTimestampTolerance { 1163 // For some reason, a tick might have been skipped, in which case we 1164 // would call alignedScrapeTime.Add(interval) multiple times. 1165 for scrapeTime.Sub(alignedScrapeTime) >= sl.interval { 1166 alignedScrapeTime = alignedScrapeTime.Add(sl.interval) 1167 } 1168 // Align the scrape time if we are in the tolerance boundaries. 1169 if scrapeTime.Sub(alignedScrapeTime) <= ScrapeTimestampTolerance { 1170 scrapeTime = alignedScrapeTime 1171 } 1172 } 1173 1174 last = sl.scrapeAndReport(sl.interval, sl.timeout, last, scrapeTime, errc) 1175 1176 select { 1177 case <-sl.parentCtx.Done(): 1178 close(sl.stopped) 1179 return 1180 case <-sl.ctx.Done(): 1181 break mainLoop 1182 case <-ticker.C: 1183 } 1184 } 1185 1186 close(sl.stopped) 1187 1188 if !sl.disabledEndOfRunStalenessMarkers { 1189 sl.endOfRunStaleness(last, ticker, sl.interval) 1190 } 1191} 1192 1193// scrapeAndReport performs a scrape and then appends the result to the storage 1194// together with reporting metrics, by using as few appenders as possible. 1195// In the happy scenario, a single appender is used. 1196// This function uses sl.parentCtx instead of sl.ctx on purpose. A scrape should 1197// only be cancelled on shutdown, not on reloads. 1198func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last, appendTime time.Time, errc chan<- error) time.Time { 1199 start := time.Now() 1200 1201 // Only record after the first scrape. 1202 if !last.IsZero() { 1203 targetIntervalLength.WithLabelValues(interval.String()).Observe( 1204 time.Since(last).Seconds(), 1205 ) 1206 } 1207 1208 b := sl.buffers.Get(sl.lastScrapeSize).([]byte) 1209 defer sl.buffers.Put(b) 1210 buf := bytes.NewBuffer(b) 1211 1212 var total, added, seriesAdded int 1213 var err, appErr, scrapeErr error 1214 1215 app := sl.appender(sl.parentCtx) 1216 defer func() { 1217 if err != nil { 1218 app.Rollback() 1219 return 1220 } 1221 err = app.Commit() 1222 if err != nil { 1223 level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err) 1224 } 1225 }() 1226 1227 defer func() { 1228 if err = sl.report(app, appendTime, timeout, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil { 1229 level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err) 1230 } 1231 }() 1232 1233 if forcedErr := sl.getForcedError(); forcedErr != nil { 1234 scrapeErr = forcedErr 1235 // Add stale markers. 1236 if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil { 1237 app.Rollback() 1238 app = sl.appender(sl.parentCtx) 1239 level.Warn(sl.l).Log("msg", "Append failed", "err", err) 1240 } 1241 if errc != nil { 1242 errc <- forcedErr 1243 } 1244 1245 return start 1246 } 1247 1248 var contentType string 1249 scrapeCtx, cancel := context.WithTimeout(sl.parentCtx, timeout) 1250 contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf) 1251 cancel() 1252 1253 if scrapeErr == nil { 1254 b = buf.Bytes() 1255 // NOTE: There were issues with misbehaving clients in the past 1256 // that occasionally returned empty results. We don't want those 1257 // to falsely reset our buffer size. 1258 if len(b) > 0 { 1259 sl.lastScrapeSize = len(b) 1260 } 1261 } else { 1262 level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr) 1263 if errc != nil { 1264 errc <- scrapeErr 1265 } 1266 } 1267 1268 // A failed scrape is the same as an empty scrape, 1269 // we still call sl.append to trigger stale markers. 1270 total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime) 1271 if appErr != nil { 1272 app.Rollback() 1273 app = sl.appender(sl.parentCtx) 1274 level.Debug(sl.l).Log("msg", "Append failed", "err", appErr) 1275 // The append failed, probably due to a parse error or sample limit. 1276 // Call sl.append again with an empty scrape to trigger stale markers. 1277 if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil { 1278 app.Rollback() 1279 app = sl.appender(sl.parentCtx) 1280 level.Warn(sl.l).Log("msg", "Append failed", "err", err) 1281 } 1282 } 1283 1284 if scrapeErr == nil { 1285 scrapeErr = appErr 1286 } 1287 1288 return start 1289} 1290 1291func (sl *scrapeLoop) setForcedError(err error) { 1292 sl.forcedErrMtx.Lock() 1293 defer sl.forcedErrMtx.Unlock() 1294 sl.forcedErr = err 1295} 1296 1297func (sl *scrapeLoop) getForcedError() error { 1298 sl.forcedErrMtx.Lock() 1299 defer sl.forcedErrMtx.Unlock() 1300 return sl.forcedErr 1301} 1302 1303func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) { 1304 // Scraping has stopped. We want to write stale markers but 1305 // the target may be recreated, so we wait just over 2 scrape intervals 1306 // before creating them. 1307 // If the context is canceled, we presume the server is shutting down 1308 // and will restart where is was. We do not attempt to write stale markers 1309 // in this case. 1310 1311 if last.IsZero() { 1312 // There never was a scrape, so there will be no stale markers. 1313 return 1314 } 1315 1316 // Wait for when the next scrape would have been, record its timestamp. 1317 var staleTime time.Time 1318 select { 1319 case <-sl.parentCtx.Done(): 1320 return 1321 case <-ticker.C: 1322 staleTime = time.Now() 1323 } 1324 1325 // Wait for when the next scrape would have been, if the target was recreated 1326 // samples should have been ingested by now. 1327 select { 1328 case <-sl.parentCtx.Done(): 1329 return 1330 case <-ticker.C: 1331 } 1332 1333 // Wait for an extra 10% of the interval, just to be safe. 1334 select { 1335 case <-sl.parentCtx.Done(): 1336 return 1337 case <-time.After(interval / 10): 1338 } 1339 1340 // Call sl.append again with an empty scrape to trigger stale markers. 1341 // If the target has since been recreated and scraped, the 1342 // stale markers will be out of order and ignored. 1343 app := sl.appender(sl.ctx) 1344 var err error 1345 defer func() { 1346 if err != nil { 1347 app.Rollback() 1348 return 1349 } 1350 err = app.Commit() 1351 if err != nil { 1352 level.Warn(sl.l).Log("msg", "Stale commit failed", "err", err) 1353 } 1354 }() 1355 if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil { 1356 app.Rollback() 1357 app = sl.appender(sl.ctx) 1358 level.Warn(sl.l).Log("msg", "Stale append failed", "err", err) 1359 } 1360 if err = sl.reportStale(app, staleTime); err != nil { 1361 level.Warn(sl.l).Log("msg", "Stale report failed", "err", err) 1362 } 1363} 1364 1365// Stop the scraping. May still write data and stale markers after it has 1366// returned. Cancel the context to stop all writes. 1367func (sl *scrapeLoop) stop() { 1368 sl.cancel() 1369 <-sl.stopped 1370} 1371 1372func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() { 1373 sl.disabledEndOfRunStalenessMarkers = true 1374} 1375 1376func (sl *scrapeLoop) getCache() *scrapeCache { 1377 return sl.cache 1378} 1379 1380type appendErrors struct { 1381 numOutOfOrder int 1382 numDuplicates int 1383 numOutOfBounds int 1384 numExemplarOutOfOrder int 1385} 1386 1387func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { 1388 var ( 1389 p = textparse.New(b, contentType) 1390 defTime = timestamp.FromTime(ts) 1391 appErrs = appendErrors{} 1392 sampleLimitErr error 1393 e exemplar.Exemplar // escapes to heap so hoisted out of loop 1394 ) 1395 1396 defer func() { 1397 if err != nil { 1398 return 1399 } 1400 // Only perform cache cleaning if the scrape was not empty. 1401 // An empty scrape (usually) is used to indicate a failed scrape. 1402 sl.cache.iterDone(len(b) > 0) 1403 }() 1404 1405loop: 1406 for { 1407 var ( 1408 et textparse.Entry 1409 sampleAdded bool 1410 ) 1411 if et, err = p.Next(); err != nil { 1412 if err == io.EOF { 1413 err = nil 1414 } 1415 break 1416 } 1417 switch et { 1418 case textparse.EntryType: 1419 sl.cache.setType(p.Type()) 1420 continue 1421 case textparse.EntryHelp: 1422 sl.cache.setHelp(p.Help()) 1423 continue 1424 case textparse.EntryUnit: 1425 sl.cache.setUnit(p.Unit()) 1426 continue 1427 case textparse.EntryComment: 1428 continue 1429 default: 1430 } 1431 total++ 1432 1433 t := defTime 1434 met, tp, v := p.Series() 1435 if !sl.honorTimestamps { 1436 tp = nil 1437 } 1438 if tp != nil { 1439 t = *tp 1440 } 1441 1442 if sl.cache.getDropped(yoloString(met)) { 1443 continue 1444 } 1445 ce, ok := sl.cache.get(yoloString(met)) 1446 var ( 1447 ref uint64 1448 lset labels.Labels 1449 mets string 1450 hash uint64 1451 ) 1452 1453 if ok { 1454 ref = ce.ref 1455 lset = ce.lset 1456 } else { 1457 mets = p.Metric(&lset) 1458 hash = lset.Hash() 1459 1460 // Hash label set as it is seen local to the target. Then add target labels 1461 // and relabeling and store the final label set. 1462 lset = sl.sampleMutator(lset) 1463 1464 // The label set may be set to nil to indicate dropping. 1465 if lset == nil { 1466 sl.cache.addDropped(mets) 1467 continue 1468 } 1469 1470 if !lset.Has(labels.MetricName) { 1471 err = errNameLabelMandatory 1472 break loop 1473 } 1474 1475 // If any label limits is exceeded the scrape should fail. 1476 if err = verifyLabelLimits(lset, sl.labelLimits); err != nil { 1477 targetScrapePoolExceededLabelLimits.Inc() 1478 break loop 1479 } 1480 } 1481 1482 ref, err = app.Append(ref, lset, t, v) 1483 sampleAdded, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs) 1484 if err != nil { 1485 if err != storage.ErrNotFound { 1486 level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err) 1487 } 1488 break loop 1489 } 1490 1491 if !ok { 1492 if tp == nil { 1493 // Bypass staleness logic if there is an explicit timestamp. 1494 sl.cache.trackStaleness(hash, lset) 1495 } 1496 sl.cache.addRef(mets, ref, lset, hash) 1497 if sampleAdded && sampleLimitErr == nil { 1498 seriesAdded++ 1499 } 1500 } 1501 1502 // Increment added even if there's an error so we correctly report the 1503 // number of samples remaining after relabeling. 1504 added++ 1505 1506 if hasExemplar := p.Exemplar(&e); hasExemplar { 1507 if !e.HasTs { 1508 e.Ts = t 1509 } 1510 _, exemplarErr := app.AppendExemplar(ref, lset, e) 1511 exemplarErr = sl.checkAddExemplarError(exemplarErr, e, &appErrs) 1512 if exemplarErr != nil { 1513 // Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors. 1514 level.Debug(sl.l).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr) 1515 } 1516 e = exemplar.Exemplar{} // reset for next time round loop 1517 } 1518 1519 } 1520 if sampleLimitErr != nil { 1521 if err == nil { 1522 err = sampleLimitErr 1523 } 1524 // We only want to increment this once per scrape, so this is Inc'd outside the loop. 1525 targetScrapeSampleLimit.Inc() 1526 } 1527 if appErrs.numOutOfOrder > 0 { 1528 level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder) 1529 } 1530 if appErrs.numDuplicates > 0 { 1531 level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates) 1532 } 1533 if appErrs.numOutOfBounds > 0 { 1534 level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds) 1535 } 1536 if appErrs.numExemplarOutOfOrder > 0 { 1537 level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder) 1538 } 1539 if err == nil { 1540 sl.cache.forEachStale(func(lset labels.Labels) bool { 1541 // Series no longer exposed, mark it stale. 1542 _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) 1543 switch errors.Cause(err) { 1544 case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: 1545 // Do not count these in logging, as this is expected if a target 1546 // goes away and comes back again with a new scrape loop. 1547 err = nil 1548 } 1549 return err == nil 1550 }) 1551 } 1552 return 1553} 1554 1555func yoloString(b []byte) string { 1556 return *((*string)(unsafe.Pointer(&b))) 1557} 1558 1559// Adds samples to the appender, checking the error, and then returns the # of samples added, 1560// whether the caller should continue to process more samples, and any sample limit errors. 1561func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr *error, appErrs *appendErrors) (bool, error) { 1562 switch errors.Cause(err) { 1563 case nil: 1564 if tp == nil && ce != nil { 1565 sl.cache.trackStaleness(ce.hash, ce.lset) 1566 } 1567 return true, nil 1568 case storage.ErrNotFound: 1569 return false, storage.ErrNotFound 1570 case storage.ErrOutOfOrderSample: 1571 appErrs.numOutOfOrder++ 1572 level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met)) 1573 targetScrapeSampleOutOfOrder.Inc() 1574 return false, nil 1575 case storage.ErrDuplicateSampleForTimestamp: 1576 appErrs.numDuplicates++ 1577 level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met)) 1578 targetScrapeSampleDuplicate.Inc() 1579 return false, nil 1580 case storage.ErrOutOfBounds: 1581 appErrs.numOutOfBounds++ 1582 level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met)) 1583 targetScrapeSampleOutOfBounds.Inc() 1584 return false, nil 1585 case errSampleLimit: 1586 // Keep on parsing output if we hit the limit, so we report the correct 1587 // total number of samples scraped. 1588 *sampleLimitErr = err 1589 return false, nil 1590 default: 1591 return false, err 1592 } 1593} 1594 1595func (sl *scrapeLoop) checkAddExemplarError(err error, e exemplar.Exemplar, appErrs *appendErrors) error { 1596 switch errors.Cause(err) { 1597 case storage.ErrNotFound: 1598 return storage.ErrNotFound 1599 case storage.ErrOutOfOrderExemplar: 1600 appErrs.numExemplarOutOfOrder++ 1601 level.Debug(sl.l).Log("msg", "Out of order exemplar", "exemplar", fmt.Sprintf("%+v", e)) 1602 targetScrapeExemplarOutOfOrder.Inc() 1603 return nil 1604 default: 1605 return err 1606 } 1607} 1608 1609// The constants are suffixed with the invalid \xff unicode rune to avoid collisions 1610// with scraped metrics in the cache. 1611const ( 1612 scrapeHealthMetricName = "up" + "\xff" 1613 scrapeDurationMetricName = "scrape_duration_seconds" + "\xff" 1614 scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff" 1615 samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff" 1616 scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff" 1617 scrapeTimeoutMetricName = "scrape_timeout_seconds" + "\xff" 1618 scrapeSampleLimitMetricName = "scrape_sample_limit" + "\xff" 1619) 1620 1621func (sl *scrapeLoop) report(app storage.Appender, start time.Time, timeout, duration time.Duration, scraped, added, seriesAdded int, scrapeErr error) (err error) { 1622 sl.scraper.Report(start, duration, scrapeErr) 1623 1624 ts := timestamp.FromTime(start) 1625 1626 var health float64 1627 if scrapeErr == nil { 1628 health = 1 1629 } 1630 1631 if err = sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { 1632 return 1633 } 1634 if err = sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil { 1635 return 1636 } 1637 if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil { 1638 return 1639 } 1640 if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(added)); err != nil { 1641 return 1642 } 1643 if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil { 1644 return 1645 } 1646 if sl.reportScrapeTimeout { 1647 if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, timeout.Seconds()); err != nil { 1648 return 1649 } 1650 if err = sl.addReportSample(app, scrapeSampleLimitMetricName, ts, float64(sl.sampleLimit)); err != nil { 1651 return 1652 } 1653 } 1654 return 1655} 1656 1657func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) { 1658 ts := timestamp.FromTime(start) 1659 1660 stale := math.Float64frombits(value.StaleNaN) 1661 1662 if err = sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { 1663 return 1664 } 1665 if err = sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil { 1666 return 1667 } 1668 if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil { 1669 return 1670 } 1671 if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil { 1672 return 1673 } 1674 if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil { 1675 return 1676 } 1677 if sl.reportScrapeTimeout { 1678 if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, stale); err != nil { 1679 return 1680 } 1681 if err = sl.addReportSample(app, scrapeSampleLimitMetricName, ts, stale); err != nil { 1682 return 1683 } 1684 } 1685 return 1686} 1687 1688func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { 1689 ce, ok := sl.cache.get(s) 1690 var ref uint64 1691 var lset labels.Labels 1692 if ok { 1693 ref = ce.ref 1694 lset = ce.lset 1695 } else { 1696 lset = labels.Labels{ 1697 // The constants are suffixed with the invalid \xff unicode rune to avoid collisions 1698 // with scraped metrics in the cache. 1699 // We have to drop it when building the actual metric. 1700 labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]}, 1701 } 1702 lset = sl.reportSampleMutator(lset) 1703 } 1704 1705 ref, err := app.Append(ref, lset, t, v) 1706 switch errors.Cause(err) { 1707 case nil: 1708 if !ok { 1709 sl.cache.addRef(s, ref, lset, lset.Hash()) 1710 } 1711 return nil 1712 case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: 1713 // Do not log here, as this is expected if a target goes away and comes back 1714 // again with a new scrape loop. 1715 return nil 1716 default: 1717 return err 1718 } 1719} 1720 1721// zeroConfig returns a new scrape config that only contains configuration items 1722// that alter metrics. 1723func zeroConfig(c *config.ScrapeConfig) *config.ScrapeConfig { 1724 z := *c 1725 // We zero out the fields that for sure don't affect scrape. 1726 z.ScrapeInterval = 0 1727 z.ScrapeTimeout = 0 1728 z.SampleLimit = 0 1729 z.LabelLimit = 0 1730 z.LabelNameLengthLimit = 0 1731 z.LabelValueLengthLimit = 0 1732 z.HTTPClientConfig = config_util.HTTPClientConfig{} 1733 return &z 1734} 1735 1736// reusableCache compares two scrape config and tells whether the cache is still 1737// valid. 1738func reusableCache(r, l *config.ScrapeConfig) bool { 1739 if r == nil || l == nil { 1740 return false 1741 } 1742 return reflect.DeepEqual(zeroConfig(r), zeroConfig(l)) 1743} 1744