1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package replicate
5
6import (
7	"bytes"
8	"context"
9	"encoding/json"
10	"io"
11	"math/rand"
12	"os"
13	"path"
14	"testing"
15	"time"
16
17	"github.com/go-kit/kit/log"
18	"github.com/go-kit/kit/log/level"
19	"github.com/oklog/ulid"
20	"github.com/prometheus/prometheus/pkg/labels"
21	"github.com/prometheus/prometheus/tsdb"
22	"github.com/thanos-io/thanos/pkg/block"
23	"github.com/thanos-io/thanos/pkg/block/metadata"
24	"github.com/thanos-io/thanos/pkg/compact"
25	"github.com/thanos-io/thanos/pkg/objstore"
26	"github.com/thanos-io/thanos/pkg/testutil"
27)
28
29func testLogger(testName string) log.Logger {
30	return log.With(
31		level.NewFilter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), level.AllowDebug()),
32		"test", testName,
33	)
34}
35
36func testULID(inc int64) ulid.ULID {
37	timestamp := time.Unix(1000000+inc, 0)
38	entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0)
39	ulid := ulid.MustNew(ulid.Timestamp(timestamp), entropy)
40
41	return ulid
42}
43
44func testMeta(ulid ulid.ULID) *metadata.Meta {
45	return &metadata.Meta{
46		Thanos: metadata.Thanos{
47			Labels: map[string]string{
48				"test-labelname": "test-labelvalue",
49			},
50			Downsample: metadata.ThanosDownsample{
51				Resolution: int64(compact.ResolutionLevelRaw),
52			},
53		},
54		BlockMeta: tsdb.BlockMeta{
55			ULID: ulid,
56			Compaction: tsdb.BlockMetaCompaction{
57				Level: 1,
58			},
59			Version: metadata.TSDBVersion1,
60		},
61	}
62}
63
64func TestReplicationSchemeAll(t *testing.T) {
65	testBlockID := testULID(0)
66	var cases = []struct {
67		name     string
68		selector labels.Selector
69		blockIDs []ulid.ULID
70		prepare  func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket)
71		assert   func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket)
72	}{
73		{
74			name:    "EmptyOrigin",
75			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {},
76			assert:  func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {},
77		},
78		{
79			name: "NoMeta",
80			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
81				_ = originBucket.Upload(ctx, path.Join(testULID(0).String(), "chunks", "000001"), bytes.NewReader(nil))
82			},
83			assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
84				if len(targetBucket.Objects()) != 0 {
85					t.Fatal("TargetBucket should have been empty but is not.")
86				}
87			},
88		},
89		{
90			name: "PartialMeta",
91			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
92				_ = originBucket.Upload(ctx, path.Join(testULID(0).String(), "meta.json"), bytes.NewReader([]byte("{")))
93			},
94			assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
95				if len(targetBucket.Objects()) != 0 {
96					t.Fatal("TargetBucket should have been empty but is not.")
97				}
98			},
99		},
100		{
101			name: "FullBlock",
102			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
103				ulid := testULID(0)
104				meta := testMeta(ulid)
105
106				b, err := json.Marshal(meta)
107				testutil.Ok(t, err)
108				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
109				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
110				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
111			},
112			assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
113				if len(targetBucket.Objects()) != 3 {
114					t.Fatal("TargetBucket should have one block made up of three objects replicated.")
115				}
116			},
117		},
118		{
119			name: "PreviousPartialUpload",
120			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
121				ulid := testULID(0)
122				meta := testMeta(ulid)
123
124				b, err := json.Marshal(meta)
125				testutil.Ok(t, err)
126				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
127				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
128				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
129
130				_ = targetBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), io.LimitReader(bytes.NewReader(b), int64(len(b)-10)))
131				_ = targetBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
132				_ = targetBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
133			},
134			assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
135				for k := range originBucket.Objects() {
136					if !bytes.Equal(originBucket.Objects()[k], targetBucket.Objects()[k]) {
137						t.Fatalf("Object %s not equal in origin and target bucket.", k)
138					}
139				}
140			},
141		},
142		{
143			name: "OnlyUploadsRaw",
144			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
145				ulid := testULID(0)
146				meta := testMeta(ulid)
147
148				b, err := json.Marshal(meta)
149				testutil.Ok(t, err)
150				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
151				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
152				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
153
154				ulid = testULID(1)
155				meta = testMeta(ulid)
156				meta.Thanos.Downsample.Resolution = int64(compact.ResolutionLevel5m)
157
158				b, err = json.Marshal(meta)
159				testutil.Ok(t, err)
160				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
161				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
162				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
163			},
164			assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
165				expected := 3
166				got := len(targetBucket.Objects())
167				if got != expected {
168					t.Fatalf("TargetBucket should have one block made up of three objects replicated. Got %d but expected %d objects.", got, expected)
169				}
170			},
171		},
172		{
173			name: "UploadMultipleCandidatesWhenPresent",
174			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
175				ulid := testULID(0)
176				meta := testMeta(ulid)
177
178				b, err := json.Marshal(meta)
179				testutil.Ok(t, err)
180				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
181				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
182				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
183
184				ulid = testULID(1)
185				meta = testMeta(ulid)
186
187				b, err = json.Marshal(meta)
188				testutil.Ok(t, err)
189				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
190				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
191				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
192			},
193			assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
194				expected := 6
195				got := len(targetBucket.Objects())
196				if got != expected {
197					t.Fatalf("TargetBucket should have two blocks made up of three objects replicated. Got %d but expected %d objects.", got, expected)
198				}
199			},
200		},
201		{
202			name: "LabelSelector",
203			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
204				ulid := testULID(0)
205				meta := testMeta(ulid)
206
207				b, err := json.Marshal(meta)
208				testutil.Ok(t, err)
209				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
210				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
211				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
212
213				ulid = testULID(1)
214				meta = testMeta(ulid)
215				meta.Thanos.Labels["test-labelname"] = "non-selected-value"
216
217				b, err = json.Marshal(meta)
218				testutil.Ok(t, err)
219				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
220				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
221				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
222			},
223			assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
224				expected := 3
225				got := len(targetBucket.Objects())
226				if got != expected {
227					t.Fatalf("TargetBucket should have one block made up of three objects replicated. Got %d but expected %d objects.", got, expected)
228				}
229			},
230		},
231		{
232			name: "NonZeroCompaction",
233			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
234				ulid := testULID(0)
235				meta := testMeta(ulid)
236				meta.BlockMeta.Compaction.Level = 2
237
238				b, err := json.Marshal(meta)
239				testutil.Ok(t, err)
240				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
241				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
242				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
243			},
244			assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
245				if len(targetBucket.Objects()) != 0 {
246					t.Fatal("TargetBucket should have been empty but is not.")
247				}
248			},
249		},
250		{
251			name:     "Regression",
252			selector: labels.Selector{},
253			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
254				b := []byte(`{
255        "ulid": "01DQYXMK8G108CEBQ79Y84DYVY",
256        "minTime": 1571911200000,
257        "maxTime": 1571918400000,
258        "stats": {
259                "numSamples": 90793,
260                "numSeries": 3703,
261                "numChunks": 3746
262        },
263        "compaction": {
264                "level": 1,
265                "sources": [
266                        "01DQYXMK8G108CEBQ79Y84DYVY"
267                ]
268        },
269        "version": 1,
270        "thanos": {
271                "labels": {
272                        "receive": "true",
273                        "replica": "thanos-receive-default-0"
274                },
275                "downsample": {
276                        "resolution": 0
277                },
278                "source": "receive"
279        }
280}`)
281
282				_ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "meta.json"), bytes.NewReader(b))
283				_ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "chunks", "000001"), bytes.NewReader(nil))
284				_ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "index"), bytes.NewReader(nil))
285			},
286			assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
287				if len(targetBucket.Objects()) != 3 {
288					t.Fatal("TargetBucket should have one block does not.")
289				}
290
291				expected := originBucket.Objects()["01DQYXMK8G108CEBQ79Y84DYVY/meta.json"]
292				got := targetBucket.Objects()["01DQYXMK8G108CEBQ79Y84DYVY/meta.json"]
293				testutil.Equals(t, expected, got)
294			},
295		},
296		{
297			name:     "BlockIDs",
298			blockIDs: []ulid.ULID{testBlockID},
299			prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
300				meta := testMeta(testBlockID)
301
302				b, err := json.Marshal(meta)
303				testutil.Ok(t, err)
304				_ = originBucket.Upload(ctx, path.Join(testBlockID.String(), "meta.json"), bytes.NewReader(b))
305				_ = originBucket.Upload(ctx, path.Join(testBlockID.String(), "chunks", "000001"), bytes.NewReader(nil))
306				_ = originBucket.Upload(ctx, path.Join(testBlockID.String(), "index"), bytes.NewReader(nil))
307
308				ulid := testULID(1)
309				meta = testMeta(ulid)
310
311				b, err = json.Marshal(meta)
312				testutil.Ok(t, err)
313				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
314				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
315				_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
316			},
317			assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
318				expected := 3
319				got := len(targetBucket.Objects())
320				if got != expected {
321					t.Fatalf("TargetBucket should have one block made up of three objects replicated. Got %d but expected %d objects.", got, expected)
322				}
323			},
324		},
325	}
326
327	for _, c := range cases {
328		ctx := context.Background()
329		originBucket := objstore.NewInMemBucket()
330		targetBucket := objstore.NewInMemBucket()
331		logger := testLogger(t.Name() + "/" + c.name)
332
333		c.prepare(ctx, t, originBucket, targetBucket)
334
335		matcher, err := labels.NewMatcher(labels.MatchEqual, "test-labelname", "test-labelvalue")
336		testutil.Ok(t, err)
337
338		selector := labels.Selector{
339			matcher,
340		}
341		if c.selector != nil {
342			selector = c.selector
343		}
344
345		filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}, c.blockIDs).Filter
346		fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil)
347		testutil.Ok(t, err)
348
349		r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil)
350
351		err = r.execute(ctx)
352		testutil.Ok(t, err)
353
354		c.assert(ctx, t, originBucket, targetBucket)
355	}
356}
357