1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package cacheutil
5
6import (
7	"context"
8	"errors"
9	"sync"
10	"testing"
11	"time"
12
13	"github.com/bradfitz/gomemcache/memcache"
14	"github.com/fortytw2/leaktest"
15	"github.com/go-kit/kit/log"
16	prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
17	"github.com/thanos-io/thanos/pkg/testutil"
18)
19
20func TestMemcachedClientConfig_validate(t *testing.T) {
21	tests := map[string]struct {
22		config   MemcachedClientConfig
23		expected error
24	}{
25		"should pass on valid config": {
26			config: MemcachedClientConfig{
27				Addresses: []string{"127.0.0.1:11211"},
28			},
29			expected: nil,
30		},
31		"should fail on no addresses": {
32			config: MemcachedClientConfig{
33				Addresses: []string{},
34			},
35			expected: errMemcachedConfigNoAddrs,
36		},
37	}
38
39	for testName, testData := range tests {
40		t.Run(testName, func(t *testing.T) {
41			testutil.Equals(t, testData.expected, testData.config.validate())
42		})
43	}
44}
45
46func TestNewMemcachedClient(t *testing.T) {
47	defer leaktest.CheckTimeout(t, 10*time.Second)()
48
49	// Should return error on empty YAML config.
50	conf := []byte{}
51	cache, err := NewMemcachedClient(log.NewNopLogger(), "test", conf, nil)
52	testutil.NotOk(t, err)
53	testutil.Equals(t, (*memcachedClient)(nil), cache)
54
55	// Should return error on invalid YAML config.
56	conf = []byte("invalid")
57	cache, err = NewMemcachedClient(log.NewNopLogger(), "test", conf, nil)
58	testutil.NotOk(t, err)
59	testutil.Equals(t, (*memcachedClient)(nil), cache)
60
61	// Should instance a memcached client with minimum YAML config.
62	conf = []byte(`
63addresses:
64  - 127.0.0.1:11211
65  - 127.0.0.2:11211
66`)
67	cache, err = NewMemcachedClient(log.NewNopLogger(), "test", conf, nil)
68	testutil.Ok(t, err)
69	defer cache.Stop()
70
71	testutil.Equals(t, []string{"127.0.0.1:11211", "127.0.0.2:11211"}, cache.config.Addresses)
72	testutil.Equals(t, defaultMemcachedClientConfig.Timeout, cache.config.Timeout)
73	testutil.Equals(t, defaultMemcachedClientConfig.MaxIdleConnections, cache.config.MaxIdleConnections)
74	testutil.Equals(t, defaultMemcachedClientConfig.MaxAsyncConcurrency, cache.config.MaxAsyncConcurrency)
75	testutil.Equals(t, defaultMemcachedClientConfig.MaxAsyncBufferSize, cache.config.MaxAsyncBufferSize)
76	testutil.Equals(t, defaultMemcachedClientConfig.DNSProviderUpdateInterval, cache.config.DNSProviderUpdateInterval)
77	testutil.Equals(t, defaultMemcachedClientConfig.MaxGetMultiConcurrency, cache.config.MaxGetMultiConcurrency)
78	testutil.Equals(t, defaultMemcachedClientConfig.MaxGetMultiBatchSize, cache.config.MaxGetMultiBatchSize)
79
80	// Should instance a memcached client with configured YAML config.
81	conf = []byte(`
82addresses:
83  - 127.0.0.1:11211
84  - 127.0.0.2:11211
85timeout: 1s
86max_idle_connections: 1
87max_async_concurrency: 1
88max_async_buffer_size: 1
89max_get_multi_concurrency: 1
90max_get_multi_batch_size: 1
91dns_provider_update_interval: 1s
92`)
93	cache, err = NewMemcachedClient(log.NewNopLogger(), "test", conf, nil)
94	testutil.Ok(t, err)
95	defer cache.Stop()
96
97	testutil.Equals(t, []string{"127.0.0.1:11211", "127.0.0.2:11211"}, cache.config.Addresses)
98	testutil.Equals(t, 1*time.Second, cache.config.Timeout)
99	testutil.Equals(t, 1, cache.config.MaxIdleConnections)
100	testutil.Equals(t, 1, cache.config.MaxAsyncConcurrency)
101	testutil.Equals(t, 1, cache.config.MaxAsyncBufferSize)
102	testutil.Equals(t, 1*time.Second, cache.config.DNSProviderUpdateInterval)
103	testutil.Equals(t, 1, cache.config.MaxGetMultiConcurrency)
104	testutil.Equals(t, 1, cache.config.MaxGetMultiBatchSize)
105}
106
107func TestMemcachedClient_SetAsync(t *testing.T) {
108	defer leaktest.CheckTimeout(t, 10*time.Second)()
109
110	ctx := context.Background()
111	config := defaultMemcachedClientConfig
112	config.Addresses = []string{"127.0.0.1:11211"}
113	backendMock := newMemcachedClientBackendMock()
114
115	client, err := prepare(config, backendMock)
116	testutil.Ok(t, err)
117	defer client.Stop()
118
119	testutil.Ok(t, client.SetAsync(ctx, "key-1", []byte("value-1"), time.Second))
120	testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2"), time.Second))
121	testutil.Ok(t, backendMock.waitItems(2))
122
123	testutil.Equals(t, 2.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opSet)))
124	testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti)))
125	testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.failures.WithLabelValues(opSet)))
126}
127
128func TestMemcachedClient_GetMulti(t *testing.T) {
129	defer leaktest.CheckTimeout(t, 10*time.Second)()
130
131	tests := map[string]struct {
132		maxBatchSize          int
133		maxConcurrency        int
134		mockedGetMultiErrors  int
135		initialItems          []memcache.Item
136		getKeys               []string
137		expectedHits          map[string][]byte
138		expectedGetMultiCount int
139	}{
140		"should fetch keys in a single batch if the input keys is <= the max batch size": {
141			maxBatchSize:   2,
142			maxConcurrency: 5,
143			initialItems: []memcache.Item{
144				{Key: "key-1", Value: []byte("value-1")},
145				{Key: "key-2", Value: []byte("value-2")},
146			},
147			getKeys: []string{"key-1", "key-2"},
148			expectedHits: map[string][]byte{
149				"key-1": []byte("value-1"),
150				"key-2": []byte("value-2"),
151			},
152			expectedGetMultiCount: 1,
153		},
154		"should fetch keys in multiple batches if the input keys is > the max batch size": {
155			maxBatchSize:   2,
156			maxConcurrency: 5,
157			initialItems: []memcache.Item{
158				{Key: "key-1", Value: []byte("value-1")},
159				{Key: "key-2", Value: []byte("value-2")},
160				{Key: "key-3", Value: []byte("value-3")},
161			},
162			getKeys: []string{"key-1", "key-2", "key-3"},
163			expectedHits: map[string][]byte{
164				"key-1": []byte("value-1"),
165				"key-2": []byte("value-2"),
166				"key-3": []byte("value-3"),
167			},
168			expectedGetMultiCount: 2,
169		},
170		"should fetch keys in multiple batches on input keys exact multiple of batch size": {
171			maxBatchSize:   2,
172			maxConcurrency: 5,
173			initialItems: []memcache.Item{
174				{Key: "key-1", Value: []byte("value-1")},
175				{Key: "key-2", Value: []byte("value-2")},
176				{Key: "key-3", Value: []byte("value-3")},
177				{Key: "key-4", Value: []byte("value-4")},
178			},
179			getKeys: []string{"key-1", "key-2", "key-3", "key-4"},
180			expectedHits: map[string][]byte{
181				"key-1": []byte("value-1"),
182				"key-2": []byte("value-2"),
183				"key-3": []byte("value-3"),
184				"key-4": []byte("value-4"),
185			},
186			expectedGetMultiCount: 2,
187		},
188		"should fetch keys in multiple batches on input keys exact multiple of batch size with max concurrency disabled (0)": {
189			maxBatchSize:   2,
190			maxConcurrency: 0,
191			initialItems: []memcache.Item{
192				{Key: "key-1", Value: []byte("value-1")},
193				{Key: "key-2", Value: []byte("value-2")},
194				{Key: "key-3", Value: []byte("value-3")},
195				{Key: "key-4", Value: []byte("value-4")},
196			},
197			getKeys: []string{"key-1", "key-2", "key-3", "key-4"},
198			expectedHits: map[string][]byte{
199				"key-1": []byte("value-1"),
200				"key-2": []byte("value-2"),
201				"key-3": []byte("value-3"),
202				"key-4": []byte("value-4"),
203			},
204			expectedGetMultiCount: 2,
205		},
206		"should fetch keys in multiple batches on input keys exact multiple of batch size with max concurrency lower than the batches": {
207			maxBatchSize:   1,
208			maxConcurrency: 1,
209			initialItems: []memcache.Item{
210				{Key: "key-1", Value: []byte("value-1")},
211				{Key: "key-2", Value: []byte("value-2")},
212				{Key: "key-3", Value: []byte("value-3")},
213				{Key: "key-4", Value: []byte("value-4")},
214			},
215			getKeys: []string{"key-1", "key-2", "key-3", "key-4"},
216			expectedHits: map[string][]byte{
217				"key-1": []byte("value-1"),
218				"key-2": []byte("value-2"),
219				"key-3": []byte("value-3"),
220				"key-4": []byte("value-4"),
221			},
222			expectedGetMultiCount: 4,
223		},
224		"should fetch keys in a single batch if max batch size is disabled (0)": {
225			maxBatchSize:   0,
226			maxConcurrency: 5,
227			initialItems: []memcache.Item{
228				{Key: "key-1", Value: []byte("value-1")},
229				{Key: "key-2", Value: []byte("value-2")},
230				{Key: "key-3", Value: []byte("value-3")},
231				{Key: "key-4", Value: []byte("value-4")},
232			},
233			getKeys: []string{"key-1", "key-2", "key-3", "key-4"},
234			expectedHits: map[string][]byte{
235				"key-1": []byte("value-1"),
236				"key-2": []byte("value-2"),
237				"key-3": []byte("value-3"),
238				"key-4": []byte("value-4"),
239			},
240			expectedGetMultiCount: 1,
241		},
242		"should fetch keys in a single batch if max batch size is disabled (0) and max concurrency is disabled (0)": {
243			maxBatchSize:   0,
244			maxConcurrency: 0,
245			initialItems: []memcache.Item{
246				{Key: "key-1", Value: []byte("value-1")},
247				{Key: "key-2", Value: []byte("value-2")},
248				{Key: "key-3", Value: []byte("value-3")},
249				{Key: "key-4", Value: []byte("value-4")},
250			},
251			getKeys: []string{"key-1", "key-2", "key-3", "key-4"},
252			expectedHits: map[string][]byte{
253				"key-1": []byte("value-1"),
254				"key-2": []byte("value-2"),
255				"key-3": []byte("value-3"),
256				"key-4": []byte("value-4"),
257			},
258			expectedGetMultiCount: 1,
259		},
260		"should return no hits on all keys missing": {
261			maxBatchSize:   2,
262			maxConcurrency: 5,
263			initialItems: []memcache.Item{
264				{Key: "key-1", Value: []byte("value-1")},
265				{Key: "key-2", Value: []byte("value-2")},
266			},
267			getKeys: []string{"key-1", "key-2", "key-3", "key-4"},
268			expectedHits: map[string][]byte{
269				"key-1": []byte("value-1"),
270				"key-2": []byte("value-2"),
271			},
272			expectedGetMultiCount: 2,
273		},
274		"should return no hits on partial errors while fetching batches and no items found": {
275			maxBatchSize:         2,
276			maxConcurrency:       5,
277			mockedGetMultiErrors: 1,
278			initialItems: []memcache.Item{
279				{Key: "key-1", Value: []byte("value-1")},
280				{Key: "key-2", Value: []byte("value-2")},
281				{Key: "key-3", Value: []byte("value-3")},
282			},
283			getKeys:               []string{"key-5", "key-6", "key-7"},
284			expectedHits:          map[string][]byte{},
285			expectedGetMultiCount: 2,
286		},
287		"should return no hits on all errors while fetching batches": {
288			maxBatchSize:         2,
289			maxConcurrency:       5,
290			mockedGetMultiErrors: 2,
291			initialItems: []memcache.Item{
292				{Key: "key-1", Value: []byte("value-1")},
293				{Key: "key-2", Value: []byte("value-2")},
294				{Key: "key-3", Value: []byte("value-3")},
295			},
296			getKeys:               []string{"key-5", "key-6", "key-7"},
297			expectedHits:          nil,
298			expectedGetMultiCount: 2,
299		},
300	}
301
302	for testName, testData := range tests {
303		t.Run(testName, func(t *testing.T) {
304			ctx := context.Background()
305			config := defaultMemcachedClientConfig
306			config.Addresses = []string{"127.0.0.1:11211"}
307			config.MaxGetMultiBatchSize = testData.maxBatchSize
308			config.MaxGetMultiConcurrency = testData.maxConcurrency
309
310			backendMock := newMemcachedClientBackendMock()
311			backendMock.getMultiErrors = testData.mockedGetMultiErrors
312
313			client, err := prepare(config, backendMock)
314			testutil.Ok(t, err)
315			defer client.Stop()
316
317			// Populate memcached with the initial items.
318			for _, item := range testData.initialItems {
319				testutil.Ok(t, client.SetAsync(ctx, item.Key, item.Value, time.Second))
320			}
321
322			// Wait until initial items have been added.
323			testutil.Ok(t, backendMock.waitItems(len(testData.initialItems)))
324
325			// Read back the items.
326			testutil.Equals(t, testData.expectedHits, client.GetMulti(ctx, testData.getKeys))
327
328			// Ensure the client has interacted with the backend as expected.
329			backendMock.lock.Lock()
330			defer backendMock.lock.Unlock()
331			testutil.Equals(t, testData.expectedGetMultiCount, backendMock.getMultiCount)
332
333			// Ensure metrics are tracked.
334			testutil.Equals(t, float64(testData.expectedGetMultiCount), prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti)))
335			testutil.Equals(t, float64(testData.mockedGetMultiErrors), prom_testutil.ToFloat64(client.failures.WithLabelValues(opGetMulti)))
336		})
337	}
338}
339
340func prepare(config MemcachedClientConfig, backendMock *memcachedClientBackendMock) (*memcachedClient, error) {
341	logger := log.NewNopLogger()
342	selector := &MemcachedJumpHashSelector{}
343	client, err := newMemcachedClient(logger, "test", backendMock, selector, config, nil)
344
345	return client, err
346}
347
348type memcachedClientBackendMock struct {
349	lock           sync.Mutex
350	items          map[string]*memcache.Item
351	getMultiCount  int
352	getMultiErrors int
353}
354
355func newMemcachedClientBackendMock() *memcachedClientBackendMock {
356	return &memcachedClientBackendMock{
357		items: map[string]*memcache.Item{},
358	}
359}
360
361func (c *memcachedClientBackendMock) GetMulti(keys []string) (map[string]*memcache.Item, error) {
362	c.lock.Lock()
363	defer c.lock.Unlock()
364
365	c.getMultiCount++
366	if c.getMultiCount <= c.getMultiErrors {
367		return nil, errors.New("mocked GetMulti error")
368	}
369
370	items := make(map[string]*memcache.Item)
371	for _, key := range keys {
372		if item, ok := c.items[key]; ok {
373			items[key] = item
374		}
375	}
376
377	return items, nil
378}
379
380func (c *memcachedClientBackendMock) Set(item *memcache.Item) error {
381	c.lock.Lock()
382	defer c.lock.Unlock()
383
384	c.items[item.Key] = item
385
386	return nil
387}
388
389func (c *memcachedClientBackendMock) waitItems(expected int) error {
390	deadline := time.Now().Add(1 * time.Second)
391
392	for time.Now().Before(deadline) {
393		c.lock.Lock()
394		count := len(c.items)
395		c.lock.Unlock()
396
397		if count >= expected {
398			return nil
399		}
400	}
401
402	return errors.New("timeout expired while waiting for items in the memcached mock")
403}
404