1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package e2e_test 5 6import ( 7 "context" 8 "fmt" 9 "net/http/httptest" 10 "net/url" 11 "os" 12 "path/filepath" 13 "sort" 14 "strings" 15 "testing" 16 "time" 17 18 "github.com/chromedp/cdproto/network" 19 "github.com/chromedp/chromedp" 20 "github.com/cortexproject/cortex/integration/e2e" 21 "github.com/go-kit/kit/log" 22 "github.com/pkg/errors" 23 "github.com/prometheus/common/model" 24 "github.com/prometheus/prometheus/pkg/labels" 25 "github.com/prometheus/prometheus/pkg/timestamp" 26 "github.com/thanos-io/thanos/pkg/store/storepb" 27 28 "github.com/thanos-io/thanos/pkg/promclient" 29 "github.com/thanos-io/thanos/pkg/runutil" 30 "github.com/thanos-io/thanos/pkg/testutil" 31 "github.com/thanos-io/thanos/test/e2e/e2ethanos" 32) 33 34// NOTE: by using aggregation all results are now unsorted. 35const queryUpWithoutInstance = "sum(up) without (instance)" 36 37// defaultPromConfig returns Prometheus config that sets Prometheus to: 38// * expose 2 external labels, source and replica. 39// * scrape fake target. This will produce up == 0 metric which we can assert on. 40// * optionally remote write endpoint to write into. 41func defaultPromConfig(name string, replica int, remoteWriteEndpoint, ruleFile string, scrapeTargets ...string) string { 42 targets := "localhost:9090" 43 if len(scrapeTargets) > 0 { 44 targets = strings.Join(scrapeTargets, ",") 45 } 46 config := fmt.Sprintf(` 47global: 48 external_labels: 49 prometheus: %v 50 replica: %v 51scrape_configs: 52- job_name: 'myself' 53 # Quick scrapes for test purposes. 54 scrape_interval: 1s 55 scrape_timeout: 1s 56 static_configs: 57 - targets: [%s] 58 relabel_configs: 59 - source_labels: ['__address__'] 60 regex: '^.+:80$' 61 action: drop 62`, name, replica, targets) 63 64 if remoteWriteEndpoint != "" { 65 config = fmt.Sprintf(` 66%s 67remote_write: 68- url: "%s" 69 # Don't spam receiver on mistake. 70 queue_config: 71 min_backoff: 2s 72 max_backoff: 10s 73`, config, remoteWriteEndpoint) 74 } 75 76 if ruleFile != "" { 77 config = fmt.Sprintf(` 78%s 79rule_files: 80- "%s" 81`, config, ruleFile) 82 } 83 84 return config 85} 86 87func sortResults(res model.Vector) { 88 sort.Slice(res, func(i, j int) bool { 89 return res[i].String() < res[j].String() 90 }) 91} 92 93func TestQuery(t *testing.T) { 94 t.Parallel() 95 96 s, err := e2e.NewScenario("e2e_test_query") 97 testutil.Ok(t, err) 98 t.Cleanup(e2ethanos.CleanScenario(t, s)) 99 100 receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) 101 testutil.Ok(t, err) 102 testutil.Ok(t, s.StartAndWaitReady(receiver)) 103 104 prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage()) 105 testutil.Ok(t, err) 106 prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 107 testutil.Ok(t, err) 108 prom3, sidecar3, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "ha1", defaultPromConfig("prom-ha", 0, "", filepath.Join(e2e.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage()) 109 testutil.Ok(t, err) 110 prom4, sidecar4, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), "e2e_test_query", "ha2", defaultPromConfig("prom-ha", 1, "", filepath.Join(e2e.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage()) 111 testutil.Ok(t, err) 112 testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2, prom3, sidecar3, prom4, sidecar4)) 113 114 // Querier. Both fileSD and directly by flags. 115 q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}, []string{sidecar3.GRPCNetworkEndpoint(), sidecar4.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") 116 testutil.Ok(t, err) 117 testutil.Ok(t, s.StartAndWaitReady(q)) 118 119 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) 120 t.Cleanup(cancel) 121 122 testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(5), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) 123 124 queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ 125 Deduplicate: false, 126 }, []model.Metric{ 127 { 128 "job": "myself", 129 "prometheus": "prom-alone", 130 "replica": "0", 131 }, 132 { 133 "job": "myself", 134 "prometheus": "prom-both-remote-write-and-sidecar", 135 "receive": "1", 136 "replica": "1234", 137 "tenant_id": "default-tenant", 138 }, 139 { 140 "job": "myself", 141 "prometheus": "prom-both-remote-write-and-sidecar", 142 "replica": "1234", 143 }, 144 { 145 "job": "myself", 146 "prometheus": "prom-ha", 147 "replica": "0", 148 }, 149 { 150 "job": "myself", 151 "prometheus": "prom-ha", 152 "replica": "1", 153 }, 154 }) 155 156 // With deduplication. 157 queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ 158 Deduplicate: true, 159 }, []model.Metric{ 160 { 161 "job": "myself", 162 "prometheus": "prom-alone", 163 }, 164 { 165 "job": "myself", 166 "prometheus": "prom-both-remote-write-and-sidecar", 167 "receive": "1", 168 "tenant_id": "default-tenant", 169 }, 170 { 171 "job": "myself", 172 "prometheus": "prom-both-remote-write-and-sidecar", 173 }, 174 { 175 "job": "myself", 176 "prometheus": "prom-ha", 177 }, 178 }) 179} 180 181func TestQueryExternalPrefixWithoutReverseProxy(t *testing.T) { 182 t.Parallel() 183 184 s, err := e2e.NewScenario("e2e_test_query_route_prefix") 185 testutil.Ok(t, err) 186 t.Cleanup(e2ethanos.CleanScenario(t, s)) 187 188 externalPrefix := "test" 189 190 q, err := e2ethanos.NewQuerier( 191 s.SharedDir(), "1", 192 nil, 193 nil, 194 nil, 195 nil, 196 nil, 197 nil, 198 "", 199 externalPrefix, 200 ) 201 testutil.Ok(t, err) 202 testutil.Ok(t, s.StartAndWaitReady(q)) 203 204 checkNetworkRequests(t, "http://"+q.HTTPEndpoint()+"/"+externalPrefix+"/graph") 205} 206 207func TestQueryExternalPrefix(t *testing.T) { 208 t.Parallel() 209 210 s, err := e2e.NewScenario("e2e_test_query_external_prefix") 211 testutil.Ok(t, err) 212 t.Cleanup(e2ethanos.CleanScenario(t, s)) 213 214 externalPrefix := "thanos" 215 216 q, err := e2ethanos.NewQuerier( 217 s.SharedDir(), "1", 218 nil, 219 nil, 220 nil, 221 nil, 222 nil, 223 nil, 224 "", 225 externalPrefix, 226 ) 227 testutil.Ok(t, err) 228 testutil.Ok(t, s.StartAndWaitReady(q)) 229 230 querierURL := mustURLParse(t, "http://"+q.HTTPEndpoint()+"/"+externalPrefix) 231 232 querierProxy := httptest.NewServer(e2ethanos.NewSingleHostReverseProxy(querierURL, externalPrefix)) 233 t.Cleanup(querierProxy.Close) 234 235 checkNetworkRequests(t, querierProxy.URL+"/"+externalPrefix+"/graph") 236} 237 238func TestQueryExternalPrefixAndRoutePrefix(t *testing.T) { 239 t.Parallel() 240 241 s, err := e2e.NewScenario("e2e_test_query_external_prefix_and_route_prefix") 242 testutil.Ok(t, err) 243 t.Cleanup(e2ethanos.CleanScenario(t, s)) 244 245 externalPrefix := "thanos" 246 routePrefix := "test" 247 248 q, err := e2ethanos.NewQuerier( 249 s.SharedDir(), "1", 250 nil, 251 nil, 252 nil, 253 nil, 254 nil, 255 nil, 256 routePrefix, 257 externalPrefix, 258 ) 259 testutil.Ok(t, err) 260 testutil.Ok(t, s.StartAndWaitReady(q)) 261 262 querierURL := mustURLParse(t, "http://"+q.HTTPEndpoint()+"/"+routePrefix) 263 264 querierProxy := httptest.NewServer(e2ethanos.NewSingleHostReverseProxy(querierURL, externalPrefix)) 265 t.Cleanup(querierProxy.Close) 266 267 checkNetworkRequests(t, querierProxy.URL+"/"+externalPrefix+"/graph") 268} 269 270func TestQueryLabelNames(t *testing.T) { 271 t.Parallel() 272 273 s, err := e2e.NewScenario("e2e_test_query_label_names") 274 testutil.Ok(t, err) 275 t.Cleanup(e2ethanos.CleanScenario(t, s)) 276 277 receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) 278 testutil.Ok(t, err) 279 testutil.Ok(t, s.StartAndWaitReady(receiver)) 280 281 prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), s.NetworkName(), "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage()) 282 testutil.Ok(t, err) 283 prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), s.NetworkName(), "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 284 testutil.Ok(t, err) 285 testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) 286 287 q, err := e2ethanos.NewQuerier( 288 s.SharedDir(), 289 "1", 290 []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}, 291 []string{}, 292 nil, 293 nil, 294 nil, 295 nil, 296 "", 297 "", 298 ) 299 300 testutil.Ok(t, err) 301 testutil.Ok(t, s.StartAndWaitReady(q)) 302 303 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) 304 t.Cleanup(cancel) 305 306 now := time.Now() 307 labelNames(t, ctx, q.HTTPEndpoint(), nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { 308 return len(res) > 0 309 }) 310 311 // Outside time range. 312 labelNames(t, ctx, q.HTTPEndpoint(), nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(-23*time.Hour)), func(res []string) bool { 313 return len(res) == 0 314 }) 315 316 labelNames(t, ctx, q.HTTPEndpoint(), []storepb.LabelMatcher{{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "up"}}, 317 timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { 318 // Expected result: [__name__, instance, job, prometheus, replica] 319 return len(res) == 7 320 }, 321 ) 322 323 // There is no matched series. 324 labelNames(t, ctx, q.HTTPEndpoint(), []storepb.LabelMatcher{{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "foobar"}}, 325 timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { 326 return len(res) == 0 327 }, 328 ) 329} 330 331func TestQueryLabelValues(t *testing.T) { 332 t.Parallel() 333 334 s, err := e2e.NewScenario("e2e_test_query_label_values") 335 testutil.Ok(t, err) 336 t.Cleanup(e2ethanos.CleanScenario(t, s)) 337 338 receiver, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) 339 testutil.Ok(t, err) 340 testutil.Ok(t, s.StartAndWaitReady(receiver)) 341 342 prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), s.NetworkName(), "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage()) 343 testutil.Ok(t, err) 344 prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), s.NetworkName(), "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 345 testutil.Ok(t, err) 346 testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) 347 348 q, err := e2ethanos.NewQuerier( 349 s.SharedDir(), 350 "1", 351 []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}, 352 []string{}, 353 nil, 354 nil, 355 nil, 356 nil, 357 "", 358 "", 359 ) 360 361 testutil.Ok(t, err) 362 testutil.Ok(t, s.StartAndWaitReady(q)) 363 364 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) 365 t.Cleanup(cancel) 366 367 now := time.Now() 368 labelValues(t, ctx, q.HTTPEndpoint(), "instance", nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { 369 return len(res) == 1 && res[0] == "localhost:9090" 370 }) 371 372 // Outside time range. 373 labelValues(t, ctx, q.HTTPEndpoint(), "instance", nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(-23*time.Hour)), func(res []string) bool { 374 return len(res) == 0 375 }) 376 377 labelValues(t, ctx, q.HTTPEndpoint(), "__name__", []storepb.LabelMatcher{{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "up"}}, 378 timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { 379 return len(res) == 1 && res[0] == "up" 380 }, 381 ) 382 383 labelValues(t, ctx, q.HTTPEndpoint(), "__name__", []storepb.LabelMatcher{{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "foobar"}}, 384 timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { 385 return len(res) == 0 386 }, 387 ) 388} 389 390func checkNetworkRequests(t *testing.T, addr string) { 391 ctx, cancel := chromedp.NewContext(context.Background()) 392 t.Cleanup(cancel) 393 394 testutil.Ok(t, runutil.Retry(1*time.Minute, ctx.Done(), func() error { 395 var networkErrors []string 396 397 // Listen for failed network requests and push them to an array. 398 chromedp.ListenTarget(ctx, func(ev interface{}) { 399 switch ev := ev.(type) { 400 case *network.EventLoadingFailed: 401 networkErrors = append(networkErrors, ev.ErrorText) 402 } 403 }) 404 405 err := chromedp.Run(ctx, 406 network.Enable(), 407 chromedp.Navigate(addr), 408 chromedp.WaitVisible(`body`), 409 ) 410 411 if err != nil { 412 return err 413 } 414 415 if len(networkErrors) > 0 { 416 err = fmt.Errorf("some network requests failed: %s", strings.Join(networkErrors, "; ")) 417 } 418 return err 419 })) 420} 421 422func mustURLParse(t *testing.T, addr string) *url.URL { 423 u, err := url.Parse(addr) 424 testutil.Ok(t, err) 425 426 return u 427} 428 429func instantQuery(t *testing.T, ctx context.Context, addr string, q string, opts promclient.QueryOptions, expectedSeriesLen int) model.Vector { 430 t.Helper() 431 432 fmt.Println("queryAndAssert: Waiting for", expectedSeriesLen, "results for query", q) 433 var result model.Vector 434 435 logger := log.NewLogfmtLogger(os.Stdout) 436 logger = log.With(logger, "ts", log.DefaultTimestampUTC) 437 testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { 438 res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, mustURLParse(t, "http://"+addr), q, time.Now(), opts) 439 if err != nil { 440 return err 441 } 442 443 if len(warnings) > 0 { 444 return errors.Errorf("unexpected warnings %s", warnings) 445 } 446 447 if len(res) != expectedSeriesLen { 448 return errors.Errorf("unexpected result size, expected %d; result %d: %v", expectedSeriesLen, len(res), res) 449 } 450 result = res 451 return nil 452 })) 453 sortResults(result) 454 return result 455} 456 457func queryAndAssertSeries(t *testing.T, ctx context.Context, addr string, q string, opts promclient.QueryOptions, expected []model.Metric) { 458 t.Helper() 459 460 result := instantQuery(t, ctx, addr, q, opts, len(expected)) 461 for i, exp := range expected { 462 testutil.Equals(t, exp, result[i].Metric) 463 } 464} 465 466func queryAndAssert(t *testing.T, ctx context.Context, addr string, q string, opts promclient.QueryOptions, expected model.Vector) { 467 t.Helper() 468 469 sortResults(expected) 470 result := instantQuery(t, ctx, addr, q, opts, len(expected)) 471 for _, r := range result { 472 r.Timestamp = 0 // Does not matter for us. 473 } 474 testutil.Equals(t, expected, result) 475} 476 477func labelNames(t *testing.T, ctx context.Context, addr string, matchers []storepb.LabelMatcher, start, end int64, check func(res []string) bool) { 478 t.Helper() 479 480 logger := log.NewLogfmtLogger(os.Stdout) 481 logger = log.With(logger, "ts", log.DefaultTimestampUTC) 482 testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { 483 res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, mustURLParse(t, "http://"+addr), matchers, start, end) 484 if err != nil { 485 return err 486 } 487 if check(res) { 488 return nil 489 } 490 491 return errors.Errorf("unexpected results %v", res) 492 })) 493} 494 495//nolint:unparam 496func labelValues(t *testing.T, ctx context.Context, addr, label string, matchers []storepb.LabelMatcher, start, end int64, check func(res []string) bool) { 497 t.Helper() 498 499 logger := log.NewLogfmtLogger(os.Stdout) 500 logger = log.With(logger, "ts", log.DefaultTimestampUTC) 501 testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { 502 res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, mustURLParse(t, "http://"+addr), label, matchers, start, end) 503 if err != nil { 504 return err 505 } 506 if check(res) { 507 return nil 508 } 509 510 return errors.Errorf("unexpected results %v", res) 511 })) 512} 513 514func series(t *testing.T, ctx context.Context, addr string, matchers []*labels.Matcher, start int64, end int64, check func(res []map[string]string) bool) { 515 t.Helper() 516 517 logger := log.NewLogfmtLogger(os.Stdout) 518 logger = log.With(logger, "ts", log.DefaultTimestampUTC) 519 testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { 520 res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, mustURLParse(t, "http://"+addr), matchers, start, end) 521 if err != nil { 522 return err 523 } 524 if check(res) { 525 return nil 526 } 527 528 return errors.Errorf("unexpected results %v", res) 529 })) 530} 531 532//nolint:unparam 533func rangeQuery(t *testing.T, ctx context.Context, addr string, q string, start, end, step int64, opts promclient.QueryOptions, check func(res model.Matrix) bool) { 534 t.Helper() 535 536 logger := log.NewLogfmtLogger(os.Stdout) 537 logger = log.With(logger, "ts", log.DefaultTimestampUTC) 538 testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { 539 res, warnings, err := promclient.NewDefaultClient().QueryRange(ctx, mustURLParse(t, "http://"+addr), q, start, end, step, opts) 540 if err != nil { 541 return err 542 } 543 544 if len(warnings) > 0 { 545 return errors.Errorf("unexpected warnings %s", warnings) 546 } 547 548 if check(res) { 549 return nil 550 } 551 552 return errors.Errorf("unexpected results size %d", len(res)) 553 })) 554} 555