1// Copyright 2016 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package scrape 15 16import ( 17 "bufio" 18 "bytes" 19 "compress/gzip" 20 "context" 21 "fmt" 22 "io" 23 "io/ioutil" 24 "math" 25 "net/http" 26 "sync" 27 "time" 28 "unsafe" 29 30 "github.com/go-kit/kit/log" 31 "github.com/go-kit/kit/log/level" 32 "github.com/pkg/errors" 33 "github.com/prometheus/client_golang/prometheus" 34 config_util "github.com/prometheus/common/config" 35 "github.com/prometheus/common/model" 36 "github.com/prometheus/common/version" 37 38 "github.com/prometheus/prometheus/config" 39 "github.com/prometheus/prometheus/discovery/targetgroup" 40 "github.com/prometheus/prometheus/pkg/labels" 41 "github.com/prometheus/prometheus/pkg/pool" 42 "github.com/prometheus/prometheus/pkg/relabel" 43 "github.com/prometheus/prometheus/pkg/textparse" 44 "github.com/prometheus/prometheus/pkg/timestamp" 45 "github.com/prometheus/prometheus/pkg/value" 46 "github.com/prometheus/prometheus/storage" 47) 48 49var ( 50 targetIntervalLength = prometheus.NewSummaryVec( 51 prometheus.SummaryOpts{ 52 Name: "prometheus_target_interval_length_seconds", 53 Help: "Actual intervals between scrapes.", 54 Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, 55 }, 56 []string{"interval"}, 57 ) 58 targetReloadIntervalLength = prometheus.NewSummaryVec( 59 prometheus.SummaryOpts{ 60 Name: "prometheus_target_reload_length_seconds", 61 Help: "Actual interval to reload the scrape pool with a given configuration.", 62 Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, 63 }, 64 []string{"interval"}, 65 ) 66 targetScrapePools = prometheus.NewCounter( 67 prometheus.CounterOpts{ 68 Name: "prometheus_target_scrape_pools_total", 69 Help: "Total number of scrape pool creation atttempts.", 70 }, 71 ) 72 targetScrapePoolsFailed = prometheus.NewCounter( 73 prometheus.CounterOpts{ 74 Name: "prometheus_target_scrape_pools_failed_total", 75 Help: "Total number of scrape pool creations that failed.", 76 }, 77 ) 78 targetScrapePoolReloads = prometheus.NewCounter( 79 prometheus.CounterOpts{ 80 Name: "prometheus_target_scrape_pool_reloads_total", 81 Help: "Total number of scrape loop reloads.", 82 }, 83 ) 84 targetScrapePoolReloadsFailed = prometheus.NewCounter( 85 prometheus.CounterOpts{ 86 Name: "prometheus_target_scrape_pool_reloads_failed_total", 87 Help: "Total number of failed scrape loop reloads.", 88 }, 89 ) 90 targetSyncIntervalLength = prometheus.NewSummaryVec( 91 prometheus.SummaryOpts{ 92 Name: "prometheus_target_sync_length_seconds", 93 Help: "Actual interval to sync the scrape pool.", 94 Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, 95 }, 96 []string{"scrape_job"}, 97 ) 98 targetScrapePoolSyncsCounter = prometheus.NewCounterVec( 99 prometheus.CounterOpts{ 100 Name: "prometheus_target_scrape_pool_sync_total", 101 Help: "Total number of syncs that were executed on a scrape pool.", 102 }, 103 []string{"scrape_job"}, 104 ) 105 targetScrapeSampleLimit = prometheus.NewCounter( 106 prometheus.CounterOpts{ 107 Name: "prometheus_target_scrapes_exceeded_sample_limit_total", 108 Help: "Total number of scrapes that hit the sample limit and were rejected.", 109 }, 110 ) 111 targetScrapeSampleDuplicate = prometheus.NewCounter( 112 prometheus.CounterOpts{ 113 Name: "prometheus_target_scrapes_sample_duplicate_timestamp_total", 114 Help: "Total number of samples rejected due to duplicate timestamps but different values", 115 }, 116 ) 117 targetScrapeSampleOutOfOrder = prometheus.NewCounter( 118 prometheus.CounterOpts{ 119 Name: "prometheus_target_scrapes_sample_out_of_order_total", 120 Help: "Total number of samples rejected due to not being out of the expected order", 121 }, 122 ) 123 targetScrapeSampleOutOfBounds = prometheus.NewCounter( 124 prometheus.CounterOpts{ 125 Name: "prometheus_target_scrapes_sample_out_of_bounds_total", 126 Help: "Total number of samples rejected due to timestamp falling outside of the time bounds", 127 }, 128 ) 129 targetScrapeCacheFlushForced = prometheus.NewCounter( 130 prometheus.CounterOpts{ 131 Name: "prometheus_target_scrapes_cache_flush_forced_total", 132 Help: "How many times a scrape cache was flushed due to getting big while scrapes are failing.", 133 }, 134 ) 135) 136 137func init() { 138 prometheus.MustRegister(targetIntervalLength) 139 prometheus.MustRegister(targetReloadIntervalLength) 140 prometheus.MustRegister(targetScrapePools) 141 prometheus.MustRegister(targetScrapePoolsFailed) 142 prometheus.MustRegister(targetScrapePoolReloads) 143 prometheus.MustRegister(targetScrapePoolReloadsFailed) 144 prometheus.MustRegister(targetSyncIntervalLength) 145 prometheus.MustRegister(targetScrapePoolSyncsCounter) 146 prometheus.MustRegister(targetScrapeSampleLimit) 147 prometheus.MustRegister(targetScrapeSampleDuplicate) 148 prometheus.MustRegister(targetScrapeSampleOutOfOrder) 149 prometheus.MustRegister(targetScrapeSampleOutOfBounds) 150 prometheus.MustRegister(targetScrapeCacheFlushForced) 151} 152 153// scrapePool manages scrapes for sets of targets. 154type scrapePool struct { 155 appendable Appendable 156 logger log.Logger 157 158 mtx sync.RWMutex 159 config *config.ScrapeConfig 160 client *http.Client 161 // Targets and loops must always be synchronized to have the same 162 // set of hashes. 163 activeTargets map[uint64]*Target 164 droppedTargets []*Target 165 loops map[uint64]loop 166 cancel context.CancelFunc 167 168 // Constructor for new scrape loops. This is settable for testing convenience. 169 newLoop func(scrapeLoopOptions) loop 170} 171 172type scrapeLoopOptions struct { 173 target *Target 174 scraper scraper 175 limit int 176 honorLabels bool 177 honorTimestamps bool 178 mrc []*relabel.Config 179} 180 181const maxAheadTime = 10 * time.Minute 182 183type labelsMutator func(labels.Labels) labels.Labels 184 185func newScrapePool(cfg *config.ScrapeConfig, app Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) { 186 targetScrapePools.Inc() 187 if logger == nil { 188 logger = log.NewNopLogger() 189 } 190 191 client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName) 192 if err != nil { 193 targetScrapePoolsFailed.Inc() 194 return nil, errors.Wrap(err, "error creating HTTP client") 195 } 196 197 buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) 198 199 ctx, cancel := context.WithCancel(context.Background()) 200 sp := &scrapePool{ 201 cancel: cancel, 202 appendable: app, 203 config: cfg, 204 client: client, 205 activeTargets: map[uint64]*Target{}, 206 loops: map[uint64]loop{}, 207 logger: logger, 208 } 209 sp.newLoop = func(opts scrapeLoopOptions) loop { 210 // Update the targets retrieval function for metadata to a new scrape cache. 211 cache := newScrapeCache() 212 opts.target.setMetadataStore(cache) 213 214 return newScrapeLoop( 215 ctx, 216 opts.scraper, 217 log.With(logger, "target", opts.target), 218 buffers, 219 func(l labels.Labels) labels.Labels { 220 return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc) 221 }, 222 func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) }, 223 func() storage.Appender { 224 app, err := app.Appender() 225 if err != nil { 226 panic(err) 227 } 228 return appender(app, opts.limit) 229 }, 230 cache, 231 jitterSeed, 232 opts.honorTimestamps, 233 ) 234 } 235 236 return sp, nil 237} 238 239func (sp *scrapePool) ActiveTargets() []*Target { 240 sp.mtx.Lock() 241 defer sp.mtx.Unlock() 242 243 var tActive []*Target 244 for _, t := range sp.activeTargets { 245 tActive = append(tActive, t) 246 } 247 return tActive 248} 249 250func (sp *scrapePool) DroppedTargets() []*Target { 251 sp.mtx.Lock() 252 defer sp.mtx.Unlock() 253 return sp.droppedTargets 254} 255 256// stop terminates all scrape loops and returns after they all terminated. 257func (sp *scrapePool) stop() { 258 sp.cancel() 259 var wg sync.WaitGroup 260 261 sp.mtx.Lock() 262 defer sp.mtx.Unlock() 263 264 for fp, l := range sp.loops { 265 wg.Add(1) 266 267 go func(l loop) { 268 l.stop() 269 wg.Done() 270 }(l) 271 272 delete(sp.loops, fp) 273 delete(sp.activeTargets, fp) 274 } 275 wg.Wait() 276 sp.client.CloseIdleConnections() 277} 278 279// reload the scrape pool with the given scrape configuration. The target state is preserved 280// but all scrape loops are restarted with the new scrape configuration. 281// This method returns after all scrape loops that were stopped have stopped scraping. 282func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { 283 targetScrapePoolReloads.Inc() 284 start := time.Now() 285 286 sp.mtx.Lock() 287 defer sp.mtx.Unlock() 288 289 client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName) 290 if err != nil { 291 targetScrapePoolReloadsFailed.Inc() 292 return errors.Wrap(err, "error creating HTTP client") 293 } 294 sp.config = cfg 295 oldClient := sp.client 296 sp.client = client 297 298 var ( 299 wg sync.WaitGroup 300 interval = time.Duration(sp.config.ScrapeInterval) 301 timeout = time.Duration(sp.config.ScrapeTimeout) 302 limit = int(sp.config.SampleLimit) 303 honorLabels = sp.config.HonorLabels 304 honorTimestamps = sp.config.HonorTimestamps 305 mrc = sp.config.MetricRelabelConfigs 306 ) 307 308 for fp, oldLoop := range sp.loops { 309 var ( 310 t = sp.activeTargets[fp] 311 s = &targetScraper{Target: t, client: sp.client, timeout: timeout} 312 newLoop = sp.newLoop(scrapeLoopOptions{ 313 target: t, 314 scraper: s, 315 limit: limit, 316 honorLabels: honorLabels, 317 honorTimestamps: honorTimestamps, 318 mrc: mrc, 319 }) 320 ) 321 wg.Add(1) 322 323 go func(oldLoop, newLoop loop) { 324 oldLoop.stop() 325 wg.Done() 326 327 go newLoop.run(interval, timeout, nil) 328 }(oldLoop, newLoop) 329 330 sp.loops[fp] = newLoop 331 } 332 333 wg.Wait() 334 oldClient.CloseIdleConnections() 335 targetReloadIntervalLength.WithLabelValues(interval.String()).Observe( 336 time.Since(start).Seconds(), 337 ) 338 return nil 339} 340 341// Sync converts target groups into actual scrape targets and synchronizes 342// the currently running scraper with the resulting set and returns all scraped and dropped targets. 343func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { 344 start := time.Now() 345 346 var all []*Target 347 sp.mtx.Lock() 348 sp.droppedTargets = []*Target{} 349 for _, tg := range tgs { 350 targets, err := targetsFromGroup(tg, sp.config) 351 if err != nil { 352 level.Error(sp.logger).Log("msg", "creating targets failed", "err", err) 353 continue 354 } 355 for _, t := range targets { 356 if t.Labels().Len() > 0 { 357 all = append(all, t) 358 } else if t.DiscoveredLabels().Len() > 0 { 359 sp.droppedTargets = append(sp.droppedTargets, t) 360 } 361 } 362 } 363 sp.mtx.Unlock() 364 sp.sync(all) 365 366 targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( 367 time.Since(start).Seconds(), 368 ) 369 targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() 370} 371 372// sync takes a list of potentially duplicated targets, deduplicates them, starts 373// scrape loops for new targets, and stops scrape loops for disappeared targets. 374// It returns after all stopped scrape loops terminated. 375func (sp *scrapePool) sync(targets []*Target) { 376 sp.mtx.Lock() 377 defer sp.mtx.Unlock() 378 379 var ( 380 uniqueTargets = map[uint64]struct{}{} 381 interval = time.Duration(sp.config.ScrapeInterval) 382 timeout = time.Duration(sp.config.ScrapeTimeout) 383 limit = int(sp.config.SampleLimit) 384 honorLabels = sp.config.HonorLabels 385 honorTimestamps = sp.config.HonorTimestamps 386 mrc = sp.config.MetricRelabelConfigs 387 ) 388 389 for _, t := range targets { 390 t := t 391 hash := t.hash() 392 uniqueTargets[hash] = struct{}{} 393 394 if _, ok := sp.activeTargets[hash]; !ok { 395 s := &targetScraper{Target: t, client: sp.client, timeout: timeout} 396 l := sp.newLoop(scrapeLoopOptions{ 397 target: t, 398 scraper: s, 399 limit: limit, 400 honorLabels: honorLabels, 401 honorTimestamps: honorTimestamps, 402 mrc: mrc, 403 }) 404 405 sp.activeTargets[hash] = t 406 sp.loops[hash] = l 407 408 go l.run(interval, timeout, nil) 409 } else { 410 // Need to keep the most updated labels information 411 // for displaying it in the Service Discovery web page. 412 sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels()) 413 } 414 } 415 416 var wg sync.WaitGroup 417 418 // Stop and remove old targets and scraper loops. 419 for hash := range sp.activeTargets { 420 if _, ok := uniqueTargets[hash]; !ok { 421 wg.Add(1) 422 go func(l loop) { 423 424 l.stop() 425 426 wg.Done() 427 }(sp.loops[hash]) 428 429 delete(sp.loops, hash) 430 delete(sp.activeTargets, hash) 431 } 432 } 433 434 // Wait for all potentially stopped scrapers to terminate. 435 // This covers the case of flapping targets. If the server is under high load, a new scraper 436 // may be active and tries to insert. The old scraper that didn't terminate yet could still 437 // be inserting a previous sample set. 438 wg.Wait() 439} 440 441func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*relabel.Config) labels.Labels { 442 lb := labels.NewBuilder(lset) 443 444 if honor { 445 for _, l := range target.Labels() { 446 if !lset.Has(l.Name) { 447 lb.Set(l.Name, l.Value) 448 } 449 } 450 } else { 451 for _, l := range target.Labels() { 452 lv := lset.Get(l.Name) 453 if lv != "" { 454 lb.Set(model.ExportedLabelPrefix+l.Name, lv) 455 } 456 lb.Set(l.Name, l.Value) 457 } 458 } 459 460 for _, l := range lb.Labels() { 461 if l.Value == "" { 462 lb.Del(l.Name) 463 } 464 } 465 466 res := lb.Labels() 467 468 if len(rc) > 0 { 469 res = relabel.Process(res, rc...) 470 } 471 472 return res 473} 474 475func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels { 476 lb := labels.NewBuilder(lset) 477 478 for _, l := range target.Labels() { 479 lv := lset.Get(l.Name) 480 if lv != "" { 481 lb.Set(model.ExportedLabelPrefix+l.Name, lv) 482 } 483 lb.Set(l.Name, l.Value) 484 } 485 486 return lb.Labels() 487} 488 489// appender returns an appender for ingested samples from the target. 490func appender(app storage.Appender, limit int) storage.Appender { 491 app = &timeLimitAppender{ 492 Appender: app, 493 maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), 494 } 495 496 // The limit is applied after metrics are potentially dropped via relabeling. 497 if limit > 0 { 498 app = &limitAppender{ 499 Appender: app, 500 limit: limit, 501 } 502 } 503 return app 504} 505 506// A scraper retrieves samples and accepts a status report at the end. 507type scraper interface { 508 scrape(ctx context.Context, w io.Writer) (string, error) 509 report(start time.Time, dur time.Duration, err error) 510 offset(interval time.Duration, jitterSeed uint64) time.Duration 511} 512 513// targetScraper implements the scraper interface for a target. 514type targetScraper struct { 515 *Target 516 517 client *http.Client 518 req *http.Request 519 timeout time.Duration 520 521 gzipr *gzip.Reader 522 buf *bufio.Reader 523} 524 525const acceptHeader = `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1` 526 527var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version) 528 529func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) { 530 if s.req == nil { 531 req, err := http.NewRequest("GET", s.URL().String(), nil) 532 if err != nil { 533 return "", err 534 } 535 req.Header.Add("Accept", acceptHeader) 536 req.Header.Add("Accept-Encoding", "gzip") 537 req.Header.Set("User-Agent", userAgentHeader) 538 req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds())) 539 540 s.req = req 541 } 542 543 resp, err := s.client.Do(s.req.WithContext(ctx)) 544 if err != nil { 545 return "", err 546 } 547 defer func() { 548 io.Copy(ioutil.Discard, resp.Body) 549 resp.Body.Close() 550 }() 551 552 if resp.StatusCode != http.StatusOK { 553 return "", errors.Errorf("server returned HTTP status %s", resp.Status) 554 } 555 556 if resp.Header.Get("Content-Encoding") != "gzip" { 557 _, err = io.Copy(w, resp.Body) 558 if err != nil { 559 return "", err 560 } 561 return resp.Header.Get("Content-Type"), nil 562 } 563 564 if s.gzipr == nil { 565 s.buf = bufio.NewReader(resp.Body) 566 s.gzipr, err = gzip.NewReader(s.buf) 567 if err != nil { 568 return "", err 569 } 570 } else { 571 s.buf.Reset(resp.Body) 572 if err = s.gzipr.Reset(s.buf); err != nil { 573 return "", err 574 } 575 } 576 577 _, err = io.Copy(w, s.gzipr) 578 s.gzipr.Close() 579 if err != nil { 580 return "", err 581 } 582 return resp.Header.Get("Content-Type"), nil 583} 584 585// A loop can run and be stopped again. It must not be reused after it was stopped. 586type loop interface { 587 run(interval, timeout time.Duration, errc chan<- error) 588 stop() 589} 590 591type cacheEntry struct { 592 ref uint64 593 lastIter uint64 594 hash uint64 595 lset labels.Labels 596} 597 598type scrapeLoop struct { 599 scraper scraper 600 l log.Logger 601 cache *scrapeCache 602 lastScrapeSize int 603 buffers *pool.Pool 604 jitterSeed uint64 605 honorTimestamps bool 606 607 appender func() storage.Appender 608 sampleMutator labelsMutator 609 reportSampleMutator labelsMutator 610 611 ctx context.Context 612 scrapeCtx context.Context 613 cancel func() 614 stopped chan struct{} 615} 616 617// scrapeCache tracks mappings of exposed metric strings to label sets and 618// storage references. Additionally, it tracks staleness of series between 619// scrapes. 620type scrapeCache struct { 621 iter uint64 // Current scrape iteration. 622 623 // How many series and metadata entries there were at the last success. 624 successfulCount int 625 626 // Parsed string to an entry with information about the actual label set 627 // and its storage reference. 628 series map[string]*cacheEntry 629 630 // Cache of dropped metric strings and their iteration. The iteration must 631 // be a pointer so we can update it without setting a new entry with an unsafe 632 // string in addDropped(). 633 droppedSeries map[string]*uint64 634 635 // seriesCur and seriesPrev store the labels of series that were seen 636 // in the current and previous scrape. 637 // We hold two maps and swap them out to save allocations. 638 seriesCur map[uint64]labels.Labels 639 seriesPrev map[uint64]labels.Labels 640 641 metaMtx sync.Mutex 642 metadata map[string]*metaEntry 643} 644 645// metaEntry holds meta information about a metric. 646type metaEntry struct { 647 lastIter uint64 // Last scrape iteration the entry was observed at. 648 typ textparse.MetricType 649 help string 650 unit string 651} 652 653func newScrapeCache() *scrapeCache { 654 return &scrapeCache{ 655 series: map[string]*cacheEntry{}, 656 droppedSeries: map[string]*uint64{}, 657 seriesCur: map[uint64]labels.Labels{}, 658 seriesPrev: map[uint64]labels.Labels{}, 659 metadata: map[string]*metaEntry{}, 660 } 661} 662 663func (c *scrapeCache) iterDone(flushCache bool) { 664 c.metaMtx.Lock() 665 count := len(c.series) + len(c.droppedSeries) + len(c.metadata) 666 c.metaMtx.Unlock() 667 668 if flushCache { 669 c.successfulCount = count 670 } else if count > c.successfulCount*2+1000 { 671 // If a target had varying labels in scrapes that ultimately failed, 672 // the caches would grow indefinitely. Force a flush when this happens. 673 // We use the heuristic that this is a doubling of the cache size 674 // since the last scrape, and allow an additional 1000 in case 675 // initial scrapes all fail. 676 flushCache = true 677 targetScrapeCacheFlushForced.Inc() 678 } 679 680 if flushCache { 681 // All caches may grow over time through series churn 682 // or multiple string representations of the same metric. Clean up entries 683 // that haven't appeared in the last scrape. 684 for s, e := range c.series { 685 if c.iter != e.lastIter { 686 delete(c.series, s) 687 } 688 } 689 for s, iter := range c.droppedSeries { 690 if c.iter != *iter { 691 delete(c.droppedSeries, s) 692 } 693 } 694 c.metaMtx.Lock() 695 for m, e := range c.metadata { 696 // Keep metadata around for 10 scrapes after its metric disappeared. 697 if c.iter-e.lastIter > 10 { 698 delete(c.metadata, m) 699 } 700 } 701 c.metaMtx.Unlock() 702 703 c.iter++ 704 } 705 706 // Swap current and previous series. 707 c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev 708 709 // We have to delete every single key in the map. 710 for k := range c.seriesCur { 711 delete(c.seriesCur, k) 712 } 713} 714 715func (c *scrapeCache) get(met string) (*cacheEntry, bool) { 716 e, ok := c.series[met] 717 if !ok { 718 return nil, false 719 } 720 e.lastIter = c.iter 721 return e, true 722} 723 724func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) { 725 if ref == 0 { 726 return 727 } 728 c.series[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} 729} 730 731func (c *scrapeCache) addDropped(met string) { 732 iter := c.iter 733 c.droppedSeries[met] = &iter 734} 735 736func (c *scrapeCache) getDropped(met string) bool { 737 iterp, ok := c.droppedSeries[met] 738 if ok { 739 *iterp = c.iter 740 } 741 return ok 742} 743 744func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) { 745 c.seriesCur[hash] = lset 746} 747 748func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { 749 for h, lset := range c.seriesPrev { 750 if _, ok := c.seriesCur[h]; !ok { 751 if !f(lset) { 752 break 753 } 754 } 755 } 756} 757 758func (c *scrapeCache) setType(metric []byte, t textparse.MetricType) { 759 c.metaMtx.Lock() 760 761 e, ok := c.metadata[yoloString(metric)] 762 if !ok { 763 e = &metaEntry{typ: textparse.MetricTypeUnknown} 764 c.metadata[string(metric)] = e 765 } 766 e.typ = t 767 e.lastIter = c.iter 768 769 c.metaMtx.Unlock() 770} 771 772func (c *scrapeCache) setHelp(metric, help []byte) { 773 c.metaMtx.Lock() 774 775 e, ok := c.metadata[yoloString(metric)] 776 if !ok { 777 e = &metaEntry{typ: textparse.MetricTypeUnknown} 778 c.metadata[string(metric)] = e 779 } 780 if e.help != yoloString(help) { 781 e.help = string(help) 782 } 783 e.lastIter = c.iter 784 785 c.metaMtx.Unlock() 786} 787 788func (c *scrapeCache) setUnit(metric, unit []byte) { 789 c.metaMtx.Lock() 790 791 e, ok := c.metadata[yoloString(metric)] 792 if !ok { 793 e = &metaEntry{typ: textparse.MetricTypeUnknown} 794 c.metadata[string(metric)] = e 795 } 796 if e.unit != yoloString(unit) { 797 e.unit = string(unit) 798 } 799 e.lastIter = c.iter 800 801 c.metaMtx.Unlock() 802} 803 804func (c *scrapeCache) getMetadata(metric string) (MetricMetadata, bool) { 805 c.metaMtx.Lock() 806 defer c.metaMtx.Unlock() 807 808 m, ok := c.metadata[metric] 809 if !ok { 810 return MetricMetadata{}, false 811 } 812 return MetricMetadata{ 813 Metric: metric, 814 Type: m.typ, 815 Help: m.help, 816 Unit: m.unit, 817 }, true 818} 819 820func (c *scrapeCache) listMetadata() []MetricMetadata { 821 c.metaMtx.Lock() 822 defer c.metaMtx.Unlock() 823 824 res := make([]MetricMetadata, 0, len(c.metadata)) 825 826 for m, e := range c.metadata { 827 res = append(res, MetricMetadata{ 828 Metric: m, 829 Type: e.typ, 830 Help: e.help, 831 Unit: e.unit, 832 }) 833 } 834 return res 835} 836 837func newScrapeLoop(ctx context.Context, 838 sc scraper, 839 l log.Logger, 840 buffers *pool.Pool, 841 sampleMutator labelsMutator, 842 reportSampleMutator labelsMutator, 843 appender func() storage.Appender, 844 cache *scrapeCache, 845 jitterSeed uint64, 846 honorTimestamps bool, 847) *scrapeLoop { 848 if l == nil { 849 l = log.NewNopLogger() 850 } 851 if buffers == nil { 852 buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) 853 } 854 if cache == nil { 855 cache = newScrapeCache() 856 } 857 sl := &scrapeLoop{ 858 scraper: sc, 859 buffers: buffers, 860 cache: cache, 861 appender: appender, 862 sampleMutator: sampleMutator, 863 reportSampleMutator: reportSampleMutator, 864 stopped: make(chan struct{}), 865 jitterSeed: jitterSeed, 866 l: l, 867 ctx: ctx, 868 honorTimestamps: honorTimestamps, 869 } 870 sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) 871 872 return sl 873} 874 875func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { 876 select { 877 case <-time.After(sl.scraper.offset(interval, sl.jitterSeed)): 878 // Continue after a scraping offset. 879 case <-sl.scrapeCtx.Done(): 880 close(sl.stopped) 881 return 882 } 883 884 var last time.Time 885 886 ticker := time.NewTicker(interval) 887 defer ticker.Stop() 888 889mainLoop: 890 for { 891 select { 892 case <-sl.ctx.Done(): 893 close(sl.stopped) 894 return 895 case <-sl.scrapeCtx.Done(): 896 break mainLoop 897 default: 898 } 899 900 var ( 901 start = time.Now() 902 scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout) 903 ) 904 905 // Only record after the first scrape. 906 if !last.IsZero() { 907 targetIntervalLength.WithLabelValues(interval.String()).Observe( 908 time.Since(last).Seconds(), 909 ) 910 } 911 912 b := sl.buffers.Get(sl.lastScrapeSize).([]byte) 913 buf := bytes.NewBuffer(b) 914 915 contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf) 916 cancel() 917 918 if scrapeErr == nil { 919 b = buf.Bytes() 920 // NOTE: There were issues with misbehaving clients in the past 921 // that occasionally returned empty results. We don't want those 922 // to falsely reset our buffer size. 923 if len(b) > 0 { 924 sl.lastScrapeSize = len(b) 925 } 926 } else { 927 level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error()) 928 if errc != nil { 929 errc <- scrapeErr 930 } 931 } 932 933 // A failed scrape is the same as an empty scrape, 934 // we still call sl.append to trigger stale markers. 935 total, added, seriesAdded, appErr := sl.append(b, contentType, start) 936 if appErr != nil { 937 level.Warn(sl.l).Log("msg", "append failed", "err", appErr) 938 // The append failed, probably due to a parse error or sample limit. 939 // Call sl.append again with an empty scrape to trigger stale markers. 940 if _, _, _, err := sl.append([]byte{}, "", start); err != nil { 941 level.Warn(sl.l).Log("msg", "append failed", "err", err) 942 } 943 } 944 945 sl.buffers.Put(b) 946 947 if scrapeErr == nil { 948 scrapeErr = appErr 949 } 950 951 if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil { 952 level.Warn(sl.l).Log("msg", "appending scrape report failed", "err", err) 953 } 954 last = start 955 956 select { 957 case <-sl.ctx.Done(): 958 close(sl.stopped) 959 return 960 case <-sl.scrapeCtx.Done(): 961 break mainLoop 962 case <-ticker.C: 963 } 964 } 965 966 close(sl.stopped) 967 968 sl.endOfRunStaleness(last, ticker, interval) 969} 970 971func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) { 972 // Scraping has stopped. We want to write stale markers but 973 // the target may be recreated, so we wait just over 2 scrape intervals 974 // before creating them. 975 // If the context is canceled, we presume the server is shutting down 976 // and will restart where is was. We do not attempt to write stale markers 977 // in this case. 978 979 if last.IsZero() { 980 // There never was a scrape, so there will be no stale markers. 981 return 982 } 983 984 // Wait for when the next scrape would have been, record its timestamp. 985 var staleTime time.Time 986 select { 987 case <-sl.ctx.Done(): 988 return 989 case <-ticker.C: 990 staleTime = time.Now() 991 } 992 993 // Wait for when the next scrape would have been, if the target was recreated 994 // samples should have been ingested by now. 995 select { 996 case <-sl.ctx.Done(): 997 return 998 case <-ticker.C: 999 } 1000 1001 // Wait for an extra 10% of the interval, just to be safe. 1002 select { 1003 case <-sl.ctx.Done(): 1004 return 1005 case <-time.After(interval / 10): 1006 } 1007 1008 // Call sl.append again with an empty scrape to trigger stale markers. 1009 // If the target has since been recreated and scraped, the 1010 // stale markers will be out of order and ignored. 1011 if _, _, _, err := sl.append([]byte{}, "", staleTime); err != nil { 1012 level.Error(sl.l).Log("msg", "stale append failed", "err", err) 1013 } 1014 if err := sl.reportStale(staleTime); err != nil { 1015 level.Error(sl.l).Log("msg", "stale report failed", "err", err) 1016 } 1017} 1018 1019// Stop the scraping. May still write data and stale markers after it has 1020// returned. Cancel the context to stop all writes. 1021func (sl *scrapeLoop) stop() { 1022 sl.cancel() 1023 <-sl.stopped 1024} 1025 1026type sample struct { 1027 metric labels.Labels 1028 t int64 1029 v float64 1030} 1031 1032//lint:ignore U1000 staticcheck falsely reports that samples is unused. 1033type samples []sample 1034 1035func (s samples) Len() int { return len(s) } 1036func (s samples) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 1037 1038func (s samples) Less(i, j int) bool { 1039 d := labels.Compare(s[i].metric, s[j].metric) 1040 if d < 0 { 1041 return true 1042 } else if d > 0 { 1043 return false 1044 } 1045 return s[i].t < s[j].t 1046} 1047 1048func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { 1049 var ( 1050 app = sl.appender() 1051 p = textparse.New(b, contentType) 1052 defTime = timestamp.FromTime(ts) 1053 numOutOfOrder = 0 1054 numDuplicates = 0 1055 numOutOfBounds = 0 1056 ) 1057 var sampleLimitErr error 1058 1059loop: 1060 for { 1061 var et textparse.Entry 1062 if et, err = p.Next(); err != nil { 1063 if err == io.EOF { 1064 err = nil 1065 } 1066 break 1067 } 1068 switch et { 1069 case textparse.EntryType: 1070 sl.cache.setType(p.Type()) 1071 continue 1072 case textparse.EntryHelp: 1073 sl.cache.setHelp(p.Help()) 1074 continue 1075 case textparse.EntryUnit: 1076 sl.cache.setUnit(p.Unit()) 1077 continue 1078 case textparse.EntryComment: 1079 continue 1080 default: 1081 } 1082 total++ 1083 1084 t := defTime 1085 met, tp, v := p.Series() 1086 if !sl.honorTimestamps { 1087 tp = nil 1088 } 1089 if tp != nil { 1090 t = *tp 1091 } 1092 1093 if sl.cache.getDropped(yoloString(met)) { 1094 continue 1095 } 1096 ce, ok := sl.cache.get(yoloString(met)) 1097 if ok { 1098 switch err = app.AddFast(ce.lset, ce.ref, t, v); err { 1099 case nil: 1100 if tp == nil { 1101 sl.cache.trackStaleness(ce.hash, ce.lset) 1102 } 1103 case storage.ErrNotFound: 1104 ok = false 1105 case storage.ErrOutOfOrderSample: 1106 numOutOfOrder++ 1107 level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met)) 1108 targetScrapeSampleOutOfOrder.Inc() 1109 continue 1110 case storage.ErrDuplicateSampleForTimestamp: 1111 numDuplicates++ 1112 level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met)) 1113 targetScrapeSampleDuplicate.Inc() 1114 continue 1115 case storage.ErrOutOfBounds: 1116 numOutOfBounds++ 1117 level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met)) 1118 targetScrapeSampleOutOfBounds.Inc() 1119 continue 1120 case errSampleLimit: 1121 // Keep on parsing output if we hit the limit, so we report the correct 1122 // total number of samples scraped. 1123 sampleLimitErr = err 1124 added++ 1125 continue 1126 default: 1127 break loop 1128 } 1129 } 1130 if !ok { 1131 var lset labels.Labels 1132 1133 mets := p.Metric(&lset) 1134 hash := lset.Hash() 1135 1136 // Hash label set as it is seen local to the target. Then add target labels 1137 // and relabeling and store the final label set. 1138 lset = sl.sampleMutator(lset) 1139 1140 // The label set may be set to nil to indicate dropping. 1141 if lset == nil { 1142 sl.cache.addDropped(mets) 1143 continue 1144 } 1145 1146 var ref uint64 1147 ref, err = app.Add(lset, t, v) 1148 switch err { 1149 case nil: 1150 case storage.ErrOutOfOrderSample: 1151 err = nil 1152 numOutOfOrder++ 1153 level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met)) 1154 targetScrapeSampleOutOfOrder.Inc() 1155 continue 1156 case storage.ErrDuplicateSampleForTimestamp: 1157 err = nil 1158 numDuplicates++ 1159 level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met)) 1160 targetScrapeSampleDuplicate.Inc() 1161 continue 1162 case storage.ErrOutOfBounds: 1163 err = nil 1164 numOutOfBounds++ 1165 level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met)) 1166 targetScrapeSampleOutOfBounds.Inc() 1167 continue 1168 case errSampleLimit: 1169 sampleLimitErr = err 1170 added++ 1171 continue 1172 default: 1173 level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err) 1174 break loop 1175 } 1176 if tp == nil { 1177 // Bypass staleness logic if there is an explicit timestamp. 1178 sl.cache.trackStaleness(hash, lset) 1179 } 1180 sl.cache.addRef(mets, ref, lset, hash) 1181 seriesAdded++ 1182 } 1183 added++ 1184 } 1185 if sampleLimitErr != nil { 1186 if err == nil { 1187 err = sampleLimitErr 1188 } 1189 // We only want to increment this once per scrape, so this is Inc'd outside the loop. 1190 targetScrapeSampleLimit.Inc() 1191 } 1192 if numOutOfOrder > 0 { 1193 level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder) 1194 } 1195 if numDuplicates > 0 { 1196 level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates) 1197 } 1198 if numOutOfBounds > 0 { 1199 level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds) 1200 } 1201 if err == nil { 1202 sl.cache.forEachStale(func(lset labels.Labels) bool { 1203 // Series no longer exposed, mark it stale. 1204 _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) 1205 switch err { 1206 case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: 1207 // Do not count these in logging, as this is expected if a target 1208 // goes away and comes back again with a new scrape loop. 1209 err = nil 1210 } 1211 return err == nil 1212 }) 1213 } 1214 if err != nil { 1215 app.Rollback() 1216 return total, added, seriesAdded, err 1217 } 1218 if err := app.Commit(); err != nil { 1219 return total, added, seriesAdded, err 1220 } 1221 1222 // Only perform cache cleaning if the scrape was not empty. 1223 // An empty scrape (usually) is used to indicate a failed scrape. 1224 sl.cache.iterDone(len(b) > 0) 1225 1226 return total, added, seriesAdded, nil 1227} 1228 1229func yoloString(b []byte) string { 1230 return *((*string)(unsafe.Pointer(&b))) 1231} 1232 1233// The constants are suffixed with the invalid \xff unicode rune to avoid collisions 1234// with scraped metrics in the cache. 1235const ( 1236 scrapeHealthMetricName = "up" + "\xff" 1237 scrapeDurationMetricName = "scrape_duration_seconds" + "\xff" 1238 scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff" 1239 samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff" 1240 scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff" 1241) 1242 1243func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, err error) error { 1244 sl.scraper.report(start, duration, err) 1245 1246 ts := timestamp.FromTime(start) 1247 1248 var health float64 1249 if err == nil { 1250 health = 1 1251 } 1252 app := sl.appender() 1253 1254 if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { 1255 app.Rollback() 1256 return err 1257 } 1258 if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil { 1259 app.Rollback() 1260 return err 1261 } 1262 if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil { 1263 app.Rollback() 1264 return err 1265 } 1266 if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil { 1267 app.Rollback() 1268 return err 1269 } 1270 if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil { 1271 app.Rollback() 1272 return err 1273 } 1274 return app.Commit() 1275} 1276 1277func (sl *scrapeLoop) reportStale(start time.Time) error { 1278 ts := timestamp.FromTime(start) 1279 app := sl.appender() 1280 1281 stale := math.Float64frombits(value.StaleNaN) 1282 1283 if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { 1284 app.Rollback() 1285 return err 1286 } 1287 if err := sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil { 1288 app.Rollback() 1289 return err 1290 } 1291 if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil { 1292 app.Rollback() 1293 return err 1294 } 1295 if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil { 1296 app.Rollback() 1297 return err 1298 } 1299 if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil { 1300 app.Rollback() 1301 return err 1302 } 1303 return app.Commit() 1304} 1305 1306func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { 1307 ce, ok := sl.cache.get(s) 1308 if ok { 1309 err := app.AddFast(ce.lset, ce.ref, t, v) 1310 switch err { 1311 case nil: 1312 return nil 1313 case storage.ErrNotFound: 1314 // Try an Add. 1315 case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: 1316 // Do not log here, as this is expected if a target goes away and comes back 1317 // again with a new scrape loop. 1318 return nil 1319 default: 1320 return err 1321 } 1322 } 1323 lset := labels.Labels{ 1324 // The constants are suffixed with the invalid \xff unicode rune to avoid collisions 1325 // with scraped metrics in the cache. 1326 // We have to drop it when building the actual metric. 1327 labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]}, 1328 } 1329 1330 hash := lset.Hash() 1331 lset = sl.reportSampleMutator(lset) 1332 1333 ref, err := app.Add(lset, t, v) 1334 switch err { 1335 case nil: 1336 sl.cache.addRef(s, ref, lset, hash) 1337 return nil 1338 case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: 1339 return nil 1340 default: 1341 return err 1342 } 1343} 1344