1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package e2e_test 5 6import ( 7 "context" 8 "log" 9 "net/http" 10 "net/http/httputil" 11 "net/url" 12 "testing" 13 "time" 14 15 "github.com/cortexproject/cortex/integration/e2e" 16 "github.com/prometheus/common/model" 17 "github.com/thanos-io/thanos/pkg/promclient" 18 "github.com/thanos-io/thanos/pkg/receive" 19 "github.com/thanos-io/thanos/pkg/testutil" 20 "github.com/thanos-io/thanos/test/e2e/e2ethanos" 21) 22 23type ReverseProxyConfig struct { 24 tenantId string 25 port string 26 target string 27} 28 29type DebugTransport struct{} 30 31func (DebugTransport) RoundTrip(r *http.Request) (*http.Response, error) { 32 _, err := httputil.DumpRequestOut(r, false) 33 if err != nil { 34 return nil, err 35 } 36 return http.DefaultTransport.RoundTrip(r) 37} 38 39func generateProxy(conf ReverseProxyConfig) { 40 targetURL, _ := url.Parse(conf.target) 41 proxy := httputil.NewSingleHostReverseProxy(targetURL) 42 d := proxy.Director 43 proxy.Director = func(r *http.Request) { 44 d(r) // call default director 45 r.Header.Add("THANOS-TENANT", conf.tenantId) 46 } 47 proxy.ErrorHandler = ErrorHandler 48 proxy.Transport = DebugTransport{} 49 log.Fatal(http.ListenAndServe(conf.port, proxy)) 50} 51 52func ErrorHandler(_ http.ResponseWriter, _ *http.Request, err error) { 53 log.Print("Response from receiver") 54 log.Print(err) 55} 56 57func TestReceive(t *testing.T) { 58 t.Parallel() 59 60 t.Run("hashring", func(t *testing.T) { 61 t.Parallel() 62 63 s, err := e2e.NewScenario("e2e_test_receive_hashring") 64 testutil.Ok(t, err) 65 t.Cleanup(e2ethanos.CleanScenario(t, s)) 66 67 // The hashring suite creates three receivers, each with a Prometheus 68 // remote-writing data to it. However, due to the hashing of the labels, 69 // the time series from the Prometheus is forwarded to a different 70 // receiver in the hashring than the one handling the request. 71 // The querier queries all the receivers and the test verifies 72 // the time series are forwarded to the correct receive node. 73 r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) 74 testutil.Ok(t, err) 75 r2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 1) 76 testutil.Ok(t, err) 77 r3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 1) 78 testutil.Ok(t, err) 79 80 h := receive.HashringConfig{ 81 Endpoints: []string{ 82 r1.GRPCNetworkEndpointFor(s.NetworkName()), 83 r2.GRPCNetworkEndpointFor(s.NetworkName()), 84 r3.GRPCNetworkEndpointFor(s.NetworkName()), 85 }, 86 } 87 88 // Recreate again, but with hashring config. 89 r1, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1, h) 90 testutil.Ok(t, err) 91 r2, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 1, h) 92 testutil.Ok(t, err) 93 r3, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 1, h) 94 testutil.Ok(t, err) 95 testutil.Ok(t, s.StartAndWaitReady(r1, r2, r3)) 96 97 prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 98 testutil.Ok(t, err) 99 prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(r2.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 100 testutil.Ok(t, err) 101 prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(r3.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 102 testutil.Ok(t, err) 103 testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) 104 105 q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "") 106 testutil.Ok(t, err) 107 testutil.Ok(t, s.StartAndWaitReady(q)) 108 109 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) 110 t.Cleanup(cancel) 111 112 testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) 113 114 queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ 115 Deduplicate: false, 116 }, []model.Metric{ 117 { 118 "job": "myself", 119 "prometheus": "prom1", 120 "receive": "2", 121 "replica": "0", 122 "tenant_id": "default-tenant", 123 }, 124 { 125 "job": "myself", 126 "prometheus": "prom2", 127 "receive": "1", 128 "replica": "0", 129 "tenant_id": "default-tenant", 130 }, 131 { 132 "job": "myself", 133 "prometheus": "prom3", 134 "receive": "2", 135 "replica": "0", 136 "tenant_id": "default-tenant", 137 }, 138 }) 139 }) 140 141 t.Run("hashring with config watcher", func(t *testing.T) { 142 t.Parallel() 143 144 s, err := e2e.NewScenario("e2e_test_receive_hashring") 145 testutil.Ok(t, err) 146 t.Cleanup(e2ethanos.CleanScenario(t, s)) 147 148 r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) 149 testutil.Ok(t, err) 150 r2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 1) 151 testutil.Ok(t, err) 152 r3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 1) 153 testutil.Ok(t, err) 154 155 h := receive.HashringConfig{ 156 Endpoints: []string{ 157 r1.GRPCNetworkEndpointFor(s.NetworkName()), 158 r2.GRPCNetworkEndpointFor(s.NetworkName()), 159 r3.GRPCNetworkEndpointFor(s.NetworkName()), 160 }, 161 } 162 163 // Recreate again, but with hashring config. 164 // TODO(kakkoyun): Update config file and wait config watcher to reconcile hashring. 165 r1, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "1", 1, h) 166 testutil.Ok(t, err) 167 r2, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "2", 1, h) 168 testutil.Ok(t, err) 169 r3, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "3", 1, h) 170 testutil.Ok(t, err) 171 testutil.Ok(t, s.StartAndWaitReady(r1, r2, r3)) 172 173 prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 174 testutil.Ok(t, err) 175 prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(r2.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 176 testutil.Ok(t, err) 177 prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(r3.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 178 testutil.Ok(t, err) 179 testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) 180 181 q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "") 182 testutil.Ok(t, err) 183 testutil.Ok(t, s.StartAndWaitReady(q)) 184 185 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) 186 t.Cleanup(cancel) 187 188 testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) 189 190 queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ 191 Deduplicate: false, 192 }, []model.Metric{ 193 { 194 "job": "myself", 195 "prometheus": "prom1", 196 "receive": "2", 197 "replica": "0", 198 "tenant_id": "default-tenant", 199 }, 200 { 201 "job": "myself", 202 "prometheus": "prom2", 203 "receive": "1", 204 "replica": "0", 205 "tenant_id": "default-tenant", 206 }, 207 { 208 "job": "myself", 209 "prometheus": "prom3", 210 "receive": "2", 211 "replica": "0", 212 "tenant_id": "default-tenant", 213 }, 214 }) 215 }) 216 217 t.Run("replication", func(t *testing.T) { 218 t.Parallel() 219 220 s, err := e2e.NewScenario("e2e_test_receive_replication") 221 testutil.Ok(t, err) 222 t.Cleanup(e2ethanos.CleanScenario(t, s)) 223 224 // The replication suite creates three receivers but only one 225 // receives Prometheus remote-written data. The querier queries all 226 // receivers and the test verifies that the time series are 227 // replicated to all of the nodes. 228 r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 3) 229 testutil.Ok(t, err) 230 r2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 3) 231 testutil.Ok(t, err) 232 r3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 3) 233 testutil.Ok(t, err) 234 235 h := receive.HashringConfig{ 236 Endpoints: []string{ 237 r1.GRPCNetworkEndpointFor(s.NetworkName()), 238 r2.GRPCNetworkEndpointFor(s.NetworkName()), 239 r3.GRPCNetworkEndpointFor(s.NetworkName()), 240 }, 241 } 242 243 // Recreate again, but with hashring config. 244 r1, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 3, h) 245 testutil.Ok(t, err) 246 r2, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 3, h) 247 testutil.Ok(t, err) 248 r3, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 3, h) 249 testutil.Ok(t, err) 250 testutil.Ok(t, s.StartAndWaitReady(r1, r2, r3)) 251 252 prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 253 testutil.Ok(t, err) 254 testutil.Ok(t, s.StartAndWaitReady(prom1)) 255 256 q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "") 257 testutil.Ok(t, err) 258 testutil.Ok(t, s.StartAndWaitReady(q)) 259 260 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) 261 t.Cleanup(cancel) 262 263 testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) 264 265 queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ 266 Deduplicate: false, 267 }, []model.Metric{ 268 { 269 "job": "myself", 270 "prometheus": "prom1", 271 "receive": "1", 272 "replica": "0", 273 "tenant_id": "default-tenant", 274 }, 275 { 276 "job": "myself", 277 "prometheus": "prom1", 278 "receive": "2", 279 "replica": "0", 280 "tenant_id": "default-tenant", 281 }, 282 { 283 "job": "myself", 284 "prometheus": "prom1", 285 "receive": "3", 286 "replica": "0", 287 "tenant_id": "default-tenant", 288 }, 289 }) 290 }) 291 292 t.Run("replication_with_outage", func(t *testing.T) { 293 t.Parallel() 294 295 s, err := e2e.NewScenario("e2e_test_receive_replication_with_outage") 296 testutil.Ok(t, err) 297 t.Cleanup(e2ethanos.CleanScenario(t, s)) 298 299 // The replication suite creates a three-node hashring but one of the 300 // receivers is dead. In this case, replication should still 301 // succeed and the time series should be replicated to the other nodes. 302 r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 3) 303 testutil.Ok(t, err) 304 r2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 3) 305 testutil.Ok(t, err) 306 notRunningR3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 3) 307 testutil.Ok(t, err) 308 309 h := receive.HashringConfig{ 310 Endpoints: []string{ 311 r1.GRPCNetworkEndpointFor(s.NetworkName()), 312 r2.GRPCNetworkEndpointFor(s.NetworkName()), 313 notRunningR3.GRPCNetworkEndpointFor(s.NetworkName()), 314 }, 315 } 316 317 // Recreate again, but with hashring config. 318 r1, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 3, h) 319 testutil.Ok(t, err) 320 r2, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 3, h) 321 testutil.Ok(t, err) 322 testutil.Ok(t, s.StartAndWaitReady(r1, r2)) 323 324 prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) 325 testutil.Ok(t, err) 326 testutil.Ok(t, s.StartAndWaitReady(prom1)) 327 328 q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "") 329 testutil.Ok(t, err) 330 testutil.Ok(t, s.StartAndWaitReady(q)) 331 332 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) 333 t.Cleanup(cancel) 334 335 testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) 336 337 queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ 338 Deduplicate: false, 339 }, []model.Metric{ 340 { 341 "job": "myself", 342 "prometheus": "prom1", 343 "receive": "1", 344 "replica": "0", 345 "tenant_id": "default-tenant", 346 }, 347 { 348 "job": "myself", 349 "prometheus": "prom1", 350 "receive": "2", 351 "replica": "0", 352 "tenant_id": "default-tenant", 353 }, 354 }) 355 }) 356 357 t.Run("multitenancy", func(t *testing.T) { 358 t.Parallel() 359 360 s, err := e2e.NewScenario("e2e_test_for_multitenancy") 361 testutil.Ok(t, err) 362 t.Cleanup(e2ethanos.CleanScenario(t, s)) 363 364 // The replication suite creates a three-node hashring but one of the 365 // receivers is dead. In this case, replication should still 366 // succeed and the time series should be replicated to the other nodes. 367 r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) 368 testutil.Ok(t, err) 369 370 h := receive.HashringConfig{ 371 Endpoints: []string{ 372 r1.GRPCNetworkEndpointFor(s.NetworkName()), 373 }, 374 } 375 376 // Recreate again, but with hashring config. 377 r1, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1, h) 378 testutil.Ok(t, err) 379 testutil.Ok(t, s.StartAndWaitReady(r1)) 380 testutil.Ok(t, err) 381 382 conf1 := ReverseProxyConfig{ 383 tenantId: "tenant-1", 384 port: ":9097", 385 target: "http://" + r1.Endpoint(8081), 386 } 387 conf2 := ReverseProxyConfig{ 388 tenantId: "tenant-2", 389 port: ":9098", 390 target: "http://" + r1.Endpoint(8081), 391 } 392 393 go generateProxy(conf1) 394 go generateProxy(conf2) 395 396 prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, "http://172.17.0.1:9097/api/v1/receive", ""), e2ethanos.DefaultPrometheusImage()) 397 testutil.Ok(t, err) 398 prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom1", 0, "http://172.17.0.1:9098/api/v1/receive", ""), e2ethanos.DefaultPrometheusImage()) 399 testutil.Ok(t, err) 400 testutil.Ok(t, s.StartAndWaitReady(prom1)) 401 testutil.Ok(t, s.StartAndWaitReady(prom2)) 402 403 q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "") 404 testutil.Ok(t, err) 405 testutil.Ok(t, s.StartAndWaitReady(q)) 406 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) 407 t.Cleanup(cancel) 408 409 testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) 410 queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ 411 Deduplicate: false, 412 }, []model.Metric{ 413 { 414 "job": "myself", 415 "prometheus": "prom1", 416 "receive": "1", 417 "replica": "0", 418 "tenant_id": "tenant-1", 419 }, 420 { 421 "job": "myself", 422 "prometheus": "prom1", 423 "receive": "1", 424 "replica": "0", 425 "tenant_id": "tenant-2", 426 }, 427 }) 428 }) 429} 430