1package querier 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "testing" 8 "time" 9 10 "github.com/go-kit/log" 11 "github.com/grafana/dskit/flagext" 12 "github.com/grafana/dskit/kv/consul" 13 "github.com/grafana/dskit/services" 14 "github.com/oklog/ulid" 15 "github.com/prometheus/client_golang/prometheus" 16 "github.com/prometheus/client_golang/prometheus/testutil" 17 "github.com/stretchr/testify/assert" 18 "github.com/stretchr/testify/require" 19 20 "github.com/cortexproject/cortex/pkg/ring" 21 cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" 22 "github.com/cortexproject/cortex/pkg/util" 23 "github.com/cortexproject/cortex/pkg/util/test" 24) 25 26func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { 27 // The following block IDs have been picked to have increasing hash values 28 // in order to simplify the tests. 29 block1 := ulid.MustNew(1, nil) // hash: 283204220 30 block2 := ulid.MustNew(2, nil) // hash: 444110359 31 block3 := ulid.MustNew(5, nil) // hash: 2931974232 32 block4 := ulid.MustNew(6, nil) // hash: 3092880371 33 34 block1Hash := cortex_tsdb.HashBlockID(block1) 35 block2Hash := cortex_tsdb.HashBlockID(block2) 36 block3Hash := cortex_tsdb.HashBlockID(block3) 37 block4Hash := cortex_tsdb.HashBlockID(block4) 38 39 userID := "user-A" 40 registeredAt := time.Now() 41 42 tests := map[string]struct { 43 shardingStrategy string 44 tenantShardSize int 45 replicationFactor int 46 setup func(*ring.Desc) 47 queryBlocks []ulid.ULID 48 exclude map[ulid.ULID][]string 49 expectedClients map[string][]ulid.ULID 50 expectedErr error 51 }{ 52 // 53 // Sharding strategy: default 54 // 55 "default sharding, single instance in the ring with RF = 1": { 56 shardingStrategy: util.ShardingStrategyDefault, 57 replicationFactor: 1, 58 setup: func(d *ring.Desc) { 59 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 60 }, 61 queryBlocks: []ulid.ULID{block1, block2}, 62 expectedClients: map[string][]ulid.ULID{ 63 "127.0.0.1": {block1, block2}, 64 }, 65 }, 66 "default sharding, single instance in the ring with RF = 1 but excluded": { 67 shardingStrategy: util.ShardingStrategyDefault, 68 replicationFactor: 1, 69 setup: func(d *ring.Desc) { 70 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 71 }, 72 queryBlocks: []ulid.ULID{block1, block2}, 73 exclude: map[ulid.ULID][]string{ 74 block1: {"127.0.0.1"}, 75 }, 76 expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), 77 }, 78 "default sharding, single instance in the ring with RF = 1 but excluded for non queried block": { 79 shardingStrategy: util.ShardingStrategyDefault, 80 replicationFactor: 1, 81 setup: func(d *ring.Desc) { 82 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 83 }, 84 queryBlocks: []ulid.ULID{block1, block2}, 85 exclude: map[ulid.ULID][]string{ 86 block3: {"127.0.0.1"}, 87 }, 88 expectedClients: map[string][]ulid.ULID{ 89 "127.0.0.1": {block1, block2}, 90 }, 91 }, 92 "default sharding, single instance in the ring with RF = 2": { 93 shardingStrategy: util.ShardingStrategyDefault, 94 replicationFactor: 2, 95 setup: func(d *ring.Desc) { 96 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 97 }, 98 queryBlocks: []ulid.ULID{block1, block2}, 99 expectedClients: map[string][]ulid.ULID{ 100 "127.0.0.1": {block1, block2}, 101 }, 102 }, 103 "default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 1": { 104 shardingStrategy: util.ShardingStrategyDefault, 105 replicationFactor: 1, 106 setup: func(d *ring.Desc) { 107 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 108 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) 109 d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) 110 d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) 111 }, 112 queryBlocks: []ulid.ULID{block1, block3, block4}, 113 expectedClients: map[string][]ulid.ULID{ 114 "127.0.0.1": {block1}, 115 "127.0.0.3": {block3}, 116 "127.0.0.4": {block4}, 117 }, 118 }, 119 "default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 1 but excluded": { 120 shardingStrategy: util.ShardingStrategyDefault, 121 replicationFactor: 1, 122 setup: func(d *ring.Desc) { 123 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 124 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) 125 d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) 126 d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) 127 }, 128 queryBlocks: []ulid.ULID{block1, block3, block4}, 129 exclude: map[ulid.ULID][]string{ 130 block3: {"127.0.0.3"}, 131 }, 132 expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block3.String()), 133 }, 134 "default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 2": { 135 shardingStrategy: util.ShardingStrategyDefault, 136 replicationFactor: 2, 137 setup: func(d *ring.Desc) { 138 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 139 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) 140 d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) 141 d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) 142 }, 143 queryBlocks: []ulid.ULID{block1, block3, block4}, 144 expectedClients: map[string][]ulid.ULID{ 145 "127.0.0.1": {block1}, 146 "127.0.0.3": {block3}, 147 "127.0.0.4": {block4}, 148 }, 149 }, 150 "default sharding, multiple instances in the ring with multiple requested blocks belonging to the same store-gateway and RF = 2": { 151 shardingStrategy: util.ShardingStrategyDefault, 152 replicationFactor: 2, 153 setup: func(d *ring.Desc) { 154 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 155 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) 156 }, 157 queryBlocks: []ulid.ULID{block1, block2, block3, block4}, 158 expectedClients: map[string][]ulid.ULID{ 159 "127.0.0.1": {block1, block4}, 160 "127.0.0.2": {block2, block3}, 161 }, 162 }, 163 "default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 2 and some blocks excluded but with replacement available": { 164 shardingStrategy: util.ShardingStrategyDefault, 165 replicationFactor: 2, 166 setup: func(d *ring.Desc) { 167 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 168 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) 169 d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) 170 d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) 171 }, 172 queryBlocks: []ulid.ULID{block1, block3, block4}, 173 exclude: map[ulid.ULID][]string{ 174 block3: {"127.0.0.3"}, 175 block1: {"127.0.0.1"}, 176 }, 177 expectedClients: map[string][]ulid.ULID{ 178 "127.0.0.2": {block1}, 179 "127.0.0.4": {block3, block4}, 180 }, 181 }, 182 "default sharding, multiple instances in the ring are JOINING, the requested block + its replicas only belongs to JOINING instances": { 183 shardingStrategy: util.ShardingStrategyDefault, 184 replicationFactor: 2, 185 setup: func(d *ring.Desc) { 186 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.JOINING, registeredAt) 187 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.JOINING, registeredAt) 188 d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.JOINING, registeredAt) 189 d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) 190 }, 191 queryBlocks: []ulid.ULID{block1}, 192 expectedClients: map[string][]ulid.ULID{ 193 "127.0.0.4": {block1}, 194 }, 195 }, 196 // 197 // Sharding strategy: shuffle sharding 198 // 199 "shuffle sharding, single instance in the ring with RF = 1, SS = 1": { 200 shardingStrategy: util.ShardingStrategyShuffle, 201 tenantShardSize: 1, 202 replicationFactor: 1, 203 setup: func(d *ring.Desc) { 204 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 205 }, 206 queryBlocks: []ulid.ULID{block1, block2}, 207 expectedClients: map[string][]ulid.ULID{ 208 "127.0.0.1": {block1, block2}, 209 }, 210 }, 211 "shuffle sharding, single instance in the ring with RF = 1, SS = 1 but excluded": { 212 shardingStrategy: util.ShardingStrategyShuffle, 213 tenantShardSize: 1, 214 replicationFactor: 1, 215 setup: func(d *ring.Desc) { 216 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 217 }, 218 queryBlocks: []ulid.ULID{block1, block2}, 219 exclude: map[ulid.ULID][]string{ 220 block1: {"127.0.0.1"}, 221 }, 222 expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), 223 }, 224 "shuffle sharding, single instance in the ring with RF = 2, SS = 2": { 225 shardingStrategy: util.ShardingStrategyShuffle, 226 tenantShardSize: 2, 227 replicationFactor: 2, 228 setup: func(d *ring.Desc) { 229 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 230 }, 231 queryBlocks: []ulid.ULID{block1, block2}, 232 expectedClients: map[string][]ulid.ULID{ 233 "127.0.0.1": {block1, block2}, 234 }, 235 }, 236 "shuffle sharding, multiple instances in the ring with RF = 1, SS = 1": { 237 shardingStrategy: util.ShardingStrategyShuffle, 238 tenantShardSize: 1, 239 replicationFactor: 1, 240 setup: func(d *ring.Desc) { 241 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 242 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) 243 d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) 244 d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) 245 }, 246 queryBlocks: []ulid.ULID{block1, block2, block4}, 247 expectedClients: map[string][]ulid.ULID{ 248 "127.0.0.1": {block1, block2, block4}, 249 }, 250 }, 251 "shuffle sharding, multiple instances in the ring with RF = 1, SS = 2": { 252 shardingStrategy: util.ShardingStrategyShuffle, 253 tenantShardSize: 2, 254 replicationFactor: 1, 255 setup: func(d *ring.Desc) { 256 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 257 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) 258 d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) 259 d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) 260 }, 261 queryBlocks: []ulid.ULID{block1, block2, block4}, 262 expectedClients: map[string][]ulid.ULID{ 263 "127.0.0.1": {block1, block4}, 264 "127.0.0.3": {block2}, 265 }, 266 }, 267 "shuffle sharding, multiple instances in the ring with RF = 1, SS = 4": { 268 shardingStrategy: util.ShardingStrategyShuffle, 269 tenantShardSize: 4, 270 replicationFactor: 1, 271 setup: func(d *ring.Desc) { 272 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 273 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) 274 d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) 275 d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) 276 }, 277 queryBlocks: []ulid.ULID{block1, block2, block4}, 278 expectedClients: map[string][]ulid.ULID{ 279 "127.0.0.1": {block1}, 280 "127.0.0.2": {block2}, 281 "127.0.0.4": {block4}, 282 }, 283 }, 284 "shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks but some replacement available": { 285 shardingStrategy: util.ShardingStrategyShuffle, 286 tenantShardSize: 2, 287 replicationFactor: 2, 288 setup: func(d *ring.Desc) { 289 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 290 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) 291 d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) 292 d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) 293 }, 294 queryBlocks: []ulid.ULID{block1, block2}, 295 exclude: map[ulid.ULID][]string{ 296 block1: {"127.0.0.1"}, 297 block2: {"127.0.0.1"}, 298 }, 299 expectedClients: map[string][]ulid.ULID{ 300 "127.0.0.3": {block1, block2}, 301 }, 302 }, 303 "shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks and no replacement available": { 304 shardingStrategy: util.ShardingStrategyShuffle, 305 tenantShardSize: 2, 306 replicationFactor: 2, 307 setup: func(d *ring.Desc) { 308 d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) 309 d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) 310 d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt) 311 d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt) 312 }, 313 queryBlocks: []ulid.ULID{block1, block2}, 314 exclude: map[ulid.ULID][]string{ 315 block1: {"127.0.0.1", "127.0.0.3"}, 316 block2: {"127.0.0.1"}, 317 }, 318 expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), 319 }, 320 } 321 322 for testName, testData := range tests { 323 testData := testData 324 325 t.Run(testName, func(t *testing.T) { 326 t.Parallel() 327 328 ctx := context.Background() 329 330 // Setup the ring state. 331 ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) 332 t.Cleanup(func() { assert.NoError(t, closer.Close()) }) 333 334 require.NoError(t, ringStore.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) { 335 d := ring.NewDesc() 336 testData.setup(d) 337 return d, true, nil 338 })) 339 340 ringCfg := ring.Config{} 341 flagext.DefaultValues(&ringCfg) 342 ringCfg.ReplicationFactor = testData.replicationFactor 343 344 r, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", "test", ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) 345 require.NoError(t, err) 346 347 limits := &blocksStoreLimitsMock{ 348 storeGatewayTenantShardSize: testData.tenantShardSize, 349 } 350 351 reg := prometheus.NewPedanticRegistry() 352 s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg) 353 require.NoError(t, err) 354 require.NoError(t, services.StartAndAwaitRunning(ctx, s)) 355 defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck 356 357 // Wait until the ring client has initialised the state. 358 test.Poll(t, time.Second, true, func() interface{} { 359 all, err := r.GetAllHealthy(ring.Read) 360 return err == nil && len(all.Instances) > 0 361 }) 362 363 clients, err := s.GetClientsFor(userID, testData.queryBlocks, testData.exclude) 364 assert.Equal(t, testData.expectedErr, err) 365 366 if testData.expectedErr == nil { 367 assert.Equal(t, testData.expectedClients, getStoreGatewayClientAddrs(clients)) 368 369 assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` 370 # HELP cortex_storegateway_clients The current number of store-gateway clients in the pool. 371 # TYPE cortex_storegateway_clients gauge 372 cortex_storegateway_clients{client="querier"} %d 373 `, len(testData.expectedClients))), "cortex_storegateway_clients")) 374 } 375 }) 376 } 377} 378 379func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancingStrategy(t *testing.T) { 380 const ( 381 numRuns = 1000 382 numInstances = 3 383 ) 384 385 ctx := context.Background() 386 userID := "user-A" 387 registeredAt := time.Now() 388 block1 := ulid.MustNew(1, nil) 389 390 // Create a ring. 391 ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) 392 t.Cleanup(func() { assert.NoError(t, closer.Close()) }) 393 394 require.NoError(t, ringStore.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) { 395 d := ring.NewDesc() 396 for n := 1; n <= numInstances; n++ { 397 d.AddIngester(fmt.Sprintf("instance-%d", n), fmt.Sprintf("127.0.0.%d", n), "", []uint32{uint32(n)}, ring.ACTIVE, registeredAt) 398 } 399 return d, true, nil 400 })) 401 402 // Configure a replication factor equal to the number of instances, so that every store-gateway gets all blocks. 403 ringCfg := ring.Config{} 404 flagext.DefaultValues(&ringCfg) 405 ringCfg.ReplicationFactor = numInstances 406 407 r, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", "test", ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) 408 require.NoError(t, err) 409 410 limits := &blocksStoreLimitsMock{} 411 reg := prometheus.NewPedanticRegistry() 412 s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg) 413 require.NoError(t, err) 414 require.NoError(t, services.StartAndAwaitRunning(ctx, s)) 415 defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck 416 417 // Wait until the ring client has initialised the state. 418 test.Poll(t, time.Second, true, func() interface{} { 419 all, err := r.GetAllHealthy(ring.Read) 420 return err == nil && len(all.Instances) > 0 421 }) 422 423 // Request the same block multiple times and ensure the distribution of 424 // requests across store-gateways is balanced. 425 distribution := map[string]int{} 426 427 for n := 0; n < numRuns; n++ { 428 clients, err := s.GetClientsFor(userID, []ulid.ULID{block1}, nil) 429 require.NoError(t, err) 430 require.Len(t, clients, 1) 431 432 for addr := range getStoreGatewayClientAddrs(clients) { 433 distribution[addr]++ 434 } 435 } 436 437 assert.Len(t, distribution, numInstances) 438 for addr, count := range distribution { 439 // Ensure that the number of times each client is returned is above 440 // the 80% of the perfect even distribution. 441 assert.Greaterf(t, float64(count), (float64(numRuns)/float64(numInstances))*0.8, "store-gateway address: %s", addr) 442 } 443} 444 445func getStoreGatewayClientAddrs(clients map[BlocksStoreClient][]ulid.ULID) map[string][]ulid.ULID { 446 addrs := map[string][]ulid.ULID{} 447 for c, blockIDs := range clients { 448 addrs[c.RemoteAddress()] = blockIDs 449 } 450 return addrs 451} 452