1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package e2e_test
5
6import (
7	"context"
8	"log"
9	"net/http"
10	"net/http/httputil"
11	"net/url"
12	"testing"
13	"time"
14
15	"github.com/cortexproject/cortex/integration/e2e"
16	"github.com/prometheus/common/model"
17	"github.com/thanos-io/thanos/pkg/promclient"
18	"github.com/thanos-io/thanos/pkg/receive"
19	"github.com/thanos-io/thanos/pkg/testutil"
20	"github.com/thanos-io/thanos/test/e2e/e2ethanos"
21)
22
23type ReverseProxyConfig struct {
24	tenantId string
25	port     string
26	target   string
27}
28
29type DebugTransport struct{}
30
31func (DebugTransport) RoundTrip(r *http.Request) (*http.Response, error) {
32	_, err := httputil.DumpRequestOut(r, false)
33	if err != nil {
34		return nil, err
35	}
36	return http.DefaultTransport.RoundTrip(r)
37}
38
39func generateProxy(conf ReverseProxyConfig) {
40	targetURL, _ := url.Parse(conf.target)
41	proxy := httputil.NewSingleHostReverseProxy(targetURL)
42	d := proxy.Director
43	proxy.Director = func(r *http.Request) {
44		d(r) // call default director
45		r.Header.Add("THANOS-TENANT", conf.tenantId)
46	}
47	proxy.ErrorHandler = ErrorHandler
48	proxy.Transport = DebugTransport{}
49	log.Fatal(http.ListenAndServe(conf.port, proxy))
50}
51
52func ErrorHandler(_ http.ResponseWriter, _ *http.Request, err error) {
53	log.Print("Response from receiver")
54	log.Print(err)
55}
56
57func TestReceive(t *testing.T) {
58	t.Parallel()
59
60	t.Run("hashring", func(t *testing.T) {
61		t.Parallel()
62
63		s, err := e2e.NewScenario("e2e_test_receive_hashring")
64		testutil.Ok(t, err)
65		t.Cleanup(e2ethanos.CleanScenario(t, s))
66
67		// The hashring suite creates three receivers, each with a Prometheus
68		// remote-writing data to it. However, due to the hashing of the labels,
69		// the time series from the Prometheus is forwarded to a different
70		// receiver in the hashring than the one handling the request.
71		// The querier queries all the receivers and the test verifies
72		// the time series are forwarded to the correct receive node.
73		r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1)
74		testutil.Ok(t, err)
75		r2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 1)
76		testutil.Ok(t, err)
77		r3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 1)
78		testutil.Ok(t, err)
79
80		h := receive.HashringConfig{
81			Endpoints: []string{
82				r1.GRPCNetworkEndpointFor(s.NetworkName()),
83				r2.GRPCNetworkEndpointFor(s.NetworkName()),
84				r3.GRPCNetworkEndpointFor(s.NetworkName()),
85			},
86		}
87
88		// Recreate again, but with hashring config.
89		r1, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1, h)
90		testutil.Ok(t, err)
91		r2, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 1, h)
92		testutil.Ok(t, err)
93		r3, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 1, h)
94		testutil.Ok(t, err)
95		testutil.Ok(t, s.StartAndWaitReady(r1, r2, r3))
96
97		prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
98		testutil.Ok(t, err)
99		prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(r2.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
100		testutil.Ok(t, err)
101		prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(r3.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
102		testutil.Ok(t, err)
103		testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3))
104
105		q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "")
106		testutil.Ok(t, err)
107		testutil.Ok(t, s.StartAndWaitReady(q))
108
109		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
110		t.Cleanup(cancel)
111
112		testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics))
113
114		queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{
115			Deduplicate: false,
116		}, []model.Metric{
117			{
118				"job":        "myself",
119				"prometheus": "prom1",
120				"receive":    "2",
121				"replica":    "0",
122				"tenant_id":  "default-tenant",
123			},
124			{
125				"job":        "myself",
126				"prometheus": "prom2",
127				"receive":    "1",
128				"replica":    "0",
129				"tenant_id":  "default-tenant",
130			},
131			{
132				"job":        "myself",
133				"prometheus": "prom3",
134				"receive":    "2",
135				"replica":    "0",
136				"tenant_id":  "default-tenant",
137			},
138		})
139	})
140
141	t.Run("hashring with config watcher", func(t *testing.T) {
142		t.Parallel()
143
144		s, err := e2e.NewScenario("e2e_test_receive_hashring")
145		testutil.Ok(t, err)
146		t.Cleanup(e2ethanos.CleanScenario(t, s))
147
148		r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1)
149		testutil.Ok(t, err)
150		r2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 1)
151		testutil.Ok(t, err)
152		r3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 1)
153		testutil.Ok(t, err)
154
155		h := receive.HashringConfig{
156			Endpoints: []string{
157				r1.GRPCNetworkEndpointFor(s.NetworkName()),
158				r2.GRPCNetworkEndpointFor(s.NetworkName()),
159				r3.GRPCNetworkEndpointFor(s.NetworkName()),
160			},
161		}
162
163		// Recreate again, but with hashring config.
164		// TODO(kakkoyun): Update config file and wait config watcher to reconcile hashring.
165		r1, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "1", 1, h)
166		testutil.Ok(t, err)
167		r2, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "2", 1, h)
168		testutil.Ok(t, err)
169		r3, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "3", 1, h)
170		testutil.Ok(t, err)
171		testutil.Ok(t, s.StartAndWaitReady(r1, r2, r3))
172
173		prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
174		testutil.Ok(t, err)
175		prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(r2.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
176		testutil.Ok(t, err)
177		prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(r3.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
178		testutil.Ok(t, err)
179		testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3))
180
181		q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "")
182		testutil.Ok(t, err)
183		testutil.Ok(t, s.StartAndWaitReady(q))
184
185		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
186		t.Cleanup(cancel)
187
188		testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics))
189
190		queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{
191			Deduplicate: false,
192		}, []model.Metric{
193			{
194				"job":        "myself",
195				"prometheus": "prom1",
196				"receive":    "2",
197				"replica":    "0",
198				"tenant_id":  "default-tenant",
199			},
200			{
201				"job":        "myself",
202				"prometheus": "prom2",
203				"receive":    "1",
204				"replica":    "0",
205				"tenant_id":  "default-tenant",
206			},
207			{
208				"job":        "myself",
209				"prometheus": "prom3",
210				"receive":    "2",
211				"replica":    "0",
212				"tenant_id":  "default-tenant",
213			},
214		})
215	})
216
217	t.Run("replication", func(t *testing.T) {
218		t.Parallel()
219
220		s, err := e2e.NewScenario("e2e_test_receive_replication")
221		testutil.Ok(t, err)
222		t.Cleanup(e2ethanos.CleanScenario(t, s))
223
224		// The replication suite creates three receivers but only one
225		// receives Prometheus remote-written data. The querier queries all
226		// receivers and the test verifies that the time series are
227		// replicated to all of the nodes.
228		r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 3)
229		testutil.Ok(t, err)
230		r2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 3)
231		testutil.Ok(t, err)
232		r3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 3)
233		testutil.Ok(t, err)
234
235		h := receive.HashringConfig{
236			Endpoints: []string{
237				r1.GRPCNetworkEndpointFor(s.NetworkName()),
238				r2.GRPCNetworkEndpointFor(s.NetworkName()),
239				r3.GRPCNetworkEndpointFor(s.NetworkName()),
240			},
241		}
242
243		// Recreate again, but with hashring config.
244		r1, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 3, h)
245		testutil.Ok(t, err)
246		r2, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 3, h)
247		testutil.Ok(t, err)
248		r3, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 3, h)
249		testutil.Ok(t, err)
250		testutil.Ok(t, s.StartAndWaitReady(r1, r2, r3))
251
252		prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
253		testutil.Ok(t, err)
254		testutil.Ok(t, s.StartAndWaitReady(prom1))
255
256		q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "")
257		testutil.Ok(t, err)
258		testutil.Ok(t, s.StartAndWaitReady(q))
259
260		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
261		t.Cleanup(cancel)
262
263		testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics))
264
265		queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{
266			Deduplicate: false,
267		}, []model.Metric{
268			{
269				"job":        "myself",
270				"prometheus": "prom1",
271				"receive":    "1",
272				"replica":    "0",
273				"tenant_id":  "default-tenant",
274			},
275			{
276				"job":        "myself",
277				"prometheus": "prom1",
278				"receive":    "2",
279				"replica":    "0",
280				"tenant_id":  "default-tenant",
281			},
282			{
283				"job":        "myself",
284				"prometheus": "prom1",
285				"receive":    "3",
286				"replica":    "0",
287				"tenant_id":  "default-tenant",
288			},
289		})
290	})
291
292	t.Run("replication_with_outage", func(t *testing.T) {
293		t.Parallel()
294
295		s, err := e2e.NewScenario("e2e_test_receive_replication_with_outage")
296		testutil.Ok(t, err)
297		t.Cleanup(e2ethanos.CleanScenario(t, s))
298
299		// The replication suite creates a three-node hashring but one of the
300		// receivers is dead. In this case, replication should still
301		// succeed and the time series should be replicated to the other nodes.
302		r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 3)
303		testutil.Ok(t, err)
304		r2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 3)
305		testutil.Ok(t, err)
306		notRunningR3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 3)
307		testutil.Ok(t, err)
308
309		h := receive.HashringConfig{
310			Endpoints: []string{
311				r1.GRPCNetworkEndpointFor(s.NetworkName()),
312				r2.GRPCNetworkEndpointFor(s.NetworkName()),
313				notRunningR3.GRPCNetworkEndpointFor(s.NetworkName()),
314			},
315		}
316
317		// Recreate again, but with hashring config.
318		r1, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 3, h)
319		testutil.Ok(t, err)
320		r2, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 3, h)
321		testutil.Ok(t, err)
322		testutil.Ok(t, s.StartAndWaitReady(r1, r2))
323
324		prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage())
325		testutil.Ok(t, err)
326		testutil.Ok(t, s.StartAndWaitReady(prom1))
327
328		q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "")
329		testutil.Ok(t, err)
330		testutil.Ok(t, s.StartAndWaitReady(q))
331
332		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
333		t.Cleanup(cancel)
334
335		testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics))
336
337		queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{
338			Deduplicate: false,
339		}, []model.Metric{
340			{
341				"job":        "myself",
342				"prometheus": "prom1",
343				"receive":    "1",
344				"replica":    "0",
345				"tenant_id":  "default-tenant",
346			},
347			{
348				"job":        "myself",
349				"prometheus": "prom1",
350				"receive":    "2",
351				"replica":    "0",
352				"tenant_id":  "default-tenant",
353			},
354		})
355	})
356
357	t.Run("multitenancy", func(t *testing.T) {
358		t.Parallel()
359
360		s, err := e2e.NewScenario("e2e_test_for_multitenancy")
361		testutil.Ok(t, err)
362		t.Cleanup(e2ethanos.CleanScenario(t, s))
363
364		// The replication suite creates a three-node hashring but one of the
365		// receivers is dead. In this case, replication should still
366		// succeed and the time series should be replicated to the other nodes.
367		r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1)
368		testutil.Ok(t, err)
369
370		h := receive.HashringConfig{
371			Endpoints: []string{
372				r1.GRPCNetworkEndpointFor(s.NetworkName()),
373			},
374		}
375
376		// Recreate again, but with hashring config.
377		r1, err = e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1, h)
378		testutil.Ok(t, err)
379		testutil.Ok(t, s.StartAndWaitReady(r1))
380		testutil.Ok(t, err)
381
382		conf1 := ReverseProxyConfig{
383			tenantId: "tenant-1",
384			port:     ":9097",
385			target:   "http://" + r1.Endpoint(8081),
386		}
387		conf2 := ReverseProxyConfig{
388			tenantId: "tenant-2",
389			port:     ":9098",
390			target:   "http://" + r1.Endpoint(8081),
391		}
392
393		go generateProxy(conf1)
394		go generateProxy(conf2)
395
396		prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, "http://172.17.0.1:9097/api/v1/receive", ""), e2ethanos.DefaultPrometheusImage())
397		testutil.Ok(t, err)
398		prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom1", 0, "http://172.17.0.1:9098/api/v1/receive", ""), e2ethanos.DefaultPrometheusImage())
399		testutil.Ok(t, err)
400		testutil.Ok(t, s.StartAndWaitReady(prom1))
401		testutil.Ok(t, s.StartAndWaitReady(prom2))
402
403		q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "")
404		testutil.Ok(t, err)
405		testutil.Ok(t, s.StartAndWaitReady(q))
406		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
407		t.Cleanup(cancel)
408
409		testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics))
410		queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{
411			Deduplicate: false,
412		}, []model.Metric{
413			{
414				"job":        "myself",
415				"prometheus": "prom1",
416				"receive":    "1",
417				"replica":    "0",
418				"tenant_id":  "tenant-1",
419			},
420			{
421				"job":        "myself",
422				"prometheus": "prom1",
423				"receive":    "1",
424				"replica":    "0",
425				"tenant_id":  "tenant-2",
426			},
427		})
428	})
429}
430