1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package e2e_test
5
6import (
7	"context"
8	"fmt"
9	"net/http/httptest"
10	"net/url"
11	"os"
12	"path/filepath"
13	"sort"
14	"strings"
15	"testing"
16	"time"
17
18	"github.com/chromedp/cdproto/network"
19	"github.com/chromedp/chromedp"
20	"github.com/cortexproject/cortex/integration/e2e"
21	"github.com/go-kit/kit/log"
22	"github.com/pkg/errors"
23	"github.com/prometheus/common/model"
24	"github.com/prometheus/prometheus/pkg/labels"
25	"github.com/prometheus/prometheus/pkg/timestamp"
26	"github.com/thanos-io/thanos/pkg/store/storepb"
27
28	"github.com/thanos-io/thanos/pkg/promclient"
29	"github.com/thanos-io/thanos/pkg/runutil"
30	"github.com/thanos-io/thanos/pkg/testutil"
31	"github.com/thanos-io/thanos/test/e2e/e2ethanos"
32)
33
34// NOTE: by using aggregation all results are now unsorted.
35const queryUpWithoutInstance = "sum(up) without (instance)"
36
37// defaultPromConfig returns Prometheus config that sets Prometheus to:
38// * expose 2 external labels, source and replica.
39// * scrape fake target. This will produce up == 0 metric which we can assert on.
40// * optionally remote write endpoint to write into.
41func defaultPromConfig(name string, replica int, remoteWriteEndpoint, ruleFile string, scrapeTargets ...string) string {
42	targets := "localhost:9090"
43	if len(scrapeTargets) > 0 {
44		targets = strings.Join(scrapeTargets, ",")
45	}
46	config := fmt.Sprintf(`
47global:
48  external_labels:
49    prometheus: %v
50    replica: %v
51scrape_configs:
52- job_name: 'myself'
53  # Quick scrapes for test purposes.
54  scrape_interval: 1s
55  scrape_timeout: 1s
56  static_configs:
57  - targets: [%s]
58  relabel_configs:
59  - source_labels: ['__address__']
60    regex: '^.+:80$'
61    action: drop
62`, name, replica, targets)
63
64	if remoteWriteEndpoint != "" {
65		config = fmt.Sprintf(`
66%s
67remote_write:
68- url: "%s"
69  # Don't spam receiver on mistake.
70  queue_config:
71    min_backoff: 2s
72    max_backoff: 10s
73`, config, remoteWriteEndpoint)
74	}
75
76	if ruleFile != "" {
77		config = fmt.Sprintf(`
78%s
79rule_files:
80-  "%s"
81`, config, ruleFile)
82	}
83
84	return config
85}
86
87func sortResults(res model.Vector) {
88	sort.Slice(res, func(i, j int) bool {
89		return res[i].String() < res[j].String()
90	})
91}
92
93func TestQuery(t *testing.T) {
94	t.Parallel()
95
96	s, err := e2e.NewScenario("e2e_test_query")
97	testutil.Ok(t, err)
98	t.Cleanup(e2ethanos.CleanScenario(t, s))
99
100	receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1)
101	testutil.Ok(t, err)
102	testutil.Ok(t, s.StartAndWaitReady(receiver))
103
104	prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage())
105	testutil.Ok(t, err)
106	prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
107	testutil.Ok(t, err)
108	prom3, sidecar3, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "ha1", defaultPromConfig("prom-ha", 0, "", filepath.Join(e2e.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage())
109	testutil.Ok(t, err)
110	prom4, sidecar4, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "ha2", defaultPromConfig("prom-ha", 1, "", filepath.Join(e2e.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage())
111	testutil.Ok(t, err)
112	testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2, prom3, sidecar3, prom4, sidecar4))
113
114	// Querier. Both fileSD and directly by flags.
115	q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}, []string{sidecar3.GRPCNetworkEndpoint(), sidecar4.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "")
116	testutil.Ok(t, err)
117	testutil.Ok(t, s.StartAndWaitReady(q))
118
119	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
120	t.Cleanup(cancel)
121
122	testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(5), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics))
123
124	queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{
125		Deduplicate: false,
126	}, []model.Metric{
127		{
128			"job":        "myself",
129			"prometheus": "prom-alone",
130			"replica":    "0",
131		},
132		{
133			"job":        "myself",
134			"prometheus": "prom-both-remote-write-and-sidecar",
135			"receive":    "1",
136			"replica":    "1234",
137			"tenant_id":  "default-tenant",
138		},
139		{
140			"job":        "myself",
141			"prometheus": "prom-both-remote-write-and-sidecar",
142			"replica":    "1234",
143		},
144		{
145			"job":        "myself",
146			"prometheus": "prom-ha",
147			"replica":    "0",
148		},
149		{
150			"job":        "myself",
151			"prometheus": "prom-ha",
152			"replica":    "1",
153		},
154	})
155
156	// With deduplication.
157	queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{
158		Deduplicate: true,
159	}, []model.Metric{
160		{
161			"job":        "myself",
162			"prometheus": "prom-alone",
163		},
164		{
165			"job":        "myself",
166			"prometheus": "prom-both-remote-write-and-sidecar",
167			"receive":    "1",
168			"tenant_id":  "default-tenant",
169		},
170		{
171			"job":        "myself",
172			"prometheus": "prom-both-remote-write-and-sidecar",
173		},
174		{
175			"job":        "myself",
176			"prometheus": "prom-ha",
177		},
178	})
179}
180
181func TestQueryExternalPrefixWithoutReverseProxy(t *testing.T) {
182	t.Parallel()
183
184	s, err := e2e.NewScenario("e2e_test_query_route_prefix")
185	testutil.Ok(t, err)
186	t.Cleanup(e2ethanos.CleanScenario(t, s))
187
188	externalPrefix := "test"
189
190	q, err := e2ethanos.NewQuerier(
191		s.SharedDir(), "1",
192		nil,
193		nil,
194		nil,
195		nil,
196		nil,
197		nil,
198		"",
199		externalPrefix,
200	)
201	testutil.Ok(t, err)
202	testutil.Ok(t, s.StartAndWaitReady(q))
203
204	checkNetworkRequests(t, "http://"+q.HTTPEndpoint()+"/"+externalPrefix+"/graph")
205}
206
207func TestQueryExternalPrefix(t *testing.T) {
208	t.Parallel()
209
210	s, err := e2e.NewScenario("e2e_test_query_external_prefix")
211	testutil.Ok(t, err)
212	t.Cleanup(e2ethanos.CleanScenario(t, s))
213
214	externalPrefix := "thanos"
215
216	q, err := e2ethanos.NewQuerier(
217		s.SharedDir(), "1",
218		nil,
219		nil,
220		nil,
221		nil,
222		nil,
223		nil,
224		"",
225		externalPrefix,
226	)
227	testutil.Ok(t, err)
228	testutil.Ok(t, s.StartAndWaitReady(q))
229
230	querierURL := mustURLParse(t, "http://"+q.HTTPEndpoint()+"/"+externalPrefix)
231
232	querierProxy := httptest.NewServer(e2ethanos.NewSingleHostReverseProxy(querierURL, externalPrefix))
233	t.Cleanup(querierProxy.Close)
234
235	checkNetworkRequests(t, querierProxy.URL+"/"+externalPrefix+"/graph")
236}
237
238func TestQueryExternalPrefixAndRoutePrefix(t *testing.T) {
239	t.Parallel()
240
241	s, err := e2e.NewScenario("e2e_test_query_external_prefix_and_route_prefix")
242	testutil.Ok(t, err)
243	t.Cleanup(e2ethanos.CleanScenario(t, s))
244
245	externalPrefix := "thanos"
246	routePrefix := "test"
247
248	q, err := e2ethanos.NewQuerier(
249		s.SharedDir(), "1",
250		nil,
251		nil,
252		nil,
253		nil,
254		nil,
255		nil,
256		routePrefix,
257		externalPrefix,
258	)
259	testutil.Ok(t, err)
260	testutil.Ok(t, s.StartAndWaitReady(q))
261
262	querierURL := mustURLParse(t, "http://"+q.HTTPEndpoint()+"/"+routePrefix)
263
264	querierProxy := httptest.NewServer(e2ethanos.NewSingleHostReverseProxy(querierURL, externalPrefix))
265	t.Cleanup(querierProxy.Close)
266
267	checkNetworkRequests(t, querierProxy.URL+"/"+externalPrefix+"/graph")
268}
269
270func TestQueryLabelNames(t *testing.T) {
271	t.Parallel()
272
273	s, err := e2e.NewScenario("e2e_test_query_label_names")
274	testutil.Ok(t, err)
275	t.Cleanup(e2ethanos.CleanScenario(t, s))
276
277	receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1)
278	testutil.Ok(t, err)
279	testutil.Ok(t, s.StartAndWaitReady(receiver))
280
281	prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), s.NetworkName(), "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage())
282	testutil.Ok(t, err)
283	prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), s.NetworkName(), "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
284	testutil.Ok(t, err)
285	testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2))
286
287	q, err := e2ethanos.NewQuerier(
288		s.SharedDir(),
289		"1",
290		[]string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()},
291		[]string{},
292		nil,
293		nil,
294		nil,
295		nil,
296		"",
297		"",
298	)
299
300	testutil.Ok(t, err)
301	testutil.Ok(t, s.StartAndWaitReady(q))
302
303	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
304	t.Cleanup(cancel)
305
306	now := time.Now()
307	labelNames(t, ctx, q.HTTPEndpoint(), nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
308		return len(res) > 0
309	})
310
311	// Outside time range.
312	labelNames(t, ctx, q.HTTPEndpoint(), nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(-23*time.Hour)), func(res []string) bool {
313		return len(res) == 0
314	})
315
316	labelNames(t, ctx, q.HTTPEndpoint(), []storepb.LabelMatcher{{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "up"}},
317		timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
318			// Expected result: [__name__, instance, job, prometheus, replica]
319			return len(res) == 7
320		},
321	)
322
323	// There is no matched series.
324	labelNames(t, ctx, q.HTTPEndpoint(), []storepb.LabelMatcher{{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "foobar"}},
325		timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
326			return len(res) == 0
327		},
328	)
329}
330
331func TestQueryLabelValues(t *testing.T) {
332	t.Parallel()
333
334	s, err := e2e.NewScenario("e2e_test_query_label_values")
335	testutil.Ok(t, err)
336	t.Cleanup(e2ethanos.CleanScenario(t, s))
337
338	receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1)
339	testutil.Ok(t, err)
340	testutil.Ok(t, s.StartAndWaitReady(receiver))
341
342	prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), s.NetworkName(), "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage())
343	testutil.Ok(t, err)
344	prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), s.NetworkName(), "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
345	testutil.Ok(t, err)
346	testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2))
347
348	q, err := e2ethanos.NewQuerier(
349		s.SharedDir(),
350		"1",
351		[]string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()},
352		[]string{},
353		nil,
354		nil,
355		nil,
356		nil,
357		"",
358		"",
359	)
360
361	testutil.Ok(t, err)
362	testutil.Ok(t, s.StartAndWaitReady(q))
363
364	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
365	t.Cleanup(cancel)
366
367	now := time.Now()
368	labelValues(t, ctx, q.HTTPEndpoint(), "instance", nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
369		return len(res) == 1 && res[0] == "localhost:9090"
370	})
371
372	// Outside time range.
373	labelValues(t, ctx, q.HTTPEndpoint(), "instance", nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(-23*time.Hour)), func(res []string) bool {
374		return len(res) == 0
375	})
376
377	labelValues(t, ctx, q.HTTPEndpoint(), "__name__", []storepb.LabelMatcher{{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "up"}},
378		timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
379			return len(res) == 1 && res[0] == "up"
380		},
381	)
382
383	labelValues(t, ctx, q.HTTPEndpoint(), "__name__", []storepb.LabelMatcher{{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "foobar"}},
384		timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
385			return len(res) == 0
386		},
387	)
388}
389
390func checkNetworkRequests(t *testing.T, addr string) {
391	ctx, cancel := chromedp.NewContext(context.Background())
392	t.Cleanup(cancel)
393
394	testutil.Ok(t, runutil.Retry(1*time.Minute, ctx.Done(), func() error {
395		var networkErrors []string
396
397		// Listen for failed network requests and push them to an array.
398		chromedp.ListenTarget(ctx, func(ev interface{}) {
399			switch ev := ev.(type) {
400			case *network.EventLoadingFailed:
401				networkErrors = append(networkErrors, ev.ErrorText)
402			}
403		})
404
405		err := chromedp.Run(ctx,
406			network.Enable(),
407			chromedp.Navigate(addr),
408			chromedp.WaitVisible(`body`),
409		)
410
411		if err != nil {
412			return err
413		}
414
415		if len(networkErrors) > 0 {
416			err = fmt.Errorf("some network requests failed: %s", strings.Join(networkErrors, "; "))
417		}
418		return err
419	}))
420}
421
422func mustURLParse(t *testing.T, addr string) *url.URL {
423	u, err := url.Parse(addr)
424	testutil.Ok(t, err)
425
426	return u
427}
428
429func instantQuery(t *testing.T, ctx context.Context, addr string, q string, opts promclient.QueryOptions, expectedSeriesLen int) model.Vector {
430	t.Helper()
431
432	fmt.Println("queryAndAssert: Waiting for", expectedSeriesLen, "results for query", q)
433	var result model.Vector
434
435	logger := log.NewLogfmtLogger(os.Stdout)
436	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
437	testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error {
438		res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, mustURLParse(t, "http://"+addr), q, time.Now(), opts)
439		if err != nil {
440			return err
441		}
442
443		if len(warnings) > 0 {
444			return errors.Errorf("unexpected warnings %s", warnings)
445		}
446
447		if len(res) != expectedSeriesLen {
448			return errors.Errorf("unexpected result size, expected %d; result %d: %v", expectedSeriesLen, len(res), res)
449		}
450		result = res
451		return nil
452	}))
453	sortResults(result)
454	return result
455}
456
457func queryAndAssertSeries(t *testing.T, ctx context.Context, addr string, q string, opts promclient.QueryOptions, expected []model.Metric) {
458	t.Helper()
459
460	result := instantQuery(t, ctx, addr, q, opts, len(expected))
461	for i, exp := range expected {
462		testutil.Equals(t, exp, result[i].Metric)
463	}
464}
465
466func queryAndAssert(t *testing.T, ctx context.Context, addr string, q string, opts promclient.QueryOptions, expected model.Vector) {
467	t.Helper()
468
469	sortResults(expected)
470	result := instantQuery(t, ctx, addr, q, opts, len(expected))
471	for _, r := range result {
472		r.Timestamp = 0 // Does not matter for us.
473	}
474	testutil.Equals(t, expected, result)
475}
476
477func labelNames(t *testing.T, ctx context.Context, addr string, matchers []storepb.LabelMatcher, start, end int64, check func(res []string) bool) {
478	t.Helper()
479
480	logger := log.NewLogfmtLogger(os.Stdout)
481	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
482	testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error {
483		res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, mustURLParse(t, "http://"+addr), matchers, start, end)
484		if err != nil {
485			return err
486		}
487		if check(res) {
488			return nil
489		}
490
491		return errors.Errorf("unexpected results %v", res)
492	}))
493}
494
495//nolint:unparam
496func labelValues(t *testing.T, ctx context.Context, addr, label string, matchers []storepb.LabelMatcher, start, end int64, check func(res []string) bool) {
497	t.Helper()
498
499	logger := log.NewLogfmtLogger(os.Stdout)
500	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
501	testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error {
502		res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, mustURLParse(t, "http://"+addr), label, matchers, start, end)
503		if err != nil {
504			return err
505		}
506		if check(res) {
507			return nil
508		}
509
510		return errors.Errorf("unexpected results %v", res)
511	}))
512}
513
514func series(t *testing.T, ctx context.Context, addr string, matchers []*labels.Matcher, start int64, end int64, check func(res []map[string]string) bool) {
515	t.Helper()
516
517	logger := log.NewLogfmtLogger(os.Stdout)
518	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
519	testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error {
520		res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, mustURLParse(t, "http://"+addr), matchers, start, end)
521		if err != nil {
522			return err
523		}
524		if check(res) {
525			return nil
526		}
527
528		return errors.Errorf("unexpected results %v", res)
529	}))
530}
531
532//nolint:unparam
533func rangeQuery(t *testing.T, ctx context.Context, addr string, q string, start, end, step int64, opts promclient.QueryOptions, check func(res model.Matrix) bool) {
534	t.Helper()
535
536	logger := log.NewLogfmtLogger(os.Stdout)
537	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
538	testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error {
539		res, warnings, err := promclient.NewDefaultClient().QueryRange(ctx, mustURLParse(t, "http://"+addr), q, start, end, step, opts)
540		if err != nil {
541			return err
542		}
543
544		if len(warnings) > 0 {
545			return errors.Errorf("unexpected warnings %s", warnings)
546		}
547
548		if check(res) {
549			return nil
550		}
551
552		return errors.Errorf("unexpected results size %d", len(res))
553	}))
554}
555