1// Copyright 2016 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package retrieval
15
16import (
17	"fmt"
18	"io"
19	"net/http"
20	"sync"
21	"time"
22
23	"github.com/prometheus/client_golang/prometheus"
24	"github.com/prometheus/common/expfmt"
25	"github.com/prometheus/common/log"
26	"github.com/prometheus/common/model"
27	"github.com/prometheus/common/version"
28	"golang.org/x/net/context"
29	"golang.org/x/net/context/ctxhttp"
30
31	"github.com/prometheus/prometheus/config"
32	"github.com/prometheus/prometheus/storage"
33	"github.com/prometheus/prometheus/storage/local"
34	"github.com/prometheus/prometheus/util/httputil"
35)
36
37const (
38	scrapeHealthMetricName       = "up"
39	scrapeDurationMetricName     = "scrape_duration_seconds"
40	scrapeSamplesMetricName      = "scrape_samples_scraped"
41	samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling"
42)
43
44var (
45	targetIntervalLength = prometheus.NewSummaryVec(
46		prometheus.SummaryOpts{
47			Name:       "prometheus_target_interval_length_seconds",
48			Help:       "Actual intervals between scrapes.",
49			Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
50		},
51		[]string{"interval"},
52	)
53	targetSkippedScrapes = prometheus.NewCounter(
54		prometheus.CounterOpts{
55			Name: "prometheus_target_skipped_scrapes_total",
56			Help: "Total number of scrapes that were skipped because the metric storage was throttled.",
57		},
58	)
59	targetReloadIntervalLength = prometheus.NewSummaryVec(
60		prometheus.SummaryOpts{
61			Name:       "prometheus_target_reload_length_seconds",
62			Help:       "Actual interval to reload the scrape pool with a given configuration.",
63			Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
64		},
65		[]string{"interval"},
66	)
67	targetSyncIntervalLength = prometheus.NewSummaryVec(
68		prometheus.SummaryOpts{
69			Name:       "prometheus_target_sync_length_seconds",
70			Help:       "Actual interval to sync the scrape pool.",
71			Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
72		},
73		[]string{"scrape_job"},
74	)
75	targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
76		prometheus.CounterOpts{
77			Name: "prometheus_target_scrape_pool_sync_total",
78			Help: "Total number of syncs that were executed on a scrape pool.",
79		},
80		[]string{"scrape_job"},
81	)
82	targetScrapeSampleLimit = prometheus.NewCounter(
83		prometheus.CounterOpts{
84			Name: "prometheus_target_scrapes_exceeded_sample_limit_total",
85			Help: "Total number of scrapes that hit the sample limit and were rejected.",
86		},
87	)
88)
89
90func init() {
91	prometheus.MustRegister(targetIntervalLength)
92	prometheus.MustRegister(targetSkippedScrapes)
93	prometheus.MustRegister(targetReloadIntervalLength)
94	prometheus.MustRegister(targetSyncIntervalLength)
95	prometheus.MustRegister(targetScrapePoolSyncsCounter)
96	prometheus.MustRegister(targetScrapeSampleLimit)
97}
98
99// scrapePool manages scrapes for sets of targets.
100type scrapePool struct {
101	appender storage.SampleAppender
102
103	ctx context.Context
104
105	mtx    sync.RWMutex
106	config *config.ScrapeConfig
107	client *http.Client
108	// Targets and loops must always be synchronized to have the same
109	// set of hashes.
110	targets map[uint64]*Target
111	loops   map[uint64]loop
112
113	// Constructor for new scrape loops. This is settable for testing convenience.
114	newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop
115}
116
117func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
118	client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
119	if err != nil {
120		// Any errors that could occur here should be caught during config validation.
121		log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
122	}
123	return &scrapePool{
124		appender: app,
125		config:   cfg,
126		ctx:      ctx,
127		client:   client,
128		targets:  map[uint64]*Target{},
129		loops:    map[uint64]loop{},
130		newLoop:  newScrapeLoop,
131	}
132}
133
134// stop terminates all scrape loops and returns after they all terminated.
135func (sp *scrapePool) stop() {
136	var wg sync.WaitGroup
137
138	sp.mtx.Lock()
139	defer sp.mtx.Unlock()
140
141	for fp, l := range sp.loops {
142		wg.Add(1)
143
144		go func(l loop) {
145			l.stop()
146			wg.Done()
147		}(l)
148
149		delete(sp.loops, fp)
150		delete(sp.targets, fp)
151	}
152
153	wg.Wait()
154}
155
156// reload the scrape pool with the given scrape configuration. The target state is preserved
157// but all scrape loops are restarted with the new scrape configuration.
158// This method returns after all scrape loops that were stopped have fully terminated.
159func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
160	start := time.Now()
161
162	sp.mtx.Lock()
163	defer sp.mtx.Unlock()
164
165	client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
166	if err != nil {
167		// Any errors that could occur here should be caught during config validation.
168		log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
169	}
170	sp.config = cfg
171	sp.client = client
172
173	var (
174		wg       sync.WaitGroup
175		interval = time.Duration(sp.config.ScrapeInterval)
176		timeout  = time.Duration(sp.config.ScrapeTimeout)
177	)
178
179	for fp, oldLoop := range sp.loops {
180		var (
181			t = sp.targets[fp]
182			s = &targetScraper{
183				Target:  t,
184				client:  sp.client,
185				timeout: timeout,
186			}
187			newLoop = sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)
188		)
189		wg.Add(1)
190
191		go func(oldLoop, newLoop loop) {
192			oldLoop.stop()
193			wg.Done()
194
195			go newLoop.run(interval, timeout, nil)
196		}(oldLoop, newLoop)
197
198		sp.loops[fp] = newLoop
199	}
200
201	wg.Wait()
202	targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
203		time.Since(start).Seconds(),
204	)
205}
206
207// Sync converts target groups into actual scrape targets and synchronizes
208// the currently running scraper with the resulting set.
209func (sp *scrapePool) Sync(tgs []*config.TargetGroup) {
210	start := time.Now()
211
212	var all []*Target
213	for _, tg := range tgs {
214		targets, err := targetsFromGroup(tg, sp.config)
215		if err != nil {
216			log.With("err", err).Error("creating targets failed")
217			continue
218		}
219		all = append(all, targets...)
220	}
221	sp.sync(all)
222
223	targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
224		time.Since(start).Seconds(),
225	)
226	targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
227}
228
229// sync takes a list of potentially duplicated targets, deduplicates them, starts
230// scrape loops for new targets, and stops scrape loops for disappeared targets.
231// It returns after all stopped scrape loops terminated.
232func (sp *scrapePool) sync(targets []*Target) {
233	sp.mtx.Lock()
234	defer sp.mtx.Unlock()
235
236	var (
237		uniqueTargets = map[uint64]struct{}{}
238		interval      = time.Duration(sp.config.ScrapeInterval)
239		timeout       = time.Duration(sp.config.ScrapeTimeout)
240	)
241
242	for _, t := range targets {
243		hash := t.hash()
244		uniqueTargets[hash] = struct{}{}
245
246		if _, ok := sp.targets[hash]; !ok {
247			s := &targetScraper{
248				Target:  t,
249				client:  sp.client,
250				timeout: timeout,
251			}
252
253			l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)
254
255			sp.targets[hash] = t
256			sp.loops[hash] = l
257
258			go l.run(interval, timeout, nil)
259		}
260	}
261
262	var wg sync.WaitGroup
263
264	// Stop and remove old targets and scraper loops.
265	for hash := range sp.targets {
266		if _, ok := uniqueTargets[hash]; !ok {
267			wg.Add(1)
268			go func(l loop) {
269				l.stop()
270				wg.Done()
271			}(sp.loops[hash])
272
273			delete(sp.loops, hash)
274			delete(sp.targets, hash)
275		}
276	}
277
278	// Wait for all potentially stopped scrapers to terminate.
279	// This covers the case of flapping targets. If the server is under high load, a new scraper
280	// may be active and tries to insert. The old scraper that didn't terminate yet could still
281	// be inserting a previous sample set.
282	wg.Wait()
283}
284
285// A scraper retrieves samples and accepts a status report at the end.
286type scraper interface {
287	scrape(ctx context.Context, ts time.Time) (model.Samples, error)
288	report(start time.Time, dur time.Duration, err error)
289	offset(interval time.Duration) time.Duration
290}
291
292// targetScraper implements the scraper interface for a target.
293type targetScraper struct {
294	*Target
295	client  *http.Client
296	timeout time.Duration
297}
298
299const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
300
301var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version)
302
303func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) {
304	req, err := http.NewRequest("GET", s.URL().String(), nil)
305	if err != nil {
306		return nil, err
307	}
308	req.Header.Add("Accept", acceptHeader)
309	req.Header.Set("User-Agent", userAgentHeader)
310	req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))
311
312	resp, err := ctxhttp.Do(ctx, s.client, req)
313	if err != nil {
314		return nil, err
315	}
316	defer resp.Body.Close()
317
318	if resp.StatusCode != http.StatusOK {
319		return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)
320	}
321
322	var (
323		allSamples = make(model.Samples, 0, 200)
324		decSamples = make(model.Vector, 0, 50)
325	)
326	sdec := expfmt.SampleDecoder{
327		Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)),
328		Opts: &expfmt.DecodeOptions{
329			Timestamp: model.TimeFromUnixNano(ts.UnixNano()),
330		},
331	}
332
333	for {
334		if err = sdec.Decode(&decSamples); err != nil {
335			break
336		}
337		allSamples = append(allSamples, decSamples...)
338		decSamples = decSamples[:0]
339	}
340
341	if err == io.EOF {
342		// Set err to nil since it is used in the scrape health recording.
343		err = nil
344	}
345	return allSamples, err
346}
347
348// A loop can run and be stopped again. It must not be reused after it was stopped.
349type loop interface {
350	run(interval, timeout time.Duration, errc chan<- error)
351	stop()
352}
353
354type scrapeLoop struct {
355	scraper scraper
356
357	// Where samples are ultimately sent.
358	appender storage.SampleAppender
359
360	targetLabels         model.LabelSet
361	metricRelabelConfigs []*config.RelabelConfig
362	honorLabels          bool
363	sampleLimit          uint
364
365	done   chan struct{}
366	ctx    context.Context
367	cancel func()
368}
369
370func newScrapeLoop(
371	ctx context.Context,
372	sc scraper,
373	appender storage.SampleAppender,
374	targetLabels model.LabelSet,
375	config *config.ScrapeConfig,
376) loop {
377	sl := &scrapeLoop{
378		scraper:              sc,
379		appender:             appender,
380		targetLabels:         targetLabels,
381		metricRelabelConfigs: config.MetricRelabelConfigs,
382		honorLabels:          config.HonorLabels,
383		sampleLimit:          config.SampleLimit,
384		done:                 make(chan struct{}),
385	}
386	sl.ctx, sl.cancel = context.WithCancel(ctx)
387
388	return sl
389}
390
391func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
392	defer close(sl.done)
393
394	select {
395	case <-time.After(sl.scraper.offset(interval)):
396		// Continue after a scraping offset.
397	case <-sl.ctx.Done():
398		return
399	}
400
401	var last time.Time
402
403	ticker := time.NewTicker(interval)
404	defer ticker.Stop()
405
406	for {
407		select {
408		case <-sl.ctx.Done():
409			return
410		default:
411		}
412
413		if !sl.appender.NeedsThrottling() {
414			var (
415				start                 = time.Now()
416				scrapeCtx, cancel     = context.WithTimeout(sl.ctx, timeout)
417				numPostRelabelSamples = 0
418			)
419
420			// Only record after the first scrape.
421			if !last.IsZero() {
422				targetIntervalLength.WithLabelValues(interval.String()).Observe(
423					time.Since(last).Seconds(),
424				)
425			}
426
427			samples, err := sl.scraper.scrape(scrapeCtx, start)
428			cancel()
429			if err == nil {
430				numPostRelabelSamples, err = sl.append(samples)
431			}
432			if err != nil && errc != nil {
433				errc <- err
434			}
435			sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err)
436			last = start
437		} else {
438			targetSkippedScrapes.Inc()
439		}
440
441		select {
442		case <-sl.ctx.Done():
443			return
444		case <-ticker.C:
445		}
446	}
447}
448
449func (sl *scrapeLoop) stop() {
450	sl.cancel()
451	<-sl.done
452}
453
454// wrapAppender wraps a SampleAppender for relabeling. It returns the wrappend
455// appender and an innermost countingAppender that counts the samples actually
456// appended in the end.
457func (sl *scrapeLoop) wrapAppender(app storage.SampleAppender) (storage.SampleAppender, *countingAppender) {
458	// Innermost appender is a countingAppender to count how many samples
459	// are left in the end.
460	countingAppender := &countingAppender{
461		SampleAppender: app,
462	}
463	app = countingAppender
464
465	// The relabelAppender has to be inside the label-modifying appenders so
466	// the relabeling rules are applied to the correct label set.
467	if len(sl.metricRelabelConfigs) > 0 {
468		app = relabelAppender{
469			SampleAppender: app,
470			relabelings:    sl.metricRelabelConfigs,
471		}
472	}
473
474	if sl.honorLabels {
475		app = honorLabelsAppender{
476			SampleAppender: app,
477			labels:         sl.targetLabels,
478		}
479	} else {
480		app = ruleLabelsAppender{
481			SampleAppender: app,
482			labels:         sl.targetLabels,
483		}
484	}
485	return app, countingAppender
486}
487
488func (sl *scrapeLoop) append(samples model.Samples) (int, error) {
489	var (
490		numOutOfOrder = 0
491		numDuplicates = 0
492		app           = sl.appender
493		countingApp   *countingAppender
494	)
495
496	if sl.sampleLimit > 0 {
497		// We need to check for the sample limit, so append everything
498		// to a wrapped bufferAppender first. Then point samples to the
499		// result.
500		bufApp := &bufferAppender{buffer: make(model.Samples, 0, len(samples))}
501		var wrappedBufApp storage.SampleAppender
502		wrappedBufApp, countingApp = sl.wrapAppender(bufApp)
503		for _, s := range samples {
504			// Ignore errors as bufferedAppender always succeeds.
505			wrappedBufApp.Append(s)
506		}
507		samples = bufApp.buffer
508		if uint(countingApp.count) > sl.sampleLimit {
509			targetScrapeSampleLimit.Inc()
510			return countingApp.count, fmt.Errorf(
511				"%d samples exceeded limit of %d", countingApp.count, sl.sampleLimit,
512			)
513		}
514	} else {
515		// No need to check for sample limit. Wrap sl.appender directly.
516		app, countingApp = sl.wrapAppender(sl.appender)
517	}
518
519	for _, s := range samples {
520		if err := app.Append(s); err != nil {
521			switch err {
522			case local.ErrOutOfOrderSample:
523				numOutOfOrder++
524				log.With("sample", s).With("error", err).Debug("Sample discarded")
525			case local.ErrDuplicateSampleForTimestamp:
526				numDuplicates++
527				log.With("sample", s).With("error", err).Debug("Sample discarded")
528			default:
529				log.With("sample", s).With("error", err).Warn("Sample discarded")
530			}
531		}
532	}
533	if numOutOfOrder > 0 {
534		log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
535	}
536	if numDuplicates > 0 {
537		log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
538	}
539	return countingApp.count, nil
540}
541
542func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples, postRelabelSamples int, err error) {
543	sl.scraper.report(start, duration, err)
544
545	ts := model.TimeFromUnixNano(start.UnixNano())
546
547	var health model.SampleValue
548	if err == nil {
549		health = 1
550	}
551
552	healthSample := &model.Sample{
553		Metric: model.Metric{
554			model.MetricNameLabel: scrapeHealthMetricName,
555		},
556		Timestamp: ts,
557		Value:     health,
558	}
559	durationSample := &model.Sample{
560		Metric: model.Metric{
561			model.MetricNameLabel: scrapeDurationMetricName,
562		},
563		Timestamp: ts,
564		Value:     model.SampleValue(duration.Seconds()),
565	}
566	countSample := &model.Sample{
567		Metric: model.Metric{
568			model.MetricNameLabel: scrapeSamplesMetricName,
569		},
570		Timestamp: ts,
571		Value:     model.SampleValue(scrapedSamples),
572	}
573	postRelabelSample := &model.Sample{
574		Metric: model.Metric{
575			model.MetricNameLabel: samplesPostRelabelMetricName,
576		},
577		Timestamp: ts,
578		Value:     model.SampleValue(postRelabelSamples),
579	}
580
581	reportAppender := ruleLabelsAppender{
582		SampleAppender: sl.appender,
583		labels:         sl.targetLabels,
584	}
585
586	if err := reportAppender.Append(healthSample); err != nil {
587		log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded")
588	}
589	if err := reportAppender.Append(durationSample); err != nil {
590		log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded")
591	}
592	if err := reportAppender.Append(countSample); err != nil {
593		log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded")
594	}
595	if err := reportAppender.Append(postRelabelSample); err != nil {
596		log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabeling sample discarded")
597	}
598}
599