1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package replicate 5 6import ( 7 "bytes" 8 "context" 9 "encoding/json" 10 "io" 11 "math/rand" 12 "os" 13 "path" 14 "testing" 15 "time" 16 17 "github.com/go-kit/kit/log" 18 "github.com/go-kit/kit/log/level" 19 "github.com/oklog/ulid" 20 "github.com/prometheus/prometheus/pkg/labels" 21 "github.com/prometheus/prometheus/tsdb" 22 "github.com/thanos-io/thanos/pkg/block" 23 "github.com/thanos-io/thanos/pkg/block/metadata" 24 "github.com/thanos-io/thanos/pkg/compact" 25 "github.com/thanos-io/thanos/pkg/objstore" 26 "github.com/thanos-io/thanos/pkg/testutil" 27) 28 29func testLogger(testName string) log.Logger { 30 return log.With( 31 level.NewFilter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), level.AllowDebug()), 32 "test", testName, 33 ) 34} 35 36func testULID(inc int64) ulid.ULID { 37 timestamp := time.Unix(1000000+inc, 0) 38 entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0) 39 ulid := ulid.MustNew(ulid.Timestamp(timestamp), entropy) 40 41 return ulid 42} 43 44func testMeta(ulid ulid.ULID) *metadata.Meta { 45 return &metadata.Meta{ 46 Thanos: metadata.Thanos{ 47 Labels: map[string]string{ 48 "test-labelname": "test-labelvalue", 49 }, 50 Downsample: metadata.ThanosDownsample{ 51 Resolution: int64(compact.ResolutionLevelRaw), 52 }, 53 }, 54 BlockMeta: tsdb.BlockMeta{ 55 ULID: ulid, 56 Compaction: tsdb.BlockMetaCompaction{ 57 Level: 1, 58 }, 59 Version: metadata.TSDBVersion1, 60 }, 61 } 62} 63 64func TestReplicationSchemeAll(t *testing.T) { 65 testBlockID := testULID(0) 66 var cases = []struct { 67 name string 68 selector labels.Selector 69 blockIDs []ulid.ULID 70 prepare func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) 71 assert func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) 72 }{ 73 { 74 name: "EmptyOrigin", 75 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {}, 76 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {}, 77 }, 78 { 79 name: "NoMeta", 80 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 81 _ = originBucket.Upload(ctx, path.Join(testULID(0).String(), "chunks", "000001"), bytes.NewReader(nil)) 82 }, 83 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 84 if len(targetBucket.Objects()) != 0 { 85 t.Fatal("TargetBucket should have been empty but is not.") 86 } 87 }, 88 }, 89 { 90 name: "PartialMeta", 91 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 92 _ = originBucket.Upload(ctx, path.Join(testULID(0).String(), "meta.json"), bytes.NewReader([]byte("{"))) 93 }, 94 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 95 if len(targetBucket.Objects()) != 0 { 96 t.Fatal("TargetBucket should have been empty but is not.") 97 } 98 }, 99 }, 100 { 101 name: "FullBlock", 102 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 103 ulid := testULID(0) 104 meta := testMeta(ulid) 105 106 b, err := json.Marshal(meta) 107 testutil.Ok(t, err) 108 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) 109 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 110 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 111 }, 112 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 113 if len(targetBucket.Objects()) != 3 { 114 t.Fatal("TargetBucket should have one block made up of three objects replicated.") 115 } 116 }, 117 }, 118 { 119 name: "PreviousPartialUpload", 120 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 121 ulid := testULID(0) 122 meta := testMeta(ulid) 123 124 b, err := json.Marshal(meta) 125 testutil.Ok(t, err) 126 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) 127 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 128 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 129 130 _ = targetBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), io.LimitReader(bytes.NewReader(b), int64(len(b)-10))) 131 _ = targetBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 132 _ = targetBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 133 }, 134 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 135 for k := range originBucket.Objects() { 136 if !bytes.Equal(originBucket.Objects()[k], targetBucket.Objects()[k]) { 137 t.Fatalf("Object %s not equal in origin and target bucket.", k) 138 } 139 } 140 }, 141 }, 142 { 143 name: "OnlyUploadsRaw", 144 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 145 ulid := testULID(0) 146 meta := testMeta(ulid) 147 148 b, err := json.Marshal(meta) 149 testutil.Ok(t, err) 150 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) 151 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 152 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 153 154 ulid = testULID(1) 155 meta = testMeta(ulid) 156 meta.Thanos.Downsample.Resolution = int64(compact.ResolutionLevel5m) 157 158 b, err = json.Marshal(meta) 159 testutil.Ok(t, err) 160 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) 161 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 162 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 163 }, 164 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 165 expected := 3 166 got := len(targetBucket.Objects()) 167 if got != expected { 168 t.Fatalf("TargetBucket should have one block made up of three objects replicated. Got %d but expected %d objects.", got, expected) 169 } 170 }, 171 }, 172 { 173 name: "UploadMultipleCandidatesWhenPresent", 174 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 175 ulid := testULID(0) 176 meta := testMeta(ulid) 177 178 b, err := json.Marshal(meta) 179 testutil.Ok(t, err) 180 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) 181 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 182 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 183 184 ulid = testULID(1) 185 meta = testMeta(ulid) 186 187 b, err = json.Marshal(meta) 188 testutil.Ok(t, err) 189 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) 190 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 191 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 192 }, 193 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 194 expected := 6 195 got := len(targetBucket.Objects()) 196 if got != expected { 197 t.Fatalf("TargetBucket should have two blocks made up of three objects replicated. Got %d but expected %d objects.", got, expected) 198 } 199 }, 200 }, 201 { 202 name: "LabelSelector", 203 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 204 ulid := testULID(0) 205 meta := testMeta(ulid) 206 207 b, err := json.Marshal(meta) 208 testutil.Ok(t, err) 209 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) 210 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 211 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 212 213 ulid = testULID(1) 214 meta = testMeta(ulid) 215 meta.Thanos.Labels["test-labelname"] = "non-selected-value" 216 217 b, err = json.Marshal(meta) 218 testutil.Ok(t, err) 219 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) 220 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 221 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 222 }, 223 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 224 expected := 3 225 got := len(targetBucket.Objects()) 226 if got != expected { 227 t.Fatalf("TargetBucket should have one block made up of three objects replicated. Got %d but expected %d objects.", got, expected) 228 } 229 }, 230 }, 231 { 232 name: "NonZeroCompaction", 233 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 234 ulid := testULID(0) 235 meta := testMeta(ulid) 236 meta.BlockMeta.Compaction.Level = 2 237 238 b, err := json.Marshal(meta) 239 testutil.Ok(t, err) 240 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) 241 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 242 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 243 }, 244 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 245 if len(targetBucket.Objects()) != 0 { 246 t.Fatal("TargetBucket should have been empty but is not.") 247 } 248 }, 249 }, 250 { 251 name: "Regression", 252 selector: labels.Selector{}, 253 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 254 b := []byte(`{ 255 "ulid": "01DQYXMK8G108CEBQ79Y84DYVY", 256 "minTime": 1571911200000, 257 "maxTime": 1571918400000, 258 "stats": { 259 "numSamples": 90793, 260 "numSeries": 3703, 261 "numChunks": 3746 262 }, 263 "compaction": { 264 "level": 1, 265 "sources": [ 266 "01DQYXMK8G108CEBQ79Y84DYVY" 267 ] 268 }, 269 "version": 1, 270 "thanos": { 271 "labels": { 272 "receive": "true", 273 "replica": "thanos-receive-default-0" 274 }, 275 "downsample": { 276 "resolution": 0 277 }, 278 "source": "receive" 279 } 280}`) 281 282 _ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "meta.json"), bytes.NewReader(b)) 283 _ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "chunks", "000001"), bytes.NewReader(nil)) 284 _ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "index"), bytes.NewReader(nil)) 285 }, 286 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 287 if len(targetBucket.Objects()) != 3 { 288 t.Fatal("TargetBucket should have one block does not.") 289 } 290 291 expected := originBucket.Objects()["01DQYXMK8G108CEBQ79Y84DYVY/meta.json"] 292 got := targetBucket.Objects()["01DQYXMK8G108CEBQ79Y84DYVY/meta.json"] 293 testutil.Equals(t, expected, got) 294 }, 295 }, 296 { 297 name: "BlockIDs", 298 blockIDs: []ulid.ULID{testBlockID}, 299 prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 300 meta := testMeta(testBlockID) 301 302 b, err := json.Marshal(meta) 303 testutil.Ok(t, err) 304 _ = originBucket.Upload(ctx, path.Join(testBlockID.String(), "meta.json"), bytes.NewReader(b)) 305 _ = originBucket.Upload(ctx, path.Join(testBlockID.String(), "chunks", "000001"), bytes.NewReader(nil)) 306 _ = originBucket.Upload(ctx, path.Join(testBlockID.String(), "index"), bytes.NewReader(nil)) 307 308 ulid := testULID(1) 309 meta = testMeta(ulid) 310 311 b, err = json.Marshal(meta) 312 testutil.Ok(t, err) 313 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) 314 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) 315 _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) 316 }, 317 assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) { 318 expected := 3 319 got := len(targetBucket.Objects()) 320 if got != expected { 321 t.Fatalf("TargetBucket should have one block made up of three objects replicated. Got %d but expected %d objects.", got, expected) 322 } 323 }, 324 }, 325 } 326 327 for _, c := range cases { 328 ctx := context.Background() 329 originBucket := objstore.NewInMemBucket() 330 targetBucket := objstore.NewInMemBucket() 331 logger := testLogger(t.Name() + "/" + c.name) 332 333 c.prepare(ctx, t, originBucket, targetBucket) 334 335 matcher, err := labels.NewMatcher(labels.MatchEqual, "test-labelname", "test-labelvalue") 336 testutil.Ok(t, err) 337 338 selector := labels.Selector{ 339 matcher, 340 } 341 if c.selector != nil { 342 selector = c.selector 343 } 344 345 filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}, c.blockIDs).Filter 346 fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil) 347 testutil.Ok(t, err) 348 349 r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil) 350 351 err = r.execute(ctx) 352 testutil.Ok(t, err) 353 354 c.assert(ctx, t, originBucket, targetBucket) 355 } 356} 357