1// Copyright 2016 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package retrieval
15
16import (
17	"fmt"
18	"net/http"
19	"net/http/httptest"
20	"net/url"
21	"reflect"
22	"sort"
23	"strings"
24	"sync"
25	"testing"
26	"time"
27
28	"github.com/prometheus/common/model"
29	"golang.org/x/net/context"
30
31	"github.com/prometheus/prometheus/config"
32	"github.com/prometheus/prometheus/storage"
33)
34
35func TestNewScrapePool(t *testing.T) {
36	var (
37		app = &nopAppender{}
38		cfg = &config.ScrapeConfig{}
39		sp  = newScrapePool(context.Background(), cfg, app)
40	)
41
42	if a, ok := sp.appender.(*nopAppender); !ok || a != app {
43		t.Fatalf("Wrong sample appender")
44	}
45	if sp.config != cfg {
46		t.Fatalf("Wrong scrape config")
47	}
48	if sp.newLoop == nil {
49		t.Fatalf("newLoop function not initialized")
50	}
51}
52
53type testLoop struct {
54	startFunc func(interval, timeout time.Duration, errc chan<- error)
55	stopFunc  func()
56}
57
58func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) {
59	l.startFunc(interval, timeout, errc)
60}
61
62func (l *testLoop) stop() {
63	l.stopFunc()
64}
65
66func TestScrapePoolStop(t *testing.T) {
67	sp := &scrapePool{
68		targets: map[uint64]*Target{},
69		loops:   map[uint64]loop{},
70	}
71	var mtx sync.Mutex
72	stopped := map[uint64]bool{}
73	numTargets := 20
74
75	// Stopping the scrape pool must call stop() on all scrape loops,
76	// clean them and the respective targets up. It must wait until each loop's
77	// stop function returned before returning itself.
78
79	for i := 0; i < numTargets; i++ {
80		t := &Target{
81			labels: model.LabelSet{
82				model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)),
83			},
84		}
85		l := &testLoop{}
86		l.stopFunc = func() {
87			time.Sleep(time.Duration(i*20) * time.Millisecond)
88
89			mtx.Lock()
90			stopped[t.hash()] = true
91			mtx.Unlock()
92		}
93
94		sp.targets[t.hash()] = t
95		sp.loops[t.hash()] = l
96	}
97
98	done := make(chan struct{})
99	stopTime := time.Now()
100
101	go func() {
102		sp.stop()
103		close(done)
104	}()
105
106	select {
107	case <-time.After(5 * time.Second):
108		t.Fatalf("scrapeLoop.stop() did not return as expected")
109	case <-done:
110		// This should have taken at least as long as the last target slept.
111		if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond {
112			t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
113		}
114	}
115
116	mtx.Lock()
117	if len(stopped) != numTargets {
118		t.Fatalf("Expected 20 stopped loops, got %d", len(stopped))
119	}
120	mtx.Unlock()
121
122	if len(sp.targets) > 0 {
123		t.Fatalf("Targets were not cleared on stopping: %d left", len(sp.targets))
124	}
125	if len(sp.loops) > 0 {
126		t.Fatalf("Loops were not cleared on stopping: %d left", len(sp.loops))
127	}
128}
129
130func TestScrapePoolReload(t *testing.T) {
131	var mtx sync.Mutex
132	numTargets := 20
133
134	stopped := map[uint64]bool{}
135
136	reloadCfg := &config.ScrapeConfig{
137		ScrapeInterval: model.Duration(3 * time.Second),
138		ScrapeTimeout:  model.Duration(2 * time.Second),
139	}
140	// On starting to run, new loops created on reload check whether their preceding
141	// equivalents have been stopped.
142	newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, tl model.LabelSet, cfg *config.ScrapeConfig) loop {
143		l := &testLoop{}
144		l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
145			if interval != 3*time.Second {
146				t.Errorf("Expected scrape interval %d but got %d", 3*time.Second, interval)
147			}
148			if timeout != 2*time.Second {
149				t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout)
150			}
151			mtx.Lock()
152			if !stopped[s.(*targetScraper).hash()] {
153				t.Errorf("Scrape loop for %v not stopped yet", s.(*targetScraper))
154			}
155			mtx.Unlock()
156		}
157		return l
158	}
159	sp := &scrapePool{
160		targets: map[uint64]*Target{},
161		loops:   map[uint64]loop{},
162		newLoop: newLoop,
163	}
164
165	// Reloading a scrape pool with a new scrape configuration must stop all scrape
166	// loops and start new ones. A new loop must not be started before the preceding
167	// one terminated.
168
169	for i := 0; i < numTargets; i++ {
170		t := &Target{
171			labels: model.LabelSet{
172				model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)),
173			},
174		}
175		l := &testLoop{}
176		l.stopFunc = func() {
177			time.Sleep(time.Duration(i*20) * time.Millisecond)
178
179			mtx.Lock()
180			stopped[t.hash()] = true
181			mtx.Unlock()
182		}
183
184		sp.targets[t.hash()] = t
185		sp.loops[t.hash()] = l
186	}
187	done := make(chan struct{})
188
189	beforeTargets := map[uint64]*Target{}
190	for h, t := range sp.targets {
191		beforeTargets[h] = t
192	}
193
194	reloadTime := time.Now()
195
196	go func() {
197		sp.reload(reloadCfg)
198		close(done)
199	}()
200
201	select {
202	case <-time.After(5 * time.Second):
203		t.Fatalf("scrapeLoop.reload() did not return as expected")
204	case <-done:
205		// This should have taken at least as long as the last target slept.
206		if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond {
207			t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
208		}
209	}
210
211	mtx.Lock()
212	if len(stopped) != numTargets {
213		t.Fatalf("Expected 20 stopped loops, got %d", len(stopped))
214	}
215	mtx.Unlock()
216
217	if !reflect.DeepEqual(sp.targets, beforeTargets) {
218		t.Fatalf("Reloading affected target states unexpectedly")
219	}
220	if len(sp.loops) != numTargets {
221		t.Fatalf("Expected %d loops after reload but got %d", numTargets, len(sp.loops))
222	}
223}
224
225func TestScrapeLoopWrapSampleAppender(t *testing.T) {
226	cfg := &config.ScrapeConfig{
227		MetricRelabelConfigs: []*config.RelabelConfig{
228			{
229				Action:       config.RelabelDrop,
230				SourceLabels: model.LabelNames{"__name__"},
231				Regex:        config.MustNewRegexp("does_not_match_.*"),
232			},
233			{
234				Action:       config.RelabelDrop,
235				SourceLabels: model.LabelNames{"__name__"},
236				Regex:        config.MustNewRegexp("does_not_match_either_*"),
237			},
238		},
239	}
240
241	target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
242	app := &nopAppender{}
243
244	sp := newScrapePool(context.Background(), cfg, app)
245
246	cfg.HonorLabels = false
247
248	sl := sp.newLoop(
249		sp.ctx,
250		&targetScraper{Target: target, client: sp.client},
251		sp.appender,
252		target.Labels(),
253		sp.config,
254	).(*scrapeLoop)
255	wrapped, _ := sl.wrapAppender(sl.appender)
256
257	rl, ok := wrapped.(ruleLabelsAppender)
258	if !ok {
259		t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
260	}
261	re, ok := rl.SampleAppender.(relabelAppender)
262	if !ok {
263		t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender)
264	}
265	co, ok := re.SampleAppender.(*countingAppender)
266	if !ok {
267		t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender)
268	}
269	if co.SampleAppender != app {
270		t.Fatalf("Expected base appender but got %T", co.SampleAppender)
271	}
272
273	cfg.HonorLabels = true
274	sl = sp.newLoop(
275		sp.ctx,
276		&targetScraper{Target: target, client: sp.client},
277		sp.appender,
278		target.Labels(),
279		sp.config,
280	).(*scrapeLoop)
281	wrapped, _ = sl.wrapAppender(sl.appender)
282
283	hl, ok := wrapped.(honorLabelsAppender)
284	if !ok {
285		t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
286	}
287	re, ok = hl.SampleAppender.(relabelAppender)
288	if !ok {
289		t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender)
290	}
291	co, ok = re.SampleAppender.(*countingAppender)
292	if !ok {
293		t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender)
294	}
295	if co.SampleAppender != app {
296		t.Fatalf("Expected base appender but got %T", co.SampleAppender)
297	}
298}
299
300func TestScrapeLoopSampleProcessing(t *testing.T) {
301	readSamples := model.Samples{
302		{
303			Metric: model.Metric{"__name__": "a_metric"},
304		},
305		{
306			Metric: model.Metric{"__name__": "b_metric"},
307		},
308	}
309
310	testCases := []struct {
311		scrapedSamples                  model.Samples
312		scrapeConfig                    *config.ScrapeConfig
313		expectedReportedSamples         model.Samples
314		expectedPostRelabelSamplesCount int
315	}{
316		{ // 0
317			scrapedSamples: readSamples,
318			scrapeConfig:   &config.ScrapeConfig{},
319			expectedReportedSamples: model.Samples{
320				{
321					Metric: model.Metric{"__name__": "up"},
322					Value:  1,
323				},
324				{
325					Metric: model.Metric{"__name__": "scrape_duration_seconds"},
326					Value:  42,
327				},
328				{
329					Metric: model.Metric{"__name__": "scrape_samples_scraped"},
330					Value:  2,
331				},
332				{
333					Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
334					Value:  2,
335				},
336			},
337			expectedPostRelabelSamplesCount: 2,
338		},
339		{ // 1
340			scrapedSamples: readSamples,
341			scrapeConfig: &config.ScrapeConfig{
342				MetricRelabelConfigs: []*config.RelabelConfig{
343					{
344						Action:       config.RelabelDrop,
345						SourceLabels: model.LabelNames{"__name__"},
346						Regex:        config.MustNewRegexp("a.*"),
347					},
348				},
349			},
350			expectedReportedSamples: model.Samples{
351				{
352					Metric: model.Metric{"__name__": "up"},
353					Value:  1,
354				},
355				{
356					Metric: model.Metric{"__name__": "scrape_duration_seconds"},
357					Value:  42,
358				},
359				{
360					Metric: model.Metric{"__name__": "scrape_samples_scraped"},
361					Value:  2,
362				},
363				{
364					Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
365					Value:  1,
366				},
367			},
368			expectedPostRelabelSamplesCount: 1,
369		},
370		{ // 2
371			scrapedSamples: readSamples,
372			scrapeConfig: &config.ScrapeConfig{
373				SampleLimit: 1,
374				MetricRelabelConfigs: []*config.RelabelConfig{
375					{
376						Action:       config.RelabelDrop,
377						SourceLabels: model.LabelNames{"__name__"},
378						Regex:        config.MustNewRegexp("a.*"),
379					},
380				},
381			},
382			expectedReportedSamples: model.Samples{
383				{
384					Metric: model.Metric{"__name__": "up"},
385					Value:  1,
386				},
387				{
388					Metric: model.Metric{"__name__": "scrape_duration_seconds"},
389					Value:  42,
390				},
391				{
392					Metric: model.Metric{"__name__": "scrape_samples_scraped"},
393					Value:  2,
394				},
395				{
396					Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
397					Value:  1,
398				},
399			},
400			expectedPostRelabelSamplesCount: 1,
401		},
402		{ // 3
403			scrapedSamples: readSamples,
404			scrapeConfig: &config.ScrapeConfig{
405				SampleLimit: 1,
406			},
407			expectedReportedSamples: model.Samples{
408				{
409					Metric: model.Metric{"__name__": "up"},
410					Value:  0,
411				},
412				{
413					Metric: model.Metric{"__name__": "scrape_duration_seconds"},
414					Value:  42,
415				},
416				{
417					Metric: model.Metric{"__name__": "scrape_samples_scraped"},
418					Value:  2,
419				},
420				{
421					Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
422					Value:  2,
423				},
424			},
425			expectedPostRelabelSamplesCount: 2,
426		},
427	}
428
429	for i, test := range testCases {
430		ingestedSamples := &bufferAppender{buffer: model.Samples{}}
431
432		target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
433
434		scraper := &testScraper{}
435		sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, target.Labels(), test.scrapeConfig).(*scrapeLoop)
436		num, err := sl.append(test.scrapedSamples)
437		sl.report(time.Unix(0, 0), 42*time.Second, len(test.scrapedSamples), num, err)
438		reportedSamples := ingestedSamples.buffer
439		if err == nil {
440			reportedSamples = reportedSamples[num:]
441		}
442
443		if !reflect.DeepEqual(reportedSamples, test.expectedReportedSamples) {
444			t.Errorf("Reported samples did not match expected metrics for case %d", i)
445			t.Errorf("Expected: %v", test.expectedReportedSamples)
446			t.Fatalf("Got: %v", reportedSamples)
447		}
448		if test.expectedPostRelabelSamplesCount != num {
449			t.Fatalf("Case %d: Ingested samples %d did not match expected value %d", i, num, test.expectedPostRelabelSamplesCount)
450		}
451	}
452
453}
454
455func TestScrapeLoopStop(t *testing.T) {
456	scraper := &testScraper{}
457	sl := newScrapeLoop(context.Background(), scraper, nil, nil, &config.ScrapeConfig{})
458
459	// The scrape pool synchronizes on stopping scrape loops. However, new scrape
460	// loops are started asynchronously. Thus it's possible, that a loop is stopped
461	// again before having started properly.
462	// Stopping not-yet-started loops must block until the run method was called and exited.
463	// The run method must exit immediately.
464
465	stopDone := make(chan struct{})
466	go func() {
467		sl.stop()
468		close(stopDone)
469	}()
470
471	select {
472	case <-stopDone:
473		t.Fatalf("Stopping terminated before run exited successfully")
474	case <-time.After(500 * time.Millisecond):
475	}
476
477	// Running the scrape loop must exit before calling the scraper even once.
478	scraper.scrapeFunc = func(context.Context, time.Time) (model.Samples, error) {
479		t.Fatalf("scraper was called for terminated scrape loop")
480		return nil, nil
481	}
482
483	runDone := make(chan struct{})
484	go func() {
485		sl.run(1, 0, nil)
486		close(runDone)
487	}()
488
489	select {
490	case <-runDone:
491	case <-time.After(1 * time.Second):
492		t.Fatalf("Running terminated scrape loop did not exit")
493	}
494
495	select {
496	case <-stopDone:
497	case <-time.After(1 * time.Second):
498		t.Fatalf("Stopping did not terminate after running exited")
499	}
500}
501
502func TestScrapeLoopRun(t *testing.T) {
503	var (
504		signal = make(chan struct{})
505		errc   = make(chan error)
506
507		scraper = &testScraper{}
508		app     = &nopAppender{}
509	)
510	defer close(signal)
511
512	ctx, cancel := context.WithCancel(context.Background())
513	sl := newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{})
514
515	// The loop must terminate during the initial offset if the context
516	// is canceled.
517	scraper.offsetDur = time.Hour
518
519	go func() {
520		sl.run(time.Second, time.Hour, errc)
521		signal <- struct{}{}
522	}()
523
524	// Wait to make sure we are actually waiting on the offset.
525	time.Sleep(1 * time.Second)
526
527	cancel()
528	select {
529	case <-signal:
530	case <-time.After(5 * time.Second):
531		t.Fatalf("Cancelation during initial offset failed")
532	case err := <-errc:
533		t.Fatalf("Unexpected error: %s", err)
534	}
535
536	// The provided timeout must cause cancelation of the context passed down to the
537	// scraper. The scraper has to respect the context.
538	scraper.offsetDur = 0
539
540	block := make(chan struct{})
541	scraper.scrapeFunc = func(ctx context.Context, ts time.Time) (model.Samples, error) {
542		select {
543		case <-block:
544		case <-ctx.Done():
545			return nil, ctx.Err()
546		}
547		return nil, nil
548	}
549
550	ctx, cancel = context.WithCancel(context.Background())
551	sl = newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{})
552
553	go func() {
554		sl.run(time.Second, 100*time.Millisecond, errc)
555		signal <- struct{}{}
556	}()
557
558	select {
559	case err := <-errc:
560		if err != context.DeadlineExceeded {
561			t.Fatalf("Expected timeout error but got: %s", err)
562		}
563	case <-time.After(3 * time.Second):
564		t.Fatalf("Expected timeout error but got none")
565	}
566
567	// We already caught the timeout error and are certainly in the loop.
568	// Let the scrapes returns immediately to cause no further timeout errors
569	// and check whether canceling the parent context terminates the loop.
570	close(block)
571	cancel()
572
573	select {
574	case <-signal:
575		// Loop terminated as expected.
576	case err := <-errc:
577		t.Fatalf("Unexpected error: %s", err)
578	case <-time.After(3 * time.Second):
579		t.Fatalf("Loop did not terminate on context cancelation")
580	}
581}
582
583func TestTargetScraperScrapeOK(t *testing.T) {
584	const (
585		configTimeout   = 1500 * time.Millisecond
586		expectedTimeout = "1.500000"
587	)
588
589	server := httptest.NewServer(
590		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
591			timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
592			if timeout != expectedTimeout {
593				t.Errorf("Scrape timeout did not match expected timeout")
594				t.Errorf("Expected: %v", expectedTimeout)
595				t.Fatalf("Got: %v", timeout)
596			}
597
598			w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
599			w.Write([]byte("metric_a 1\nmetric_b 2\n"))
600		}),
601	)
602	defer server.Close()
603
604	serverURL, err := url.Parse(server.URL)
605	if err != nil {
606		panic(err)
607	}
608
609	ts := &targetScraper{
610		Target: &Target{
611			labels: model.LabelSet{
612				model.SchemeLabel:  model.LabelValue(serverURL.Scheme),
613				model.AddressLabel: model.LabelValue(serverURL.Host),
614			},
615		},
616		client:  http.DefaultClient,
617		timeout: configTimeout,
618	}
619	now := time.Now()
620
621	samples, err := ts.scrape(context.Background(), now)
622	if err != nil {
623		t.Fatalf("Unexpected scrape error: %s", err)
624	}
625
626	expectedSamples := model.Samples{
627		{
628			Metric:    model.Metric{"__name__": "metric_a"},
629			Timestamp: model.TimeFromUnixNano(now.UnixNano()),
630			Value:     1,
631		},
632		{
633			Metric:    model.Metric{"__name__": "metric_b"},
634			Timestamp: model.TimeFromUnixNano(now.UnixNano()),
635			Value:     2,
636		},
637	}
638	sort.Sort(expectedSamples)
639	sort.Sort(samples)
640
641	if !reflect.DeepEqual(samples, expectedSamples) {
642		t.Errorf("Scraped samples did not match served metrics")
643		t.Errorf("Expected: %v", expectedSamples)
644		t.Fatalf("Got: %v", samples)
645	}
646}
647
648func TestTargetScrapeScrapeCancel(t *testing.T) {
649	block := make(chan struct{})
650
651	server := httptest.NewServer(
652		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
653			<-block
654		}),
655	)
656	defer server.Close()
657
658	serverURL, err := url.Parse(server.URL)
659	if err != nil {
660		panic(err)
661	}
662
663	ts := &targetScraper{
664		Target: &Target{
665			labels: model.LabelSet{
666				model.SchemeLabel:  model.LabelValue(serverURL.Scheme),
667				model.AddressLabel: model.LabelValue(serverURL.Host),
668			},
669		},
670		client: http.DefaultClient,
671	}
672	ctx, cancel := context.WithCancel(context.Background())
673
674	errc := make(chan error)
675
676	go func() {
677		time.Sleep(1 * time.Second)
678		cancel()
679	}()
680
681	go func() {
682		if _, err := ts.scrape(ctx, time.Now()); err != context.Canceled {
683			errc <- fmt.Errorf("Expected context cancelation error but got: %s", err)
684		}
685		close(errc)
686	}()
687
688	select {
689	case <-time.After(5 * time.Second):
690		t.Fatalf("Scrape function did not return unexpectedly")
691	case err := <-errc:
692		if err != nil {
693			t.Fatalf(err.Error())
694		}
695	}
696	// If this is closed in a defer above the function the test server
697	// does not terminate and the test doens't complete.
698	close(block)
699}
700
701func TestTargetScrapeScrapeNotFound(t *testing.T) {
702	server := httptest.NewServer(
703		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
704			w.WriteHeader(http.StatusNotFound)
705		}),
706	)
707	defer server.Close()
708
709	serverURL, err := url.Parse(server.URL)
710	if err != nil {
711		panic(err)
712	}
713
714	ts := &targetScraper{
715		Target: &Target{
716			labels: model.LabelSet{
717				model.SchemeLabel:  model.LabelValue(serverURL.Scheme),
718				model.AddressLabel: model.LabelValue(serverURL.Host),
719			},
720		},
721		client: http.DefaultClient,
722	}
723
724	if _, err := ts.scrape(context.Background(), time.Now()); !strings.Contains(err.Error(), "404") {
725		t.Fatalf("Expected \"404 NotFound\" error but got: %s", err)
726	}
727}
728
729// testScraper implements the scraper interface and allows setting values
730// returned by its methods. It also allows setting a custom scrape function.
731type testScraper struct {
732	offsetDur time.Duration
733
734	lastStart    time.Time
735	lastDuration time.Duration
736	lastError    error
737
738	samples    model.Samples
739	scrapeErr  error
740	scrapeFunc func(context.Context, time.Time) (model.Samples, error)
741}
742
743func (ts *testScraper) offset(interval time.Duration) time.Duration {
744	return ts.offsetDur
745}
746
747func (ts *testScraper) report(start time.Time, duration time.Duration, err error) {
748	ts.lastStart = start
749	ts.lastDuration = duration
750	ts.lastError = err
751}
752
753func (ts *testScraper) scrape(ctx context.Context, t time.Time) (model.Samples, error) {
754	if ts.scrapeFunc != nil {
755		return ts.scrapeFunc(ctx, t)
756	}
757	return ts.samples, ts.scrapeErr
758}
759