1// Copyright 2016 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package scrape
15
16import (
17	"bufio"
18	"bytes"
19	"compress/gzip"
20	"context"
21	"fmt"
22	"io"
23	"io/ioutil"
24	"math"
25	"net/http"
26	"reflect"
27	"strconv"
28	"sync"
29	"time"
30	"unsafe"
31
32	"github.com/go-kit/log"
33	"github.com/go-kit/log/level"
34	"github.com/pkg/errors"
35	"github.com/prometheus/client_golang/prometheus"
36	config_util "github.com/prometheus/common/config"
37	"github.com/prometheus/common/model"
38	"github.com/prometheus/common/version"
39
40	"github.com/prometheus/prometheus/config"
41	"github.com/prometheus/prometheus/discovery/targetgroup"
42	"github.com/prometheus/prometheus/pkg/exemplar"
43	"github.com/prometheus/prometheus/pkg/labels"
44	"github.com/prometheus/prometheus/pkg/pool"
45	"github.com/prometheus/prometheus/pkg/relabel"
46	"github.com/prometheus/prometheus/pkg/textparse"
47	"github.com/prometheus/prometheus/pkg/timestamp"
48	"github.com/prometheus/prometheus/pkg/value"
49	"github.com/prometheus/prometheus/storage"
50)
51
52// ScrapeTimestampTolerance is the tolerance for scrape appends timestamps
53// alignment, to enable better compression at the TSDB level.
54// See https://github.com/prometheus/prometheus/issues/7846
55var ScrapeTimestampTolerance = 2 * time.Millisecond
56
57// AlignScrapeTimestamps enables the tolerance for scrape appends timestamps described above.
58var AlignScrapeTimestamps = true
59
60var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName)
61
62var (
63	targetIntervalLength = prometheus.NewSummaryVec(
64		prometheus.SummaryOpts{
65			Name:       "prometheus_target_interval_length_seconds",
66			Help:       "Actual intervals between scrapes.",
67			Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
68		},
69		[]string{"interval"},
70	)
71	targetReloadIntervalLength = prometheus.NewSummaryVec(
72		prometheus.SummaryOpts{
73			Name:       "prometheus_target_reload_length_seconds",
74			Help:       "Actual interval to reload the scrape pool with a given configuration.",
75			Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
76		},
77		[]string{"interval"},
78	)
79	targetScrapePools = prometheus.NewCounter(
80		prometheus.CounterOpts{
81			Name: "prometheus_target_scrape_pools_total",
82			Help: "Total number of scrape pool creation attempts.",
83		},
84	)
85	targetScrapePoolsFailed = prometheus.NewCounter(
86		prometheus.CounterOpts{
87			Name: "prometheus_target_scrape_pools_failed_total",
88			Help: "Total number of scrape pool creations that failed.",
89		},
90	)
91	targetScrapePoolReloads = prometheus.NewCounter(
92		prometheus.CounterOpts{
93			Name: "prometheus_target_scrape_pool_reloads_total",
94			Help: "Total number of scrape pool reloads.",
95		},
96	)
97	targetScrapePoolReloadsFailed = prometheus.NewCounter(
98		prometheus.CounterOpts{
99			Name: "prometheus_target_scrape_pool_reloads_failed_total",
100			Help: "Total number of failed scrape pool reloads.",
101		},
102	)
103	targetScrapePoolExceededTargetLimit = prometheus.NewCounter(
104		prometheus.CounterOpts{
105			Name: "prometheus_target_scrape_pool_exceeded_target_limit_total",
106			Help: "Total number of times scrape pools hit the target limit, during sync or config reload.",
107		},
108	)
109	targetScrapePoolTargetLimit = prometheus.NewGaugeVec(
110		prometheus.GaugeOpts{
111			Name: "prometheus_target_scrape_pool_target_limit",
112			Help: "Maximum number of targets allowed in this scrape pool.",
113		},
114		[]string{"scrape_job"},
115	)
116	targetScrapePoolTargetsAdded = prometheus.NewGaugeVec(
117		prometheus.GaugeOpts{
118			Name: "prometheus_target_scrape_pool_targets",
119			Help: "Current number of targets in this scrape pool.",
120		},
121		[]string{"scrape_job"},
122	)
123	targetSyncIntervalLength = prometheus.NewSummaryVec(
124		prometheus.SummaryOpts{
125			Name:       "prometheus_target_sync_length_seconds",
126			Help:       "Actual interval to sync the scrape pool.",
127			Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
128		},
129		[]string{"scrape_job"},
130	)
131	targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
132		prometheus.CounterOpts{
133			Name: "prometheus_target_scrape_pool_sync_total",
134			Help: "Total number of syncs that were executed on a scrape pool.",
135		},
136		[]string{"scrape_job"},
137	)
138	targetScrapeExceededBodySizeLimit = prometheus.NewCounter(
139		prometheus.CounterOpts{
140			Name: "prometheus_target_scrapes_exceeded_body_size_limit_total",
141			Help: "Total number of scrapes that hit the body size limit",
142		},
143	)
144	targetScrapeSampleLimit = prometheus.NewCounter(
145		prometheus.CounterOpts{
146			Name: "prometheus_target_scrapes_exceeded_sample_limit_total",
147			Help: "Total number of scrapes that hit the sample limit and were rejected.",
148		},
149	)
150	targetScrapeSampleDuplicate = prometheus.NewCounter(
151		prometheus.CounterOpts{
152			Name: "prometheus_target_scrapes_sample_duplicate_timestamp_total",
153			Help: "Total number of samples rejected due to duplicate timestamps but different values.",
154		},
155	)
156	targetScrapeSampleOutOfOrder = prometheus.NewCounter(
157		prometheus.CounterOpts{
158			Name: "prometheus_target_scrapes_sample_out_of_order_total",
159			Help: "Total number of samples rejected due to not being out of the expected order.",
160		},
161	)
162	targetScrapeSampleOutOfBounds = prometheus.NewCounter(
163		prometheus.CounterOpts{
164			Name: "prometheus_target_scrapes_sample_out_of_bounds_total",
165			Help: "Total number of samples rejected due to timestamp falling outside of the time bounds.",
166		},
167	)
168	targetScrapeCacheFlushForced = prometheus.NewCounter(
169		prometheus.CounterOpts{
170			Name: "prometheus_target_scrapes_cache_flush_forced_total",
171			Help: "How many times a scrape cache was flushed due to getting big while scrapes are failing.",
172		},
173	)
174	targetScrapeExemplarOutOfOrder = prometheus.NewCounter(
175		prometheus.CounterOpts{
176			Name: "prometheus_target_scrapes_exemplar_out_of_order_total",
177			Help: "Total number of exemplar rejected due to not being out of the expected order.",
178		},
179	)
180	targetScrapePoolExceededLabelLimits = prometheus.NewCounter(
181		prometheus.CounterOpts{
182			Name: "prometheus_target_scrape_pool_exceeded_label_limits_total",
183			Help: "Total number of times scrape pools hit the label limits, during sync or config reload.",
184		},
185	)
186	targetSyncFailed = prometheus.NewCounterVec(
187		prometheus.CounterOpts{
188			Name: "prometheus_target_sync_failed_total",
189			Help: "Total number of target sync failures.",
190		},
191		[]string{"scrape_job"},
192	)
193)
194
195func init() {
196	prometheus.MustRegister(
197		targetIntervalLength,
198		targetReloadIntervalLength,
199		targetScrapePools,
200		targetScrapePoolsFailed,
201		targetScrapePoolReloads,
202		targetScrapePoolReloadsFailed,
203		targetSyncIntervalLength,
204		targetScrapePoolSyncsCounter,
205		targetScrapeExceededBodySizeLimit,
206		targetScrapeSampleLimit,
207		targetScrapeSampleDuplicate,
208		targetScrapeSampleOutOfOrder,
209		targetScrapeSampleOutOfBounds,
210		targetScrapePoolExceededTargetLimit,
211		targetScrapePoolTargetLimit,
212		targetScrapePoolTargetsAdded,
213		targetScrapeCacheFlushForced,
214		targetMetadataCache,
215		targetScrapeExemplarOutOfOrder,
216		targetScrapePoolExceededLabelLimits,
217		targetSyncFailed,
218	)
219}
220
221// scrapePool manages scrapes for sets of targets.
222type scrapePool struct {
223	appendable storage.Appendable
224	logger     log.Logger
225	cancel     context.CancelFunc
226
227	// mtx must not be taken after targetMtx.
228	mtx    sync.Mutex
229	config *config.ScrapeConfig
230	client *http.Client
231	loops  map[uint64]loop
232
233	targetMtx sync.Mutex
234	// activeTargets and loops must always be synchronized to have the same
235	// set of hashes.
236	activeTargets  map[uint64]*Target
237	droppedTargets []*Target
238
239	// Constructor for new scrape loops. This is settable for testing convenience.
240	newLoop func(scrapeLoopOptions) loop
241}
242
243type labelLimits struct {
244	labelLimit            int
245	labelNameLengthLimit  int
246	labelValueLengthLimit int
247}
248
249type scrapeLoopOptions struct {
250	target          *Target
251	scraper         scraper
252	sampleLimit     int
253	labelLimits     *labelLimits
254	honorLabels     bool
255	honorTimestamps bool
256	interval        time.Duration
257	timeout         time.Duration
258	mrc             []*relabel.Config
259	cache           *scrapeCache
260}
261
262const maxAheadTime = 10 * time.Minute
263
264type labelsMutator func(labels.Labels) labels.Labels
265
266func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportScrapeTimeout bool) (*scrapePool, error) {
267	targetScrapePools.Inc()
268	if logger == nil {
269		logger = log.NewNopLogger()
270	}
271
272	client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, config_util.WithHTTP2Disabled())
273	if err != nil {
274		targetScrapePoolsFailed.Inc()
275		return nil, errors.Wrap(err, "error creating HTTP client")
276	}
277
278	buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
279
280	ctx, cancel := context.WithCancel(context.Background())
281	sp := &scrapePool{
282		cancel:        cancel,
283		appendable:    app,
284		config:        cfg,
285		client:        client,
286		activeTargets: map[uint64]*Target{},
287		loops:         map[uint64]loop{},
288		logger:        logger,
289	}
290	sp.newLoop = func(opts scrapeLoopOptions) loop {
291		// Update the targets retrieval function for metadata to a new scrape cache.
292		cache := opts.cache
293		if cache == nil {
294			cache = newScrapeCache()
295		}
296		opts.target.SetMetadataStore(cache)
297
298		return newScrapeLoop(
299			ctx,
300			opts.scraper,
301			log.With(logger, "target", opts.target),
302			buffers,
303			func(l labels.Labels) labels.Labels {
304				return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
305			},
306			func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
307			func(ctx context.Context) storage.Appender { return appender(app.Appender(ctx), opts.sampleLimit) },
308			cache,
309			jitterSeed,
310			opts.honorTimestamps,
311			opts.sampleLimit,
312			opts.labelLimits,
313			opts.interval,
314			opts.timeout,
315			reportScrapeTimeout,
316		)
317	}
318
319	return sp, nil
320}
321
322func (sp *scrapePool) ActiveTargets() []*Target {
323	sp.targetMtx.Lock()
324	defer sp.targetMtx.Unlock()
325
326	var tActive []*Target
327	for _, t := range sp.activeTargets {
328		tActive = append(tActive, t)
329	}
330	return tActive
331}
332
333func (sp *scrapePool) DroppedTargets() []*Target {
334	sp.targetMtx.Lock()
335	defer sp.targetMtx.Unlock()
336	return sp.droppedTargets
337}
338
339// stop terminates all scrape loops and returns after they all terminated.
340func (sp *scrapePool) stop() {
341	sp.mtx.Lock()
342	defer sp.mtx.Unlock()
343	sp.cancel()
344	var wg sync.WaitGroup
345
346	sp.targetMtx.Lock()
347
348	for fp, l := range sp.loops {
349		wg.Add(1)
350
351		go func(l loop) {
352			l.stop()
353			wg.Done()
354		}(l)
355
356		delete(sp.loops, fp)
357		delete(sp.activeTargets, fp)
358	}
359
360	sp.targetMtx.Unlock()
361
362	wg.Wait()
363	sp.client.CloseIdleConnections()
364
365	if sp.config != nil {
366		targetScrapePoolSyncsCounter.DeleteLabelValues(sp.config.JobName)
367		targetScrapePoolTargetLimit.DeleteLabelValues(sp.config.JobName)
368		targetScrapePoolTargetsAdded.DeleteLabelValues(sp.config.JobName)
369		targetSyncIntervalLength.DeleteLabelValues(sp.config.JobName)
370		targetSyncFailed.DeleteLabelValues(sp.config.JobName)
371	}
372}
373
374// reload the scrape pool with the given scrape configuration. The target state is preserved
375// but all scrape loops are restarted with the new scrape configuration.
376// This method returns after all scrape loops that were stopped have stopped scraping.
377func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
378	sp.mtx.Lock()
379	defer sp.mtx.Unlock()
380	targetScrapePoolReloads.Inc()
381	start := time.Now()
382
383	client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, config_util.WithHTTP2Disabled())
384	if err != nil {
385		targetScrapePoolReloadsFailed.Inc()
386		return errors.Wrap(err, "error creating HTTP client")
387	}
388
389	reuseCache := reusableCache(sp.config, cfg)
390	sp.config = cfg
391	oldClient := sp.client
392	sp.client = client
393
394	targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
395
396	var (
397		wg            sync.WaitGroup
398		interval      = time.Duration(sp.config.ScrapeInterval)
399		timeout       = time.Duration(sp.config.ScrapeTimeout)
400		bodySizeLimit = int64(sp.config.BodySizeLimit)
401		sampleLimit   = int(sp.config.SampleLimit)
402		labelLimits   = &labelLimits{
403			labelLimit:            int(sp.config.LabelLimit),
404			labelNameLengthLimit:  int(sp.config.LabelNameLengthLimit),
405			labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
406		}
407		honorLabels     = sp.config.HonorLabels
408		honorTimestamps = sp.config.HonorTimestamps
409		mrc             = sp.config.MetricRelabelConfigs
410	)
411
412	sp.targetMtx.Lock()
413
414	forcedErr := sp.refreshTargetLimitErr()
415	for fp, oldLoop := range sp.loops {
416		var cache *scrapeCache
417		if oc := oldLoop.getCache(); reuseCache && oc != nil {
418			oldLoop.disableEndOfRunStalenessMarkers()
419			cache = oc
420		} else {
421			cache = newScrapeCache()
422		}
423
424		var (
425			t       = sp.activeTargets[fp]
426			s       = &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit}
427			newLoop = sp.newLoop(scrapeLoopOptions{
428				target:          t,
429				scraper:         s,
430				sampleLimit:     sampleLimit,
431				labelLimits:     labelLimits,
432				honorLabels:     honorLabels,
433				honorTimestamps: honorTimestamps,
434				mrc:             mrc,
435				cache:           cache,
436				interval:        interval,
437				timeout:         timeout,
438			})
439		)
440		wg.Add(1)
441
442		go func(oldLoop, newLoop loop) {
443			oldLoop.stop()
444			wg.Done()
445
446			newLoop.setForcedError(forcedErr)
447			newLoop.run(nil)
448		}(oldLoop, newLoop)
449
450		sp.loops[fp] = newLoop
451	}
452
453	sp.targetMtx.Unlock()
454
455	wg.Wait()
456	oldClient.CloseIdleConnections()
457	targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
458		time.Since(start).Seconds(),
459	)
460	return nil
461}
462
463// Sync converts target groups into actual scrape targets and synchronizes
464// the currently running scraper with the resulting set and returns all scraped and dropped targets.
465func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
466	sp.mtx.Lock()
467	defer sp.mtx.Unlock()
468	start := time.Now()
469
470	sp.targetMtx.Lock()
471	var all []*Target
472	sp.droppedTargets = []*Target{}
473	for _, tg := range tgs {
474		targets, failures := targetsFromGroup(tg, sp.config)
475		for _, err := range failures {
476			level.Error(sp.logger).Log("msg", "Creating target failed", "err", err)
477		}
478		targetSyncFailed.WithLabelValues(sp.config.JobName).Add(float64(len(failures)))
479		for _, t := range targets {
480			if t.Labels().Len() > 0 {
481				all = append(all, t)
482			} else if t.DiscoveredLabels().Len() > 0 {
483				sp.droppedTargets = append(sp.droppedTargets, t)
484			}
485		}
486	}
487	sp.targetMtx.Unlock()
488	sp.sync(all)
489
490	targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
491		time.Since(start).Seconds(),
492	)
493	targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
494}
495
496// sync takes a list of potentially duplicated targets, deduplicates them, starts
497// scrape loops for new targets, and stops scrape loops for disappeared targets.
498// It returns after all stopped scrape loops terminated.
499func (sp *scrapePool) sync(targets []*Target) {
500	var (
501		uniqueLoops   = make(map[uint64]loop)
502		interval      = time.Duration(sp.config.ScrapeInterval)
503		timeout       = time.Duration(sp.config.ScrapeTimeout)
504		bodySizeLimit = int64(sp.config.BodySizeLimit)
505		sampleLimit   = int(sp.config.SampleLimit)
506		labelLimits   = &labelLimits{
507			labelLimit:            int(sp.config.LabelLimit),
508			labelNameLengthLimit:  int(sp.config.LabelNameLengthLimit),
509			labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
510		}
511		honorLabels     = sp.config.HonorLabels
512		honorTimestamps = sp.config.HonorTimestamps
513		mrc             = sp.config.MetricRelabelConfigs
514	)
515
516	sp.targetMtx.Lock()
517	for _, t := range targets {
518		hash := t.hash()
519
520		if _, ok := sp.activeTargets[hash]; !ok {
521			// The scrape interval and timeout labels are set to the config's values initially,
522			// so whether changed via relabeling or not, they'll exist and hold the correct values
523			// for every target.
524			var err error
525			interval, timeout, err = t.intervalAndTimeout(interval, timeout)
526
527			s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit}
528			l := sp.newLoop(scrapeLoopOptions{
529				target:          t,
530				scraper:         s,
531				sampleLimit:     sampleLimit,
532				labelLimits:     labelLimits,
533				honorLabels:     honorLabels,
534				honorTimestamps: honorTimestamps,
535				mrc:             mrc,
536				interval:        interval,
537				timeout:         timeout,
538			})
539			if err != nil {
540				l.setForcedError(err)
541			}
542
543			sp.activeTargets[hash] = t
544			sp.loops[hash] = l
545
546			uniqueLoops[hash] = l
547		} else {
548			// This might be a duplicated target.
549			if _, ok := uniqueLoops[hash]; !ok {
550				uniqueLoops[hash] = nil
551			}
552			// Need to keep the most updated labels information
553			// for displaying it in the Service Discovery web page.
554			sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
555		}
556	}
557
558	var wg sync.WaitGroup
559
560	// Stop and remove old targets and scraper loops.
561	for hash := range sp.activeTargets {
562		if _, ok := uniqueLoops[hash]; !ok {
563			wg.Add(1)
564			go func(l loop) {
565				l.stop()
566				wg.Done()
567			}(sp.loops[hash])
568
569			delete(sp.loops, hash)
570			delete(sp.activeTargets, hash)
571		}
572	}
573
574	sp.targetMtx.Unlock()
575
576	targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
577	forcedErr := sp.refreshTargetLimitErr()
578	for _, l := range sp.loops {
579		l.setForcedError(forcedErr)
580	}
581	for _, l := range uniqueLoops {
582		if l != nil {
583			go l.run(nil)
584		}
585	}
586	// Wait for all potentially stopped scrapers to terminate.
587	// This covers the case of flapping targets. If the server is under high load, a new scraper
588	// may be active and tries to insert. The old scraper that didn't terminate yet could still
589	// be inserting a previous sample set.
590	wg.Wait()
591}
592
593// refreshTargetLimitErr returns an error that can be passed to the scrape loops
594// if the number of targets exceeds the configured limit.
595func (sp *scrapePool) refreshTargetLimitErr() error {
596	if sp.config == nil || sp.config.TargetLimit == 0 {
597		return nil
598	}
599	if l := len(sp.activeTargets); l > int(sp.config.TargetLimit) {
600		targetScrapePoolExceededTargetLimit.Inc()
601		return fmt.Errorf("target_limit exceeded (number of targets: %d, limit: %d)", l, sp.config.TargetLimit)
602	}
603	return nil
604}
605
606func verifyLabelLimits(lset labels.Labels, limits *labelLimits) error {
607	if limits == nil {
608		return nil
609	}
610
611	met := lset.Get(labels.MetricName)
612	if limits.labelLimit > 0 {
613		nbLabels := len(lset)
614		if nbLabels > int(limits.labelLimit) {
615			return fmt.Errorf("label_limit exceeded (metric: %.50s, number of label: %d, limit: %d)", met, nbLabels, limits.labelLimit)
616		}
617	}
618
619	if limits.labelNameLengthLimit == 0 && limits.labelValueLengthLimit == 0 {
620		return nil
621	}
622
623	for _, l := range lset {
624		if limits.labelNameLengthLimit > 0 {
625			nameLength := len(l.Name)
626			if nameLength > int(limits.labelNameLengthLimit) {
627				return fmt.Errorf("label_name_length_limit exceeded (metric: %.50s, label: %.50v, name length: %d, limit: %d)", met, l, nameLength, limits.labelNameLengthLimit)
628			}
629		}
630
631		if limits.labelValueLengthLimit > 0 {
632			valueLength := len(l.Value)
633			if valueLength > int(limits.labelValueLengthLimit) {
634				return fmt.Errorf("label_value_length_limit exceeded (metric: %.50s, label: %.50v, value length: %d, limit: %d)", met, l, valueLength, limits.labelValueLengthLimit)
635			}
636		}
637	}
638	return nil
639}
640
641func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*relabel.Config) labels.Labels {
642	lb := labels.NewBuilder(lset)
643
644	if honor {
645		for _, l := range target.Labels() {
646			if !lset.Has(l.Name) {
647				lb.Set(l.Name, l.Value)
648			}
649		}
650	} else {
651		for _, l := range target.Labels() {
652			// existingValue will be empty if l.Name doesn't exist.
653			existingValue := lset.Get(l.Name)
654			if existingValue != "" {
655				lb.Set(model.ExportedLabelPrefix+l.Name, existingValue)
656			}
657			// It is now safe to set the target label.
658			lb.Set(l.Name, l.Value)
659		}
660	}
661
662	res := lb.Labels()
663
664	if len(rc) > 0 {
665		res = relabel.Process(res, rc...)
666	}
667
668	return res
669}
670
671func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels {
672	lb := labels.NewBuilder(lset)
673
674	for _, l := range target.Labels() {
675		lb.Set(model.ExportedLabelPrefix+l.Name, lset.Get(l.Name))
676		lb.Set(l.Name, l.Value)
677	}
678
679	return lb.Labels()
680}
681
682// appender returns an appender for ingested samples from the target.
683func appender(app storage.Appender, limit int) storage.Appender {
684	app = &timeLimitAppender{
685		Appender: app,
686		maxTime:  timestamp.FromTime(time.Now().Add(maxAheadTime)),
687	}
688
689	// The limit is applied after metrics are potentially dropped via relabeling.
690	if limit > 0 {
691		app = &limitAppender{
692			Appender: app,
693			limit:    limit,
694		}
695	}
696	return app
697}
698
699// A scraper retrieves samples and accepts a status report at the end.
700type scraper interface {
701	scrape(ctx context.Context, w io.Writer) (string, error)
702	Report(start time.Time, dur time.Duration, err error)
703	offset(interval time.Duration, jitterSeed uint64) time.Duration
704}
705
706// targetScraper implements the scraper interface for a target.
707type targetScraper struct {
708	*Target
709
710	client  *http.Client
711	req     *http.Request
712	timeout time.Duration
713
714	gzipr *gzip.Reader
715	buf   *bufio.Reader
716
717	bodySizeLimit int64
718}
719
720var errBodySizeLimit = errors.New("body size limit exceeded")
721
722const acceptHeader = `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`
723
724var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version)
725
726func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
727	if s.req == nil {
728		req, err := http.NewRequest("GET", s.URL().String(), nil)
729		if err != nil {
730			return "", err
731		}
732		req.Header.Add("Accept", acceptHeader)
733		req.Header.Add("Accept-Encoding", "gzip")
734		req.Header.Set("User-Agent", userAgentHeader)
735		req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(s.timeout.Seconds(), 'f', -1, 64))
736
737		s.req = req
738	}
739
740	resp, err := s.client.Do(s.req.WithContext(ctx))
741	if err != nil {
742		return "", err
743	}
744	defer func() {
745		io.Copy(ioutil.Discard, resp.Body)
746		resp.Body.Close()
747	}()
748
749	if resp.StatusCode != http.StatusOK {
750		return "", errors.Errorf("server returned HTTP status %s", resp.Status)
751	}
752
753	if s.bodySizeLimit <= 0 {
754		s.bodySizeLimit = math.MaxInt64
755	}
756	if resp.Header.Get("Content-Encoding") != "gzip" {
757		n, err := io.Copy(w, io.LimitReader(resp.Body, s.bodySizeLimit))
758		if err != nil {
759			return "", err
760		}
761		if n >= s.bodySizeLimit {
762			targetScrapeExceededBodySizeLimit.Inc()
763			return "", errBodySizeLimit
764		}
765		return resp.Header.Get("Content-Type"), nil
766	}
767
768	if s.gzipr == nil {
769		s.buf = bufio.NewReader(resp.Body)
770		s.gzipr, err = gzip.NewReader(s.buf)
771		if err != nil {
772			return "", err
773		}
774	} else {
775		s.buf.Reset(resp.Body)
776		if err = s.gzipr.Reset(s.buf); err != nil {
777			return "", err
778		}
779	}
780
781	n, err := io.Copy(w, io.LimitReader(s.gzipr, s.bodySizeLimit))
782	s.gzipr.Close()
783	if err != nil {
784		return "", err
785	}
786	if n >= s.bodySizeLimit {
787		targetScrapeExceededBodySizeLimit.Inc()
788		return "", errBodySizeLimit
789	}
790	return resp.Header.Get("Content-Type"), nil
791}
792
793// A loop can run and be stopped again. It must not be reused after it was stopped.
794type loop interface {
795	run(errc chan<- error)
796	setForcedError(err error)
797	stop()
798	getCache() *scrapeCache
799	disableEndOfRunStalenessMarkers()
800}
801
802type cacheEntry struct {
803	ref      uint64
804	lastIter uint64
805	hash     uint64
806	lset     labels.Labels
807}
808
809type scrapeLoop struct {
810	scraper         scraper
811	l               log.Logger
812	cache           *scrapeCache
813	lastScrapeSize  int
814	buffers         *pool.Pool
815	jitterSeed      uint64
816	honorTimestamps bool
817	forcedErr       error
818	forcedErrMtx    sync.Mutex
819	sampleLimit     int
820	labelLimits     *labelLimits
821	interval        time.Duration
822	timeout         time.Duration
823
824	appender            func(ctx context.Context) storage.Appender
825	sampleMutator       labelsMutator
826	reportSampleMutator labelsMutator
827
828	parentCtx context.Context
829	ctx       context.Context
830	cancel    func()
831	stopped   chan struct{}
832
833	disabledEndOfRunStalenessMarkers bool
834
835	reportScrapeTimeout bool
836}
837
838// scrapeCache tracks mappings of exposed metric strings to label sets and
839// storage references. Additionally, it tracks staleness of series between
840// scrapes.
841type scrapeCache struct {
842	iter uint64 // Current scrape iteration.
843
844	// How many series and metadata entries there were at the last success.
845	successfulCount int
846
847	// Parsed string to an entry with information about the actual label set
848	// and its storage reference.
849	series map[string]*cacheEntry
850
851	// Cache of dropped metric strings and their iteration. The iteration must
852	// be a pointer so we can update it without setting a new entry with an unsafe
853	// string in addDropped().
854	droppedSeries map[string]*uint64
855
856	// seriesCur and seriesPrev store the labels of series that were seen
857	// in the current and previous scrape.
858	// We hold two maps and swap them out to save allocations.
859	seriesCur  map[uint64]labels.Labels
860	seriesPrev map[uint64]labels.Labels
861
862	metaMtx  sync.Mutex
863	metadata map[string]*metaEntry
864}
865
866// metaEntry holds meta information about a metric.
867type metaEntry struct {
868	lastIter uint64 // Last scrape iteration the entry was observed at.
869	typ      textparse.MetricType
870	help     string
871	unit     string
872}
873
874func (m *metaEntry) size() int {
875	// The attribute lastIter although part of the struct it is not metadata.
876	return len(m.help) + len(m.unit) + len(m.typ)
877}
878
879func newScrapeCache() *scrapeCache {
880	return &scrapeCache{
881		series:        map[string]*cacheEntry{},
882		droppedSeries: map[string]*uint64{},
883		seriesCur:     map[uint64]labels.Labels{},
884		seriesPrev:    map[uint64]labels.Labels{},
885		metadata:      map[string]*metaEntry{},
886	}
887}
888
889func (c *scrapeCache) iterDone(flushCache bool) {
890	c.metaMtx.Lock()
891	count := len(c.series) + len(c.droppedSeries) + len(c.metadata)
892	c.metaMtx.Unlock()
893
894	if flushCache {
895		c.successfulCount = count
896	} else if count > c.successfulCount*2+1000 {
897		// If a target had varying labels in scrapes that ultimately failed,
898		// the caches would grow indefinitely. Force a flush when this happens.
899		// We use the heuristic that this is a doubling of the cache size
900		// since the last scrape, and allow an additional 1000 in case
901		// initial scrapes all fail.
902		flushCache = true
903		targetScrapeCacheFlushForced.Inc()
904	}
905
906	if flushCache {
907		// All caches may grow over time through series churn
908		// or multiple string representations of the same metric. Clean up entries
909		// that haven't appeared in the last scrape.
910		for s, e := range c.series {
911			if c.iter != e.lastIter {
912				delete(c.series, s)
913			}
914		}
915		for s, iter := range c.droppedSeries {
916			if c.iter != *iter {
917				delete(c.droppedSeries, s)
918			}
919		}
920		c.metaMtx.Lock()
921		for m, e := range c.metadata {
922			// Keep metadata around for 10 scrapes after its metric disappeared.
923			if c.iter-e.lastIter > 10 {
924				delete(c.metadata, m)
925			}
926		}
927		c.metaMtx.Unlock()
928
929		c.iter++
930	}
931
932	// Swap current and previous series.
933	c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
934
935	// We have to delete every single key in the map.
936	for k := range c.seriesCur {
937		delete(c.seriesCur, k)
938	}
939}
940
941func (c *scrapeCache) get(met string) (*cacheEntry, bool) {
942	e, ok := c.series[met]
943	if !ok {
944		return nil, false
945	}
946	e.lastIter = c.iter
947	return e, true
948}
949
950func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) {
951	if ref == 0 {
952		return
953	}
954	c.series[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
955}
956
957func (c *scrapeCache) addDropped(met string) {
958	iter := c.iter
959	c.droppedSeries[met] = &iter
960}
961
962func (c *scrapeCache) getDropped(met string) bool {
963	iterp, ok := c.droppedSeries[met]
964	if ok {
965		*iterp = c.iter
966	}
967	return ok
968}
969
970func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {
971	c.seriesCur[hash] = lset
972}
973
974func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
975	for h, lset := range c.seriesPrev {
976		if _, ok := c.seriesCur[h]; !ok {
977			if !f(lset) {
978				break
979			}
980		}
981	}
982}
983
984func (c *scrapeCache) setType(metric []byte, t textparse.MetricType) {
985	c.metaMtx.Lock()
986
987	e, ok := c.metadata[yoloString(metric)]
988	if !ok {
989		e = &metaEntry{typ: textparse.MetricTypeUnknown}
990		c.metadata[string(metric)] = e
991	}
992	e.typ = t
993	e.lastIter = c.iter
994
995	c.metaMtx.Unlock()
996}
997
998func (c *scrapeCache) setHelp(metric, help []byte) {
999	c.metaMtx.Lock()
1000
1001	e, ok := c.metadata[yoloString(metric)]
1002	if !ok {
1003		e = &metaEntry{typ: textparse.MetricTypeUnknown}
1004		c.metadata[string(metric)] = e
1005	}
1006	if e.help != yoloString(help) {
1007		e.help = string(help)
1008	}
1009	e.lastIter = c.iter
1010
1011	c.metaMtx.Unlock()
1012}
1013
1014func (c *scrapeCache) setUnit(metric, unit []byte) {
1015	c.metaMtx.Lock()
1016
1017	e, ok := c.metadata[yoloString(metric)]
1018	if !ok {
1019		e = &metaEntry{typ: textparse.MetricTypeUnknown}
1020		c.metadata[string(metric)] = e
1021	}
1022	if e.unit != yoloString(unit) {
1023		e.unit = string(unit)
1024	}
1025	e.lastIter = c.iter
1026
1027	c.metaMtx.Unlock()
1028}
1029
1030func (c *scrapeCache) GetMetadata(metric string) (MetricMetadata, bool) {
1031	c.metaMtx.Lock()
1032	defer c.metaMtx.Unlock()
1033
1034	m, ok := c.metadata[metric]
1035	if !ok {
1036		return MetricMetadata{}, false
1037	}
1038	return MetricMetadata{
1039		Metric: metric,
1040		Type:   m.typ,
1041		Help:   m.help,
1042		Unit:   m.unit,
1043	}, true
1044}
1045
1046func (c *scrapeCache) ListMetadata() []MetricMetadata {
1047	c.metaMtx.Lock()
1048	defer c.metaMtx.Unlock()
1049
1050	res := make([]MetricMetadata, 0, len(c.metadata))
1051
1052	for m, e := range c.metadata {
1053		res = append(res, MetricMetadata{
1054			Metric: m,
1055			Type:   e.typ,
1056			Help:   e.help,
1057			Unit:   e.unit,
1058		})
1059	}
1060	return res
1061}
1062
1063// MetadataSize returns the size of the metadata cache.
1064func (c *scrapeCache) SizeMetadata() (s int) {
1065	c.metaMtx.Lock()
1066	defer c.metaMtx.Unlock()
1067	for _, e := range c.metadata {
1068		s += e.size()
1069	}
1070
1071	return s
1072}
1073
1074// MetadataLen returns the number of metadata entries in the cache.
1075func (c *scrapeCache) LengthMetadata() int {
1076	c.metaMtx.Lock()
1077	defer c.metaMtx.Unlock()
1078
1079	return len(c.metadata)
1080}
1081
1082func newScrapeLoop(ctx context.Context,
1083	sc scraper,
1084	l log.Logger,
1085	buffers *pool.Pool,
1086	sampleMutator labelsMutator,
1087	reportSampleMutator labelsMutator,
1088	appender func(ctx context.Context) storage.Appender,
1089	cache *scrapeCache,
1090	jitterSeed uint64,
1091	honorTimestamps bool,
1092	sampleLimit int,
1093	labelLimits *labelLimits,
1094	interval time.Duration,
1095	timeout time.Duration,
1096	reportScrapeTimeout bool,
1097) *scrapeLoop {
1098	if l == nil {
1099		l = log.NewNopLogger()
1100	}
1101	if buffers == nil {
1102		buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
1103	}
1104	if cache == nil {
1105		cache = newScrapeCache()
1106	}
1107	sl := &scrapeLoop{
1108		scraper:             sc,
1109		buffers:             buffers,
1110		cache:               cache,
1111		appender:            appender,
1112		sampleMutator:       sampleMutator,
1113		reportSampleMutator: reportSampleMutator,
1114		stopped:             make(chan struct{}),
1115		jitterSeed:          jitterSeed,
1116		l:                   l,
1117		parentCtx:           ctx,
1118		honorTimestamps:     honorTimestamps,
1119		sampleLimit:         sampleLimit,
1120		labelLimits:         labelLimits,
1121		interval:            interval,
1122		timeout:             timeout,
1123		reportScrapeTimeout: reportScrapeTimeout,
1124	}
1125	sl.ctx, sl.cancel = context.WithCancel(ctx)
1126
1127	return sl
1128}
1129
1130func (sl *scrapeLoop) run(errc chan<- error) {
1131	select {
1132	case <-time.After(sl.scraper.offset(sl.interval, sl.jitterSeed)):
1133		// Continue after a scraping offset.
1134	case <-sl.ctx.Done():
1135		close(sl.stopped)
1136		return
1137	}
1138
1139	var last time.Time
1140
1141	alignedScrapeTime := time.Now().Round(0)
1142	ticker := time.NewTicker(sl.interval)
1143	defer ticker.Stop()
1144
1145mainLoop:
1146	for {
1147		select {
1148		case <-sl.parentCtx.Done():
1149			close(sl.stopped)
1150			return
1151		case <-sl.ctx.Done():
1152			break mainLoop
1153		default:
1154		}
1155
1156		// Temporary workaround for a jitter in go timers that causes disk space
1157		// increase in TSDB.
1158		// See https://github.com/prometheus/prometheus/issues/7846
1159		// Calling Round ensures the time used is the wall clock, as otherwise .Sub
1160		// and .Add on time.Time behave differently (see time package docs).
1161		scrapeTime := time.Now().Round(0)
1162		if AlignScrapeTimestamps && sl.interval > 100*ScrapeTimestampTolerance {
1163			// For some reason, a tick might have been skipped, in which case we
1164			// would call alignedScrapeTime.Add(interval) multiple times.
1165			for scrapeTime.Sub(alignedScrapeTime) >= sl.interval {
1166				alignedScrapeTime = alignedScrapeTime.Add(sl.interval)
1167			}
1168			// Align the scrape time if we are in the tolerance boundaries.
1169			if scrapeTime.Sub(alignedScrapeTime) <= ScrapeTimestampTolerance {
1170				scrapeTime = alignedScrapeTime
1171			}
1172		}
1173
1174		last = sl.scrapeAndReport(sl.interval, sl.timeout, last, scrapeTime, errc)
1175
1176		select {
1177		case <-sl.parentCtx.Done():
1178			close(sl.stopped)
1179			return
1180		case <-sl.ctx.Done():
1181			break mainLoop
1182		case <-ticker.C:
1183		}
1184	}
1185
1186	close(sl.stopped)
1187
1188	if !sl.disabledEndOfRunStalenessMarkers {
1189		sl.endOfRunStaleness(last, ticker, sl.interval)
1190	}
1191}
1192
1193// scrapeAndReport performs a scrape and then appends the result to the storage
1194// together with reporting metrics, by using as few appenders as possible.
1195// In the happy scenario, a single appender is used.
1196// This function uses sl.parentCtx instead of sl.ctx on purpose. A scrape should
1197// only be cancelled on shutdown, not on reloads.
1198func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last, appendTime time.Time, errc chan<- error) time.Time {
1199	start := time.Now()
1200
1201	// Only record after the first scrape.
1202	if !last.IsZero() {
1203		targetIntervalLength.WithLabelValues(interval.String()).Observe(
1204			time.Since(last).Seconds(),
1205		)
1206	}
1207
1208	b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
1209	defer sl.buffers.Put(b)
1210	buf := bytes.NewBuffer(b)
1211
1212	var total, added, seriesAdded int
1213	var err, appErr, scrapeErr error
1214
1215	app := sl.appender(sl.parentCtx)
1216	defer func() {
1217		if err != nil {
1218			app.Rollback()
1219			return
1220		}
1221		err = app.Commit()
1222		if err != nil {
1223			level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err)
1224		}
1225	}()
1226
1227	defer func() {
1228		if err = sl.report(app, appendTime, timeout, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
1229			level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
1230		}
1231	}()
1232
1233	if forcedErr := sl.getForcedError(); forcedErr != nil {
1234		scrapeErr = forcedErr
1235		// Add stale markers.
1236		if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
1237			app.Rollback()
1238			app = sl.appender(sl.parentCtx)
1239			level.Warn(sl.l).Log("msg", "Append failed", "err", err)
1240		}
1241		if errc != nil {
1242			errc <- forcedErr
1243		}
1244
1245		return start
1246	}
1247
1248	var contentType string
1249	scrapeCtx, cancel := context.WithTimeout(sl.parentCtx, timeout)
1250	contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf)
1251	cancel()
1252
1253	if scrapeErr == nil {
1254		b = buf.Bytes()
1255		// NOTE: There were issues with misbehaving clients in the past
1256		// that occasionally returned empty results. We don't want those
1257		// to falsely reset our buffer size.
1258		if len(b) > 0 {
1259			sl.lastScrapeSize = len(b)
1260		}
1261	} else {
1262		level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr)
1263		if errc != nil {
1264			errc <- scrapeErr
1265		}
1266	}
1267
1268	// A failed scrape is the same as an empty scrape,
1269	// we still call sl.append to trigger stale markers.
1270	total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
1271	if appErr != nil {
1272		app.Rollback()
1273		app = sl.appender(sl.parentCtx)
1274		level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
1275		// The append failed, probably due to a parse error or sample limit.
1276		// Call sl.append again with an empty scrape to trigger stale markers.
1277		if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
1278			app.Rollback()
1279			app = sl.appender(sl.parentCtx)
1280			level.Warn(sl.l).Log("msg", "Append failed", "err", err)
1281		}
1282	}
1283
1284	if scrapeErr == nil {
1285		scrapeErr = appErr
1286	}
1287
1288	return start
1289}
1290
1291func (sl *scrapeLoop) setForcedError(err error) {
1292	sl.forcedErrMtx.Lock()
1293	defer sl.forcedErrMtx.Unlock()
1294	sl.forcedErr = err
1295}
1296
1297func (sl *scrapeLoop) getForcedError() error {
1298	sl.forcedErrMtx.Lock()
1299	defer sl.forcedErrMtx.Unlock()
1300	return sl.forcedErr
1301}
1302
1303func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) {
1304	// Scraping has stopped. We want to write stale markers but
1305	// the target may be recreated, so we wait just over 2 scrape intervals
1306	// before creating them.
1307	// If the context is canceled, we presume the server is shutting down
1308	// and will restart where is was. We do not attempt to write stale markers
1309	// in this case.
1310
1311	if last.IsZero() {
1312		// There never was a scrape, so there will be no stale markers.
1313		return
1314	}
1315
1316	// Wait for when the next scrape would have been, record its timestamp.
1317	var staleTime time.Time
1318	select {
1319	case <-sl.parentCtx.Done():
1320		return
1321	case <-ticker.C:
1322		staleTime = time.Now()
1323	}
1324
1325	// Wait for when the next scrape would have been, if the target was recreated
1326	// samples should have been ingested by now.
1327	select {
1328	case <-sl.parentCtx.Done():
1329		return
1330	case <-ticker.C:
1331	}
1332
1333	// Wait for an extra 10% of the interval, just to be safe.
1334	select {
1335	case <-sl.parentCtx.Done():
1336		return
1337	case <-time.After(interval / 10):
1338	}
1339
1340	// Call sl.append again with an empty scrape to trigger stale markers.
1341	// If the target has since been recreated and scraped, the
1342	// stale markers will be out of order and ignored.
1343	app := sl.appender(sl.ctx)
1344	var err error
1345	defer func() {
1346		if err != nil {
1347			app.Rollback()
1348			return
1349		}
1350		err = app.Commit()
1351		if err != nil {
1352			level.Warn(sl.l).Log("msg", "Stale commit failed", "err", err)
1353		}
1354	}()
1355	if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil {
1356		app.Rollback()
1357		app = sl.appender(sl.ctx)
1358		level.Warn(sl.l).Log("msg", "Stale append failed", "err", err)
1359	}
1360	if err = sl.reportStale(app, staleTime); err != nil {
1361		level.Warn(sl.l).Log("msg", "Stale report failed", "err", err)
1362	}
1363}
1364
1365// Stop the scraping. May still write data and stale markers after it has
1366// returned. Cancel the context to stop all writes.
1367func (sl *scrapeLoop) stop() {
1368	sl.cancel()
1369	<-sl.stopped
1370}
1371
1372func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() {
1373	sl.disabledEndOfRunStalenessMarkers = true
1374}
1375
1376func (sl *scrapeLoop) getCache() *scrapeCache {
1377	return sl.cache
1378}
1379
1380type appendErrors struct {
1381	numOutOfOrder         int
1382	numDuplicates         int
1383	numOutOfBounds        int
1384	numExemplarOutOfOrder int
1385}
1386
1387func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
1388	var (
1389		p              = textparse.New(b, contentType)
1390		defTime        = timestamp.FromTime(ts)
1391		appErrs        = appendErrors{}
1392		sampleLimitErr error
1393		e              exemplar.Exemplar // escapes to heap so hoisted out of loop
1394	)
1395
1396	defer func() {
1397		if err != nil {
1398			return
1399		}
1400		// Only perform cache cleaning if the scrape was not empty.
1401		// An empty scrape (usually) is used to indicate a failed scrape.
1402		sl.cache.iterDone(len(b) > 0)
1403	}()
1404
1405loop:
1406	for {
1407		var (
1408			et          textparse.Entry
1409			sampleAdded bool
1410		)
1411		if et, err = p.Next(); err != nil {
1412			if err == io.EOF {
1413				err = nil
1414			}
1415			break
1416		}
1417		switch et {
1418		case textparse.EntryType:
1419			sl.cache.setType(p.Type())
1420			continue
1421		case textparse.EntryHelp:
1422			sl.cache.setHelp(p.Help())
1423			continue
1424		case textparse.EntryUnit:
1425			sl.cache.setUnit(p.Unit())
1426			continue
1427		case textparse.EntryComment:
1428			continue
1429		default:
1430		}
1431		total++
1432
1433		t := defTime
1434		met, tp, v := p.Series()
1435		if !sl.honorTimestamps {
1436			tp = nil
1437		}
1438		if tp != nil {
1439			t = *tp
1440		}
1441
1442		if sl.cache.getDropped(yoloString(met)) {
1443			continue
1444		}
1445		ce, ok := sl.cache.get(yoloString(met))
1446		var (
1447			ref  uint64
1448			lset labels.Labels
1449			mets string
1450			hash uint64
1451		)
1452
1453		if ok {
1454			ref = ce.ref
1455			lset = ce.lset
1456		} else {
1457			mets = p.Metric(&lset)
1458			hash = lset.Hash()
1459
1460			// Hash label set as it is seen local to the target. Then add target labels
1461			// and relabeling and store the final label set.
1462			lset = sl.sampleMutator(lset)
1463
1464			// The label set may be set to nil to indicate dropping.
1465			if lset == nil {
1466				sl.cache.addDropped(mets)
1467				continue
1468			}
1469
1470			if !lset.Has(labels.MetricName) {
1471				err = errNameLabelMandatory
1472				break loop
1473			}
1474
1475			// If any label limits is exceeded the scrape should fail.
1476			if err = verifyLabelLimits(lset, sl.labelLimits); err != nil {
1477				targetScrapePoolExceededLabelLimits.Inc()
1478				break loop
1479			}
1480		}
1481
1482		ref, err = app.Append(ref, lset, t, v)
1483		sampleAdded, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs)
1484		if err != nil {
1485			if err != storage.ErrNotFound {
1486				level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
1487			}
1488			break loop
1489		}
1490
1491		if !ok {
1492			if tp == nil {
1493				// Bypass staleness logic if there is an explicit timestamp.
1494				sl.cache.trackStaleness(hash, lset)
1495			}
1496			sl.cache.addRef(mets, ref, lset, hash)
1497			if sampleAdded && sampleLimitErr == nil {
1498				seriesAdded++
1499			}
1500		}
1501
1502		// Increment added even if there's an error so we correctly report the
1503		// number of samples remaining after relabeling.
1504		added++
1505
1506		if hasExemplar := p.Exemplar(&e); hasExemplar {
1507			if !e.HasTs {
1508				e.Ts = t
1509			}
1510			_, exemplarErr := app.AppendExemplar(ref, lset, e)
1511			exemplarErr = sl.checkAddExemplarError(exemplarErr, e, &appErrs)
1512			if exemplarErr != nil {
1513				// Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors.
1514				level.Debug(sl.l).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
1515			}
1516			e = exemplar.Exemplar{} // reset for next time round loop
1517		}
1518
1519	}
1520	if sampleLimitErr != nil {
1521		if err == nil {
1522			err = sampleLimitErr
1523		}
1524		// We only want to increment this once per scrape, so this is Inc'd outside the loop.
1525		targetScrapeSampleLimit.Inc()
1526	}
1527	if appErrs.numOutOfOrder > 0 {
1528		level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
1529	}
1530	if appErrs.numDuplicates > 0 {
1531		level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates)
1532	}
1533	if appErrs.numOutOfBounds > 0 {
1534		level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds)
1535	}
1536	if appErrs.numExemplarOutOfOrder > 0 {
1537		level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
1538	}
1539	if err == nil {
1540		sl.cache.forEachStale(func(lset labels.Labels) bool {
1541			// Series no longer exposed, mark it stale.
1542			_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
1543			switch errors.Cause(err) {
1544			case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
1545				// Do not count these in logging, as this is expected if a target
1546				// goes away and comes back again with a new scrape loop.
1547				err = nil
1548			}
1549			return err == nil
1550		})
1551	}
1552	return
1553}
1554
1555func yoloString(b []byte) string {
1556	return *((*string)(unsafe.Pointer(&b)))
1557}
1558
1559// Adds samples to the appender, checking the error, and then returns the # of samples added,
1560// whether the caller should continue to process more samples, and any sample limit errors.
1561func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr *error, appErrs *appendErrors) (bool, error) {
1562	switch errors.Cause(err) {
1563	case nil:
1564		if tp == nil && ce != nil {
1565			sl.cache.trackStaleness(ce.hash, ce.lset)
1566		}
1567		return true, nil
1568	case storage.ErrNotFound:
1569		return false, storage.ErrNotFound
1570	case storage.ErrOutOfOrderSample:
1571		appErrs.numOutOfOrder++
1572		level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
1573		targetScrapeSampleOutOfOrder.Inc()
1574		return false, nil
1575	case storage.ErrDuplicateSampleForTimestamp:
1576		appErrs.numDuplicates++
1577		level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
1578		targetScrapeSampleDuplicate.Inc()
1579		return false, nil
1580	case storage.ErrOutOfBounds:
1581		appErrs.numOutOfBounds++
1582		level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
1583		targetScrapeSampleOutOfBounds.Inc()
1584		return false, nil
1585	case errSampleLimit:
1586		// Keep on parsing output if we hit the limit, so we report the correct
1587		// total number of samples scraped.
1588		*sampleLimitErr = err
1589		return false, nil
1590	default:
1591		return false, err
1592	}
1593}
1594
1595func (sl *scrapeLoop) checkAddExemplarError(err error, e exemplar.Exemplar, appErrs *appendErrors) error {
1596	switch errors.Cause(err) {
1597	case storage.ErrNotFound:
1598		return storage.ErrNotFound
1599	case storage.ErrOutOfOrderExemplar:
1600		appErrs.numExemplarOutOfOrder++
1601		level.Debug(sl.l).Log("msg", "Out of order exemplar", "exemplar", fmt.Sprintf("%+v", e))
1602		targetScrapeExemplarOutOfOrder.Inc()
1603		return nil
1604	default:
1605		return err
1606	}
1607}
1608
1609// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
1610// with scraped metrics in the cache.
1611const (
1612	scrapeHealthMetricName       = "up" + "\xff"
1613	scrapeDurationMetricName     = "scrape_duration_seconds" + "\xff"
1614	scrapeSamplesMetricName      = "scrape_samples_scraped" + "\xff"
1615	samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff"
1616	scrapeSeriesAddedMetricName  = "scrape_series_added" + "\xff"
1617	scrapeTimeoutMetricName      = "scrape_timeout_seconds" + "\xff"
1618	scrapeSampleLimitMetricName  = "scrape_sample_limit" + "\xff"
1619)
1620
1621func (sl *scrapeLoop) report(app storage.Appender, start time.Time, timeout, duration time.Duration, scraped, added, seriesAdded int, scrapeErr error) (err error) {
1622	sl.scraper.Report(start, duration, scrapeErr)
1623
1624	ts := timestamp.FromTime(start)
1625
1626	var health float64
1627	if scrapeErr == nil {
1628		health = 1
1629	}
1630
1631	if err = sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
1632		return
1633	}
1634	if err = sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil {
1635		return
1636	}
1637	if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil {
1638		return
1639	}
1640	if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(added)); err != nil {
1641		return
1642	}
1643	if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil {
1644		return
1645	}
1646	if sl.reportScrapeTimeout {
1647		if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, timeout.Seconds()); err != nil {
1648			return
1649		}
1650		if err = sl.addReportSample(app, scrapeSampleLimitMetricName, ts, float64(sl.sampleLimit)); err != nil {
1651			return
1652		}
1653	}
1654	return
1655}
1656
1657func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) {
1658	ts := timestamp.FromTime(start)
1659
1660	stale := math.Float64frombits(value.StaleNaN)
1661
1662	if err = sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil {
1663		return
1664	}
1665	if err = sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil {
1666		return
1667	}
1668	if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil {
1669		return
1670	}
1671	if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil {
1672		return
1673	}
1674	if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil {
1675		return
1676	}
1677	if sl.reportScrapeTimeout {
1678		if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, stale); err != nil {
1679			return
1680		}
1681		if err = sl.addReportSample(app, scrapeSampleLimitMetricName, ts, stale); err != nil {
1682			return
1683		}
1684	}
1685	return
1686}
1687
1688func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
1689	ce, ok := sl.cache.get(s)
1690	var ref uint64
1691	var lset labels.Labels
1692	if ok {
1693		ref = ce.ref
1694		lset = ce.lset
1695	} else {
1696		lset = labels.Labels{
1697			// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
1698			// with scraped metrics in the cache.
1699			// We have to drop it when building the actual metric.
1700			labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]},
1701		}
1702		lset = sl.reportSampleMutator(lset)
1703	}
1704
1705	ref, err := app.Append(ref, lset, t, v)
1706	switch errors.Cause(err) {
1707	case nil:
1708		if !ok {
1709			sl.cache.addRef(s, ref, lset, lset.Hash())
1710		}
1711		return nil
1712	case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
1713		// Do not log here, as this is expected if a target goes away and comes back
1714		// again with a new scrape loop.
1715		return nil
1716	default:
1717		return err
1718	}
1719}
1720
1721// zeroConfig returns a new scrape config that only contains configuration items
1722// that alter metrics.
1723func zeroConfig(c *config.ScrapeConfig) *config.ScrapeConfig {
1724	z := *c
1725	// We zero out the fields that for sure don't affect scrape.
1726	z.ScrapeInterval = 0
1727	z.ScrapeTimeout = 0
1728	z.SampleLimit = 0
1729	z.LabelLimit = 0
1730	z.LabelNameLengthLimit = 0
1731	z.LabelValueLengthLimit = 0
1732	z.HTTPClientConfig = config_util.HTTPClientConfig{}
1733	return &z
1734}
1735
1736// reusableCache compares two scrape config and tells whether the cache is still
1737// valid.
1738func reusableCache(r, l *config.ScrapeConfig) bool {
1739	if r == nil || l == nil {
1740		return false
1741	}
1742	return reflect.DeepEqual(zeroConfig(r), zeroConfig(l))
1743}
1744