1package querier
2
3import (
4	"context"
5	"fmt"
6	"strings"
7	"testing"
8	"time"
9
10	"github.com/go-kit/log"
11	"github.com/grafana/dskit/flagext"
12	"github.com/grafana/dskit/kv/consul"
13	"github.com/grafana/dskit/services"
14	"github.com/oklog/ulid"
15	"github.com/prometheus/client_golang/prometheus"
16	"github.com/prometheus/client_golang/prometheus/testutil"
17	"github.com/stretchr/testify/assert"
18	"github.com/stretchr/testify/require"
19
20	"github.com/cortexproject/cortex/pkg/ring"
21	cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
22	"github.com/cortexproject/cortex/pkg/util"
23	"github.com/cortexproject/cortex/pkg/util/test"
24)
25
26func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
27	// The following block IDs have been picked to have increasing hash values
28	// in order to simplify the tests.
29	block1 := ulid.MustNew(1, nil) // hash: 283204220
30	block2 := ulid.MustNew(2, nil) // hash: 444110359
31	block3 := ulid.MustNew(5, nil) // hash: 2931974232
32	block4 := ulid.MustNew(6, nil) // hash: 3092880371
33
34	block1Hash := cortex_tsdb.HashBlockID(block1)
35	block2Hash := cortex_tsdb.HashBlockID(block2)
36	block3Hash := cortex_tsdb.HashBlockID(block3)
37	block4Hash := cortex_tsdb.HashBlockID(block4)
38
39	userID := "user-A"
40	registeredAt := time.Now()
41
42	tests := map[string]struct {
43		shardingStrategy  string
44		tenantShardSize   int
45		replicationFactor int
46		setup             func(*ring.Desc)
47		queryBlocks       []ulid.ULID
48		exclude           map[ulid.ULID][]string
49		expectedClients   map[string][]ulid.ULID
50		expectedErr       error
51	}{
52		//
53		// Sharding strategy: default
54		//
55		"default sharding, single instance in the ring with RF = 1": {
56			shardingStrategy:  util.ShardingStrategyDefault,
57			replicationFactor: 1,
58			setup: func(d *ring.Desc) {
59				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
60			},
61			queryBlocks: []ulid.ULID{block1, block2},
62			expectedClients: map[string][]ulid.ULID{
63				"127.0.0.1": {block1, block2},
64			},
65		},
66		"default sharding, single instance in the ring with RF = 1 but excluded": {
67			shardingStrategy:  util.ShardingStrategyDefault,
68			replicationFactor: 1,
69			setup: func(d *ring.Desc) {
70				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
71			},
72			queryBlocks: []ulid.ULID{block1, block2},
73			exclude: map[ulid.ULID][]string{
74				block1: {"127.0.0.1"},
75			},
76			expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()),
77		},
78		"default sharding, single instance in the ring with RF = 1 but excluded for non queried block": {
79			shardingStrategy:  util.ShardingStrategyDefault,
80			replicationFactor: 1,
81			setup: func(d *ring.Desc) {
82				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
83			},
84			queryBlocks: []ulid.ULID{block1, block2},
85			exclude: map[ulid.ULID][]string{
86				block3: {"127.0.0.1"},
87			},
88			expectedClients: map[string][]ulid.ULID{
89				"127.0.0.1": {block1, block2},
90			},
91		},
92		"default sharding, single instance in the ring with RF = 2": {
93			shardingStrategy:  util.ShardingStrategyDefault,
94			replicationFactor: 2,
95			setup: func(d *ring.Desc) {
96				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
97			},
98			queryBlocks: []ulid.ULID{block1, block2},
99			expectedClients: map[string][]ulid.ULID{
100				"127.0.0.1": {block1, block2},
101			},
102		},
103		"default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 1": {
104			shardingStrategy:  util.ShardingStrategyDefault,
105			replicationFactor: 1,
106			setup: func(d *ring.Desc) {
107				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
108				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
109				d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt)
110				d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt)
111			},
112			queryBlocks: []ulid.ULID{block1, block3, block4},
113			expectedClients: map[string][]ulid.ULID{
114				"127.0.0.1": {block1},
115				"127.0.0.3": {block3},
116				"127.0.0.4": {block4},
117			},
118		},
119		"default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 1 but excluded": {
120			shardingStrategy:  util.ShardingStrategyDefault,
121			replicationFactor: 1,
122			setup: func(d *ring.Desc) {
123				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
124				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
125				d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt)
126				d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt)
127			},
128			queryBlocks: []ulid.ULID{block1, block3, block4},
129			exclude: map[ulid.ULID][]string{
130				block3: {"127.0.0.3"},
131			},
132			expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block3.String()),
133		},
134		"default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 2": {
135			shardingStrategy:  util.ShardingStrategyDefault,
136			replicationFactor: 2,
137			setup: func(d *ring.Desc) {
138				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
139				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
140				d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt)
141				d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt)
142			},
143			queryBlocks: []ulid.ULID{block1, block3, block4},
144			expectedClients: map[string][]ulid.ULID{
145				"127.0.0.1": {block1},
146				"127.0.0.3": {block3},
147				"127.0.0.4": {block4},
148			},
149		},
150		"default sharding, multiple instances in the ring with multiple requested blocks belonging to the same store-gateway and RF = 2": {
151			shardingStrategy:  util.ShardingStrategyDefault,
152			replicationFactor: 2,
153			setup: func(d *ring.Desc) {
154				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
155				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt)
156			},
157			queryBlocks: []ulid.ULID{block1, block2, block3, block4},
158			expectedClients: map[string][]ulid.ULID{
159				"127.0.0.1": {block1, block4},
160				"127.0.0.2": {block2, block3},
161			},
162		},
163		"default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 2 and some blocks excluded but with replacement available": {
164			shardingStrategy:  util.ShardingStrategyDefault,
165			replicationFactor: 2,
166			setup: func(d *ring.Desc) {
167				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
168				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
169				d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt)
170				d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt)
171			},
172			queryBlocks: []ulid.ULID{block1, block3, block4},
173			exclude: map[ulid.ULID][]string{
174				block3: {"127.0.0.3"},
175				block1: {"127.0.0.1"},
176			},
177			expectedClients: map[string][]ulid.ULID{
178				"127.0.0.2": {block1},
179				"127.0.0.4": {block3, block4},
180			},
181		},
182		"default sharding, multiple instances in the ring are JOINING, the requested block + its replicas only belongs to JOINING instances": {
183			shardingStrategy:  util.ShardingStrategyDefault,
184			replicationFactor: 2,
185			setup: func(d *ring.Desc) {
186				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.JOINING, registeredAt)
187				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.JOINING, registeredAt)
188				d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.JOINING, registeredAt)
189				d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt)
190			},
191			queryBlocks: []ulid.ULID{block1},
192			expectedClients: map[string][]ulid.ULID{
193				"127.0.0.4": {block1},
194			},
195		},
196		//
197		// Sharding strategy: shuffle sharding
198		//
199		"shuffle sharding, single instance in the ring with RF = 1, SS = 1": {
200			shardingStrategy:  util.ShardingStrategyShuffle,
201			tenantShardSize:   1,
202			replicationFactor: 1,
203			setup: func(d *ring.Desc) {
204				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
205			},
206			queryBlocks: []ulid.ULID{block1, block2},
207			expectedClients: map[string][]ulid.ULID{
208				"127.0.0.1": {block1, block2},
209			},
210		},
211		"shuffle sharding, single instance in the ring with RF = 1, SS = 1 but excluded": {
212			shardingStrategy:  util.ShardingStrategyShuffle,
213			tenantShardSize:   1,
214			replicationFactor: 1,
215			setup: func(d *ring.Desc) {
216				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
217			},
218			queryBlocks: []ulid.ULID{block1, block2},
219			exclude: map[ulid.ULID][]string{
220				block1: {"127.0.0.1"},
221			},
222			expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()),
223		},
224		"shuffle sharding, single instance in the ring with RF = 2, SS = 2": {
225			shardingStrategy:  util.ShardingStrategyShuffle,
226			tenantShardSize:   2,
227			replicationFactor: 2,
228			setup: func(d *ring.Desc) {
229				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
230			},
231			queryBlocks: []ulid.ULID{block1, block2},
232			expectedClients: map[string][]ulid.ULID{
233				"127.0.0.1": {block1, block2},
234			},
235		},
236		"shuffle sharding, multiple instances in the ring with RF = 1, SS = 1": {
237			shardingStrategy:  util.ShardingStrategyShuffle,
238			tenantShardSize:   1,
239			replicationFactor: 1,
240			setup: func(d *ring.Desc) {
241				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
242				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
243				d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt)
244				d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt)
245			},
246			queryBlocks: []ulid.ULID{block1, block2, block4},
247			expectedClients: map[string][]ulid.ULID{
248				"127.0.0.1": {block1, block2, block4},
249			},
250		},
251		"shuffle sharding, multiple instances in the ring with RF = 1, SS = 2": {
252			shardingStrategy:  util.ShardingStrategyShuffle,
253			tenantShardSize:   2,
254			replicationFactor: 1,
255			setup: func(d *ring.Desc) {
256				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
257				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
258				d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt)
259				d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt)
260			},
261			queryBlocks: []ulid.ULID{block1, block2, block4},
262			expectedClients: map[string][]ulid.ULID{
263				"127.0.0.1": {block1, block4},
264				"127.0.0.3": {block2},
265			},
266		},
267		"shuffle sharding, multiple instances in the ring with RF = 1, SS = 4": {
268			shardingStrategy:  util.ShardingStrategyShuffle,
269			tenantShardSize:   4,
270			replicationFactor: 1,
271			setup: func(d *ring.Desc) {
272				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
273				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
274				d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt)
275				d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt)
276			},
277			queryBlocks: []ulid.ULID{block1, block2, block4},
278			expectedClients: map[string][]ulid.ULID{
279				"127.0.0.1": {block1},
280				"127.0.0.2": {block2},
281				"127.0.0.4": {block4},
282			},
283		},
284		"shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks but some replacement available": {
285			shardingStrategy:  util.ShardingStrategyShuffle,
286			tenantShardSize:   2,
287			replicationFactor: 2,
288			setup: func(d *ring.Desc) {
289				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
290				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
291				d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt)
292				d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt)
293			},
294			queryBlocks: []ulid.ULID{block1, block2},
295			exclude: map[ulid.ULID][]string{
296				block1: {"127.0.0.1"},
297				block2: {"127.0.0.1"},
298			},
299			expectedClients: map[string][]ulid.ULID{
300				"127.0.0.3": {block1, block2},
301			},
302		},
303		"shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks and no replacement available": {
304			shardingStrategy:  util.ShardingStrategyShuffle,
305			tenantShardSize:   2,
306			replicationFactor: 2,
307			setup: func(d *ring.Desc) {
308				d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
309				d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
310				d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE, registeredAt)
311				d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE, registeredAt)
312			},
313			queryBlocks: []ulid.ULID{block1, block2},
314			exclude: map[ulid.ULID][]string{
315				block1: {"127.0.0.1", "127.0.0.3"},
316				block2: {"127.0.0.1"},
317			},
318			expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()),
319		},
320	}
321
322	for testName, testData := range tests {
323		testData := testData
324
325		t.Run(testName, func(t *testing.T) {
326			t.Parallel()
327
328			ctx := context.Background()
329
330			// Setup the ring state.
331			ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
332			t.Cleanup(func() { assert.NoError(t, closer.Close()) })
333
334			require.NoError(t, ringStore.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) {
335				d := ring.NewDesc()
336				testData.setup(d)
337				return d, true, nil
338			}))
339
340			ringCfg := ring.Config{}
341			flagext.DefaultValues(&ringCfg)
342			ringCfg.ReplicationFactor = testData.replicationFactor
343
344			r, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", "test", ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy())
345			require.NoError(t, err)
346
347			limits := &blocksStoreLimitsMock{
348				storeGatewayTenantShardSize: testData.tenantShardSize,
349			}
350
351			reg := prometheus.NewPedanticRegistry()
352			s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg)
353			require.NoError(t, err)
354			require.NoError(t, services.StartAndAwaitRunning(ctx, s))
355			defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
356
357			// Wait until the ring client has initialised the state.
358			test.Poll(t, time.Second, true, func() interface{} {
359				all, err := r.GetAllHealthy(ring.Read)
360				return err == nil && len(all.Instances) > 0
361			})
362
363			clients, err := s.GetClientsFor(userID, testData.queryBlocks, testData.exclude)
364			assert.Equal(t, testData.expectedErr, err)
365
366			if testData.expectedErr == nil {
367				assert.Equal(t, testData.expectedClients, getStoreGatewayClientAddrs(clients))
368
369				assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
370					# HELP cortex_storegateway_clients The current number of store-gateway clients in the pool.
371					# TYPE cortex_storegateway_clients gauge
372					cortex_storegateway_clients{client="querier"} %d
373				`, len(testData.expectedClients))), "cortex_storegateway_clients"))
374			}
375		})
376	}
377}
378
379func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancingStrategy(t *testing.T) {
380	const (
381		numRuns      = 1000
382		numInstances = 3
383	)
384
385	ctx := context.Background()
386	userID := "user-A"
387	registeredAt := time.Now()
388	block1 := ulid.MustNew(1, nil)
389
390	// Create a ring.
391	ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
392	t.Cleanup(func() { assert.NoError(t, closer.Close()) })
393
394	require.NoError(t, ringStore.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) {
395		d := ring.NewDesc()
396		for n := 1; n <= numInstances; n++ {
397			d.AddIngester(fmt.Sprintf("instance-%d", n), fmt.Sprintf("127.0.0.%d", n), "", []uint32{uint32(n)}, ring.ACTIVE, registeredAt)
398		}
399		return d, true, nil
400	}))
401
402	// Configure a replication factor equal to the number of instances, so that every store-gateway gets all blocks.
403	ringCfg := ring.Config{}
404	flagext.DefaultValues(&ringCfg)
405	ringCfg.ReplicationFactor = numInstances
406
407	r, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", "test", ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy())
408	require.NoError(t, err)
409
410	limits := &blocksStoreLimitsMock{}
411	reg := prometheus.NewPedanticRegistry()
412	s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg)
413	require.NoError(t, err)
414	require.NoError(t, services.StartAndAwaitRunning(ctx, s))
415	defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
416
417	// Wait until the ring client has initialised the state.
418	test.Poll(t, time.Second, true, func() interface{} {
419		all, err := r.GetAllHealthy(ring.Read)
420		return err == nil && len(all.Instances) > 0
421	})
422
423	// Request the same block multiple times and ensure the distribution of
424	// requests across store-gateways is balanced.
425	distribution := map[string]int{}
426
427	for n := 0; n < numRuns; n++ {
428		clients, err := s.GetClientsFor(userID, []ulid.ULID{block1}, nil)
429		require.NoError(t, err)
430		require.Len(t, clients, 1)
431
432		for addr := range getStoreGatewayClientAddrs(clients) {
433			distribution[addr]++
434		}
435	}
436
437	assert.Len(t, distribution, numInstances)
438	for addr, count := range distribution {
439		// Ensure that the number of times each client is returned is above
440		// the 80% of the perfect even distribution.
441		assert.Greaterf(t, float64(count), (float64(numRuns)/float64(numInstances))*0.8, "store-gateway address: %s", addr)
442	}
443}
444
445func getStoreGatewayClientAddrs(clients map[BlocksStoreClient][]ulid.ULID) map[string][]ulid.ULID {
446	addrs := map[string][]ulid.ULID{}
447	for c, blockIDs := range clients {
448		addrs[c.RemoteAddress()] = blockIDs
449	}
450	return addrs
451}
452