1// Copyright 2016 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package scrape
15
16import (
17	"bytes"
18	"compress/gzip"
19	"context"
20	"fmt"
21	"io"
22	"io/ioutil"
23	"math"
24	"net/http"
25	"net/http/httptest"
26	"net/url"
27	"strings"
28	"sync"
29	"testing"
30	"time"
31
32	"github.com/go-kit/log"
33	"github.com/pkg/errors"
34	dto "github.com/prometheus/client_model/go"
35	config_util "github.com/prometheus/common/config"
36	"github.com/prometheus/common/model"
37	"github.com/stretchr/testify/require"
38
39	"github.com/prometheus/prometheus/config"
40	"github.com/prometheus/prometheus/discovery/targetgroup"
41	"github.com/prometheus/prometheus/pkg/exemplar"
42	"github.com/prometheus/prometheus/pkg/labels"
43	"github.com/prometheus/prometheus/pkg/relabel"
44	"github.com/prometheus/prometheus/pkg/textparse"
45	"github.com/prometheus/prometheus/pkg/timestamp"
46	"github.com/prometheus/prometheus/pkg/value"
47	"github.com/prometheus/prometheus/storage"
48	"github.com/prometheus/prometheus/util/teststorage"
49	"github.com/prometheus/prometheus/util/testutil"
50)
51
52func TestMain(m *testing.M) {
53	testutil.TolerantVerifyLeak(m)
54}
55
56func TestNewScrapePool(t *testing.T) {
57	var (
58		app   = &nopAppendable{}
59		cfg   = &config.ScrapeConfig{}
60		sp, _ = newScrapePool(cfg, app, 0, nil, false)
61	)
62
63	if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
64		t.Fatalf("Wrong sample appender")
65	}
66	if sp.config != cfg {
67		t.Fatalf("Wrong scrape config")
68	}
69	if sp.newLoop == nil {
70		t.Fatalf("newLoop function not initialized")
71	}
72}
73
74func TestDroppedTargetsList(t *testing.T) {
75	var (
76		app = &nopAppendable{}
77		cfg = &config.ScrapeConfig{
78			JobName:        "dropMe",
79			ScrapeInterval: model.Duration(1),
80			RelabelConfigs: []*relabel.Config{
81				{
82					Action:       relabel.Drop,
83					Regex:        relabel.MustNewRegexp("dropMe"),
84					SourceLabels: model.LabelNames{"job"},
85				},
86			},
87		}
88		tgs = []*targetgroup.Group{
89			{
90				Targets: []model.LabelSet{
91					{model.AddressLabel: "127.0.0.1:9090"},
92				},
93			},
94		}
95		sp, _                  = newScrapePool(cfg, app, 0, nil, false)
96		expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
97		expectedLength         = 1
98	)
99	sp.Sync(tgs)
100	sp.Sync(tgs)
101	if len(sp.droppedTargets) != expectedLength {
102		t.Fatalf("Length of dropped targets exceeded expected length, expected %v, got %v", expectedLength, len(sp.droppedTargets))
103	}
104	if sp.droppedTargets[0].DiscoveredLabels().String() != expectedLabelSetString {
105		t.Fatalf("Got %v, expected %v", sp.droppedTargets[0].DiscoveredLabels().String(), expectedLabelSetString)
106	}
107}
108
109// TestDiscoveredLabelsUpdate checks that DiscoveredLabels are updated
110// even when new labels don't affect the target `hash`.
111func TestDiscoveredLabelsUpdate(t *testing.T) {
112
113	sp := &scrapePool{}
114	// These are used when syncing so need this to avoid a panic.
115	sp.config = &config.ScrapeConfig{
116		ScrapeInterval: model.Duration(1),
117		ScrapeTimeout:  model.Duration(1),
118	}
119	sp.activeTargets = make(map[uint64]*Target)
120	t1 := &Target{
121		discoveredLabels: labels.Labels{
122			labels.Label{
123				Name:  "label",
124				Value: "name",
125			},
126		},
127	}
128	sp.activeTargets[t1.hash()] = t1
129
130	t2 := &Target{
131		discoveredLabels: labels.Labels{
132			labels.Label{
133				Name:  "labelNew",
134				Value: "nameNew",
135			},
136		},
137	}
138	sp.sync([]*Target{t2})
139
140	require.Equal(t, t2.DiscoveredLabels(), sp.activeTargets[t1.hash()].DiscoveredLabels())
141}
142
143type testLoop struct {
144	startFunc    func(interval, timeout time.Duration, errc chan<- error)
145	stopFunc     func()
146	forcedErr    error
147	forcedErrMtx sync.Mutex
148	runOnce      bool
149	interval     time.Duration
150	timeout      time.Duration
151}
152
153func (l *testLoop) run(errc chan<- error) {
154	if l.runOnce {
155		panic("loop must be started only once")
156	}
157	l.runOnce = true
158	l.startFunc(l.interval, l.timeout, errc)
159}
160
161func (l *testLoop) disableEndOfRunStalenessMarkers() {
162}
163
164func (l *testLoop) setForcedError(err error) {
165	l.forcedErrMtx.Lock()
166	defer l.forcedErrMtx.Unlock()
167	l.forcedErr = err
168}
169
170func (l *testLoop) getForcedError() error {
171	l.forcedErrMtx.Lock()
172	defer l.forcedErrMtx.Unlock()
173	return l.forcedErr
174}
175
176func (l *testLoop) stop() {
177	l.stopFunc()
178}
179
180func (l *testLoop) getCache() *scrapeCache {
181	return nil
182}
183
184func TestScrapePoolStop(t *testing.T) {
185	sp := &scrapePool{
186		activeTargets: map[uint64]*Target{},
187		loops:         map[uint64]loop{},
188		cancel:        func() {},
189		client:        http.DefaultClient,
190	}
191	var mtx sync.Mutex
192	stopped := map[uint64]bool{}
193	numTargets := 20
194
195	// Stopping the scrape pool must call stop() on all scrape loops,
196	// clean them and the respective targets up. It must wait until each loop's
197	// stop function returned before returning itself.
198
199	for i := 0; i < numTargets; i++ {
200		t := &Target{
201			labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)),
202		}
203		l := &testLoop{}
204		l.stopFunc = func() {
205			time.Sleep(time.Duration(i*20) * time.Millisecond)
206
207			mtx.Lock()
208			stopped[t.hash()] = true
209			mtx.Unlock()
210		}
211
212		sp.activeTargets[t.hash()] = t
213		sp.loops[t.hash()] = l
214	}
215
216	done := make(chan struct{})
217	stopTime := time.Now()
218
219	go func() {
220		sp.stop()
221		close(done)
222	}()
223
224	select {
225	case <-time.After(5 * time.Second):
226		t.Fatalf("scrapeLoop.stop() did not return as expected")
227	case <-done:
228		// This should have taken at least as long as the last target slept.
229		if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond {
230			t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
231		}
232	}
233
234	mtx.Lock()
235	require.Equal(t, numTargets, len(stopped), "Unexpected number of stopped loops")
236	mtx.Unlock()
237
238	require.Equal(t, 0, len(sp.activeTargets), "Targets were not cleared on stopping: %d left", len(sp.activeTargets))
239	require.Equal(t, 0, len(sp.loops), "Loops were not cleared on stopping: %d left", len(sp.loops))
240}
241
242func TestScrapePoolReload(t *testing.T) {
243	var mtx sync.Mutex
244	numTargets := 20
245
246	stopped := map[uint64]bool{}
247
248	reloadCfg := &config.ScrapeConfig{
249		ScrapeInterval: model.Duration(3 * time.Second),
250		ScrapeTimeout:  model.Duration(2 * time.Second),
251	}
252	// On starting to run, new loops created on reload check whether their preceding
253	// equivalents have been stopped.
254	newLoop := func(opts scrapeLoopOptions) loop {
255		l := &testLoop{interval: time.Duration(reloadCfg.ScrapeInterval), timeout: time.Duration(reloadCfg.ScrapeTimeout)}
256		l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
257			require.Equal(t, 3*time.Second, interval, "Unexpected scrape interval")
258			require.Equal(t, 2*time.Second, timeout, "Unexpected scrape timeout")
259
260			mtx.Lock()
261			targetScraper := opts.scraper.(*targetScraper)
262			require.True(t, stopped[targetScraper.hash()], "Scrape loop for %v not stopped yet", targetScraper)
263			mtx.Unlock()
264		}
265		return l
266	}
267	sp := &scrapePool{
268		appendable:    &nopAppendable{},
269		activeTargets: map[uint64]*Target{},
270		loops:         map[uint64]loop{},
271		newLoop:       newLoop,
272		logger:        nil,
273		client:        http.DefaultClient,
274	}
275
276	// Reloading a scrape pool with a new scrape configuration must stop all scrape
277	// loops and start new ones. A new loop must not be started before the preceding
278	// one terminated.
279
280	for i := 0; i < numTargets; i++ {
281		labels := labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i))
282		t := &Target{
283			labels:           labels,
284			discoveredLabels: labels,
285		}
286		l := &testLoop{}
287		l.stopFunc = func() {
288			time.Sleep(time.Duration(i*20) * time.Millisecond)
289
290			mtx.Lock()
291			stopped[t.hash()] = true
292			mtx.Unlock()
293		}
294
295		sp.activeTargets[t.hash()] = t
296		sp.loops[t.hash()] = l
297	}
298	done := make(chan struct{})
299
300	beforeTargets := map[uint64]*Target{}
301	for h, t := range sp.activeTargets {
302		beforeTargets[h] = t
303	}
304
305	reloadTime := time.Now()
306
307	go func() {
308		sp.reload(reloadCfg)
309		close(done)
310	}()
311
312	select {
313	case <-time.After(5 * time.Second):
314		t.Fatalf("scrapeLoop.reload() did not return as expected")
315	case <-done:
316		// This should have taken at least as long as the last target slept.
317		if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond {
318			t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
319		}
320	}
321
322	mtx.Lock()
323	require.Equal(t, numTargets, len(stopped), "Unexpected number of stopped loops")
324	mtx.Unlock()
325
326	require.Equal(t, sp.activeTargets, beforeTargets, "Reloading affected target states unexpectedly")
327	require.Equal(t, numTargets, len(sp.loops), "Unexpected number of stopped loops after reload")
328}
329
330func TestScrapePoolTargetLimit(t *testing.T) {
331	var wg sync.WaitGroup
332	// On starting to run, new loops created on reload check whether their preceding
333	// equivalents have been stopped.
334	newLoop := func(opts scrapeLoopOptions) loop {
335		wg.Add(1)
336		l := &testLoop{
337			startFunc: func(interval, timeout time.Duration, errc chan<- error) {
338				wg.Done()
339			},
340			stopFunc: func() {},
341		}
342		return l
343	}
344	sp := &scrapePool{
345		appendable:    &nopAppendable{},
346		activeTargets: map[uint64]*Target{},
347		loops:         map[uint64]loop{},
348		newLoop:       newLoop,
349		logger:        log.NewNopLogger(),
350		client:        http.DefaultClient,
351	}
352
353	var tgs = []*targetgroup.Group{}
354	for i := 0; i < 50; i++ {
355		tgs = append(tgs,
356			&targetgroup.Group{
357				Targets: []model.LabelSet{
358					{model.AddressLabel: model.LabelValue(fmt.Sprintf("127.0.0.1:%d", 9090+i))},
359				},
360			},
361		)
362	}
363
364	var limit uint
365	reloadWithLimit := func(l uint) {
366		limit = l
367		require.NoError(t, sp.reload(&config.ScrapeConfig{
368			ScrapeInterval: model.Duration(3 * time.Second),
369			ScrapeTimeout:  model.Duration(2 * time.Second),
370			TargetLimit:    l,
371		}))
372	}
373
374	var targets int
375	loadTargets := func(n int) {
376		targets = n
377		sp.Sync(tgs[:n])
378	}
379
380	validateIsRunning := func() {
381		wg.Wait()
382		for _, l := range sp.loops {
383			require.True(t, l.(*testLoop).runOnce, "loop should be running")
384		}
385	}
386
387	validateErrorMessage := func(shouldErr bool) {
388		for _, l := range sp.loops {
389			lerr := l.(*testLoop).getForcedError()
390			if shouldErr {
391				require.NotNil(t, lerr, "error was expected for %d targets with a limit of %d", targets, limit)
392				require.Equal(t, fmt.Sprintf("target_limit exceeded (number of targets: %d, limit: %d)", targets, limit), lerr.Error())
393			} else {
394				require.Equal(t, nil, lerr)
395			}
396		}
397	}
398
399	reloadWithLimit(0)
400	loadTargets(50)
401	validateIsRunning()
402
403	// Simulate an initial config with a limit.
404	sp.config.TargetLimit = 30
405	limit = 30
406	loadTargets(50)
407	validateIsRunning()
408	validateErrorMessage(true)
409
410	reloadWithLimit(50)
411	validateIsRunning()
412	validateErrorMessage(false)
413
414	reloadWithLimit(40)
415	validateIsRunning()
416	validateErrorMessage(true)
417
418	loadTargets(30)
419	validateIsRunning()
420	validateErrorMessage(false)
421
422	loadTargets(40)
423	validateIsRunning()
424	validateErrorMessage(false)
425
426	loadTargets(41)
427	validateIsRunning()
428	validateErrorMessage(true)
429
430	reloadWithLimit(0)
431	validateIsRunning()
432	validateErrorMessage(false)
433
434	reloadWithLimit(51)
435	validateIsRunning()
436	validateErrorMessage(false)
437
438	tgs = append(tgs,
439		&targetgroup.Group{
440			Targets: []model.LabelSet{
441				{model.AddressLabel: model.LabelValue("127.0.0.1:1090")},
442			},
443		},
444		&targetgroup.Group{
445			Targets: []model.LabelSet{
446				{model.AddressLabel: model.LabelValue("127.0.0.1:1090")},
447			},
448		},
449	)
450
451	sp.Sync(tgs)
452	validateIsRunning()
453	validateErrorMessage(false)
454}
455
456func TestScrapePoolAppender(t *testing.T) {
457	cfg := &config.ScrapeConfig{}
458	app := &nopAppendable{}
459	sp, _ := newScrapePool(cfg, app, 0, nil, false)
460
461	loop := sp.newLoop(scrapeLoopOptions{
462		target: &Target{},
463	})
464	appl, ok := loop.(*scrapeLoop)
465	require.True(t, ok, "Expected scrapeLoop but got %T", loop)
466
467	wrapped := appl.appender(context.Background())
468
469	tl, ok := wrapped.(*timeLimitAppender)
470	require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped)
471
472	_, ok = tl.Appender.(nopAppender)
473	require.True(t, ok, "Expected base appender but got %T", tl.Appender)
474
475	loop = sp.newLoop(scrapeLoopOptions{
476		target:      &Target{},
477		sampleLimit: 100,
478	})
479	appl, ok = loop.(*scrapeLoop)
480	require.True(t, ok, "Expected scrapeLoop but got %T", loop)
481
482	wrapped = appl.appender(context.Background())
483
484	sl, ok := wrapped.(*limitAppender)
485	require.True(t, ok, "Expected limitAppender but got %T", wrapped)
486
487	tl, ok = sl.Appender.(*timeLimitAppender)
488	require.True(t, ok, "Expected limitAppender but got %T", sl.Appender)
489
490	_, ok = tl.Appender.(nopAppender)
491	require.True(t, ok, "Expected base appender but got %T", tl.Appender)
492}
493
494func TestScrapePoolRaces(t *testing.T) {
495	interval, _ := model.ParseDuration("1s")
496	timeout, _ := model.ParseDuration("500ms")
497	newConfig := func() *config.ScrapeConfig {
498		return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
499	}
500	sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false)
501	tgts := []*targetgroup.Group{
502		{
503			Targets: []model.LabelSet{
504				{model.AddressLabel: "127.0.0.1:9090"},
505				{model.AddressLabel: "127.0.0.2:9090"},
506				{model.AddressLabel: "127.0.0.3:9090"},
507				{model.AddressLabel: "127.0.0.4:9090"},
508				{model.AddressLabel: "127.0.0.5:9090"},
509				{model.AddressLabel: "127.0.0.6:9090"},
510				{model.AddressLabel: "127.0.0.7:9090"},
511				{model.AddressLabel: "127.0.0.8:9090"},
512			},
513		},
514	}
515
516	sp.Sync(tgts)
517	active := sp.ActiveTargets()
518	dropped := sp.DroppedTargets()
519	expectedActive, expectedDropped := len(tgts[0].Targets), 0
520
521	require.Equal(t, expectedActive, len(active), "Invalid number of active targets")
522	require.Equal(t, expectedDropped, len(dropped), "Invalid number of dropped targets")
523
524	for i := 0; i < 20; i++ {
525		time.Sleep(time.Duration(10 * time.Millisecond))
526		sp.reload(newConfig())
527	}
528	sp.stop()
529}
530
531func TestScrapePoolScrapeLoopsStarted(t *testing.T) {
532	var wg sync.WaitGroup
533	newLoop := func(opts scrapeLoopOptions) loop {
534		wg.Add(1)
535		l := &testLoop{
536			startFunc: func(interval, timeout time.Duration, errc chan<- error) {
537				wg.Done()
538			},
539			stopFunc: func() {},
540		}
541		return l
542	}
543	sp := &scrapePool{
544		appendable:    &nopAppendable{},
545		activeTargets: map[uint64]*Target{},
546		loops:         map[uint64]loop{},
547		newLoop:       newLoop,
548		logger:        nil,
549		client:        http.DefaultClient,
550	}
551
552	tgs := []*targetgroup.Group{
553		{
554			Targets: []model.LabelSet{
555				{model.AddressLabel: model.LabelValue("127.0.0.1:9090")},
556			},
557		},
558		{
559			Targets: []model.LabelSet{
560				{model.AddressLabel: model.LabelValue("127.0.0.1:9090")},
561			},
562		},
563	}
564
565	require.NoError(t, sp.reload(&config.ScrapeConfig{
566		ScrapeInterval: model.Duration(3 * time.Second),
567		ScrapeTimeout:  model.Duration(2 * time.Second),
568	}))
569	sp.Sync(tgs)
570
571	require.Equal(t, 1, len(sp.loops))
572
573	wg.Wait()
574	for _, l := range sp.loops {
575		require.True(t, l.(*testLoop).runOnce, "loop should be running")
576	}
577}
578
579func TestScrapeLoopStopBeforeRun(t *testing.T) {
580	scraper := &testScraper{}
581
582	sl := newScrapeLoop(context.Background(),
583		scraper,
584		nil, nil,
585		nopMutator,
586		nopMutator,
587		nil, nil, 0,
588		true,
589		0,
590		nil,
591		1,
592		0,
593		false,
594	)
595
596	// The scrape pool synchronizes on stopping scrape loops. However, new scrape
597	// loops are started asynchronously. Thus it's possible, that a loop is stopped
598	// again before having started properly.
599	// Stopping not-yet-started loops must block until the run method was called and exited.
600	// The run method must exit immediately.
601
602	stopDone := make(chan struct{})
603	go func() {
604		sl.stop()
605		close(stopDone)
606	}()
607
608	select {
609	case <-stopDone:
610		t.Fatalf("Stopping terminated before run exited successfully")
611	case <-time.After(500 * time.Millisecond):
612	}
613
614	// Running the scrape loop must exit before calling the scraper even once.
615	scraper.scrapeFunc = func(context.Context, io.Writer) error {
616		t.Fatalf("scraper was called for terminated scrape loop")
617		return nil
618	}
619
620	runDone := make(chan struct{})
621	go func() {
622		sl.run(nil)
623		close(runDone)
624	}()
625
626	select {
627	case <-runDone:
628	case <-time.After(1 * time.Second):
629		t.Fatalf("Running terminated scrape loop did not exit")
630	}
631
632	select {
633	case <-stopDone:
634	case <-time.After(1 * time.Second):
635		t.Fatalf("Stopping did not terminate after running exited")
636	}
637}
638
639func nopMutator(l labels.Labels) labels.Labels { return l }
640
641func TestScrapeLoopStop(t *testing.T) {
642	var (
643		signal   = make(chan struct{}, 1)
644		appender = &collectResultAppender{}
645		scraper  = &testScraper{}
646		app      = func(ctx context.Context) storage.Appender { return appender }
647	)
648
649	sl := newScrapeLoop(context.Background(),
650		scraper,
651		nil, nil,
652		nopMutator,
653		nopMutator,
654		app,
655		nil,
656		0,
657		true,
658		0,
659		nil,
660		10*time.Millisecond,
661		time.Hour,
662		false,
663	)
664
665	// Terminate loop after 2 scrapes.
666	numScrapes := 0
667
668	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
669		numScrapes++
670		if numScrapes == 2 {
671			go sl.stop()
672			<-sl.ctx.Done()
673		}
674		w.Write([]byte("metric_a 42\n"))
675		return ctx.Err()
676	}
677
678	go func() {
679		sl.run(nil)
680		signal <- struct{}{}
681	}()
682
683	select {
684	case <-signal:
685	case <-time.After(5 * time.Second):
686		t.Fatalf("Scrape wasn't stopped.")
687	}
688
689	// We expected 1 actual sample for each scrape plus 5 for report samples.
690	// At least 2 scrapes were made, plus the final stale markers.
691	if len(appender.result) < 6*3 || len(appender.result)%6 != 0 {
692		t.Fatalf("Expected at least 3 scrapes with 6 samples each, got %d samples", len(appender.result))
693	}
694	// All samples in a scrape must have the same timestamp.
695	var ts int64
696	for i, s := range appender.result {
697		if i%6 == 0 {
698			ts = s.t
699		} else if s.t != ts {
700			t.Fatalf("Unexpected multiple timestamps within single scrape")
701		}
702	}
703	// All samples from the last scrape must be stale markers.
704	for _, s := range appender.result[len(appender.result)-5:] {
705		if !value.IsStaleNaN(s.v) {
706			t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.v))
707		}
708	}
709}
710
711func TestScrapeLoopRun(t *testing.T) {
712	var (
713		signal = make(chan struct{}, 1)
714		errc   = make(chan error)
715
716		scraper = &testScraper{}
717		app     = func(ctx context.Context) storage.Appender { return &nopAppender{} }
718	)
719
720	ctx, cancel := context.WithCancel(context.Background())
721	sl := newScrapeLoop(ctx,
722		scraper,
723		nil, nil,
724		nopMutator,
725		nopMutator,
726		app,
727		nil,
728		0,
729		true,
730		0,
731		nil,
732		time.Second,
733		time.Hour,
734		false,
735	)
736
737	// The loop must terminate during the initial offset if the context
738	// is canceled.
739	scraper.offsetDur = time.Hour
740
741	go func() {
742		sl.run(errc)
743		signal <- struct{}{}
744	}()
745
746	// Wait to make sure we are actually waiting on the offset.
747	time.Sleep(1 * time.Second)
748
749	cancel()
750	select {
751	case <-signal:
752	case <-time.After(5 * time.Second):
753		t.Fatalf("Cancellation during initial offset failed")
754	case err := <-errc:
755		t.Fatalf("Unexpected error: %s", err)
756	}
757
758	// The provided timeout must cause cancellation of the context passed down to the
759	// scraper. The scraper has to respect the context.
760	scraper.offsetDur = 0
761
762	block := make(chan struct{})
763	scraper.scrapeFunc = func(ctx context.Context, _ io.Writer) error {
764		select {
765		case <-block:
766		case <-ctx.Done():
767			return ctx.Err()
768		}
769		return nil
770	}
771
772	ctx, cancel = context.WithCancel(context.Background())
773	sl = newScrapeLoop(ctx,
774		scraper,
775		nil, nil,
776		nopMutator,
777		nopMutator,
778		app,
779		nil,
780		0,
781		true,
782		0,
783		nil,
784		time.Second,
785		100*time.Millisecond,
786		false,
787	)
788
789	go func() {
790		sl.run(errc)
791		signal <- struct{}{}
792	}()
793
794	select {
795	case err := <-errc:
796		if err != context.DeadlineExceeded {
797			t.Fatalf("Expected timeout error but got: %s", err)
798		}
799	case <-time.After(3 * time.Second):
800		t.Fatalf("Expected timeout error but got none")
801	}
802
803	// We already caught the timeout error and are certainly in the loop.
804	// Let the scrapes returns immediately to cause no further timeout errors
805	// and check whether canceling the parent context terminates the loop.
806	close(block)
807	cancel()
808
809	select {
810	case <-signal:
811		// Loop terminated as expected.
812	case err := <-errc:
813		t.Fatalf("Unexpected error: %s", err)
814	case <-time.After(3 * time.Second):
815		t.Fatalf("Loop did not terminate on context cancellation")
816	}
817}
818
819func TestScrapeLoopForcedErr(t *testing.T) {
820	var (
821		signal = make(chan struct{}, 1)
822		errc   = make(chan error)
823
824		scraper = &testScraper{}
825		app     = func(ctx context.Context) storage.Appender { return &nopAppender{} }
826	)
827
828	ctx, cancel := context.WithCancel(context.Background())
829	sl := newScrapeLoop(ctx,
830		scraper,
831		nil, nil,
832		nopMutator,
833		nopMutator,
834		app,
835		nil,
836		0,
837		true,
838		0,
839		nil,
840		time.Second,
841		time.Hour,
842		false,
843	)
844
845	forcedErr := fmt.Errorf("forced err")
846	sl.setForcedError(forcedErr)
847
848	scraper.scrapeFunc = func(context.Context, io.Writer) error {
849		t.Fatalf("should not be scraped")
850		return nil
851	}
852
853	go func() {
854		sl.run(errc)
855		signal <- struct{}{}
856	}()
857
858	select {
859	case err := <-errc:
860		if err != forcedErr {
861			t.Fatalf("Expected forced error but got: %s", err)
862		}
863	case <-time.After(3 * time.Second):
864		t.Fatalf("Expected forced error but got none")
865	}
866	cancel()
867
868	select {
869	case <-signal:
870	case <-time.After(5 * time.Second):
871		t.Fatalf("Scrape not stopped")
872	}
873}
874
875func TestScrapeLoopMetadata(t *testing.T) {
876	var (
877		signal  = make(chan struct{})
878		scraper = &testScraper{}
879		cache   = newScrapeCache()
880	)
881	defer close(signal)
882
883	ctx, cancel := context.WithCancel(context.Background())
884	sl := newScrapeLoop(ctx,
885		scraper,
886		nil, nil,
887		nopMutator,
888		nopMutator,
889		func(ctx context.Context) storage.Appender { return nopAppender{} },
890		cache,
891		0,
892		true,
893		0,
894		nil,
895		0,
896		0,
897		false,
898	)
899	defer cancel()
900
901	slApp := sl.appender(ctx)
902	total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter
903# HELP test_metric some help text
904# UNIT test_metric metric
905test_metric 1
906# TYPE test_metric_no_help gauge
907# HELP test_metric_no_type other help text
908# EOF`), "application/openmetrics-text", time.Now())
909	require.NoError(t, err)
910	require.NoError(t, slApp.Commit())
911	require.Equal(t, 1, total)
912
913	md, ok := cache.GetMetadata("test_metric")
914	require.True(t, ok, "expected metadata to be present")
915	require.Equal(t, textparse.MetricTypeCounter, md.Type, "unexpected metric type")
916	require.Equal(t, "some help text", md.Help)
917	require.Equal(t, "metric", md.Unit)
918
919	md, ok = cache.GetMetadata("test_metric_no_help")
920	require.True(t, ok, "expected metadata to be present")
921	require.Equal(t, textparse.MetricTypeGauge, md.Type, "unexpected metric type")
922	require.Equal(t, "", md.Help)
923	require.Equal(t, "", md.Unit)
924
925	md, ok = cache.GetMetadata("test_metric_no_type")
926	require.True(t, ok, "expected metadata to be present")
927	require.Equal(t, textparse.MetricTypeUnknown, md.Type, "unexpected metric type")
928	require.Equal(t, "other help text", md.Help)
929	require.Equal(t, "", md.Unit)
930}
931
932func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) {
933	// Need a full storage for correct Add/AddFast semantics.
934	s := teststorage.New(t)
935	t.Cleanup(func() { s.Close() })
936
937	ctx, cancel := context.WithCancel(context.Background())
938	sl := newScrapeLoop(ctx,
939		&testScraper{},
940		nil, nil,
941		nopMutator,
942		nopMutator,
943		s.Appender,
944		nil,
945		0,
946		true,
947		0,
948		nil,
949		0,
950		0,
951		false,
952	)
953	t.Cleanup(func() { cancel() })
954
955	return ctx, sl
956}
957
958func TestScrapeLoopSeriesAdded(t *testing.T) {
959	ctx, sl := simpleTestScrapeLoop(t)
960
961	slApp := sl.appender(ctx)
962	total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
963	require.NoError(t, err)
964	require.NoError(t, slApp.Commit())
965	require.Equal(t, 1, total)
966	require.Equal(t, 1, added)
967	require.Equal(t, 1, seriesAdded)
968
969	slApp = sl.appender(ctx)
970	total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
971	require.NoError(t, slApp.Commit())
972	require.NoError(t, err)
973	require.Equal(t, 1, total)
974	require.Equal(t, 1, added)
975	require.Equal(t, 0, seriesAdded)
976}
977
978func makeTestMetrics(n int) []byte {
979	// Construct a metrics string to parse
980	sb := bytes.Buffer{}
981	for i := 0; i < n; i++ {
982		fmt.Fprintf(&sb, "# TYPE metric_a gauge\n")
983		fmt.Fprintf(&sb, "# HELP metric_a help text\n")
984		fmt.Fprintf(&sb, "metric_a{foo=\"%d\",bar=\"%d\"} 1\n", i, i*100)
985	}
986	return sb.Bytes()
987}
988
989func BenchmarkScrapeLoopAppend(b *testing.B) {
990	ctx, sl := simpleTestScrapeLoop(b)
991
992	slApp := sl.appender(ctx)
993	metrics := makeTestMetrics(100)
994	ts := time.Time{}
995
996	b.ResetTimer()
997
998	for i := 0; i < b.N; i++ {
999		ts = ts.Add(time.Second)
1000		_, _, _, _ = sl.append(slApp, metrics, "", ts)
1001	}
1002}
1003func BenchmarkScrapeLoopAppendOM(b *testing.B) {
1004	ctx, sl := simpleTestScrapeLoop(b)
1005
1006	slApp := sl.appender(ctx)
1007	metrics := makeTestMetrics(100)
1008	ts := time.Time{}
1009
1010	b.ResetTimer()
1011
1012	for i := 0; i < b.N; i++ {
1013		ts = ts.Add(time.Second)
1014		_, _, _, _ = sl.append(slApp, metrics, "application/openmetrics-text", ts)
1015	}
1016}
1017
1018func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
1019	appender := &collectResultAppender{}
1020	var (
1021		signal  = make(chan struct{}, 1)
1022		scraper = &testScraper{}
1023		app     = func(ctx context.Context) storage.Appender { return appender }
1024	)
1025
1026	ctx, cancel := context.WithCancel(context.Background())
1027	sl := newScrapeLoop(ctx,
1028		scraper,
1029		nil, nil,
1030		nopMutator,
1031		nopMutator,
1032		app,
1033		nil,
1034		0,
1035		true,
1036		0,
1037		nil,
1038		10*time.Millisecond,
1039		time.Hour,
1040		false,
1041	)
1042	// Succeed once, several failures, then stop.
1043	numScrapes := 0
1044
1045	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1046		numScrapes++
1047
1048		if numScrapes == 1 {
1049			w.Write([]byte("metric_a 42\n"))
1050			return nil
1051		} else if numScrapes == 5 {
1052			cancel()
1053		}
1054		return errors.New("scrape failed")
1055	}
1056
1057	go func() {
1058		sl.run(nil)
1059		signal <- struct{}{}
1060	}()
1061
1062	select {
1063	case <-signal:
1064	case <-time.After(5 * time.Second):
1065		t.Fatalf("Scrape wasn't stopped.")
1066	}
1067
1068	// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
1069	// each scrape successful or not.
1070	require.Equal(t, 27, len(appender.result), "Appended samples not as expected")
1071	require.Equal(t, 42.0, appender.result[0].v, "Appended first sample not as expected")
1072	require.True(t, value.IsStaleNaN(appender.result[6].v),
1073		"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[6].v))
1074}
1075
1076func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
1077	appender := &collectResultAppender{}
1078	var (
1079		signal     = make(chan struct{}, 1)
1080		scraper    = &testScraper{}
1081		app        = func(ctx context.Context) storage.Appender { return appender }
1082		numScrapes = 0
1083	)
1084
1085	ctx, cancel := context.WithCancel(context.Background())
1086	sl := newScrapeLoop(ctx,
1087		scraper,
1088		nil, nil,
1089		nopMutator,
1090		nopMutator,
1091		app,
1092		nil,
1093		0,
1094		true,
1095		0,
1096		nil,
1097		10*time.Millisecond,
1098		time.Hour,
1099		false,
1100	)
1101
1102	// Succeed once, several failures, then stop.
1103	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1104		numScrapes++
1105
1106		if numScrapes == 1 {
1107			w.Write([]byte("metric_a 42\n"))
1108			return nil
1109		} else if numScrapes == 2 {
1110			w.Write([]byte("7&-\n"))
1111			return nil
1112		} else if numScrapes == 3 {
1113			cancel()
1114		}
1115		return errors.New("scrape failed")
1116	}
1117
1118	go func() {
1119		sl.run(nil)
1120		signal <- struct{}{}
1121	}()
1122
1123	select {
1124	case <-signal:
1125	case <-time.After(5 * time.Second):
1126		t.Fatalf("Scrape wasn't stopped.")
1127	}
1128
1129	// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
1130	// each scrape successful or not.
1131	require.Equal(t, 17, len(appender.result), "Appended samples not as expected")
1132	require.Equal(t, 42.0, appender.result[0].v, "Appended first sample not as expected")
1133	require.True(t, value.IsStaleNaN(appender.result[6].v),
1134		"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[6].v))
1135}
1136
1137func TestScrapeLoopCache(t *testing.T) {
1138	s := teststorage.New(t)
1139	defer s.Close()
1140
1141	appender := &collectResultAppender{}
1142	var (
1143		signal  = make(chan struct{}, 1)
1144		scraper = &testScraper{}
1145		app     = func(ctx context.Context) storage.Appender { appender.next = s.Appender(ctx); return appender }
1146	)
1147
1148	ctx, cancel := context.WithCancel(context.Background())
1149	sl := newScrapeLoop(ctx,
1150		scraper,
1151		nil, nil,
1152		nopMutator,
1153		nopMutator,
1154		app,
1155		nil,
1156		0,
1157		true,
1158		0,
1159		nil,
1160		10*time.Millisecond,
1161		time.Hour,
1162		false,
1163	)
1164
1165	numScrapes := 0
1166
1167	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1168		if numScrapes == 1 || numScrapes == 2 {
1169			if _, ok := sl.cache.series["metric_a"]; !ok {
1170				t.Errorf("metric_a missing from cache after scrape %d", numScrapes)
1171			}
1172			if _, ok := sl.cache.series["metric_b"]; !ok {
1173				t.Errorf("metric_b missing from cache after scrape %d", numScrapes)
1174			}
1175		} else if numScrapes == 3 {
1176			if _, ok := sl.cache.series["metric_a"]; !ok {
1177				t.Errorf("metric_a missing from cache after scrape %d", numScrapes)
1178			}
1179			if _, ok := sl.cache.series["metric_b"]; ok {
1180				t.Errorf("metric_b present in cache after scrape %d", numScrapes)
1181			}
1182		}
1183
1184		numScrapes++
1185
1186		if numScrapes == 1 {
1187			w.Write([]byte("metric_a 42\nmetric_b 43\n"))
1188			return nil
1189		} else if numScrapes == 3 {
1190			w.Write([]byte("metric_a 44\n"))
1191			return nil
1192		} else if numScrapes == 4 {
1193			cancel()
1194		}
1195		return fmt.Errorf("scrape failed")
1196	}
1197
1198	go func() {
1199		sl.run(nil)
1200		signal <- struct{}{}
1201	}()
1202
1203	select {
1204	case <-signal:
1205	case <-time.After(5 * time.Second):
1206		t.Fatalf("Scrape wasn't stopped.")
1207	}
1208
1209	// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
1210	// each scrape successful or not.
1211	require.Equal(t, 26, len(appender.result), "Appended samples not as expected")
1212}
1213
1214func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
1215	s := teststorage.New(t)
1216	defer s.Close()
1217
1218	sapp := s.Appender(context.Background())
1219
1220	appender := &collectResultAppender{next: sapp}
1221	var (
1222		signal  = make(chan struct{}, 1)
1223		scraper = &testScraper{}
1224		app     = func(ctx context.Context) storage.Appender { return appender }
1225	)
1226
1227	ctx, cancel := context.WithCancel(context.Background())
1228	sl := newScrapeLoop(ctx,
1229		scraper,
1230		nil, nil,
1231		nopMutator,
1232		nopMutator,
1233		app,
1234		nil,
1235		0,
1236		true,
1237		0,
1238		nil,
1239		10*time.Millisecond,
1240		time.Hour,
1241		false,
1242	)
1243
1244	numScrapes := 0
1245
1246	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1247		numScrapes++
1248		if numScrapes < 5 {
1249			s := ""
1250			for i := 0; i < 500; i++ {
1251				s = fmt.Sprintf("%smetric_%d_%d 42\n", s, i, numScrapes)
1252			}
1253			w.Write([]byte(fmt.Sprintf(s + "&")))
1254		} else {
1255			cancel()
1256		}
1257		return nil
1258	}
1259
1260	go func() {
1261		sl.run(nil)
1262		signal <- struct{}{}
1263	}()
1264
1265	select {
1266	case <-signal:
1267	case <-time.After(5 * time.Second):
1268		t.Fatalf("Scrape wasn't stopped.")
1269	}
1270
1271	if len(sl.cache.series) > 2000 {
1272		t.Fatalf("More than 2000 series cached. Got: %d", len(sl.cache.series))
1273	}
1274}
1275
1276func TestScrapeLoopAppend(t *testing.T) {
1277	tests := []struct {
1278		title           string
1279		honorLabels     bool
1280		scrapeLabels    string
1281		discoveryLabels []string
1282		expLset         labels.Labels
1283		expValue        float64
1284	}{
1285		{
1286			// When "honor_labels" is not set
1287			// label name collision is handler by adding a prefix.
1288			title:           "Label name collision",
1289			honorLabels:     false,
1290			scrapeLabels:    `metric{n="1"} 0`,
1291			discoveryLabels: []string{"n", "2"},
1292			expLset:         labels.FromStrings("__name__", "metric", "exported_n", "1", "n", "2"),
1293			expValue:        0,
1294		}, {
1295			// When "honor_labels" is not set
1296			// exported label from discovery don't get overwritten
1297			title:           "Label name collision",
1298			honorLabels:     false,
1299			scrapeLabels:    `metric 0`,
1300			discoveryLabels: []string{"n", "2", "exported_n", "2"},
1301			expLset:         labels.FromStrings("__name__", "metric", "n", "2", "exported_n", "2"),
1302			expValue:        0,
1303		}, {
1304			// Labels with no value need to be removed as these should not be ingested.
1305			title:           "Delete Empty labels",
1306			honorLabels:     false,
1307			scrapeLabels:    `metric{n=""} 0`,
1308			discoveryLabels: nil,
1309			expLset:         labels.FromStrings("__name__", "metric"),
1310			expValue:        0,
1311		}, {
1312			// Honor Labels should ignore labels with the same name.
1313			title:           "Honor Labels",
1314			honorLabels:     true,
1315			scrapeLabels:    `metric{n1="1" n2="2"} 0`,
1316			discoveryLabels: []string{"n1", "0"},
1317			expLset:         labels.FromStrings("__name__", "metric", "n1", "1", "n2", "2"),
1318			expValue:        0,
1319		}, {
1320			title:           "Stale - NaN",
1321			honorLabels:     false,
1322			scrapeLabels:    `metric NaN`,
1323			discoveryLabels: nil,
1324			expLset:         labels.FromStrings("__name__", "metric"),
1325			expValue:        float64(value.NormalNaN),
1326		},
1327	}
1328
1329	for _, test := range tests {
1330		app := &collectResultAppender{}
1331
1332		discoveryLabels := &Target{
1333			labels: labels.FromStrings(test.discoveryLabels...),
1334		}
1335
1336		sl := newScrapeLoop(context.Background(),
1337			nil, nil, nil,
1338			func(l labels.Labels) labels.Labels {
1339				return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil)
1340			},
1341			func(l labels.Labels) labels.Labels {
1342				return mutateReportSampleLabels(l, discoveryLabels)
1343			},
1344			func(ctx context.Context) storage.Appender { return app },
1345			nil,
1346			0,
1347			true,
1348			0,
1349			nil,
1350			0,
1351			0,
1352			false,
1353		)
1354
1355		now := time.Now()
1356
1357		slApp := sl.appender(context.Background())
1358		_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now)
1359		require.NoError(t, err)
1360		require.NoError(t, slApp.Commit())
1361
1362		expected := []sample{
1363			{
1364				metric: test.expLset,
1365				t:      timestamp.FromTime(now),
1366				v:      test.expValue,
1367			},
1368		}
1369
1370		// When the expected value is NaN
1371		// DeepEqual will report NaNs as being different,
1372		// so replace it with the expected one.
1373		if test.expValue == float64(value.NormalNaN) {
1374			app.result[0].v = expected[0].v
1375		}
1376
1377		t.Logf("Test:%s", test.title)
1378		require.Equal(t, expected, app.result)
1379	}
1380}
1381
1382func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
1383	// collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next.
1384	app := &collectResultAppender{}
1385
1386	sl := newScrapeLoop(context.Background(),
1387		nil, nil, nil,
1388		nopMutator,
1389		nopMutator,
1390		func(ctx context.Context) storage.Appender { return app },
1391		nil,
1392		0,
1393		true,
1394		0,
1395		nil,
1396		0,
1397		0,
1398		false,
1399	)
1400
1401	fakeRef := uint64(1)
1402	expValue := float64(1)
1403	metric := `metric{n="1"} 1`
1404	p := textparse.New([]byte(metric), "")
1405
1406	var lset labels.Labels
1407	p.Next()
1408	mets := p.Metric(&lset)
1409	hash := lset.Hash()
1410
1411	// Create a fake entry in the cache
1412	sl.cache.addRef(mets, fakeRef, lset, hash)
1413	now := time.Now()
1414
1415	slApp := sl.appender(context.Background())
1416	_, _, _, err := sl.append(slApp, []byte(metric), "", now)
1417	require.NoError(t, err)
1418	require.NoError(t, slApp.Commit())
1419
1420	expected := []sample{
1421		{
1422			metric: lset,
1423			t:      timestamp.FromTime(now),
1424			v:      expValue,
1425		},
1426	}
1427
1428	require.Equal(t, expected, app.result)
1429}
1430
1431func TestScrapeLoopAppendSampleLimit(t *testing.T) {
1432	resApp := &collectResultAppender{}
1433	app := &limitAppender{Appender: resApp, limit: 1}
1434
1435	sl := newScrapeLoop(context.Background(),
1436		nil, nil, nil,
1437		func(l labels.Labels) labels.Labels {
1438			if l.Has("deleteme") {
1439				return nil
1440			}
1441			return l
1442		},
1443		nopMutator,
1444		func(ctx context.Context) storage.Appender { return app },
1445		nil,
1446		0,
1447		true,
1448		app.limit,
1449		nil,
1450		0,
1451		0,
1452		false,
1453	)
1454
1455	// Get the value of the Counter before performing the append.
1456	beforeMetric := dto.Metric{}
1457	err := targetScrapeSampleLimit.Write(&beforeMetric)
1458	require.NoError(t, err)
1459
1460	beforeMetricValue := beforeMetric.GetCounter().GetValue()
1461
1462	now := time.Now()
1463	slApp := sl.appender(context.Background())
1464	total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
1465	if err != errSampleLimit {
1466		t.Fatalf("Did not see expected sample limit error: %s", err)
1467	}
1468	require.NoError(t, slApp.Rollback())
1469	require.Equal(t, 3, total)
1470	require.Equal(t, 3, added)
1471	require.Equal(t, 1, seriesAdded)
1472
1473	// Check that the Counter has been incremented a single time for the scrape,
1474	// not multiple times for each sample.
1475	metric := dto.Metric{}
1476	err = targetScrapeSampleLimit.Write(&metric)
1477	require.NoError(t, err)
1478
1479	value := metric.GetCounter().GetValue()
1480	change := value - beforeMetricValue
1481	require.Equal(t, 1.0, change, "Unexpected change of sample limit metric: %f", change)
1482
1483	// And verify that we got the samples that fit under the limit.
1484	want := []sample{
1485		{
1486			metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1487			t:      timestamp.FromTime(now),
1488			v:      1,
1489		},
1490	}
1491	require.Equal(t, want, resApp.rolledbackResult, "Appended samples not as expected")
1492
1493	now = time.Now()
1494	slApp = sl.appender(context.Background())
1495	total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now)
1496	if err != errSampleLimit {
1497		t.Fatalf("Did not see expected sample limit error: %s", err)
1498	}
1499	require.NoError(t, slApp.Rollback())
1500	require.Equal(t, 9, total)
1501	require.Equal(t, 6, added)
1502	require.Equal(t, 0, seriesAdded)
1503}
1504
1505func TestScrapeLoop_ChangingMetricString(t *testing.T) {
1506	// This is a regression test for the scrape loop cache not properly maintaining
1507	// IDs when the string representation of a metric changes across a scrape. Thus
1508	// we use a real storage appender here.
1509	s := teststorage.New(t)
1510	defer s.Close()
1511
1512	capp := &collectResultAppender{}
1513
1514	sl := newScrapeLoop(context.Background(),
1515		nil, nil, nil,
1516		nopMutator,
1517		nopMutator,
1518		func(ctx context.Context) storage.Appender { capp.next = s.Appender(ctx); return capp },
1519		nil,
1520		0,
1521		true,
1522		0,
1523		nil,
1524		0,
1525		0,
1526		false,
1527	)
1528
1529	now := time.Now()
1530	slApp := sl.appender(context.Background())
1531	_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now)
1532	require.NoError(t, err)
1533	require.NoError(t, slApp.Commit())
1534
1535	slApp = sl.appender(context.Background())
1536	_, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
1537	require.NoError(t, err)
1538	require.NoError(t, slApp.Commit())
1539
1540	// DeepEqual will report NaNs as being different, so replace with a different value.
1541	want := []sample{
1542		{
1543			metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
1544			t:      timestamp.FromTime(now),
1545			v:      1,
1546		},
1547		{
1548			metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
1549			t:      timestamp.FromTime(now.Add(time.Minute)),
1550			v:      2,
1551		},
1552	}
1553	require.Equal(t, want, capp.result, "Appended samples not as expected")
1554}
1555
1556func TestScrapeLoopAppendStaleness(t *testing.T) {
1557	app := &collectResultAppender{}
1558
1559	sl := newScrapeLoop(context.Background(),
1560		nil, nil, nil,
1561		nopMutator,
1562		nopMutator,
1563		func(ctx context.Context) storage.Appender { return app },
1564		nil,
1565		0,
1566		true,
1567		0,
1568		nil,
1569		0,
1570		0,
1571		false,
1572	)
1573
1574	now := time.Now()
1575	slApp := sl.appender(context.Background())
1576	_, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now)
1577	require.NoError(t, err)
1578	require.NoError(t, slApp.Commit())
1579
1580	slApp = sl.appender(context.Background())
1581	_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
1582	require.NoError(t, err)
1583	require.NoError(t, slApp.Commit())
1584
1585	ingestedNaN := math.Float64bits(app.result[1].v)
1586	require.Equal(t, value.StaleNaN, ingestedNaN, "Appended stale sample wasn't as expected")
1587
1588	// DeepEqual will report NaNs as being different, so replace with a different value.
1589	app.result[1].v = 42
1590	want := []sample{
1591		{
1592			metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1593			t:      timestamp.FromTime(now),
1594			v:      1,
1595		},
1596		{
1597			metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1598			t:      timestamp.FromTime(now.Add(time.Second)),
1599			v:      42,
1600		},
1601	}
1602	require.Equal(t, want, app.result, "Appended samples not as expected")
1603}
1604
1605func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
1606	app := &collectResultAppender{}
1607	sl := newScrapeLoop(context.Background(),
1608		nil, nil, nil,
1609		nopMutator,
1610		nopMutator,
1611		func(ctx context.Context) storage.Appender { return app },
1612		nil,
1613		0,
1614		true,
1615		0,
1616		nil,
1617		0,
1618		0,
1619		false,
1620	)
1621
1622	now := time.Now()
1623	slApp := sl.appender(context.Background())
1624	_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now)
1625	require.NoError(t, err)
1626	require.NoError(t, slApp.Commit())
1627
1628	slApp = sl.appender(context.Background())
1629	_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
1630	require.NoError(t, err)
1631	require.NoError(t, slApp.Commit())
1632
1633	want := []sample{
1634		{
1635			metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
1636			t:      1000,
1637			v:      1,
1638		},
1639	}
1640	require.Equal(t, want, app.result, "Appended samples not as expected")
1641}
1642
1643func TestScrapeLoopAppendExemplar(t *testing.T) {
1644	tests := []struct {
1645		title           string
1646		scrapeText      string
1647		discoveryLabels []string
1648		samples         []sample
1649		exemplars       []exemplar.Exemplar
1650	}{
1651		{
1652			title:           "Metric without exemplars",
1653			scrapeText:      "metric_total{n=\"1\"} 0\n# EOF",
1654			discoveryLabels: []string{"n", "2"},
1655			samples: []sample{{
1656				metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
1657				v:      0,
1658			}},
1659		},
1660		{
1661			title:           "Metric with exemplars",
1662			scrapeText:      "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0\n# EOF",
1663			discoveryLabels: []string{"n", "2"},
1664			samples: []sample{{
1665				metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
1666				v:      0,
1667			}},
1668			exemplars: []exemplar.Exemplar{
1669				{Labels: labels.FromStrings("a", "abc"), Value: 1},
1670			},
1671		}, {
1672			title:           "Metric with exemplars and TS",
1673			scrapeText:      "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0 10000\n# EOF",
1674			discoveryLabels: []string{"n", "2"},
1675			samples: []sample{{
1676				metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
1677				v:      0,
1678			}},
1679			exemplars: []exemplar.Exemplar{
1680				{Labels: labels.FromStrings("a", "abc"), Value: 1, Ts: 10000000, HasTs: true},
1681			},
1682		}, {
1683			title: "Two metrics and exemplars",
1684			scrapeText: `metric_total{n="1"} 1 # {t="1"} 1.0 10000
1685metric_total{n="2"} 2 # {t="2"} 2.0 20000
1686# EOF`,
1687			samples: []sample{{
1688				metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
1689				v:      1,
1690			}, {
1691				metric: labels.FromStrings("__name__", "metric_total", "n", "2"),
1692				v:      2,
1693			}},
1694			exemplars: []exemplar.Exemplar{
1695				{Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true},
1696				{Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true},
1697			},
1698		},
1699	}
1700
1701	for _, test := range tests {
1702		t.Run(test.title, func(t *testing.T) {
1703			app := &collectResultAppender{}
1704
1705			discoveryLabels := &Target{
1706				labels: labels.FromStrings(test.discoveryLabels...),
1707			}
1708
1709			sl := newScrapeLoop(context.Background(),
1710				nil, nil, nil,
1711				func(l labels.Labels) labels.Labels {
1712					return mutateSampleLabels(l, discoveryLabels, false, nil)
1713				},
1714				func(l labels.Labels) labels.Labels {
1715					return mutateReportSampleLabels(l, discoveryLabels)
1716				},
1717				func(ctx context.Context) storage.Appender { return app },
1718				nil,
1719				0,
1720				true,
1721				0,
1722				nil,
1723				0,
1724				0,
1725				false,
1726			)
1727
1728			now := time.Now()
1729
1730			for i := range test.samples {
1731				test.samples[i].t = timestamp.FromTime(now)
1732			}
1733
1734			// We need to set the timestamp for expected exemplars that does not have a timestamp.
1735			for i := range test.exemplars {
1736				if test.exemplars[i].Ts == 0 {
1737					test.exemplars[i].Ts = timestamp.FromTime(now)
1738				}
1739			}
1740
1741			_, _, _, err := sl.append(app, []byte(test.scrapeText), "application/openmetrics-text", now)
1742			require.NoError(t, err)
1743			require.NoError(t, app.Commit())
1744			require.Equal(t, test.samples, app.result)
1745			require.Equal(t, test.exemplars, app.resultExemplars)
1746		})
1747	}
1748}
1749
1750func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
1751	scrapeText := []string{`metric_total{n="1"} 1 # {t="1"} 1.0 10000
1752# EOF`, `metric_total{n="1"} 2 # {t="2"} 2.0 20000
1753# EOF`}
1754	samples := []sample{{
1755		metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
1756		v:      1,
1757	}, {
1758		metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
1759		v:      2,
1760	}}
1761	exemplars := []exemplar.Exemplar{
1762		{Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true},
1763		{Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true},
1764	}
1765	discoveryLabels := &Target{
1766		labels: labels.FromStrings(),
1767	}
1768
1769	app := &collectResultAppender{}
1770
1771	sl := newScrapeLoop(context.Background(),
1772		nil, nil, nil,
1773		func(l labels.Labels) labels.Labels {
1774			return mutateSampleLabels(l, discoveryLabels, false, nil)
1775		},
1776		func(l labels.Labels) labels.Labels {
1777			return mutateReportSampleLabels(l, discoveryLabels)
1778		},
1779		func(ctx context.Context) storage.Appender { return app },
1780		nil,
1781		0,
1782		true,
1783		0,
1784		nil,
1785		0,
1786		0,
1787		false,
1788	)
1789
1790	now := time.Now()
1791
1792	for i := range samples {
1793		ts := now.Add(time.Second * time.Duration(i))
1794		samples[i].t = timestamp.FromTime(ts)
1795	}
1796
1797	// We need to set the timestamp for expected exemplars that does not have a timestamp.
1798	for i := range exemplars {
1799		if exemplars[i].Ts == 0 {
1800			ts := now.Add(time.Second * time.Duration(i))
1801			exemplars[i].Ts = timestamp.FromTime(ts)
1802		}
1803	}
1804
1805	for i, st := range scrapeText {
1806		_, _, _, err := sl.append(app, []byte(st), "application/openmetrics-text", timestamp.Time(samples[i].t))
1807		require.NoError(t, err)
1808		require.NoError(t, app.Commit())
1809	}
1810
1811	require.Equal(t, samples, app.result)
1812	require.Equal(t, exemplars, app.resultExemplars)
1813}
1814
1815func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
1816	var (
1817		scraper  = &testScraper{}
1818		appender = &collectResultAppender{}
1819		app      = func(ctx context.Context) storage.Appender { return appender }
1820	)
1821
1822	ctx, cancel := context.WithCancel(context.Background())
1823	sl := newScrapeLoop(ctx,
1824		scraper,
1825		nil, nil,
1826		nopMutator,
1827		nopMutator,
1828		app,
1829		nil,
1830		0,
1831		true,
1832		0,
1833		nil,
1834		10*time.Millisecond,
1835		time.Hour,
1836		false,
1837	)
1838
1839	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1840		cancel()
1841		return errors.New("scrape failed")
1842	}
1843
1844	sl.run(nil)
1845	require.Equal(t, 0.0, appender.result[0].v, "bad 'up' value")
1846}
1847
1848func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
1849	var (
1850		scraper  = &testScraper{}
1851		appender = &collectResultAppender{}
1852		app      = func(ctx context.Context) storage.Appender { return appender }
1853	)
1854
1855	ctx, cancel := context.WithCancel(context.Background())
1856	sl := newScrapeLoop(ctx,
1857		scraper,
1858		nil, nil,
1859		nopMutator,
1860		nopMutator,
1861		app,
1862		nil,
1863		0,
1864		true,
1865		0,
1866		nil,
1867		10*time.Millisecond,
1868		time.Hour,
1869		false,
1870	)
1871
1872	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
1873		cancel()
1874		w.Write([]byte("a{l=\"\xff\"} 1\n"))
1875		return nil
1876	}
1877
1878	sl.run(nil)
1879	require.Equal(t, 0.0, appender.result[0].v, "bad 'up' value")
1880}
1881
1882type errorAppender struct {
1883	collectResultAppender
1884}
1885
1886func (app *errorAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
1887	switch lset.Get(model.MetricNameLabel) {
1888	case "out_of_order":
1889		return 0, storage.ErrOutOfOrderSample
1890	case "amend":
1891		return 0, storage.ErrDuplicateSampleForTimestamp
1892	case "out_of_bounds":
1893		return 0, storage.ErrOutOfBounds
1894	default:
1895		return app.collectResultAppender.Append(ref, lset, t, v)
1896	}
1897}
1898
1899func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
1900	app := &errorAppender{}
1901
1902	sl := newScrapeLoop(context.Background(),
1903		nil,
1904		nil, nil,
1905		nopMutator,
1906		nopMutator,
1907		func(ctx context.Context) storage.Appender { return app },
1908		nil,
1909		0,
1910		true,
1911		0,
1912		nil,
1913		0,
1914		0,
1915		false,
1916	)
1917
1918	now := time.Unix(1, 0)
1919	slApp := sl.appender(context.Background())
1920	total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
1921	require.NoError(t, err)
1922	require.NoError(t, slApp.Commit())
1923
1924	want := []sample{
1925		{
1926			metric: labels.FromStrings(model.MetricNameLabel, "normal"),
1927			t:      timestamp.FromTime(now),
1928			v:      1,
1929		},
1930	}
1931	require.Equal(t, want, app.result, "Appended samples not as expected")
1932	require.Equal(t, 4, total)
1933	require.Equal(t, 4, added)
1934	require.Equal(t, 1, seriesAdded)
1935}
1936
1937func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
1938	app := &collectResultAppender{}
1939	sl := newScrapeLoop(context.Background(),
1940		nil,
1941		nil, nil,
1942		nopMutator,
1943		nopMutator,
1944		func(ctx context.Context) storage.Appender {
1945			return &timeLimitAppender{
1946				Appender: app,
1947				maxTime:  timestamp.FromTime(time.Now().Add(10 * time.Minute)),
1948			}
1949		},
1950		nil,
1951		0,
1952		true,
1953		0,
1954		nil,
1955		0,
1956		0,
1957		false,
1958	)
1959
1960	now := time.Now().Add(20 * time.Minute)
1961	slApp := sl.appender(context.Background())
1962	total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now)
1963	require.NoError(t, err)
1964	require.NoError(t, slApp.Commit())
1965	require.Equal(t, 1, total)
1966	require.Equal(t, 1, added)
1967	require.Equal(t, 0, seriesAdded)
1968
1969}
1970
1971func TestTargetScraperScrapeOK(t *testing.T) {
1972	const (
1973		configTimeout   = 1500 * time.Millisecond
1974		expectedTimeout = "1.5"
1975	)
1976
1977	server := httptest.NewServer(
1978		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1979			accept := r.Header.Get("Accept")
1980			if !strings.HasPrefix(accept, "application/openmetrics-text;") {
1981				t.Errorf("Expected Accept header to prefer application/openmetrics-text, got %q", accept)
1982			}
1983
1984			timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
1985			if timeout != expectedTimeout {
1986				t.Errorf("Expected scrape timeout header %q, got %q", expectedTimeout, timeout)
1987			}
1988
1989			w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
1990			w.Write([]byte("metric_a 1\nmetric_b 2\n"))
1991		}),
1992	)
1993	defer server.Close()
1994
1995	serverURL, err := url.Parse(server.URL)
1996	if err != nil {
1997		panic(err)
1998	}
1999
2000	ts := &targetScraper{
2001		Target: &Target{
2002			labels: labels.FromStrings(
2003				model.SchemeLabel, serverURL.Scheme,
2004				model.AddressLabel, serverURL.Host,
2005			),
2006		},
2007		client:  http.DefaultClient,
2008		timeout: configTimeout,
2009	}
2010	var buf bytes.Buffer
2011
2012	contentType, err := ts.scrape(context.Background(), &buf)
2013	require.NoError(t, err)
2014	require.Equal(t, "text/plain; version=0.0.4", contentType)
2015	require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
2016}
2017
2018func TestTargetScrapeScrapeCancel(t *testing.T) {
2019	block := make(chan struct{})
2020
2021	server := httptest.NewServer(
2022		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2023			<-block
2024		}),
2025	)
2026	defer server.Close()
2027
2028	serverURL, err := url.Parse(server.URL)
2029	if err != nil {
2030		panic(err)
2031	}
2032
2033	ts := &targetScraper{
2034		Target: &Target{
2035			labels: labels.FromStrings(
2036				model.SchemeLabel, serverURL.Scheme,
2037				model.AddressLabel, serverURL.Host,
2038			),
2039		},
2040		client: http.DefaultClient,
2041	}
2042	ctx, cancel := context.WithCancel(context.Background())
2043
2044	errc := make(chan error, 1)
2045
2046	go func() {
2047		time.Sleep(1 * time.Second)
2048		cancel()
2049	}()
2050
2051	go func() {
2052		_, err := ts.scrape(ctx, ioutil.Discard)
2053		if err == nil {
2054			errc <- errors.New("Expected error but got nil")
2055		} else if ctx.Err() != context.Canceled {
2056			errc <- errors.Errorf("Expected context cancellation error but got: %s", ctx.Err())
2057		} else {
2058			close(errc)
2059		}
2060	}()
2061
2062	select {
2063	case <-time.After(5 * time.Second):
2064		t.Fatalf("Scrape function did not return unexpectedly")
2065	case err := <-errc:
2066		require.NoError(t, err)
2067	}
2068	// If this is closed in a defer above the function the test server
2069	// doesn't terminate and the test doesn't complete.
2070	close(block)
2071}
2072
2073func TestTargetScrapeScrapeNotFound(t *testing.T) {
2074	server := httptest.NewServer(
2075		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2076			w.WriteHeader(http.StatusNotFound)
2077		}),
2078	)
2079	defer server.Close()
2080
2081	serverURL, err := url.Parse(server.URL)
2082	if err != nil {
2083		panic(err)
2084	}
2085
2086	ts := &targetScraper{
2087		Target: &Target{
2088			labels: labels.FromStrings(
2089				model.SchemeLabel, serverURL.Scheme,
2090				model.AddressLabel, serverURL.Host,
2091			),
2092		},
2093		client: http.DefaultClient,
2094	}
2095
2096	_, err = ts.scrape(context.Background(), ioutil.Discard)
2097	require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err)
2098}
2099
2100func TestTargetScraperBodySizeLimit(t *testing.T) {
2101	const (
2102		bodySizeLimit = 15
2103		responseBody  = "metric_a 1\nmetric_b 2\n"
2104	)
2105	var gzipResponse bool
2106	server := httptest.NewServer(
2107		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2108			w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
2109			if gzipResponse {
2110				w.Header().Set("Content-Encoding", "gzip")
2111				gw := gzip.NewWriter(w)
2112				defer gw.Close()
2113				gw.Write([]byte(responseBody))
2114				return
2115			}
2116			w.Write([]byte(responseBody))
2117		}),
2118	)
2119	defer server.Close()
2120
2121	serverURL, err := url.Parse(server.URL)
2122	if err != nil {
2123		panic(err)
2124	}
2125
2126	ts := &targetScraper{
2127		Target: &Target{
2128			labels: labels.FromStrings(
2129				model.SchemeLabel, serverURL.Scheme,
2130				model.AddressLabel, serverURL.Host,
2131			),
2132		},
2133		client:        http.DefaultClient,
2134		bodySizeLimit: bodySizeLimit,
2135	}
2136	var buf bytes.Buffer
2137
2138	// Target response uncompressed body, scrape with body size limit.
2139	_, err = ts.scrape(context.Background(), &buf)
2140	require.ErrorIs(t, err, errBodySizeLimit)
2141	require.Equal(t, bodySizeLimit, buf.Len())
2142	// Target response gzip compressed body, scrape with body size limit.
2143	gzipResponse = true
2144	buf.Reset()
2145	_, err = ts.scrape(context.Background(), &buf)
2146	require.ErrorIs(t, err, errBodySizeLimit)
2147	require.Equal(t, bodySizeLimit, buf.Len())
2148	// Target response uncompressed body, scrape without body size limit.
2149	gzipResponse = false
2150	buf.Reset()
2151	ts.bodySizeLimit = 0
2152	_, err = ts.scrape(context.Background(), &buf)
2153	require.NoError(t, err)
2154	require.Equal(t, len(responseBody), buf.Len())
2155	// Target response gzip compressed body, scrape without body size limit.
2156	gzipResponse = true
2157	buf.Reset()
2158	_, err = ts.scrape(context.Background(), &buf)
2159	require.NoError(t, err)
2160	require.Equal(t, len(responseBody), buf.Len())
2161}
2162
2163// testScraper implements the scraper interface and allows setting values
2164// returned by its methods. It also allows setting a custom scrape function.
2165type testScraper struct {
2166	offsetDur time.Duration
2167
2168	lastStart    time.Time
2169	lastDuration time.Duration
2170	lastError    error
2171
2172	scrapeErr  error
2173	scrapeFunc func(context.Context, io.Writer) error
2174}
2175
2176func (ts *testScraper) offset(interval time.Duration, jitterSeed uint64) time.Duration {
2177	return ts.offsetDur
2178}
2179
2180func (ts *testScraper) Report(start time.Time, duration time.Duration, err error) {
2181	ts.lastStart = start
2182	ts.lastDuration = duration
2183	ts.lastError = err
2184}
2185
2186func (ts *testScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
2187	if ts.scrapeFunc != nil {
2188		return "", ts.scrapeFunc(ctx, w)
2189	}
2190	return "", ts.scrapeErr
2191}
2192
2193func TestScrapeLoop_RespectTimestamps(t *testing.T) {
2194	s := teststorage.New(t)
2195	defer s.Close()
2196
2197	app := s.Appender(context.Background())
2198
2199	capp := &collectResultAppender{next: app}
2200
2201	sl := newScrapeLoop(context.Background(),
2202		nil, nil, nil,
2203		nopMutator,
2204		nopMutator,
2205		func(ctx context.Context) storage.Appender { return capp },
2206		nil, 0,
2207		true,
2208		0,
2209		nil,
2210		0,
2211		0,
2212		false,
2213	)
2214
2215	now := time.Now()
2216	slApp := sl.appender(context.Background())
2217	_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
2218	require.NoError(t, err)
2219	require.NoError(t, slApp.Commit())
2220
2221	want := []sample{
2222		{
2223			metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
2224			t:      0,
2225			v:      1,
2226		},
2227	}
2228	require.Equal(t, want, capp.result, "Appended samples not as expected")
2229}
2230
2231func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
2232	s := teststorage.New(t)
2233	defer s.Close()
2234
2235	app := s.Appender(context.Background())
2236
2237	capp := &collectResultAppender{next: app}
2238
2239	sl := newScrapeLoop(context.Background(),
2240		nil, nil, nil,
2241		nopMutator,
2242		nopMutator,
2243		func(ctx context.Context) storage.Appender { return capp },
2244		nil, 0,
2245		false,
2246		0,
2247		nil,
2248		0,
2249		0,
2250		false,
2251	)
2252
2253	now := time.Now()
2254	slApp := sl.appender(context.Background())
2255	_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
2256	require.NoError(t, err)
2257	require.NoError(t, slApp.Commit())
2258
2259	want := []sample{
2260		{
2261			metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
2262			t:      timestamp.FromTime(now),
2263			v:      1,
2264		},
2265	}
2266	require.Equal(t, want, capp.result, "Appended samples not as expected")
2267}
2268
2269func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
2270	s := teststorage.New(t)
2271	defer s.Close()
2272
2273	ctx, cancel := context.WithCancel(context.Background())
2274	sl := newScrapeLoop(ctx,
2275		&testScraper{},
2276		nil, nil,
2277		nopMutator,
2278		nopMutator,
2279		s.Appender,
2280		nil,
2281		0,
2282		true,
2283		0,
2284		nil,
2285		0,
2286		0,
2287		false,
2288	)
2289	defer cancel()
2290
2291	// We add a good and a bad metric to check that both are discarded.
2292	slApp := sl.appender(ctx)
2293	_, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
2294	require.Error(t, err)
2295	require.NoError(t, slApp.Rollback())
2296
2297	q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
2298	require.NoError(t, err)
2299	series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
2300	require.Equal(t, false, series.Next(), "series found in tsdb")
2301	require.NoError(t, series.Err())
2302
2303	// We add a good metric to check that it is recorded.
2304	slApp = sl.appender(ctx)
2305	_, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
2306	require.NoError(t, err)
2307	require.NoError(t, slApp.Commit())
2308
2309	q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
2310	require.NoError(t, err)
2311	series = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
2312	require.Equal(t, true, series.Next(), "series not found in tsdb")
2313	require.NoError(t, series.Err())
2314	require.Equal(t, false, series.Next(), "more than one series found in tsdb")
2315}
2316
2317func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
2318	s := teststorage.New(t)
2319	defer s.Close()
2320
2321	app := s.Appender(context.Background())
2322
2323	ctx, cancel := context.WithCancel(context.Background())
2324	sl := newScrapeLoop(context.Background(),
2325		&testScraper{},
2326		nil, nil,
2327		func(l labels.Labels) labels.Labels {
2328			if l.Has("drop") {
2329				return labels.Labels{}
2330			}
2331			return l
2332		},
2333		nopMutator,
2334		func(ctx context.Context) storage.Appender { return app },
2335		nil,
2336		0,
2337		true,
2338		0,
2339		nil,
2340		0,
2341		0,
2342		false,
2343	)
2344	defer cancel()
2345
2346	slApp := sl.appender(context.Background())
2347	_, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{})
2348	require.Error(t, err)
2349	require.NoError(t, slApp.Rollback())
2350	require.Equal(t, errNameLabelMandatory, err)
2351
2352	q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
2353	require.NoError(t, err)
2354	series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
2355	require.Equal(t, false, series.Next(), "series found in tsdb")
2356	require.NoError(t, series.Err())
2357}
2358
2359func TestReusableConfig(t *testing.T) {
2360	variants := []*config.ScrapeConfig{
2361		{
2362			JobName:       "prometheus",
2363			ScrapeTimeout: model.Duration(15 * time.Second),
2364		},
2365		{
2366			JobName:       "httpd",
2367			ScrapeTimeout: model.Duration(15 * time.Second),
2368		},
2369		{
2370			JobName:       "prometheus",
2371			ScrapeTimeout: model.Duration(5 * time.Second),
2372		},
2373		{
2374			JobName:     "prometheus",
2375			MetricsPath: "/metrics",
2376		},
2377		{
2378			JobName:     "prometheus",
2379			MetricsPath: "/metrics2",
2380		},
2381		{
2382			JobName:       "prometheus",
2383			ScrapeTimeout: model.Duration(5 * time.Second),
2384			MetricsPath:   "/metrics2",
2385		},
2386		{
2387			JobName:        "prometheus",
2388			ScrapeInterval: model.Duration(5 * time.Second),
2389			MetricsPath:    "/metrics2",
2390		},
2391		{
2392			JobName:        "prometheus",
2393			ScrapeInterval: model.Duration(5 * time.Second),
2394			SampleLimit:    1000,
2395			MetricsPath:    "/metrics2",
2396		},
2397	}
2398
2399	match := [][]int{
2400		{0, 2},
2401		{4, 5},
2402		{4, 6},
2403		{4, 7},
2404		{5, 6},
2405		{5, 7},
2406		{6, 7},
2407	}
2408	noMatch := [][]int{
2409		{1, 2},
2410		{0, 4},
2411		{3, 4},
2412	}
2413
2414	for i, m := range match {
2415		require.Equal(t, true, reusableCache(variants[m[0]], variants[m[1]]), "match test %d", i)
2416		require.Equal(t, true, reusableCache(variants[m[1]], variants[m[0]]), "match test %d", i)
2417		require.Equal(t, true, reusableCache(variants[m[1]], variants[m[1]]), "match test %d", i)
2418		require.Equal(t, true, reusableCache(variants[m[0]], variants[m[0]]), "match test %d", i)
2419	}
2420	for i, m := range noMatch {
2421		require.Equal(t, false, reusableCache(variants[m[0]], variants[m[1]]), "not match test %d", i)
2422		require.Equal(t, false, reusableCache(variants[m[1]], variants[m[0]]), "not match test %d", i)
2423	}
2424}
2425
2426func TestReuseScrapeCache(t *testing.T) {
2427	var (
2428		app = &nopAppendable{}
2429		cfg = &config.ScrapeConfig{
2430			JobName:        "Prometheus",
2431			ScrapeTimeout:  model.Duration(5 * time.Second),
2432			ScrapeInterval: model.Duration(5 * time.Second),
2433			MetricsPath:    "/metrics",
2434		}
2435		sp, _ = newScrapePool(cfg, app, 0, nil, false)
2436		t1    = &Target{
2437			discoveredLabels: labels.Labels{
2438				labels.Label{
2439					Name:  "labelNew",
2440					Value: "nameNew",
2441				},
2442			},
2443		}
2444		proxyURL, _ = url.Parse("http://localhost:2128")
2445	)
2446	defer sp.stop()
2447	sp.sync([]*Target{t1})
2448
2449	steps := []struct {
2450		keep      bool
2451		newConfig *config.ScrapeConfig
2452	}{
2453		{
2454			keep: true,
2455			newConfig: &config.ScrapeConfig{
2456				JobName:        "Prometheus",
2457				ScrapeInterval: model.Duration(5 * time.Second),
2458				ScrapeTimeout:  model.Duration(5 * time.Second),
2459				MetricsPath:    "/metrics",
2460			},
2461		},
2462		{
2463			keep: false,
2464			newConfig: &config.ScrapeConfig{
2465				JobName:        "Prometheus",
2466				ScrapeInterval: model.Duration(5 * time.Second),
2467				ScrapeTimeout:  model.Duration(15 * time.Second),
2468				MetricsPath:    "/metrics2",
2469			},
2470		},
2471		{
2472			keep: true,
2473			newConfig: &config.ScrapeConfig{
2474				JobName:        "Prometheus",
2475				SampleLimit:    400,
2476				ScrapeInterval: model.Duration(5 * time.Second),
2477				ScrapeTimeout:  model.Duration(15 * time.Second),
2478				MetricsPath:    "/metrics2",
2479			},
2480		},
2481		{
2482			keep: false,
2483			newConfig: &config.ScrapeConfig{
2484				JobName:         "Prometheus",
2485				HonorTimestamps: true,
2486				SampleLimit:     400,
2487				ScrapeInterval:  model.Duration(5 * time.Second),
2488				ScrapeTimeout:   model.Duration(15 * time.Second),
2489				MetricsPath:     "/metrics2",
2490			},
2491		},
2492		{
2493			keep: true,
2494			newConfig: &config.ScrapeConfig{
2495				JobName:         "Prometheus",
2496				HonorTimestamps: true,
2497				SampleLimit:     400,
2498				HTTPClientConfig: config_util.HTTPClientConfig{
2499					ProxyURL: config_util.URL{URL: proxyURL},
2500				},
2501				ScrapeInterval: model.Duration(5 * time.Second),
2502				ScrapeTimeout:  model.Duration(15 * time.Second),
2503				MetricsPath:    "/metrics2",
2504			},
2505		},
2506		{
2507			keep: false,
2508			newConfig: &config.ScrapeConfig{
2509				JobName:         "Prometheus",
2510				HonorTimestamps: true,
2511				HonorLabels:     true,
2512				SampleLimit:     400,
2513				ScrapeInterval:  model.Duration(5 * time.Second),
2514				ScrapeTimeout:   model.Duration(15 * time.Second),
2515				MetricsPath:     "/metrics2",
2516			},
2517		},
2518	}
2519
2520	cacheAddr := func(sp *scrapePool) map[uint64]string {
2521		r := make(map[uint64]string)
2522		for fp, l := range sp.loops {
2523			r[fp] = fmt.Sprintf("%p", l.getCache())
2524		}
2525		return r
2526	}
2527
2528	for i, s := range steps {
2529		initCacheAddr := cacheAddr(sp)
2530		sp.reload(s.newConfig)
2531		for fp, newCacheAddr := range cacheAddr(sp) {
2532			if s.keep {
2533				require.Equal(t, initCacheAddr[fp], newCacheAddr, "step %d: old cache and new cache are not the same", i)
2534			} else {
2535				require.NotEqual(t, initCacheAddr[fp], newCacheAddr, "step %d: old cache and new cache are the same", i)
2536			}
2537		}
2538		initCacheAddr = cacheAddr(sp)
2539		sp.reload(s.newConfig)
2540		for fp, newCacheAddr := range cacheAddr(sp) {
2541			require.Equal(t, initCacheAddr[fp], newCacheAddr, "step %d: reloading the exact config invalidates the cache", i)
2542		}
2543	}
2544}
2545
2546func TestScrapeAddFast(t *testing.T) {
2547	s := teststorage.New(t)
2548	defer s.Close()
2549
2550	ctx, cancel := context.WithCancel(context.Background())
2551	sl := newScrapeLoop(ctx,
2552		&testScraper{},
2553		nil, nil,
2554		nopMutator,
2555		nopMutator,
2556		s.Appender,
2557		nil,
2558		0,
2559		true,
2560		0,
2561		nil,
2562		0,
2563		0,
2564		false,
2565	)
2566	defer cancel()
2567
2568	slApp := sl.appender(ctx)
2569	_, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{})
2570	require.NoError(t, err)
2571	require.NoError(t, slApp.Commit())
2572
2573	// Poison the cache. There is just one entry, and one series in the
2574	// storage. Changing the ref will create a 'not found' error.
2575	for _, v := range sl.getCache().series {
2576		v.ref++
2577	}
2578
2579	slApp = sl.appender(ctx)
2580	_, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second))
2581	require.NoError(t, err)
2582	require.NoError(t, slApp.Commit())
2583}
2584
2585func TestReuseCacheRace(t *testing.T) {
2586	var (
2587		app = &nopAppendable{}
2588		cfg = &config.ScrapeConfig{
2589			JobName:        "Prometheus",
2590			ScrapeTimeout:  model.Duration(5 * time.Second),
2591			ScrapeInterval: model.Duration(5 * time.Second),
2592			MetricsPath:    "/metrics",
2593		}
2594		sp, _ = newScrapePool(cfg, app, 0, nil, false)
2595		t1    = &Target{
2596			discoveredLabels: labels.Labels{
2597				labels.Label{
2598					Name:  "labelNew",
2599					Value: "nameNew",
2600				},
2601			},
2602		}
2603	)
2604	defer sp.stop()
2605	sp.sync([]*Target{t1})
2606
2607	start := time.Now()
2608	for i := uint(1); i > 0; i++ {
2609		if time.Since(start) > 5*time.Second {
2610			break
2611		}
2612		sp.reload(&config.ScrapeConfig{
2613			JobName:        "Prometheus",
2614			ScrapeTimeout:  model.Duration(1 * time.Millisecond),
2615			ScrapeInterval: model.Duration(1 * time.Millisecond),
2616			MetricsPath:    "/metrics",
2617			SampleLimit:    i,
2618		})
2619	}
2620}
2621
2622func TestCheckAddError(t *testing.T) {
2623	var appErrs appendErrors
2624	sl := scrapeLoop{l: log.NewNopLogger()}
2625	sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, &appErrs)
2626	require.Equal(t, 1, appErrs.numOutOfOrder)
2627}
2628
2629func TestScrapeReportSingleAppender(t *testing.T) {
2630	s := teststorage.New(t)
2631	defer s.Close()
2632
2633	var (
2634		signal  = make(chan struct{}, 1)
2635		scraper = &testScraper{}
2636	)
2637
2638	ctx, cancel := context.WithCancel(context.Background())
2639	sl := newScrapeLoop(ctx,
2640		scraper,
2641		nil, nil,
2642		nopMutator,
2643		nopMutator,
2644		s.Appender,
2645		nil,
2646		0,
2647		true,
2648		0,
2649		nil,
2650		10*time.Millisecond,
2651		time.Hour,
2652		false,
2653	)
2654
2655	numScrapes := 0
2656
2657	scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
2658		numScrapes++
2659		if numScrapes%4 == 0 {
2660			return fmt.Errorf("scrape failed")
2661		}
2662		w.Write([]byte("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n"))
2663		return nil
2664	}
2665
2666	go func() {
2667		sl.run(nil)
2668		signal <- struct{}{}
2669	}()
2670
2671	start := time.Now()
2672	for time.Since(start) < 3*time.Second {
2673		q, err := s.Querier(ctx, time.Time{}.UnixNano(), time.Now().UnixNano())
2674		require.NoError(t, err)
2675		series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+"))
2676
2677		c := 0
2678		for series.Next() {
2679			i := series.At().Iterator()
2680			for i.Next() {
2681				c++
2682			}
2683		}
2684
2685		require.Equal(t, 0, c%9, "Appended samples not as expected: %d", c)
2686		q.Close()
2687	}
2688	cancel()
2689
2690	select {
2691	case <-signal:
2692	case <-time.After(5 * time.Second):
2693		t.Fatalf("Scrape wasn't stopped.")
2694	}
2695}
2696
2697func TestScrapeLoopLabelLimit(t *testing.T) {
2698	tests := []struct {
2699		title           string
2700		scrapeLabels    string
2701		discoveryLabels []string
2702		labelLimits     labelLimits
2703		expectErr       bool
2704	}{
2705		{
2706			title:           "Valid number of labels",
2707			scrapeLabels:    `metric{l1="1", l2="2"} 0`,
2708			discoveryLabels: nil,
2709			labelLimits:     labelLimits{labelLimit: 5},
2710			expectErr:       false,
2711		}, {
2712			title:           "Too many labels",
2713			scrapeLabels:    `metric{l1="1", l2="2", l3="3", l4="4", l5="5", l6="6"} 0`,
2714			discoveryLabels: nil,
2715			labelLimits:     labelLimits{labelLimit: 5},
2716			expectErr:       true,
2717		}, {
2718			title:           "Too many labels including discovery labels",
2719			scrapeLabels:    `metric{l1="1", l2="2", l3="3", l4="4"} 0`,
2720			discoveryLabels: []string{"l5", "5", "l6", "6"},
2721			labelLimits:     labelLimits{labelLimit: 5},
2722			expectErr:       true,
2723		}, {
2724			title:           "Valid labels name length",
2725			scrapeLabels:    `metric{l1="1", l2="2"} 0`,
2726			discoveryLabels: nil,
2727			labelLimits:     labelLimits{labelNameLengthLimit: 10},
2728			expectErr:       false,
2729		}, {
2730			title:           "Label name too long",
2731			scrapeLabels:    `metric{label_name_too_long="0"} 0`,
2732			discoveryLabels: nil,
2733			labelLimits:     labelLimits{labelNameLengthLimit: 10},
2734			expectErr:       true,
2735		}, {
2736			title:           "Discovery label name too long",
2737			scrapeLabels:    `metric{l1="1", l2="2"} 0`,
2738			discoveryLabels: []string{"label_name_too_long", "0"},
2739			labelLimits:     labelLimits{labelNameLengthLimit: 10},
2740			expectErr:       true,
2741		}, {
2742			title:           "Valid labels value length",
2743			scrapeLabels:    `metric{l1="1", l2="2"} 0`,
2744			discoveryLabels: nil,
2745			labelLimits:     labelLimits{labelValueLengthLimit: 10},
2746			expectErr:       false,
2747		}, {
2748			title:           "Label value too long",
2749			scrapeLabels:    `metric{l1="label_value_too_long"} 0`,
2750			discoveryLabels: nil,
2751			labelLimits:     labelLimits{labelValueLengthLimit: 10},
2752			expectErr:       true,
2753		}, {
2754			title:           "Discovery label value too long",
2755			scrapeLabels:    `metric{l1="1", l2="2"} 0`,
2756			discoveryLabels: []string{"l1", "label_value_too_long"},
2757			labelLimits:     labelLimits{labelValueLengthLimit: 10},
2758			expectErr:       true,
2759		},
2760	}
2761
2762	for _, test := range tests {
2763		app := &collectResultAppender{}
2764
2765		discoveryLabels := &Target{
2766			labels: labels.FromStrings(test.discoveryLabels...),
2767		}
2768
2769		sl := newScrapeLoop(context.Background(),
2770			nil, nil, nil,
2771			func(l labels.Labels) labels.Labels {
2772				return mutateSampleLabels(l, discoveryLabels, false, nil)
2773			},
2774			func(l labels.Labels) labels.Labels {
2775				return mutateReportSampleLabels(l, discoveryLabels)
2776			},
2777			func(ctx context.Context) storage.Appender { return app },
2778			nil,
2779			0,
2780			true,
2781			0,
2782			&test.labelLimits,
2783			0,
2784			0,
2785			false,
2786		)
2787
2788		slApp := sl.appender(context.Background())
2789		_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", time.Now())
2790
2791		t.Logf("Test:%s", test.title)
2792		if test.expectErr {
2793			require.Error(t, err)
2794		} else {
2795			require.NoError(t, err)
2796			require.NoError(t, slApp.Commit())
2797		}
2798	}
2799}
2800
2801func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
2802	interval, _ := model.ParseDuration("2s")
2803	timeout, _ := model.ParseDuration("500ms")
2804	config := &config.ScrapeConfig{
2805		ScrapeInterval: interval,
2806		ScrapeTimeout:  timeout,
2807		RelabelConfigs: []*relabel.Config{
2808			{
2809				SourceLabels: model.LabelNames{model.ScrapeIntervalLabel},
2810				Regex:        relabel.MustNewRegexp("2s"),
2811				Replacement:  "3s",
2812				TargetLabel:  model.ScrapeIntervalLabel,
2813				Action:       relabel.Replace,
2814			},
2815			{
2816				SourceLabels: model.LabelNames{model.ScrapeTimeoutLabel},
2817				Regex:        relabel.MustNewRegexp("500ms"),
2818				Replacement:  "750ms",
2819				TargetLabel:  model.ScrapeTimeoutLabel,
2820				Action:       relabel.Replace,
2821			},
2822		},
2823	}
2824	sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false)
2825	tgts := []*targetgroup.Group{
2826		{
2827			Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},
2828		},
2829	}
2830
2831	sp.Sync(tgts)
2832	defer sp.stop()
2833
2834	require.Equal(t, "3s", sp.ActiveTargets()[0].labels.Get(model.ScrapeIntervalLabel))
2835	require.Equal(t, "750ms", sp.ActiveTargets()[0].labels.Get(model.ScrapeTimeoutLabel))
2836}
2837