1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package targets
5
6import (
7	"context"
8	"fmt"
9	"net/http"
10	"net/url"
11	"testing"
12	"time"
13
14	"github.com/go-kit/kit/log"
15	"github.com/gogo/protobuf/proto"
16	"github.com/pkg/errors"
17	"github.com/prometheus/prometheus/pkg/labels"
18	"github.com/prometheus/prometheus/storage"
19	"github.com/thanos-io/thanos/pkg/promclient"
20	"github.com/thanos-io/thanos/pkg/runutil"
21	"github.com/thanos-io/thanos/pkg/store/labelpb"
22	"github.com/thanos-io/thanos/pkg/targets/targetspb"
23	"github.com/thanos-io/thanos/pkg/testutil"
24	"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
25)
26
27func TestPrometheus_Targets_e2e(t *testing.T) {
28	p, err := e2eutil.NewPrometheus()
29	testutil.Ok(t, err)
30	defer func() { testutil.Ok(t, p.Stop()) }()
31
32	p.SetConfig(fmt.Sprintf(`
33global:
34  external_labels:
35    region: eu-west
36scrape_configs:
37- job_name: 'myself'
38  # Quick scrapes for test purposes.
39  scrape_interval: 1s
40  scrape_timeout: 1s
41  static_configs:
42  - targets: ['%s','localhost:80']
43  relabel_configs:
44  - source_labels: ['__address__']
45    regex: '^.+:80$'
46    action: drop
47`, e2eutil.PromAddrPlaceHolder))
48	testutil.Ok(t, p.Start())
49
50	ctx, cancel := context.WithCancel(context.Background())
51	defer cancel()
52
53	upctx, upcancel := context.WithTimeout(ctx, 10*time.Second)
54	defer upcancel()
55
56	logger := log.NewNopLogger()
57	err = p.WaitPrometheusUp(upctx, logger)
58	testutil.Ok(t, err)
59
60	u, err := url.Parse("http://" + p.Addr())
61	testutil.Ok(t, err)
62
63	c := promclient.NewClient(http.DefaultClient, logger, "")
64
65	// Wait targets response to be ready as Prometheus scrapes targets.
66	testutil.Ok(t, runutil.Retry(3*time.Second, ctx.Done(), func() error {
67		targets, err := c.TargetsInGRPC(ctx, u, "")
68		testutil.Ok(t, err)
69		if len(targets.ActiveTargets) > 0 {
70			return nil
71		}
72		return errors.New("empty targets response from Prometheus")
73	}))
74
75	promTargets := NewPrometheus(u, promclient.NewDefaultClient(), func() labels.Labels {
76		return labels.FromStrings("replica", "test1")
77	})
78
79	expected := &targetspb.TargetDiscovery{
80		ActiveTargets: []*targetspb.ActiveTarget{
81			{
82				DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
83					{Name: "__address__", Value: p.Addr()},
84					{Name: "__metrics_path__", Value: "/metrics"},
85					{Name: "__scheme__", Value: "http"},
86					{Name: "job", Value: "myself"},
87					{Name: "replica", Value: "test1"},
88				}},
89				Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
90					{Name: "instance", Value: p.Addr()},
91					{Name: "job", Value: "myself"},
92					{Name: "replica", Value: "test1"},
93				}},
94				ScrapePool:         "myself",
95				ScrapeUrl:          fmt.Sprintf("http://%s/metrics", p.Addr()),
96				GlobalUrl:          "",
97				Health:             targetspb.TargetHealth_UP,
98				LastScrape:         time.Time{},
99				LastScrapeDuration: 0,
100			},
101		},
102		DroppedTargets: []*targetspb.DroppedTarget{
103			{
104				DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
105					{Name: "__address__", Value: "localhost:80"},
106					{Name: "__metrics_path__", Value: "/metrics"},
107					{Name: "__scheme__", Value: "http"},
108					{Name: "job", Value: "myself"},
109					{Name: "replica", Value: "test1"},
110				}},
111			},
112		},
113	}
114
115	grpcClient := NewGRPCClientWithDedup(promTargets, nil)
116	for _, tcase := range []struct {
117		requestedState targetspb.TargetsRequest_State
118		expectedErr    error
119	}{
120		{
121			requestedState: targetspb.TargetsRequest_ANY,
122		},
123		{
124			requestedState: targetspb.TargetsRequest_ACTIVE,
125		},
126		{
127			requestedState: targetspb.TargetsRequest_DROPPED,
128		},
129	} {
130		t.Run(tcase.requestedState.String(), func(t *testing.T) {
131			targets, w, err := grpcClient.Targets(context.Background(), &targetspb.TargetsRequest{
132				State: tcase.requestedState,
133			})
134			testutil.Equals(t, storage.Warnings(nil), w)
135			if tcase.expectedErr != nil {
136				testutil.NotOk(t, err)
137				testutil.Equals(t, tcase.expectedErr.Error(), err.Error())
138				return
139			}
140			testutil.Ok(t, err)
141
142			expectedTargets := proto.Clone(expected).(*targetspb.TargetDiscovery)
143
144			switch tcase.requestedState {
145			case targetspb.TargetsRequest_ACTIVE:
146				expectedTargets.DroppedTargets = expectedTargets.DroppedTargets[:0]
147			case targetspb.TargetsRequest_DROPPED:
148				expectedTargets.ActiveTargets = expectedTargets.ActiveTargets[:0]
149			}
150
151			for i := range targets.ActiveTargets {
152				targets.ActiveTargets[i].LastScrapeDuration = 0
153				targets.ActiveTargets[i].LastScrape = time.Time{}
154				targets.ActiveTargets[i].LastError = ""
155				targets.ActiveTargets[i].GlobalUrl = ""
156			}
157
158			testutil.Equals(t, expectedTargets.ActiveTargets, targets.ActiveTargets)
159			testutil.Equals(t, expectedTargets.DroppedTargets, targets.DroppedTargets)
160		})
161	}
162}
163