1// Copyright 2016 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package scrape
15
16import (
17	"bufio"
18	"bytes"
19	"compress/gzip"
20	"context"
21	"fmt"
22	"io"
23	"io/ioutil"
24	"math"
25	"net/http"
26	"sync"
27	"time"
28	"unsafe"
29
30	"github.com/go-kit/kit/log"
31	"github.com/go-kit/kit/log/level"
32	"github.com/pkg/errors"
33	"github.com/prometheus/client_golang/prometheus"
34	config_util "github.com/prometheus/common/config"
35	"github.com/prometheus/common/model"
36	"github.com/prometheus/common/version"
37
38	"github.com/prometheus/prometheus/config"
39	"github.com/prometheus/prometheus/discovery/targetgroup"
40	"github.com/prometheus/prometheus/pkg/labels"
41	"github.com/prometheus/prometheus/pkg/pool"
42	"github.com/prometheus/prometheus/pkg/relabel"
43	"github.com/prometheus/prometheus/pkg/textparse"
44	"github.com/prometheus/prometheus/pkg/timestamp"
45	"github.com/prometheus/prometheus/pkg/value"
46	"github.com/prometheus/prometheus/storage"
47)
48
49var (
50	targetIntervalLength = prometheus.NewSummaryVec(
51		prometheus.SummaryOpts{
52			Name:       "prometheus_target_interval_length_seconds",
53			Help:       "Actual intervals between scrapes.",
54			Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
55		},
56		[]string{"interval"},
57	)
58	targetReloadIntervalLength = prometheus.NewSummaryVec(
59		prometheus.SummaryOpts{
60			Name:       "prometheus_target_reload_length_seconds",
61			Help:       "Actual interval to reload the scrape pool with a given configuration.",
62			Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
63		},
64		[]string{"interval"},
65	)
66	targetScrapePools = prometheus.NewCounter(
67		prometheus.CounterOpts{
68			Name: "prometheus_target_scrape_pools_total",
69			Help: "Total number of scrape pool creation atttempts.",
70		},
71	)
72	targetScrapePoolsFailed = prometheus.NewCounter(
73		prometheus.CounterOpts{
74			Name: "prometheus_target_scrape_pools_failed_total",
75			Help: "Total number of scrape pool creations that failed.",
76		},
77	)
78	targetScrapePoolReloads = prometheus.NewCounter(
79		prometheus.CounterOpts{
80			Name: "prometheus_target_scrape_pool_reloads_total",
81			Help: "Total number of scrape loop reloads.",
82		},
83	)
84	targetScrapePoolReloadsFailed = prometheus.NewCounter(
85		prometheus.CounterOpts{
86			Name: "prometheus_target_scrape_pool_reloads_failed_total",
87			Help: "Total number of failed scrape loop reloads.",
88		},
89	)
90	targetSyncIntervalLength = prometheus.NewSummaryVec(
91		prometheus.SummaryOpts{
92			Name:       "prometheus_target_sync_length_seconds",
93			Help:       "Actual interval to sync the scrape pool.",
94			Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
95		},
96		[]string{"scrape_job"},
97	)
98	targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
99		prometheus.CounterOpts{
100			Name: "prometheus_target_scrape_pool_sync_total",
101			Help: "Total number of syncs that were executed on a scrape pool.",
102		},
103		[]string{"scrape_job"},
104	)
105	targetScrapeSampleLimit = prometheus.NewCounter(
106		prometheus.CounterOpts{
107			Name: "prometheus_target_scrapes_exceeded_sample_limit_total",
108			Help: "Total number of scrapes that hit the sample limit and were rejected.",
109		},
110	)
111	targetScrapeSampleDuplicate = prometheus.NewCounter(
112		prometheus.CounterOpts{
113			Name: "prometheus_target_scrapes_sample_duplicate_timestamp_total",
114			Help: "Total number of samples rejected due to duplicate timestamps but different values",
115		},
116	)
117	targetScrapeSampleOutOfOrder = prometheus.NewCounter(
118		prometheus.CounterOpts{
119			Name: "prometheus_target_scrapes_sample_out_of_order_total",
120			Help: "Total number of samples rejected due to not being out of the expected order",
121		},
122	)
123	targetScrapeSampleOutOfBounds = prometheus.NewCounter(
124		prometheus.CounterOpts{
125			Name: "prometheus_target_scrapes_sample_out_of_bounds_total",
126			Help: "Total number of samples rejected due to timestamp falling outside of the time bounds",
127		},
128	)
129	targetScrapeCacheFlushForced = prometheus.NewCounter(
130		prometheus.CounterOpts{
131			Name: "prometheus_target_scrapes_cache_flush_forced_total",
132			Help: "How many times a scrape cache was flushed due to getting big while scrapes are failing.",
133		},
134	)
135)
136
137func init() {
138	prometheus.MustRegister(targetIntervalLength)
139	prometheus.MustRegister(targetReloadIntervalLength)
140	prometheus.MustRegister(targetScrapePools)
141	prometheus.MustRegister(targetScrapePoolsFailed)
142	prometheus.MustRegister(targetScrapePoolReloads)
143	prometheus.MustRegister(targetScrapePoolReloadsFailed)
144	prometheus.MustRegister(targetSyncIntervalLength)
145	prometheus.MustRegister(targetScrapePoolSyncsCounter)
146	prometheus.MustRegister(targetScrapeSampleLimit)
147	prometheus.MustRegister(targetScrapeSampleDuplicate)
148	prometheus.MustRegister(targetScrapeSampleOutOfOrder)
149	prometheus.MustRegister(targetScrapeSampleOutOfBounds)
150	prometheus.MustRegister(targetScrapeCacheFlushForced)
151}
152
153// scrapePool manages scrapes for sets of targets.
154type scrapePool struct {
155	appendable Appendable
156	logger     log.Logger
157
158	mtx    sync.RWMutex
159	config *config.ScrapeConfig
160	client *http.Client
161	// Targets and loops must always be synchronized to have the same
162	// set of hashes.
163	activeTargets  map[uint64]*Target
164	droppedTargets []*Target
165	loops          map[uint64]loop
166	cancel         context.CancelFunc
167
168	// Constructor for new scrape loops. This is settable for testing convenience.
169	newLoop func(scrapeLoopOptions) loop
170}
171
172type scrapeLoopOptions struct {
173	target          *Target
174	scraper         scraper
175	limit           int
176	honorLabels     bool
177	honorTimestamps bool
178	mrc             []*relabel.Config
179}
180
181const maxAheadTime = 10 * time.Minute
182
183type labelsMutator func(labels.Labels) labels.Labels
184
185func newScrapePool(cfg *config.ScrapeConfig, app Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {
186	targetScrapePools.Inc()
187	if logger == nil {
188		logger = log.NewNopLogger()
189	}
190
191	client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
192	if err != nil {
193		targetScrapePoolsFailed.Inc()
194		return nil, errors.Wrap(err, "error creating HTTP client")
195	}
196
197	buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
198
199	ctx, cancel := context.WithCancel(context.Background())
200	sp := &scrapePool{
201		cancel:        cancel,
202		appendable:    app,
203		config:        cfg,
204		client:        client,
205		activeTargets: map[uint64]*Target{},
206		loops:         map[uint64]loop{},
207		logger:        logger,
208	}
209	sp.newLoop = func(opts scrapeLoopOptions) loop {
210		// Update the targets retrieval function for metadata to a new scrape cache.
211		cache := newScrapeCache()
212		opts.target.setMetadataStore(cache)
213
214		return newScrapeLoop(
215			ctx,
216			opts.scraper,
217			log.With(logger, "target", opts.target),
218			buffers,
219			func(l labels.Labels) labels.Labels {
220				return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
221			},
222			func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
223			func() storage.Appender {
224				app, err := app.Appender()
225				if err != nil {
226					panic(err)
227				}
228				return appender(app, opts.limit)
229			},
230			cache,
231			jitterSeed,
232			opts.honorTimestamps,
233		)
234	}
235
236	return sp, nil
237}
238
239func (sp *scrapePool) ActiveTargets() []*Target {
240	sp.mtx.Lock()
241	defer sp.mtx.Unlock()
242
243	var tActive []*Target
244	for _, t := range sp.activeTargets {
245		tActive = append(tActive, t)
246	}
247	return tActive
248}
249
250func (sp *scrapePool) DroppedTargets() []*Target {
251	sp.mtx.Lock()
252	defer sp.mtx.Unlock()
253	return sp.droppedTargets
254}
255
256// stop terminates all scrape loops and returns after they all terminated.
257func (sp *scrapePool) stop() {
258	sp.cancel()
259	var wg sync.WaitGroup
260
261	sp.mtx.Lock()
262	defer sp.mtx.Unlock()
263
264	for fp, l := range sp.loops {
265		wg.Add(1)
266
267		go func(l loop) {
268			l.stop()
269			wg.Done()
270		}(l)
271
272		delete(sp.loops, fp)
273		delete(sp.activeTargets, fp)
274	}
275	wg.Wait()
276	sp.client.CloseIdleConnections()
277}
278
279// reload the scrape pool with the given scrape configuration. The target state is preserved
280// but all scrape loops are restarted with the new scrape configuration.
281// This method returns after all scrape loops that were stopped have stopped scraping.
282func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
283	targetScrapePoolReloads.Inc()
284	start := time.Now()
285
286	sp.mtx.Lock()
287	defer sp.mtx.Unlock()
288
289	client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
290	if err != nil {
291		targetScrapePoolReloadsFailed.Inc()
292		return errors.Wrap(err, "error creating HTTP client")
293	}
294	sp.config = cfg
295	oldClient := sp.client
296	sp.client = client
297
298	var (
299		wg              sync.WaitGroup
300		interval        = time.Duration(sp.config.ScrapeInterval)
301		timeout         = time.Duration(sp.config.ScrapeTimeout)
302		limit           = int(sp.config.SampleLimit)
303		honorLabels     = sp.config.HonorLabels
304		honorTimestamps = sp.config.HonorTimestamps
305		mrc             = sp.config.MetricRelabelConfigs
306	)
307
308	for fp, oldLoop := range sp.loops {
309		var (
310			t       = sp.activeTargets[fp]
311			s       = &targetScraper{Target: t, client: sp.client, timeout: timeout}
312			newLoop = sp.newLoop(scrapeLoopOptions{
313				target:          t,
314				scraper:         s,
315				limit:           limit,
316				honorLabels:     honorLabels,
317				honorTimestamps: honorTimestamps,
318				mrc:             mrc,
319			})
320		)
321		wg.Add(1)
322
323		go func(oldLoop, newLoop loop) {
324			oldLoop.stop()
325			wg.Done()
326
327			go newLoop.run(interval, timeout, nil)
328		}(oldLoop, newLoop)
329
330		sp.loops[fp] = newLoop
331	}
332
333	wg.Wait()
334	oldClient.CloseIdleConnections()
335	targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
336		time.Since(start).Seconds(),
337	)
338	return nil
339}
340
341// Sync converts target groups into actual scrape targets and synchronizes
342// the currently running scraper with the resulting set and returns all scraped and dropped targets.
343func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
344	start := time.Now()
345
346	var all []*Target
347	sp.mtx.Lock()
348	sp.droppedTargets = []*Target{}
349	for _, tg := range tgs {
350		targets, err := targetsFromGroup(tg, sp.config)
351		if err != nil {
352			level.Error(sp.logger).Log("msg", "creating targets failed", "err", err)
353			continue
354		}
355		for _, t := range targets {
356			if t.Labels().Len() > 0 {
357				all = append(all, t)
358			} else if t.DiscoveredLabels().Len() > 0 {
359				sp.droppedTargets = append(sp.droppedTargets, t)
360			}
361		}
362	}
363	sp.mtx.Unlock()
364	sp.sync(all)
365
366	targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
367		time.Since(start).Seconds(),
368	)
369	targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
370}
371
372// sync takes a list of potentially duplicated targets, deduplicates them, starts
373// scrape loops for new targets, and stops scrape loops for disappeared targets.
374// It returns after all stopped scrape loops terminated.
375func (sp *scrapePool) sync(targets []*Target) {
376	sp.mtx.Lock()
377	defer sp.mtx.Unlock()
378
379	var (
380		uniqueTargets   = map[uint64]struct{}{}
381		interval        = time.Duration(sp.config.ScrapeInterval)
382		timeout         = time.Duration(sp.config.ScrapeTimeout)
383		limit           = int(sp.config.SampleLimit)
384		honorLabels     = sp.config.HonorLabels
385		honorTimestamps = sp.config.HonorTimestamps
386		mrc             = sp.config.MetricRelabelConfigs
387	)
388
389	for _, t := range targets {
390		t := t
391		hash := t.hash()
392		uniqueTargets[hash] = struct{}{}
393
394		if _, ok := sp.activeTargets[hash]; !ok {
395			s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
396			l := sp.newLoop(scrapeLoopOptions{
397				target:          t,
398				scraper:         s,
399				limit:           limit,
400				honorLabels:     honorLabels,
401				honorTimestamps: honorTimestamps,
402				mrc:             mrc,
403			})
404
405			sp.activeTargets[hash] = t
406			sp.loops[hash] = l
407
408			go l.run(interval, timeout, nil)
409		} else {
410			// Need to keep the most updated labels information
411			// for displaying it in the Service Discovery web page.
412			sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
413		}
414	}
415
416	var wg sync.WaitGroup
417
418	// Stop and remove old targets and scraper loops.
419	for hash := range sp.activeTargets {
420		if _, ok := uniqueTargets[hash]; !ok {
421			wg.Add(1)
422			go func(l loop) {
423
424				l.stop()
425
426				wg.Done()
427			}(sp.loops[hash])
428
429			delete(sp.loops, hash)
430			delete(sp.activeTargets, hash)
431		}
432	}
433
434	// Wait for all potentially stopped scrapers to terminate.
435	// This covers the case of flapping targets. If the server is under high load, a new scraper
436	// may be active and tries to insert. The old scraper that didn't terminate yet could still
437	// be inserting a previous sample set.
438	wg.Wait()
439}
440
441func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*relabel.Config) labels.Labels {
442	lb := labels.NewBuilder(lset)
443
444	if honor {
445		for _, l := range target.Labels() {
446			if !lset.Has(l.Name) {
447				lb.Set(l.Name, l.Value)
448			}
449		}
450	} else {
451		for _, l := range target.Labels() {
452			lv := lset.Get(l.Name)
453			if lv != "" {
454				lb.Set(model.ExportedLabelPrefix+l.Name, lv)
455			}
456			lb.Set(l.Name, l.Value)
457		}
458	}
459
460	for _, l := range lb.Labels() {
461		if l.Value == "" {
462			lb.Del(l.Name)
463		}
464	}
465
466	res := lb.Labels()
467
468	if len(rc) > 0 {
469		res = relabel.Process(res, rc...)
470	}
471
472	return res
473}
474
475func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels {
476	lb := labels.NewBuilder(lset)
477
478	for _, l := range target.Labels() {
479		lv := lset.Get(l.Name)
480		if lv != "" {
481			lb.Set(model.ExportedLabelPrefix+l.Name, lv)
482		}
483		lb.Set(l.Name, l.Value)
484	}
485
486	return lb.Labels()
487}
488
489// appender returns an appender for ingested samples from the target.
490func appender(app storage.Appender, limit int) storage.Appender {
491	app = &timeLimitAppender{
492		Appender: app,
493		maxTime:  timestamp.FromTime(time.Now().Add(maxAheadTime)),
494	}
495
496	// The limit is applied after metrics are potentially dropped via relabeling.
497	if limit > 0 {
498		app = &limitAppender{
499			Appender: app,
500			limit:    limit,
501		}
502	}
503	return app
504}
505
506// A scraper retrieves samples and accepts a status report at the end.
507type scraper interface {
508	scrape(ctx context.Context, w io.Writer) (string, error)
509	report(start time.Time, dur time.Duration, err error)
510	offset(interval time.Duration, jitterSeed uint64) time.Duration
511}
512
513// targetScraper implements the scraper interface for a target.
514type targetScraper struct {
515	*Target
516
517	client  *http.Client
518	req     *http.Request
519	timeout time.Duration
520
521	gzipr *gzip.Reader
522	buf   *bufio.Reader
523}
524
525const acceptHeader = `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`
526
527var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version)
528
529func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
530	if s.req == nil {
531		req, err := http.NewRequest("GET", s.URL().String(), nil)
532		if err != nil {
533			return "", err
534		}
535		req.Header.Add("Accept", acceptHeader)
536		req.Header.Add("Accept-Encoding", "gzip")
537		req.Header.Set("User-Agent", userAgentHeader)
538		req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))
539
540		s.req = req
541	}
542
543	resp, err := s.client.Do(s.req.WithContext(ctx))
544	if err != nil {
545		return "", err
546	}
547	defer func() {
548		io.Copy(ioutil.Discard, resp.Body)
549		resp.Body.Close()
550	}()
551
552	if resp.StatusCode != http.StatusOK {
553		return "", errors.Errorf("server returned HTTP status %s", resp.Status)
554	}
555
556	if resp.Header.Get("Content-Encoding") != "gzip" {
557		_, err = io.Copy(w, resp.Body)
558		if err != nil {
559			return "", err
560		}
561		return resp.Header.Get("Content-Type"), nil
562	}
563
564	if s.gzipr == nil {
565		s.buf = bufio.NewReader(resp.Body)
566		s.gzipr, err = gzip.NewReader(s.buf)
567		if err != nil {
568			return "", err
569		}
570	} else {
571		s.buf.Reset(resp.Body)
572		if err = s.gzipr.Reset(s.buf); err != nil {
573			return "", err
574		}
575	}
576
577	_, err = io.Copy(w, s.gzipr)
578	s.gzipr.Close()
579	if err != nil {
580		return "", err
581	}
582	return resp.Header.Get("Content-Type"), nil
583}
584
585// A loop can run and be stopped again. It must not be reused after it was stopped.
586type loop interface {
587	run(interval, timeout time.Duration, errc chan<- error)
588	stop()
589}
590
591type cacheEntry struct {
592	ref      uint64
593	lastIter uint64
594	hash     uint64
595	lset     labels.Labels
596}
597
598type scrapeLoop struct {
599	scraper         scraper
600	l               log.Logger
601	cache           *scrapeCache
602	lastScrapeSize  int
603	buffers         *pool.Pool
604	jitterSeed      uint64
605	honorTimestamps bool
606
607	appender            func() storage.Appender
608	sampleMutator       labelsMutator
609	reportSampleMutator labelsMutator
610
611	ctx       context.Context
612	scrapeCtx context.Context
613	cancel    func()
614	stopped   chan struct{}
615}
616
617// scrapeCache tracks mappings of exposed metric strings to label sets and
618// storage references. Additionally, it tracks staleness of series between
619// scrapes.
620type scrapeCache struct {
621	iter uint64 // Current scrape iteration.
622
623	// How many series and metadata entries there were at the last success.
624	successfulCount int
625
626	// Parsed string to an entry with information about the actual label set
627	// and its storage reference.
628	series map[string]*cacheEntry
629
630	// Cache of dropped metric strings and their iteration. The iteration must
631	// be a pointer so we can update it without setting a new entry with an unsafe
632	// string in addDropped().
633	droppedSeries map[string]*uint64
634
635	// seriesCur and seriesPrev store the labels of series that were seen
636	// in the current and previous scrape.
637	// We hold two maps and swap them out to save allocations.
638	seriesCur  map[uint64]labels.Labels
639	seriesPrev map[uint64]labels.Labels
640
641	metaMtx  sync.Mutex
642	metadata map[string]*metaEntry
643}
644
645// metaEntry holds meta information about a metric.
646type metaEntry struct {
647	lastIter uint64 // Last scrape iteration the entry was observed at.
648	typ      textparse.MetricType
649	help     string
650	unit     string
651}
652
653func newScrapeCache() *scrapeCache {
654	return &scrapeCache{
655		series:        map[string]*cacheEntry{},
656		droppedSeries: map[string]*uint64{},
657		seriesCur:     map[uint64]labels.Labels{},
658		seriesPrev:    map[uint64]labels.Labels{},
659		metadata:      map[string]*metaEntry{},
660	}
661}
662
663func (c *scrapeCache) iterDone(flushCache bool) {
664	c.metaMtx.Lock()
665	count := len(c.series) + len(c.droppedSeries) + len(c.metadata)
666	c.metaMtx.Unlock()
667
668	if flushCache {
669		c.successfulCount = count
670	} else if count > c.successfulCount*2+1000 {
671		// If a target had varying labels in scrapes that ultimately failed,
672		// the caches would grow indefinitely. Force a flush when this happens.
673		// We use the heuristic that this is a doubling of the cache size
674		// since the last scrape, and allow an additional 1000 in case
675		// initial scrapes all fail.
676		flushCache = true
677		targetScrapeCacheFlushForced.Inc()
678	}
679
680	if flushCache {
681		// All caches may grow over time through series churn
682		// or multiple string representations of the same metric. Clean up entries
683		// that haven't appeared in the last scrape.
684		for s, e := range c.series {
685			if c.iter != e.lastIter {
686				delete(c.series, s)
687			}
688		}
689		for s, iter := range c.droppedSeries {
690			if c.iter != *iter {
691				delete(c.droppedSeries, s)
692			}
693		}
694		c.metaMtx.Lock()
695		for m, e := range c.metadata {
696			// Keep metadata around for 10 scrapes after its metric disappeared.
697			if c.iter-e.lastIter > 10 {
698				delete(c.metadata, m)
699			}
700		}
701		c.metaMtx.Unlock()
702
703		c.iter++
704	}
705
706	// Swap current and previous series.
707	c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
708
709	// We have to delete every single key in the map.
710	for k := range c.seriesCur {
711		delete(c.seriesCur, k)
712	}
713}
714
715func (c *scrapeCache) get(met string) (*cacheEntry, bool) {
716	e, ok := c.series[met]
717	if !ok {
718		return nil, false
719	}
720	e.lastIter = c.iter
721	return e, true
722}
723
724func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) {
725	if ref == 0 {
726		return
727	}
728	c.series[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
729}
730
731func (c *scrapeCache) addDropped(met string) {
732	iter := c.iter
733	c.droppedSeries[met] = &iter
734}
735
736func (c *scrapeCache) getDropped(met string) bool {
737	iterp, ok := c.droppedSeries[met]
738	if ok {
739		*iterp = c.iter
740	}
741	return ok
742}
743
744func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {
745	c.seriesCur[hash] = lset
746}
747
748func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
749	for h, lset := range c.seriesPrev {
750		if _, ok := c.seriesCur[h]; !ok {
751			if !f(lset) {
752				break
753			}
754		}
755	}
756}
757
758func (c *scrapeCache) setType(metric []byte, t textparse.MetricType) {
759	c.metaMtx.Lock()
760
761	e, ok := c.metadata[yoloString(metric)]
762	if !ok {
763		e = &metaEntry{typ: textparse.MetricTypeUnknown}
764		c.metadata[string(metric)] = e
765	}
766	e.typ = t
767	e.lastIter = c.iter
768
769	c.metaMtx.Unlock()
770}
771
772func (c *scrapeCache) setHelp(metric, help []byte) {
773	c.metaMtx.Lock()
774
775	e, ok := c.metadata[yoloString(metric)]
776	if !ok {
777		e = &metaEntry{typ: textparse.MetricTypeUnknown}
778		c.metadata[string(metric)] = e
779	}
780	if e.help != yoloString(help) {
781		e.help = string(help)
782	}
783	e.lastIter = c.iter
784
785	c.metaMtx.Unlock()
786}
787
788func (c *scrapeCache) setUnit(metric, unit []byte) {
789	c.metaMtx.Lock()
790
791	e, ok := c.metadata[yoloString(metric)]
792	if !ok {
793		e = &metaEntry{typ: textparse.MetricTypeUnknown}
794		c.metadata[string(metric)] = e
795	}
796	if e.unit != yoloString(unit) {
797		e.unit = string(unit)
798	}
799	e.lastIter = c.iter
800
801	c.metaMtx.Unlock()
802}
803
804func (c *scrapeCache) getMetadata(metric string) (MetricMetadata, bool) {
805	c.metaMtx.Lock()
806	defer c.metaMtx.Unlock()
807
808	m, ok := c.metadata[metric]
809	if !ok {
810		return MetricMetadata{}, false
811	}
812	return MetricMetadata{
813		Metric: metric,
814		Type:   m.typ,
815		Help:   m.help,
816		Unit:   m.unit,
817	}, true
818}
819
820func (c *scrapeCache) listMetadata() []MetricMetadata {
821	c.metaMtx.Lock()
822	defer c.metaMtx.Unlock()
823
824	res := make([]MetricMetadata, 0, len(c.metadata))
825
826	for m, e := range c.metadata {
827		res = append(res, MetricMetadata{
828			Metric: m,
829			Type:   e.typ,
830			Help:   e.help,
831			Unit:   e.unit,
832		})
833	}
834	return res
835}
836
837func newScrapeLoop(ctx context.Context,
838	sc scraper,
839	l log.Logger,
840	buffers *pool.Pool,
841	sampleMutator labelsMutator,
842	reportSampleMutator labelsMutator,
843	appender func() storage.Appender,
844	cache *scrapeCache,
845	jitterSeed uint64,
846	honorTimestamps bool,
847) *scrapeLoop {
848	if l == nil {
849		l = log.NewNopLogger()
850	}
851	if buffers == nil {
852		buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
853	}
854	if cache == nil {
855		cache = newScrapeCache()
856	}
857	sl := &scrapeLoop{
858		scraper:             sc,
859		buffers:             buffers,
860		cache:               cache,
861		appender:            appender,
862		sampleMutator:       sampleMutator,
863		reportSampleMutator: reportSampleMutator,
864		stopped:             make(chan struct{}),
865		jitterSeed:          jitterSeed,
866		l:                   l,
867		ctx:                 ctx,
868		honorTimestamps:     honorTimestamps,
869	}
870	sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)
871
872	return sl
873}
874
875func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
876	select {
877	case <-time.After(sl.scraper.offset(interval, sl.jitterSeed)):
878		// Continue after a scraping offset.
879	case <-sl.scrapeCtx.Done():
880		close(sl.stopped)
881		return
882	}
883
884	var last time.Time
885
886	ticker := time.NewTicker(interval)
887	defer ticker.Stop()
888
889mainLoop:
890	for {
891		select {
892		case <-sl.ctx.Done():
893			close(sl.stopped)
894			return
895		case <-sl.scrapeCtx.Done():
896			break mainLoop
897		default:
898		}
899
900		var (
901			start             = time.Now()
902			scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
903		)
904
905		// Only record after the first scrape.
906		if !last.IsZero() {
907			targetIntervalLength.WithLabelValues(interval.String()).Observe(
908				time.Since(last).Seconds(),
909			)
910		}
911
912		b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
913		buf := bytes.NewBuffer(b)
914
915		contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
916		cancel()
917
918		if scrapeErr == nil {
919			b = buf.Bytes()
920			// NOTE: There were issues with misbehaving clients in the past
921			// that occasionally returned empty results. We don't want those
922			// to falsely reset our buffer size.
923			if len(b) > 0 {
924				sl.lastScrapeSize = len(b)
925			}
926		} else {
927			level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
928			if errc != nil {
929				errc <- scrapeErr
930			}
931		}
932
933		// A failed scrape is the same as an empty scrape,
934		// we still call sl.append to trigger stale markers.
935		total, added, seriesAdded, appErr := sl.append(b, contentType, start)
936		if appErr != nil {
937			level.Warn(sl.l).Log("msg", "append failed", "err", appErr)
938			// The append failed, probably due to a parse error or sample limit.
939			// Call sl.append again with an empty scrape to trigger stale markers.
940			if _, _, _, err := sl.append([]byte{}, "", start); err != nil {
941				level.Warn(sl.l).Log("msg", "append failed", "err", err)
942			}
943		}
944
945		sl.buffers.Put(b)
946
947		if scrapeErr == nil {
948			scrapeErr = appErr
949		}
950
951		if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
952			level.Warn(sl.l).Log("msg", "appending scrape report failed", "err", err)
953		}
954		last = start
955
956		select {
957		case <-sl.ctx.Done():
958			close(sl.stopped)
959			return
960		case <-sl.scrapeCtx.Done():
961			break mainLoop
962		case <-ticker.C:
963		}
964	}
965
966	close(sl.stopped)
967
968	sl.endOfRunStaleness(last, ticker, interval)
969}
970
971func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) {
972	// Scraping has stopped. We want to write stale markers but
973	// the target may be recreated, so we wait just over 2 scrape intervals
974	// before creating them.
975	// If the context is canceled, we presume the server is shutting down
976	// and will restart where is was. We do not attempt to write stale markers
977	// in this case.
978
979	if last.IsZero() {
980		// There never was a scrape, so there will be no stale markers.
981		return
982	}
983
984	// Wait for when the next scrape would have been, record its timestamp.
985	var staleTime time.Time
986	select {
987	case <-sl.ctx.Done():
988		return
989	case <-ticker.C:
990		staleTime = time.Now()
991	}
992
993	// Wait for when the next scrape would have been, if the target was recreated
994	// samples should have been ingested by now.
995	select {
996	case <-sl.ctx.Done():
997		return
998	case <-ticker.C:
999	}
1000
1001	// Wait for an extra 10% of the interval, just to be safe.
1002	select {
1003	case <-sl.ctx.Done():
1004		return
1005	case <-time.After(interval / 10):
1006	}
1007
1008	// Call sl.append again with an empty scrape to trigger stale markers.
1009	// If the target has since been recreated and scraped, the
1010	// stale markers will be out of order and ignored.
1011	if _, _, _, err := sl.append([]byte{}, "", staleTime); err != nil {
1012		level.Error(sl.l).Log("msg", "stale append failed", "err", err)
1013	}
1014	if err := sl.reportStale(staleTime); err != nil {
1015		level.Error(sl.l).Log("msg", "stale report failed", "err", err)
1016	}
1017}
1018
1019// Stop the scraping. May still write data and stale markers after it has
1020// returned. Cancel the context to stop all writes.
1021func (sl *scrapeLoop) stop() {
1022	sl.cancel()
1023	<-sl.stopped
1024}
1025
1026type sample struct {
1027	metric labels.Labels
1028	t      int64
1029	v      float64
1030}
1031
1032//lint:ignore U1000 staticcheck falsely reports that samples is unused.
1033type samples []sample
1034
1035func (s samples) Len() int      { return len(s) }
1036func (s samples) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
1037
1038func (s samples) Less(i, j int) bool {
1039	d := labels.Compare(s[i].metric, s[j].metric)
1040	if d < 0 {
1041		return true
1042	} else if d > 0 {
1043		return false
1044	}
1045	return s[i].t < s[j].t
1046}
1047
1048func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
1049	var (
1050		app            = sl.appender()
1051		p              = textparse.New(b, contentType)
1052		defTime        = timestamp.FromTime(ts)
1053		numOutOfOrder  = 0
1054		numDuplicates  = 0
1055		numOutOfBounds = 0
1056	)
1057	var sampleLimitErr error
1058
1059loop:
1060	for {
1061		var et textparse.Entry
1062		if et, err = p.Next(); err != nil {
1063			if err == io.EOF {
1064				err = nil
1065			}
1066			break
1067		}
1068		switch et {
1069		case textparse.EntryType:
1070			sl.cache.setType(p.Type())
1071			continue
1072		case textparse.EntryHelp:
1073			sl.cache.setHelp(p.Help())
1074			continue
1075		case textparse.EntryUnit:
1076			sl.cache.setUnit(p.Unit())
1077			continue
1078		case textparse.EntryComment:
1079			continue
1080		default:
1081		}
1082		total++
1083
1084		t := defTime
1085		met, tp, v := p.Series()
1086		if !sl.honorTimestamps {
1087			tp = nil
1088		}
1089		if tp != nil {
1090			t = *tp
1091		}
1092
1093		if sl.cache.getDropped(yoloString(met)) {
1094			continue
1095		}
1096		ce, ok := sl.cache.get(yoloString(met))
1097		if ok {
1098			switch err = app.AddFast(ce.lset, ce.ref, t, v); err {
1099			case nil:
1100				if tp == nil {
1101					sl.cache.trackStaleness(ce.hash, ce.lset)
1102				}
1103			case storage.ErrNotFound:
1104				ok = false
1105			case storage.ErrOutOfOrderSample:
1106				numOutOfOrder++
1107				level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
1108				targetScrapeSampleOutOfOrder.Inc()
1109				continue
1110			case storage.ErrDuplicateSampleForTimestamp:
1111				numDuplicates++
1112				level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
1113				targetScrapeSampleDuplicate.Inc()
1114				continue
1115			case storage.ErrOutOfBounds:
1116				numOutOfBounds++
1117				level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
1118				targetScrapeSampleOutOfBounds.Inc()
1119				continue
1120			case errSampleLimit:
1121				// Keep on parsing output if we hit the limit, so we report the correct
1122				// total number of samples scraped.
1123				sampleLimitErr = err
1124				added++
1125				continue
1126			default:
1127				break loop
1128			}
1129		}
1130		if !ok {
1131			var lset labels.Labels
1132
1133			mets := p.Metric(&lset)
1134			hash := lset.Hash()
1135
1136			// Hash label set as it is seen local to the target. Then add target labels
1137			// and relabeling and store the final label set.
1138			lset = sl.sampleMutator(lset)
1139
1140			// The label set may be set to nil to indicate dropping.
1141			if lset == nil {
1142				sl.cache.addDropped(mets)
1143				continue
1144			}
1145
1146			var ref uint64
1147			ref, err = app.Add(lset, t, v)
1148			switch err {
1149			case nil:
1150			case storage.ErrOutOfOrderSample:
1151				err = nil
1152				numOutOfOrder++
1153				level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
1154				targetScrapeSampleOutOfOrder.Inc()
1155				continue
1156			case storage.ErrDuplicateSampleForTimestamp:
1157				err = nil
1158				numDuplicates++
1159				level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
1160				targetScrapeSampleDuplicate.Inc()
1161				continue
1162			case storage.ErrOutOfBounds:
1163				err = nil
1164				numOutOfBounds++
1165				level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
1166				targetScrapeSampleOutOfBounds.Inc()
1167				continue
1168			case errSampleLimit:
1169				sampleLimitErr = err
1170				added++
1171				continue
1172			default:
1173				level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err)
1174				break loop
1175			}
1176			if tp == nil {
1177				// Bypass staleness logic if there is an explicit timestamp.
1178				sl.cache.trackStaleness(hash, lset)
1179			}
1180			sl.cache.addRef(mets, ref, lset, hash)
1181			seriesAdded++
1182		}
1183		added++
1184	}
1185	if sampleLimitErr != nil {
1186		if err == nil {
1187			err = sampleLimitErr
1188		}
1189		// We only want to increment this once per scrape, so this is Inc'd outside the loop.
1190		targetScrapeSampleLimit.Inc()
1191	}
1192	if numOutOfOrder > 0 {
1193		level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder)
1194	}
1195	if numDuplicates > 0 {
1196		level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates)
1197	}
1198	if numOutOfBounds > 0 {
1199		level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds)
1200	}
1201	if err == nil {
1202		sl.cache.forEachStale(func(lset labels.Labels) bool {
1203			// Series no longer exposed, mark it stale.
1204			_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
1205			switch err {
1206			case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
1207				// Do not count these in logging, as this is expected if a target
1208				// goes away and comes back again with a new scrape loop.
1209				err = nil
1210			}
1211			return err == nil
1212		})
1213	}
1214	if err != nil {
1215		app.Rollback()
1216		return total, added, seriesAdded, err
1217	}
1218	if err := app.Commit(); err != nil {
1219		return total, added, seriesAdded, err
1220	}
1221
1222	// Only perform cache cleaning if the scrape was not empty.
1223	// An empty scrape (usually) is used to indicate a failed scrape.
1224	sl.cache.iterDone(len(b) > 0)
1225
1226	return total, added, seriesAdded, nil
1227}
1228
1229func yoloString(b []byte) string {
1230	return *((*string)(unsafe.Pointer(&b)))
1231}
1232
1233// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
1234// with scraped metrics in the cache.
1235const (
1236	scrapeHealthMetricName       = "up" + "\xff"
1237	scrapeDurationMetricName     = "scrape_duration_seconds" + "\xff"
1238	scrapeSamplesMetricName      = "scrape_samples_scraped" + "\xff"
1239	samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff"
1240	scrapeSeriesAddedMetricName  = "scrape_series_added" + "\xff"
1241)
1242
1243func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, err error) error {
1244	sl.scraper.report(start, duration, err)
1245
1246	ts := timestamp.FromTime(start)
1247
1248	var health float64
1249	if err == nil {
1250		health = 1
1251	}
1252	app := sl.appender()
1253
1254	if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
1255		app.Rollback()
1256		return err
1257	}
1258	if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil {
1259		app.Rollback()
1260		return err
1261	}
1262	if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil {
1263		app.Rollback()
1264		return err
1265	}
1266	if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil {
1267		app.Rollback()
1268		return err
1269	}
1270	if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil {
1271		app.Rollback()
1272		return err
1273	}
1274	return app.Commit()
1275}
1276
1277func (sl *scrapeLoop) reportStale(start time.Time) error {
1278	ts := timestamp.FromTime(start)
1279	app := sl.appender()
1280
1281	stale := math.Float64frombits(value.StaleNaN)
1282
1283	if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil {
1284		app.Rollback()
1285		return err
1286	}
1287	if err := sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil {
1288		app.Rollback()
1289		return err
1290	}
1291	if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil {
1292		app.Rollback()
1293		return err
1294	}
1295	if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil {
1296		app.Rollback()
1297		return err
1298	}
1299	if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil {
1300		app.Rollback()
1301		return err
1302	}
1303	return app.Commit()
1304}
1305
1306func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
1307	ce, ok := sl.cache.get(s)
1308	if ok {
1309		err := app.AddFast(ce.lset, ce.ref, t, v)
1310		switch err {
1311		case nil:
1312			return nil
1313		case storage.ErrNotFound:
1314			// Try an Add.
1315		case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
1316			// Do not log here, as this is expected if a target goes away and comes back
1317			// again with a new scrape loop.
1318			return nil
1319		default:
1320			return err
1321		}
1322	}
1323	lset := labels.Labels{
1324		// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
1325		// with scraped metrics in the cache.
1326		// We have to drop it when building the actual metric.
1327		labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]},
1328	}
1329
1330	hash := lset.Hash()
1331	lset = sl.reportSampleMutator(lset)
1332
1333	ref, err := app.Add(lset, t, v)
1334	switch err {
1335	case nil:
1336		sl.cache.addRef(s, ref, lset, hash)
1337		return nil
1338	case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
1339		return nil
1340	default:
1341		return err
1342	}
1343}
1344